package raft import ( "errors" "fmt" "sync" "time" ) // ==================== Custom Errors ==================== var ( // ErrNoLeader indicates no leader is available ErrNoLeader = errors.New("no leader available") // ErrNotLeader indicates this node is not the leader ErrNotLeader = errors.New("not leader") // ErrConfigInFlight indicates a configuration change is in progress ErrConfigInFlight = errors.New("configuration change in progress") // ErrTimeout indicates an operation timed out ErrTimeout = errors.New("operation timed out") // ErrShutdown indicates the raft node is shutting down ErrShutdown = errors.New("raft is shutting down") // ErrPersistFailed indicates persistent storage failed ErrPersistFailed = errors.New("failed to persist state") // ErrLeadershipLost indicates leadership was lost during operation ErrLeadershipLost = errors.New("leadership lost") ) // RaftError wraps errors with additional context type RaftError struct { Err error LeaderID string // Known leader, if any RetryIn time.Duration // Suggested retry delay } func (e *RaftError) Error() string { if e.LeaderID != "" { return fmt.Sprintf("%s (leader: %s)", e.Err, e.LeaderID) } return e.Err.Error() } func (e *RaftError) Unwrap() error { return e.Err } // NewRaftError creates a new RaftError func NewRaftError(err error, leaderID string, retryIn time.Duration) *RaftError { return &RaftError{ Err: err, LeaderID: leaderID, RetryIn: retryIn, } } // NodeState represents the current state of a Raft node type NodeState int const ( Follower NodeState = iota Candidate Leader ) func (s NodeState) String() string { switch s { case Follower: return "Follower" case Candidate: return "Candidate" case Leader: return "Leader" default: return "Unknown" } } // EntryType represents the type of a log entry type EntryType int const ( EntryNormal EntryType = iota // Normal command entry EntryConfig // Configuration change entry EntryNoop // No-op entry (used by leader to commit previous term entries) ) // LogEntry represents a single entry in the replicated log type LogEntry struct { Index uint64 `json:"index"` // Log index (1-based) Term uint64 `json:"term"` // Term when entry was received Type EntryType `json:"type,omitempty"` // Entry type (normal or config change) Command []byte `json:"command,omitempty"` // Command to be applied to state machine (for normal entries) Config *ClusterConfig `json:"config,omitempty"` // New cluster configuration (for config entries) } // ConfigChangeType represents the type of configuration change type ConfigChangeType int const ( ConfigAddNode ConfigChangeType = iota ConfigRemoveNode ) // ConfigChange represents a single node configuration change type ConfigChange struct { Type ConfigChangeType `json:"type"` // Add or remove NodeID string `json:"node_id"` // Node to add/remove Address string `json:"address"` // Node address (for add) } // ClusterConfig represents the cluster membership configuration type ClusterConfig struct { Nodes map[string]string `json:"nodes"` // NodeID -> Address mapping for all nodes including self } // PersistentState represents the persistent state on all servers // (Updated on stable storage before responding to RPCs) type PersistentState struct { CurrentTerm uint64 `json:"current_term"` // Latest term server has seen VotedFor string `json:"voted_for"` // CandidateId that received vote in current term } // VolatileState represents the volatile state on all servers type VolatileState struct { CommitIndex uint64 // Index of highest log entry known to be committed LastApplied uint64 // Index of highest log entry applied to state machine } // LeaderVolatileState represents volatile state on leaders // (Reinitialized after election) type LeaderVolatileState struct { NextIndex map[string]uint64 // For each server, index of the next log entry to send MatchIndex map[string]uint64 // For each server, index of highest log entry known to be replicated } // Config holds the configuration for a Raft node type Config struct { // NodeID is the unique identifier for this node NodeID string // Peers is the list of peer node addresses (excluding self) // Deprecated: Use ClusterNodes instead for dynamic membership Peers []string // PeerMap maps nodeID to address for all nodes (including self) // Used for request forwarding. Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002"} // Deprecated: Use ClusterNodes instead PeerMap map[string]string // ClusterNodes maps nodeID to address for all nodes (including self) // This is the canonical cluster membership configuration // Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002", "node3": "127.0.0.1:9003"} ClusterNodes map[string]string // ListenAddr is the address this node listens on ListenAddr string // DataDir is the directory for persistent storage DataDir string // ElectionTimeoutMin is the minimum election timeout ElectionTimeoutMin time.Duration // ElectionTimeoutMax is the maximum election timeout ElectionTimeoutMax time.Duration // HeartbeatInterval is the interval between heartbeats from leader HeartbeatInterval time.Duration // MaxLogEntriesPerRequest limits entries sent in a single AppendEntries MaxLogEntriesPerRequest int // MemoryLogCapacity is the maximum number of log entries to keep in memory MemoryLogCapacity int // LogCompactionEnabled determines if log compaction is enabled // If false, the log will grow indefinitely (safest for data, but consumes disk) // Default: false LogCompactionEnabled bool // SnapshotThreshold triggers snapshot when log grows beyond this SnapshotThreshold uint64 // SnapshotMinRetention is the minimum number of log entries to retain after compaction // This ensures followers have enough entries for catch-up without needing a full snapshot // Default: 1000 SnapshotMinRetention uint64 // SnapshotProvider is a callback function that returns the current state machine snapshot // If set, automatic log compaction will be enabled // The function should return the serialized state that can restore the state machine // minIncludeIndex: The snapshot MUST include all entries up to at least this index. SnapshotProvider func(minIncludeIndex uint64) ([]byte, error) // GetHandler is a callback function to handle remote Get requests // If set, clients can read values from this node via RPC GetHandler func(key string) (value string, found bool) // SnapshotChunkSize is the size of each chunk when transferring snapshots // Default: 1MB SnapshotChunkSize int // LastAppliedIndex is the index of the last log entry applied to the state machine // Used for initialization to avoid re-applying entries LastAppliedIndex uint64 // RPC Timeout configurations RPCTimeout time.Duration // Default: 500ms for normal RPCs SnapshotRPCTimeout time.Duration // Default: 30s for snapshot transfers ProposeTimeout time.Duration // Default: 3s for propose forwarding // Retry configurations MaxRetries int // Default: 3 RetryBackoff time.Duration // Default: 100ms // Batching configurations BatchMinWait time.Duration // Minimum wait time for batching (Default: 1ms) BatchMaxWait time.Duration // Maximum wait time for batching (Default: 10ms) // BatchMaxSize is the maximum batch size before forcing flush (Default: 100) BatchMaxSize int // EnableCLI determines if the CLI should be started automatically // Default: false EnableCLI bool // Logger for debug output Logger Logger } // Clone creates a deep copy of the config func (c *Config) Clone() *Config { clone := *c if c.Peers != nil { clone.Peers = make([]string, len(c.Peers)) copy(clone.Peers, c.Peers) } if c.PeerMap != nil { clone.PeerMap = make(map[string]string) for k, v := range c.PeerMap { clone.PeerMap[k] = v } } if c.ClusterNodes != nil { clone.ClusterNodes = make(map[string]string) for k, v := range c.ClusterNodes { clone.ClusterNodes[k] = v } } return &clone } // GetPeerAddresses returns the addresses of all peers (excluding self) func (c *Config) GetPeerAddresses() []string { if c.ClusterNodes != nil { peers := make([]string, 0, len(c.ClusterNodes)-1) for nodeID, addr := range c.ClusterNodes { if nodeID != c.NodeID { peers = append(peers, addr) } } return peers } return c.Peers } // GetClusterSize returns the total number of nodes in the cluster func (c *Config) GetClusterSize() int { if c.ClusterNodes != nil { return len(c.ClusterNodes) } return len(c.Peers) + 1 } // DefaultConfig returns a configuration with sensible defaults func DefaultConfig() *Config { return &Config{ ElectionTimeoutMin: 150 * time.Millisecond, ElectionTimeoutMax: 300 * time.Millisecond, HeartbeatInterval: 50 * time.Millisecond, MaxLogEntriesPerRequest: 5000, MemoryLogCapacity: 10000, LogCompactionEnabled: false, // 默认不启用压缩 SnapshotThreshold: 100000, // 默认阈值(仅在 LogCompactionEnabled=true 时生效) SnapshotMinRetention: 10000, // 保留1万条用于 follower 追赶 SnapshotChunkSize: 1024 * 1024, // 1MB chunks RPCTimeout: 500 * time.Millisecond, SnapshotRPCTimeout: 30 * time.Second, ProposeTimeout: 3 * time.Second, MaxRetries: 3, RetryBackoff: 100 * time.Millisecond, BatchMinWait: 1 * time.Millisecond, BatchMaxWait: 10 * time.Millisecond, BatchMaxSize: 100, EnableCLI: true, } } // ==================== Metrics ==================== // Metrics holds runtime metrics for monitoring type Metrics struct { // Term metrics Term uint64 `json:"term"` // Proposal metrics ProposalsTotal uint64 `json:"proposals_total"` ProposalsSuccess uint64 `json:"proposals_success"` ProposalsFailed uint64 `json:"proposals_failed"` ProposalsForwarded uint64 `json:"proposals_forwarded"` // Replication metrics AppendsSent uint64 `json:"appends_sent"` AppendsReceived uint64 `json:"appends_received"` AppendsSuccess uint64 `json:"appends_success"` AppendsFailed uint64 `json:"appends_failed"` // Election metrics ElectionsStarted uint64 `json:"elections_started"` ElectionsWon uint64 `json:"elections_won"` PreVotesStarted uint64 `json:"pre_votes_started"` PreVotesGranted uint64 `json:"pre_votes_granted"` // Snapshot metrics SnapshotsTaken uint64 `json:"snapshots_taken"` SnapshotsInstalled uint64 `json:"snapshots_installed"` SnapshotsSent uint64 `json:"snapshots_sent"` // Read metrics ReadIndexRequests uint64 `json:"read_index_requests"` ReadIndexSuccess uint64 `json:"read_index_success"` // Leadership transfer metrics LeadershipTransfers uint64 `json:"leadership_transfers"` LeadershipTransferSuccess uint64 `json:"leadership_transfer_success"` } // HealthStatus represents the health status of a Raft node type HealthStatus struct { NodeID string `json:"node_id"` State string `json:"state"` Term uint64 `json:"term"` LeaderID string `json:"leader_id"` ClusterSize int `json:"cluster_size"` ClusterNodes map[string]string `json:"cluster_nodes"` CommitIndex uint64 `json:"commit_index"` LastApplied uint64 `json:"last_applied"` LogBehind uint64 `json:"log_behind"` // commitIndex - lastApplied LastHeartbeat time.Time `json:"last_heartbeat"` IsHealthy bool `json:"is_healthy"` Uptime time.Duration `json:"uptime"` } // Logger interface for logging type Logger interface { Debug(format string, args ...interface{}) Info(format string, args ...interface{}) Warn(format string, args ...interface{}) Error(format string, args ...interface{}) } // DefaultLogger implements a simple console logger type DefaultLogger struct { Prefix string mu sync.Mutex } func (l *DefaultLogger) log(level, format string, args ...interface{}) { l.mu.Lock() defer l.mu.Unlock() // Use fmt.Printf with timestamp // Commented out to avoid import cycle, implemented in raft.go } func (l *DefaultLogger) Debug(format string, args ...interface{}) {} func (l *DefaultLogger) Info(format string, args ...interface{}) {} func (l *DefaultLogger) Warn(format string, args ...interface{}) {} func (l *DefaultLogger) Error(format string, args ...interface{}) {} // NoopLogger implements a no-op logger type NoopLogger struct{} func (l *NoopLogger) Debug(format string, args ...interface{}) {} func (l *NoopLogger) Info(format string, args ...interface{}) {} func (l *NoopLogger) Warn(format string, args ...interface{}) {} func (l *NoopLogger) Error(format string, args ...interface{}) {} // StateMachine interface that users must implement type StateMachine interface { // Apply applies a command to the state machine Apply(command []byte) (interface{}, error) // Snapshot returns a snapshot of the current state Snapshot() ([]byte, error) // Restore restores the state machine from a snapshot Restore(snapshot []byte) error } // ApplyMsg is sent to the application layer when a log entry is committed type ApplyMsg struct { CommandValid bool Command []byte CommandIndex uint64 CommandTerm uint64 // For snapshots SnapshotValid bool Snapshot []byte SnapshotIndex uint64 SnapshotTerm uint64 } // RequestVoteArgs is the arguments for RequestVote RPC type RequestVoteArgs struct { Term uint64 `json:"term"` // Candidate's term CandidateID string `json:"candidate_id"` // Candidate requesting vote LastLogIndex uint64 `json:"last_log_index"` // Index of candidate's last log entry LastLogTerm uint64 `json:"last_log_term"` // Term of candidate's last log entry PreVote bool `json:"pre_vote"` // True if this is a pre-vote request } // RequestVoteReply is the response for RequestVote RPC type RequestVoteReply struct { Term uint64 `json:"term"` // CurrentTerm, for candidate to update itself VoteGranted bool `json:"vote_granted"` // True means candidate received vote } // AppendEntriesArgs is the arguments for AppendEntries RPC type AppendEntriesArgs struct { Term uint64 `json:"term"` // Leader's term LeaderID string `json:"leader_id"` // So follower can redirect clients PrevLogIndex uint64 `json:"prev_log_index"` // Index of log entry immediately preceding new ones PrevLogTerm uint64 `json:"prev_log_term"` // Term of PrevLogIndex entry Entries []LogEntry `json:"entries"` // Log entries to store (empty for heartbeat) LeaderCommit uint64 `json:"leader_commit"` // Leader's commitIndex } // AppendEntriesReply is the response for AppendEntries RPC type AppendEntriesReply struct { Term uint64 `json:"term"` // CurrentTerm, for leader to update itself Success bool `json:"success"` // True if follower contained entry matching PrevLogIndex and PrevLogTerm // Optimization: help leader find correct NextIndex faster ConflictIndex uint64 `json:"conflict_index"` // Index of first entry with conflicting term ConflictTerm uint64 `json:"conflict_term"` // Term of conflicting entry } // InstallSnapshotArgs is the arguments for InstallSnapshot RPC type InstallSnapshotArgs struct { Term uint64 `json:"term"` // Leader's term LeaderID string `json:"leader_id"` // So follower can redirect clients LastIncludedIndex uint64 `json:"last_included_index"` // Snapshot replaces all entries up through this index LastIncludedTerm uint64 `json:"last_included_term"` // Term of LastIncludedIndex Offset uint64 `json:"offset"` // Byte offset for chunked transfer Data []byte `json:"data"` // Snapshot data (chunk) Done bool `json:"done"` // True if this is the last chunk } // InstallSnapshotReply is the response for InstallSnapshot RPC type InstallSnapshotReply struct { Term uint64 `json:"term"` // CurrentTerm, for leader to update itself Success bool `json:"success"` // True if chunk was accepted } // TimeoutNowArgs is the arguments for TimeoutNow RPC (leadership transfer) type TimeoutNowArgs struct { Term uint64 `json:"term"` LeaderID string `json:"leader_id"` } // TimeoutNowReply is the response for TimeoutNow RPC type TimeoutNowReply struct { Term uint64 `json:"term"` Success bool `json:"success"` } // ReadIndexArgs is the arguments for ReadIndex RPC type ReadIndexArgs struct { // Empty - just need to confirm leadership } // ReadIndexReply is the response for ReadIndex RPC type ReadIndexReply struct { ReadIndex uint64 `json:"read_index"` Success bool `json:"success"` Error string `json:"error,omitempty"` } // RPC message types for network communication type RPCType int const ( RPCRequestVote RPCType = iota RPCAppendEntries RPCInstallSnapshot RPCPropose // For request forwarding RPCAddNode // For AddNode forwarding RPCRemoveNode // For RemoveNode forwarding RPCTimeoutNow // For leadership transfer RPCReadIndex // For linearizable reads RPCGet // For remote KV reads ) // ProposeArgs is the arguments for Propose RPC (forwarding) type ProposeArgs struct { Command []byte `json:"command"` } // ProposeReply is the response for Propose RPC type ProposeReply struct { Success bool `json:"success"` Index uint64 `json:"index,omitempty"` Term uint64 `json:"term,omitempty"` Error string `json:"error,omitempty"` } // AddNodeArgs is the arguments for AddNode RPC (forwarding) type AddNodeArgs struct { NodeID string `json:"node_id"` Address string `json:"address"` } // AddNodeReply is the response for AddNode RPC type AddNodeReply struct { Success bool `json:"success"` Error string `json:"error,omitempty"` } // RemoveNodeArgs is the arguments for RemoveNode RPC (forwarding) type RemoveNodeArgs struct { NodeID string `json:"node_id"` } // RemoveNodeReply is the response for RemoveNode RPC type RemoveNodeReply struct { Success bool `json:"success"` Error string `json:"error,omitempty"` } // GetArgs is the arguments for Get RPC (for remote KV reads) type GetArgs struct { Key string `json:"key"` } // GetReply is the response for Get RPC type GetReply struct { Value string `json:"value,omitempty"` Found bool `json:"found"` Error string `json:"error,omitempty"` } // RPCMessage wraps all RPC types for network transmission type RPCMessage struct { Type RPCType `json:"type"` Payload interface{} `json:"payload"` } // RPCResponse wraps all RPC response types type RPCResponse struct { Type RPCType `json:"type"` Payload interface{} `json:"payload"` Error string `json:"error,omitempty"` }