| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- package raft
- import (
- "errors"
- "fmt"
- "sync"
- "time"
- )
- // ==================== Custom Errors ====================
- var (
- // ErrNoLeader indicates no leader is available
- ErrNoLeader = errors.New("no leader available")
- // ErrNotLeader indicates this node is not the leader
- ErrNotLeader = errors.New("not leader")
- // ErrConfigInFlight indicates a configuration change is in progress
- ErrConfigInFlight = errors.New("configuration change in progress")
- // ErrTimeout indicates an operation timed out
- ErrTimeout = errors.New("operation timed out")
- // ErrShutdown indicates the raft node is shutting down
- ErrShutdown = errors.New("raft is shutting down")
- // ErrPersistFailed indicates persistent storage failed
- ErrPersistFailed = errors.New("failed to persist state")
- // ErrLeadershipLost indicates leadership was lost during operation
- ErrLeadershipLost = errors.New("leadership lost")
- )
- // RaftError wraps errors with additional context
- type RaftError struct {
- Err error
- LeaderID string // Known leader, if any
- RetryIn time.Duration // Suggested retry delay
- }
- func (e *RaftError) Error() string {
- if e.LeaderID != "" {
- return fmt.Sprintf("%s (leader: %s)", e.Err, e.LeaderID)
- }
- return e.Err.Error()
- }
- func (e *RaftError) Unwrap() error {
- return e.Err
- }
- // NewRaftError creates a new RaftError
- func NewRaftError(err error, leaderID string, retryIn time.Duration) *RaftError {
- return &RaftError{
- Err: err,
- LeaderID: leaderID,
- RetryIn: retryIn,
- }
- }
- // NodeState represents the current state of a Raft node
- type NodeState int
- const (
- Follower NodeState = iota
- Candidate
- Leader
- )
- func (s NodeState) String() string {
- switch s {
- case Follower:
- return "Follower"
- case Candidate:
- return "Candidate"
- case Leader:
- return "Leader"
- default:
- return "Unknown"
- }
- }
- // EntryType represents the type of a log entry
- type EntryType int
- const (
- EntryNormal EntryType = iota // Normal command entry
- EntryConfig // Configuration change entry
- EntryNoop // No-op entry (used by leader to commit previous term entries)
- )
- // LogEntry represents a single entry in the replicated log
- type LogEntry struct {
- Index uint64 `json:"index"` // Log index (1-based)
- Term uint64 `json:"term"` // Term when entry was received
- Type EntryType `json:"type,omitempty"` // Entry type (normal or config change)
- Command []byte `json:"command,omitempty"` // Command to be applied to state machine (for normal entries)
- Config *ClusterConfig `json:"config,omitempty"` // New cluster configuration (for config entries)
- }
- // ConfigChangeType represents the type of configuration change
- type ConfigChangeType int
- const (
- ConfigAddNode ConfigChangeType = iota
- ConfigRemoveNode
- )
- // ConfigChange represents a single node configuration change
- type ConfigChange struct {
- Type ConfigChangeType `json:"type"` // Add or remove
- NodeID string `json:"node_id"` // Node to add/remove
- Address string `json:"address"` // Node address (for add)
- }
- // ClusterConfig represents the cluster membership configuration
- type ClusterConfig struct {
- Nodes map[string]string `json:"nodes"` // NodeID -> Address mapping for all nodes including self
- }
- // PersistentState represents the persistent state on all servers
- // (Updated on stable storage before responding to RPCs)
- type PersistentState struct {
- CurrentTerm uint64 `json:"current_term"` // Latest term server has seen
- VotedFor string `json:"voted_for"` // CandidateId that received vote in current term
- }
- // VolatileState represents the volatile state on all servers
- type VolatileState struct {
- CommitIndex uint64 // Index of highest log entry known to be committed
- LastApplied uint64 // Index of highest log entry applied to state machine
- }
- // LeaderVolatileState represents volatile state on leaders
- // (Reinitialized after election)
- type LeaderVolatileState struct {
- NextIndex map[string]uint64 // For each server, index of the next log entry to send
- MatchIndex map[string]uint64 // For each server, index of highest log entry known to be replicated
- }
- // Config holds the configuration for a Raft node
- type Config struct {
- // NodeID is the unique identifier for this node
- NodeID string
- // Peers is the list of peer node addresses (excluding self)
- // Deprecated: Use ClusterNodes instead for dynamic membership
- Peers []string
- // PeerMap maps nodeID to address for all nodes (including self)
- // Used for request forwarding. Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002"}
- // Deprecated: Use ClusterNodes instead
- PeerMap map[string]string
- // ClusterNodes maps nodeID to address for all nodes (including self)
- // This is the canonical cluster membership configuration
- // Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002", "node3": "127.0.0.1:9003"}
- ClusterNodes map[string]string
- // ListenAddr is the address this node listens on
- ListenAddr string
- // DataDir is the directory for persistent storage
- DataDir string
- // ElectionTimeoutMin is the minimum election timeout
- ElectionTimeoutMin time.Duration
- // ElectionTimeoutMax is the maximum election timeout
- ElectionTimeoutMax time.Duration
- // HeartbeatInterval is the interval between heartbeats from leader
- HeartbeatInterval time.Duration
- // MaxLogEntriesPerRequest limits entries sent in a single AppendEntries
- MaxLogEntriesPerRequest int
- // MemoryLogCapacity is the maximum number of log entries to keep in memory
- MemoryLogCapacity int
- // LogCompactionEnabled determines if log compaction is enabled
- // If false, the log will grow indefinitely (safest for data, but consumes disk)
- // Default: false
- LogCompactionEnabled bool
- // SnapshotThreshold triggers snapshot when log grows beyond this
- SnapshotThreshold uint64
- // SnapshotMinRetention is the minimum number of log entries to retain after compaction
- // This ensures followers have enough entries for catch-up without needing a full snapshot
- // Default: 1000
- SnapshotMinRetention uint64
- // SnapshotProvider is a callback function that returns the current state machine snapshot
- // If set, automatic log compaction will be enabled
- // The function should return the serialized state that can restore the state machine
- // minIncludeIndex: The snapshot MUST include all entries up to at least this index.
- SnapshotProvider func(minIncludeIndex uint64) ([]byte, error)
- // GetHandler is a callback function to handle remote Get requests
- // If set, clients can read values from this node via RPC
- GetHandler func(key string) (value string, found bool)
- // SnapshotChunkSize is the size of each chunk when transferring snapshots
- // Default: 1MB
- SnapshotChunkSize int
- // LastAppliedIndex is the index of the last log entry applied to the state machine
- // Used for initialization to avoid re-applying entries
- LastAppliedIndex uint64
- // RPC Timeout configurations
- RPCTimeout time.Duration // Default: 500ms for normal RPCs
- SnapshotRPCTimeout time.Duration // Default: 30s for snapshot transfers
- ProposeTimeout time.Duration // Default: 3s for propose forwarding
- // Retry configurations
- MaxRetries int // Default: 3
- RetryBackoff time.Duration // Default: 100ms
- // Batching configurations
- BatchMinWait time.Duration // Minimum wait time for batching (Default: 1ms)
- BatchMaxWait time.Duration // Maximum wait time for batching (Default: 10ms)
- BatchMaxSize int // Maximum batch size before forcing flush (Default: 100)
- // Logger for debug output
- Logger Logger
- }
- // Clone creates a deep copy of the config
- func (c *Config) Clone() *Config {
- clone := *c
- if c.Peers != nil {
- clone.Peers = make([]string, len(c.Peers))
- copy(clone.Peers, c.Peers)
- }
- if c.PeerMap != nil {
- clone.PeerMap = make(map[string]string)
- for k, v := range c.PeerMap {
- clone.PeerMap[k] = v
- }
- }
- if c.ClusterNodes != nil {
- clone.ClusterNodes = make(map[string]string)
- for k, v := range c.ClusterNodes {
- clone.ClusterNodes[k] = v
- }
- }
- return &clone
- }
- // GetPeerAddresses returns the addresses of all peers (excluding self)
- func (c *Config) GetPeerAddresses() []string {
- if c.ClusterNodes != nil {
- peers := make([]string, 0, len(c.ClusterNodes)-1)
- for nodeID, addr := range c.ClusterNodes {
- if nodeID != c.NodeID {
- peers = append(peers, addr)
- }
- }
- return peers
- }
- return c.Peers
- }
- // GetClusterSize returns the total number of nodes in the cluster
- func (c *Config) GetClusterSize() int {
- if c.ClusterNodes != nil {
- return len(c.ClusterNodes)
- }
- return len(c.Peers) + 1
- }
- // DefaultConfig returns a configuration with sensible defaults
- func DefaultConfig() *Config {
- return &Config{
- ElectionTimeoutMin: 150 * time.Millisecond,
- ElectionTimeoutMax: 300 * time.Millisecond,
- HeartbeatInterval: 50 * time.Millisecond,
- MaxLogEntriesPerRequest: 5000,
- MemoryLogCapacity: 10000,
- LogCompactionEnabled: false, // 默认不启用压缩
- SnapshotThreshold: 100000, // 默认阈值(仅在 LogCompactionEnabled=true 时生效)
- SnapshotMinRetention: 10000, // 保留1万条用于 follower 追赶
- SnapshotChunkSize: 1024 * 1024, // 1MB chunks
- RPCTimeout: 500 * time.Millisecond,
- SnapshotRPCTimeout: 30 * time.Second,
- ProposeTimeout: 3 * time.Second,
- MaxRetries: 3,
- RetryBackoff: 100 * time.Millisecond,
- BatchMinWait: 1 * time.Millisecond,
- BatchMaxWait: 10 * time.Millisecond,
- BatchMaxSize: 100,
- }
- }
- // ==================== Metrics ====================
- // Metrics holds runtime metrics for monitoring
- type Metrics struct {
- // Term metrics
- Term uint64 `json:"term"`
- // Proposal metrics
- ProposalsTotal uint64 `json:"proposals_total"`
- ProposalsSuccess uint64 `json:"proposals_success"`
- ProposalsFailed uint64 `json:"proposals_failed"`
- ProposalsForwarded uint64 `json:"proposals_forwarded"`
- // Replication metrics
- AppendsSent uint64 `json:"appends_sent"`
- AppendsReceived uint64 `json:"appends_received"`
- AppendsSuccess uint64 `json:"appends_success"`
- AppendsFailed uint64 `json:"appends_failed"`
- // Election metrics
- ElectionsStarted uint64 `json:"elections_started"`
- ElectionsWon uint64 `json:"elections_won"`
- PreVotesStarted uint64 `json:"pre_votes_started"`
- PreVotesGranted uint64 `json:"pre_votes_granted"`
- // Snapshot metrics
- SnapshotsTaken uint64 `json:"snapshots_taken"`
- SnapshotsInstalled uint64 `json:"snapshots_installed"`
- SnapshotsSent uint64 `json:"snapshots_sent"`
- // Read metrics
- ReadIndexRequests uint64 `json:"read_index_requests"`
- ReadIndexSuccess uint64 `json:"read_index_success"`
- // Leadership transfer metrics
- LeadershipTransfers uint64 `json:"leadership_transfers"`
- LeadershipTransferSuccess uint64 `json:"leadership_transfer_success"`
- }
- // HealthStatus represents the health status of a Raft node
- type HealthStatus struct {
- NodeID string `json:"node_id"`
- State string `json:"state"`
- Term uint64 `json:"term"`
- LeaderID string `json:"leader_id"`
- ClusterSize int `json:"cluster_size"`
- ClusterNodes map[string]string `json:"cluster_nodes"`
- CommitIndex uint64 `json:"commit_index"`
- LastApplied uint64 `json:"last_applied"`
- LogBehind uint64 `json:"log_behind"` // commitIndex - lastApplied
- LastHeartbeat time.Time `json:"last_heartbeat"`
- IsHealthy bool `json:"is_healthy"`
- Uptime time.Duration `json:"uptime"`
- }
- // Logger interface for logging
- type Logger interface {
- Debug(format string, args ...interface{})
- Info(format string, args ...interface{})
- Warn(format string, args ...interface{})
- Error(format string, args ...interface{})
- }
- // DefaultLogger implements a simple console logger
- type DefaultLogger struct {
- Prefix string
- mu sync.Mutex
- }
- func (l *DefaultLogger) log(level, format string, args ...interface{}) {
- l.mu.Lock()
- defer l.mu.Unlock()
- // Use fmt.Printf with timestamp
- // Commented out to avoid import cycle, implemented in raft.go
- }
- func (l *DefaultLogger) Debug(format string, args ...interface{}) {}
- func (l *DefaultLogger) Info(format string, args ...interface{}) {}
- func (l *DefaultLogger) Warn(format string, args ...interface{}) {}
- func (l *DefaultLogger) Error(format string, args ...interface{}) {}
- // NoopLogger implements a no-op logger
- type NoopLogger struct{}
- func (l *NoopLogger) Debug(format string, args ...interface{}) {}
- func (l *NoopLogger) Info(format string, args ...interface{}) {}
- func (l *NoopLogger) Warn(format string, args ...interface{}) {}
- func (l *NoopLogger) Error(format string, args ...interface{}) {}
- // StateMachine interface that users must implement
- type StateMachine interface {
- // Apply applies a command to the state machine
- Apply(command []byte) (interface{}, error)
- // Snapshot returns a snapshot of the current state
- Snapshot() ([]byte, error)
- // Restore restores the state machine from a snapshot
- Restore(snapshot []byte) error
- }
- // ApplyMsg is sent to the application layer when a log entry is committed
- type ApplyMsg struct {
- CommandValid bool
- Command []byte
- CommandIndex uint64
- CommandTerm uint64
- // For snapshots
- SnapshotValid bool
- Snapshot []byte
- SnapshotIndex uint64
- SnapshotTerm uint64
- }
- // RequestVoteArgs is the arguments for RequestVote RPC
- type RequestVoteArgs struct {
- Term uint64 `json:"term"` // Candidate's term
- CandidateID string `json:"candidate_id"` // Candidate requesting vote
- LastLogIndex uint64 `json:"last_log_index"` // Index of candidate's last log entry
- LastLogTerm uint64 `json:"last_log_term"` // Term of candidate's last log entry
- PreVote bool `json:"pre_vote"` // True if this is a pre-vote request
- }
- // RequestVoteReply is the response for RequestVote RPC
- type RequestVoteReply struct {
- Term uint64 `json:"term"` // CurrentTerm, for candidate to update itself
- VoteGranted bool `json:"vote_granted"` // True means candidate received vote
- }
- // AppendEntriesArgs is the arguments for AppendEntries RPC
- type AppendEntriesArgs struct {
- Term uint64 `json:"term"` // Leader's term
- LeaderID string `json:"leader_id"` // So follower can redirect clients
- PrevLogIndex uint64 `json:"prev_log_index"` // Index of log entry immediately preceding new ones
- PrevLogTerm uint64 `json:"prev_log_term"` // Term of PrevLogIndex entry
- Entries []LogEntry `json:"entries"` // Log entries to store (empty for heartbeat)
- LeaderCommit uint64 `json:"leader_commit"` // Leader's commitIndex
- }
- // AppendEntriesReply is the response for AppendEntries RPC
- type AppendEntriesReply struct {
- Term uint64 `json:"term"` // CurrentTerm, for leader to update itself
- Success bool `json:"success"` // True if follower contained entry matching PrevLogIndex and PrevLogTerm
- // Optimization: help leader find correct NextIndex faster
- ConflictIndex uint64 `json:"conflict_index"` // Index of first entry with conflicting term
- ConflictTerm uint64 `json:"conflict_term"` // Term of conflicting entry
- }
- // InstallSnapshotArgs is the arguments for InstallSnapshot RPC
- type InstallSnapshotArgs struct {
- Term uint64 `json:"term"` // Leader's term
- LeaderID string `json:"leader_id"` // So follower can redirect clients
- LastIncludedIndex uint64 `json:"last_included_index"` // Snapshot replaces all entries up through this index
- LastIncludedTerm uint64 `json:"last_included_term"` // Term of LastIncludedIndex
- Offset uint64 `json:"offset"` // Byte offset for chunked transfer
- Data []byte `json:"data"` // Snapshot data (chunk)
- Done bool `json:"done"` // True if this is the last chunk
- }
- // InstallSnapshotReply is the response for InstallSnapshot RPC
- type InstallSnapshotReply struct {
- Term uint64 `json:"term"` // CurrentTerm, for leader to update itself
- Success bool `json:"success"` // True if chunk was accepted
- }
- // TimeoutNowArgs is the arguments for TimeoutNow RPC (leadership transfer)
- type TimeoutNowArgs struct {
- Term uint64 `json:"term"`
- LeaderID string `json:"leader_id"`
- }
- // TimeoutNowReply is the response for TimeoutNow RPC
- type TimeoutNowReply struct {
- Term uint64 `json:"term"`
- Success bool `json:"success"`
- }
- // ReadIndexArgs is the arguments for ReadIndex RPC
- type ReadIndexArgs struct {
- // Empty - just need to confirm leadership
- }
- // ReadIndexReply is the response for ReadIndex RPC
- type ReadIndexReply struct {
- ReadIndex uint64 `json:"read_index"`
- Success bool `json:"success"`
- Error string `json:"error,omitempty"`
- }
- // RPC message types for network communication
- type RPCType int
- const (
- RPCRequestVote RPCType = iota
- RPCAppendEntries
- RPCInstallSnapshot
- RPCPropose // For request forwarding
- RPCAddNode // For AddNode forwarding
- RPCRemoveNode // For RemoveNode forwarding
- RPCTimeoutNow // For leadership transfer
- RPCReadIndex // For linearizable reads
- RPCGet // For remote KV reads
- )
- // ProposeArgs is the arguments for Propose RPC (forwarding)
- type ProposeArgs struct {
- Command []byte `json:"command"`
- }
- // ProposeReply is the response for Propose RPC
- type ProposeReply struct {
- Success bool `json:"success"`
- Index uint64 `json:"index,omitempty"`
- Term uint64 `json:"term,omitempty"`
- Error string `json:"error,omitempty"`
- }
- // AddNodeArgs is the arguments for AddNode RPC (forwarding)
- type AddNodeArgs struct {
- NodeID string `json:"node_id"`
- Address string `json:"address"`
- }
- // AddNodeReply is the response for AddNode RPC
- type AddNodeReply struct {
- Success bool `json:"success"`
- Error string `json:"error,omitempty"`
- }
- // RemoveNodeArgs is the arguments for RemoveNode RPC (forwarding)
- type RemoveNodeArgs struct {
- NodeID string `json:"node_id"`
- }
- // RemoveNodeReply is the response for RemoveNode RPC
- type RemoveNodeReply struct {
- Success bool `json:"success"`
- Error string `json:"error,omitempty"`
- }
- // GetArgs is the arguments for Get RPC (for remote KV reads)
- type GetArgs struct {
- Key string `json:"key"`
- }
- // GetReply is the response for Get RPC
- type GetReply struct {
- Value string `json:"value,omitempty"`
- Found bool `json:"found"`
- Error string `json:"error,omitempty"`
- }
- // RPCMessage wraps all RPC types for network transmission
- type RPCMessage struct {
- Type RPCType `json:"type"`
- Payload interface{} `json:"payload"`
- }
- // RPCResponse wraps all RPC response types
- type RPCResponse struct {
- Type RPCType `json:"type"`
- Payload interface{} `json:"payload"`
- Error string `json:"error,omitempty"`
- }
|