types.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. package raft
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. // ==================== Custom Errors ====================
  9. var (
  10. // ErrNoLeader indicates no leader is available
  11. ErrNoLeader = errors.New("no leader available")
  12. // ErrNotLeader indicates this node is not the leader
  13. ErrNotLeader = errors.New("not leader")
  14. // ErrConfigInFlight indicates a configuration change is in progress
  15. ErrConfigInFlight = errors.New("configuration change in progress")
  16. // ErrTimeout indicates an operation timed out
  17. ErrTimeout = errors.New("operation timed out")
  18. // ErrShutdown indicates the raft node is shutting down
  19. ErrShutdown = errors.New("raft is shutting down")
  20. // ErrPersistFailed indicates persistent storage failed
  21. ErrPersistFailed = errors.New("failed to persist state")
  22. // ErrLeadershipLost indicates leadership was lost during operation
  23. ErrLeadershipLost = errors.New("leadership lost")
  24. )
  25. // RaftError wraps errors with additional context
  26. type RaftError struct {
  27. Err error
  28. LeaderID string // Known leader, if any
  29. RetryIn time.Duration // Suggested retry delay
  30. }
  31. func (e *RaftError) Error() string {
  32. if e.LeaderID != "" {
  33. return fmt.Sprintf("%s (leader: %s)", e.Err, e.LeaderID)
  34. }
  35. return e.Err.Error()
  36. }
  37. func (e *RaftError) Unwrap() error {
  38. return e.Err
  39. }
  40. // NewRaftError creates a new RaftError
  41. func NewRaftError(err error, leaderID string, retryIn time.Duration) *RaftError {
  42. return &RaftError{
  43. Err: err,
  44. LeaderID: leaderID,
  45. RetryIn: retryIn,
  46. }
  47. }
  48. // NodeState represents the current state of a Raft node
  49. type NodeState int
  50. const (
  51. Follower NodeState = iota
  52. Candidate
  53. Leader
  54. )
  55. func (s NodeState) String() string {
  56. switch s {
  57. case Follower:
  58. return "Follower"
  59. case Candidate:
  60. return "Candidate"
  61. case Leader:
  62. return "Leader"
  63. default:
  64. return "Unknown"
  65. }
  66. }
  67. // EntryType represents the type of a log entry
  68. type EntryType int
  69. const (
  70. EntryNormal EntryType = iota // Normal command entry
  71. EntryConfig // Configuration change entry
  72. EntryNoop // No-op entry (used by leader to commit previous term entries)
  73. )
  74. // LogEntry represents a single entry in the replicated log
  75. type LogEntry struct {
  76. Index uint64 `json:"index"` // Log index (1-based)
  77. Term uint64 `json:"term"` // Term when entry was received
  78. Type EntryType `json:"type,omitempty"` // Entry type (normal or config change)
  79. Command []byte `json:"command,omitempty"` // Command to be applied to state machine (for normal entries)
  80. Config *ClusterConfig `json:"config,omitempty"` // New cluster configuration (for config entries)
  81. }
  82. // ConfigChangeType represents the type of configuration change
  83. type ConfigChangeType int
  84. const (
  85. ConfigAddNode ConfigChangeType = iota
  86. ConfigRemoveNode
  87. )
  88. // ConfigChange represents a single node configuration change
  89. type ConfigChange struct {
  90. Type ConfigChangeType `json:"type"` // Add or remove
  91. NodeID string `json:"node_id"` // Node to add/remove
  92. Address string `json:"address"` // Node address (for add)
  93. }
  94. // ClusterConfig represents the cluster membership configuration
  95. type ClusterConfig struct {
  96. Nodes map[string]string `json:"nodes"` // NodeID -> Address mapping for all nodes including self
  97. }
  98. // PersistentState represents the persistent state on all servers
  99. // (Updated on stable storage before responding to RPCs)
  100. type PersistentState struct {
  101. CurrentTerm uint64 `json:"current_term"` // Latest term server has seen
  102. VotedFor string `json:"voted_for"` // CandidateId that received vote in current term
  103. }
  104. // VolatileState represents the volatile state on all servers
  105. type VolatileState struct {
  106. CommitIndex uint64 // Index of highest log entry known to be committed
  107. LastApplied uint64 // Index of highest log entry applied to state machine
  108. }
  109. // LeaderVolatileState represents volatile state on leaders
  110. // (Reinitialized after election)
  111. type LeaderVolatileState struct {
  112. NextIndex map[string]uint64 // For each server, index of the next log entry to send
  113. MatchIndex map[string]uint64 // For each server, index of highest log entry known to be replicated
  114. }
  115. // Config holds the configuration for a Raft node
  116. type Config struct {
  117. // NodeID is the unique identifier for this node
  118. NodeID string
  119. // Peers is the list of peer node addresses (excluding self)
  120. // Deprecated: Use ClusterNodes instead for dynamic membership
  121. Peers []string
  122. // PeerMap maps nodeID to address for all nodes (including self)
  123. // Used for request forwarding. Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002"}
  124. // Deprecated: Use ClusterNodes instead
  125. PeerMap map[string]string
  126. // ClusterNodes maps nodeID to address for all nodes (including self)
  127. // This is the canonical cluster membership configuration
  128. // Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002", "node3": "127.0.0.1:9003"}
  129. ClusterNodes map[string]string
  130. // ListenAddr is the address this node listens on
  131. ListenAddr string
  132. // DataDir is the directory for persistent storage
  133. DataDir string
  134. // ElectionTimeoutMin is the minimum election timeout
  135. ElectionTimeoutMin time.Duration
  136. // ElectionTimeoutMax is the maximum election timeout
  137. ElectionTimeoutMax time.Duration
  138. // HeartbeatInterval is the interval between heartbeats from leader
  139. HeartbeatInterval time.Duration
  140. // MaxLogEntriesPerRequest limits entries sent in a single AppendEntries
  141. MaxLogEntriesPerRequest int
  142. // MemoryLogCapacity is the maximum number of log entries to keep in memory
  143. MemoryLogCapacity int
  144. // LogCompactionEnabled determines if log compaction is enabled
  145. // If false, the log will grow indefinitely (safest for data, but consumes disk)
  146. // Default: false
  147. LogCompactionEnabled bool
  148. // SnapshotThreshold triggers snapshot when log grows beyond this
  149. SnapshotThreshold uint64
  150. // SnapshotMinRetention is the minimum number of log entries to retain after compaction
  151. // This ensures followers have enough entries for catch-up without needing a full snapshot
  152. // Default: 1000
  153. SnapshotMinRetention uint64
  154. // SnapshotProvider is a callback function that returns the current state machine snapshot
  155. // If set, automatic log compaction will be enabled
  156. // The function should return the serialized state that can restore the state machine
  157. // minIncludeIndex: The snapshot MUST include all entries up to at least this index.
  158. SnapshotProvider func(minIncludeIndex uint64) ([]byte, error)
  159. // GetHandler is a callback function to handle remote Get requests
  160. // If set, clients can read values from this node via RPC
  161. GetHandler func(key string) (value string, found bool)
  162. // SnapshotChunkSize is the size of each chunk when transferring snapshots
  163. // Default: 1MB
  164. SnapshotChunkSize int
  165. // LastAppliedIndex is the index of the last log entry applied to the state machine
  166. // Used for initialization to avoid re-applying entries
  167. LastAppliedIndex uint64
  168. // RPC Timeout configurations
  169. RPCTimeout time.Duration // Default: 500ms for normal RPCs
  170. SnapshotRPCTimeout time.Duration // Default: 30s for snapshot transfers
  171. ProposeTimeout time.Duration // Default: 3s for propose forwarding
  172. // Retry configurations
  173. MaxRetries int // Default: 3
  174. RetryBackoff time.Duration // Default: 100ms
  175. // Batching configurations
  176. BatchMinWait time.Duration // Minimum wait time for batching (Default: 1ms)
  177. BatchMaxWait time.Duration // Maximum wait time for batching (Default: 10ms)
  178. BatchMaxSize int // Maximum batch size before forcing flush (Default: 100)
  179. // Logger for debug output
  180. Logger Logger
  181. }
  182. // Clone creates a deep copy of the config
  183. func (c *Config) Clone() *Config {
  184. clone := *c
  185. if c.Peers != nil {
  186. clone.Peers = make([]string, len(c.Peers))
  187. copy(clone.Peers, c.Peers)
  188. }
  189. if c.PeerMap != nil {
  190. clone.PeerMap = make(map[string]string)
  191. for k, v := range c.PeerMap {
  192. clone.PeerMap[k] = v
  193. }
  194. }
  195. if c.ClusterNodes != nil {
  196. clone.ClusterNodes = make(map[string]string)
  197. for k, v := range c.ClusterNodes {
  198. clone.ClusterNodes[k] = v
  199. }
  200. }
  201. return &clone
  202. }
  203. // GetPeerAddresses returns the addresses of all peers (excluding self)
  204. func (c *Config) GetPeerAddresses() []string {
  205. if c.ClusterNodes != nil {
  206. peers := make([]string, 0, len(c.ClusterNodes)-1)
  207. for nodeID, addr := range c.ClusterNodes {
  208. if nodeID != c.NodeID {
  209. peers = append(peers, addr)
  210. }
  211. }
  212. return peers
  213. }
  214. return c.Peers
  215. }
  216. // GetClusterSize returns the total number of nodes in the cluster
  217. func (c *Config) GetClusterSize() int {
  218. if c.ClusterNodes != nil {
  219. return len(c.ClusterNodes)
  220. }
  221. return len(c.Peers) + 1
  222. }
  223. // DefaultConfig returns a configuration with sensible defaults
  224. func DefaultConfig() *Config {
  225. return &Config{
  226. ElectionTimeoutMin: 150 * time.Millisecond,
  227. ElectionTimeoutMax: 300 * time.Millisecond,
  228. HeartbeatInterval: 50 * time.Millisecond,
  229. MaxLogEntriesPerRequest: 5000,
  230. MemoryLogCapacity: 10000,
  231. LogCompactionEnabled: false, // 默认不启用压缩
  232. SnapshotThreshold: 100000, // 默认阈值(仅在 LogCompactionEnabled=true 时生效)
  233. SnapshotMinRetention: 10000, // 保留1万条用于 follower 追赶
  234. SnapshotChunkSize: 1024 * 1024, // 1MB chunks
  235. RPCTimeout: 500 * time.Millisecond,
  236. SnapshotRPCTimeout: 30 * time.Second,
  237. ProposeTimeout: 3 * time.Second,
  238. MaxRetries: 3,
  239. RetryBackoff: 100 * time.Millisecond,
  240. BatchMinWait: 1 * time.Millisecond,
  241. BatchMaxWait: 10 * time.Millisecond,
  242. BatchMaxSize: 100,
  243. }
  244. }
  245. // ==================== Metrics ====================
  246. // Metrics holds runtime metrics for monitoring
  247. type Metrics struct {
  248. // Term metrics
  249. Term uint64 `json:"term"`
  250. // Proposal metrics
  251. ProposalsTotal uint64 `json:"proposals_total"`
  252. ProposalsSuccess uint64 `json:"proposals_success"`
  253. ProposalsFailed uint64 `json:"proposals_failed"`
  254. ProposalsForwarded uint64 `json:"proposals_forwarded"`
  255. // Replication metrics
  256. AppendsSent uint64 `json:"appends_sent"`
  257. AppendsReceived uint64 `json:"appends_received"`
  258. AppendsSuccess uint64 `json:"appends_success"`
  259. AppendsFailed uint64 `json:"appends_failed"`
  260. // Election metrics
  261. ElectionsStarted uint64 `json:"elections_started"`
  262. ElectionsWon uint64 `json:"elections_won"`
  263. PreVotesStarted uint64 `json:"pre_votes_started"`
  264. PreVotesGranted uint64 `json:"pre_votes_granted"`
  265. // Snapshot metrics
  266. SnapshotsTaken uint64 `json:"snapshots_taken"`
  267. SnapshotsInstalled uint64 `json:"snapshots_installed"`
  268. SnapshotsSent uint64 `json:"snapshots_sent"`
  269. // Read metrics
  270. ReadIndexRequests uint64 `json:"read_index_requests"`
  271. ReadIndexSuccess uint64 `json:"read_index_success"`
  272. // Leadership transfer metrics
  273. LeadershipTransfers uint64 `json:"leadership_transfers"`
  274. LeadershipTransferSuccess uint64 `json:"leadership_transfer_success"`
  275. }
  276. // HealthStatus represents the health status of a Raft node
  277. type HealthStatus struct {
  278. NodeID string `json:"node_id"`
  279. State string `json:"state"`
  280. Term uint64 `json:"term"`
  281. LeaderID string `json:"leader_id"`
  282. ClusterSize int `json:"cluster_size"`
  283. ClusterNodes map[string]string `json:"cluster_nodes"`
  284. CommitIndex uint64 `json:"commit_index"`
  285. LastApplied uint64 `json:"last_applied"`
  286. LogBehind uint64 `json:"log_behind"` // commitIndex - lastApplied
  287. LastHeartbeat time.Time `json:"last_heartbeat"`
  288. IsHealthy bool `json:"is_healthy"`
  289. Uptime time.Duration `json:"uptime"`
  290. }
  291. // Logger interface for logging
  292. type Logger interface {
  293. Debug(format string, args ...interface{})
  294. Info(format string, args ...interface{})
  295. Warn(format string, args ...interface{})
  296. Error(format string, args ...interface{})
  297. }
  298. // DefaultLogger implements a simple console logger
  299. type DefaultLogger struct {
  300. Prefix string
  301. mu sync.Mutex
  302. }
  303. func (l *DefaultLogger) log(level, format string, args ...interface{}) {
  304. l.mu.Lock()
  305. defer l.mu.Unlock()
  306. // Use fmt.Printf with timestamp
  307. // Commented out to avoid import cycle, implemented in raft.go
  308. }
  309. func (l *DefaultLogger) Debug(format string, args ...interface{}) {}
  310. func (l *DefaultLogger) Info(format string, args ...interface{}) {}
  311. func (l *DefaultLogger) Warn(format string, args ...interface{}) {}
  312. func (l *DefaultLogger) Error(format string, args ...interface{}) {}
  313. // NoopLogger implements a no-op logger
  314. type NoopLogger struct{}
  315. func (l *NoopLogger) Debug(format string, args ...interface{}) {}
  316. func (l *NoopLogger) Info(format string, args ...interface{}) {}
  317. func (l *NoopLogger) Warn(format string, args ...interface{}) {}
  318. func (l *NoopLogger) Error(format string, args ...interface{}) {}
  319. // StateMachine interface that users must implement
  320. type StateMachine interface {
  321. // Apply applies a command to the state machine
  322. Apply(command []byte) (interface{}, error)
  323. // Snapshot returns a snapshot of the current state
  324. Snapshot() ([]byte, error)
  325. // Restore restores the state machine from a snapshot
  326. Restore(snapshot []byte) error
  327. }
  328. // ApplyMsg is sent to the application layer when a log entry is committed
  329. type ApplyMsg struct {
  330. CommandValid bool
  331. Command []byte
  332. CommandIndex uint64
  333. CommandTerm uint64
  334. // For snapshots
  335. SnapshotValid bool
  336. Snapshot []byte
  337. SnapshotIndex uint64
  338. SnapshotTerm uint64
  339. }
  340. // RequestVoteArgs is the arguments for RequestVote RPC
  341. type RequestVoteArgs struct {
  342. Term uint64 `json:"term"` // Candidate's term
  343. CandidateID string `json:"candidate_id"` // Candidate requesting vote
  344. LastLogIndex uint64 `json:"last_log_index"` // Index of candidate's last log entry
  345. LastLogTerm uint64 `json:"last_log_term"` // Term of candidate's last log entry
  346. PreVote bool `json:"pre_vote"` // True if this is a pre-vote request
  347. }
  348. // RequestVoteReply is the response for RequestVote RPC
  349. type RequestVoteReply struct {
  350. Term uint64 `json:"term"` // CurrentTerm, for candidate to update itself
  351. VoteGranted bool `json:"vote_granted"` // True means candidate received vote
  352. }
  353. // AppendEntriesArgs is the arguments for AppendEntries RPC
  354. type AppendEntriesArgs struct {
  355. Term uint64 `json:"term"` // Leader's term
  356. LeaderID string `json:"leader_id"` // So follower can redirect clients
  357. PrevLogIndex uint64 `json:"prev_log_index"` // Index of log entry immediately preceding new ones
  358. PrevLogTerm uint64 `json:"prev_log_term"` // Term of PrevLogIndex entry
  359. Entries []LogEntry `json:"entries"` // Log entries to store (empty for heartbeat)
  360. LeaderCommit uint64 `json:"leader_commit"` // Leader's commitIndex
  361. }
  362. // AppendEntriesReply is the response for AppendEntries RPC
  363. type AppendEntriesReply struct {
  364. Term uint64 `json:"term"` // CurrentTerm, for leader to update itself
  365. Success bool `json:"success"` // True if follower contained entry matching PrevLogIndex and PrevLogTerm
  366. // Optimization: help leader find correct NextIndex faster
  367. ConflictIndex uint64 `json:"conflict_index"` // Index of first entry with conflicting term
  368. ConflictTerm uint64 `json:"conflict_term"` // Term of conflicting entry
  369. }
  370. // InstallSnapshotArgs is the arguments for InstallSnapshot RPC
  371. type InstallSnapshotArgs struct {
  372. Term uint64 `json:"term"` // Leader's term
  373. LeaderID string `json:"leader_id"` // So follower can redirect clients
  374. LastIncludedIndex uint64 `json:"last_included_index"` // Snapshot replaces all entries up through this index
  375. LastIncludedTerm uint64 `json:"last_included_term"` // Term of LastIncludedIndex
  376. Offset uint64 `json:"offset"` // Byte offset for chunked transfer
  377. Data []byte `json:"data"` // Snapshot data (chunk)
  378. Done bool `json:"done"` // True if this is the last chunk
  379. }
  380. // InstallSnapshotReply is the response for InstallSnapshot RPC
  381. type InstallSnapshotReply struct {
  382. Term uint64 `json:"term"` // CurrentTerm, for leader to update itself
  383. Success bool `json:"success"` // True if chunk was accepted
  384. }
  385. // TimeoutNowArgs is the arguments for TimeoutNow RPC (leadership transfer)
  386. type TimeoutNowArgs struct {
  387. Term uint64 `json:"term"`
  388. LeaderID string `json:"leader_id"`
  389. }
  390. // TimeoutNowReply is the response for TimeoutNow RPC
  391. type TimeoutNowReply struct {
  392. Term uint64 `json:"term"`
  393. Success bool `json:"success"`
  394. }
  395. // ReadIndexArgs is the arguments for ReadIndex RPC
  396. type ReadIndexArgs struct {
  397. // Empty - just need to confirm leadership
  398. }
  399. // ReadIndexReply is the response for ReadIndex RPC
  400. type ReadIndexReply struct {
  401. ReadIndex uint64 `json:"read_index"`
  402. Success bool `json:"success"`
  403. Error string `json:"error,omitempty"`
  404. }
  405. // RPC message types for network communication
  406. type RPCType int
  407. const (
  408. RPCRequestVote RPCType = iota
  409. RPCAppendEntries
  410. RPCInstallSnapshot
  411. RPCPropose // For request forwarding
  412. RPCAddNode // For AddNode forwarding
  413. RPCRemoveNode // For RemoveNode forwarding
  414. RPCTimeoutNow // For leadership transfer
  415. RPCReadIndex // For linearizable reads
  416. RPCGet // For remote KV reads
  417. )
  418. // ProposeArgs is the arguments for Propose RPC (forwarding)
  419. type ProposeArgs struct {
  420. Command []byte `json:"command"`
  421. }
  422. // ProposeReply is the response for Propose RPC
  423. type ProposeReply struct {
  424. Success bool `json:"success"`
  425. Index uint64 `json:"index,omitempty"`
  426. Term uint64 `json:"term,omitempty"`
  427. Error string `json:"error,omitempty"`
  428. }
  429. // AddNodeArgs is the arguments for AddNode RPC (forwarding)
  430. type AddNodeArgs struct {
  431. NodeID string `json:"node_id"`
  432. Address string `json:"address"`
  433. }
  434. // AddNodeReply is the response for AddNode RPC
  435. type AddNodeReply struct {
  436. Success bool `json:"success"`
  437. Error string `json:"error,omitempty"`
  438. }
  439. // RemoveNodeArgs is the arguments for RemoveNode RPC (forwarding)
  440. type RemoveNodeArgs struct {
  441. NodeID string `json:"node_id"`
  442. }
  443. // RemoveNodeReply is the response for RemoveNode RPC
  444. type RemoveNodeReply struct {
  445. Success bool `json:"success"`
  446. Error string `json:"error,omitempty"`
  447. }
  448. // GetArgs is the arguments for Get RPC (for remote KV reads)
  449. type GetArgs struct {
  450. Key string `json:"key"`
  451. }
  452. // GetReply is the response for Get RPC
  453. type GetReply struct {
  454. Value string `json:"value,omitempty"`
  455. Found bool `json:"found"`
  456. Error string `json:"error,omitempty"`
  457. }
  458. // RPCMessage wraps all RPC types for network transmission
  459. type RPCMessage struct {
  460. Type RPCType `json:"type"`
  461. Payload interface{} `json:"payload"`
  462. }
  463. // RPCResponse wraps all RPC response types
  464. type RPCResponse struct {
  465. Type RPCType `json:"type"`
  466. Payload interface{} `json:"payload"`
  467. Error string `json:"error,omitempty"`
  468. }