types.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
  1. package raft
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. // ==================== Custom Errors ====================
  9. var (
  10. // ErrNoLeader indicates no leader is available
  11. ErrNoLeader = errors.New("no leader available")
  12. // ErrNotLeader indicates this node is not the leader
  13. ErrNotLeader = errors.New("not leader")
  14. // ErrConfigInFlight indicates a configuration change is in progress
  15. ErrConfigInFlight = errors.New("configuration change in progress")
  16. // ErrTimeout indicates an operation timed out
  17. ErrTimeout = errors.New("operation timed out")
  18. // ErrShutdown indicates the raft node is shutting down
  19. ErrShutdown = errors.New("raft is shutting down")
  20. // ErrPersistFailed indicates persistent storage failed
  21. ErrPersistFailed = errors.New("failed to persist state")
  22. // ErrLeadershipLost indicates leadership was lost during operation
  23. ErrLeadershipLost = errors.New("leadership lost")
  24. )
  25. // RaftError wraps errors with additional context
  26. type RaftError struct {
  27. Err error
  28. LeaderID string // Known leader, if any
  29. RetryIn time.Duration // Suggested retry delay
  30. }
  31. func (e *RaftError) Error() string {
  32. if e.LeaderID != "" {
  33. return fmt.Sprintf("%s (leader: %s)", e.Err, e.LeaderID)
  34. }
  35. return e.Err.Error()
  36. }
  37. func (e *RaftError) Unwrap() error {
  38. return e.Err
  39. }
  40. // NewRaftError creates a new RaftError
  41. func NewRaftError(err error, leaderID string, retryIn time.Duration) *RaftError {
  42. return &RaftError{
  43. Err: err,
  44. LeaderID: leaderID,
  45. RetryIn: retryIn,
  46. }
  47. }
  48. // NodeState represents the current state of a Raft node
  49. type NodeState int
  50. const (
  51. Follower NodeState = iota
  52. Candidate
  53. Leader
  54. )
  55. func (s NodeState) String() string {
  56. switch s {
  57. case Follower:
  58. return "Follower"
  59. case Candidate:
  60. return "Candidate"
  61. case Leader:
  62. return "Leader"
  63. default:
  64. return "Unknown"
  65. }
  66. }
  67. // EntryType represents the type of a log entry
  68. type EntryType int
  69. const (
  70. EntryNormal EntryType = iota // Normal command entry
  71. EntryConfig // Configuration change entry
  72. EntryNoop // No-op entry (used by leader to commit previous term entries)
  73. )
  74. // LogEntry represents a single entry in the replicated log
  75. type LogEntry struct {
  76. Index uint64 `json:"index"` // Log index (1-based)
  77. Term uint64 `json:"term"` // Term when entry was received
  78. Type EntryType `json:"type,omitempty"` // Entry type (normal or config change)
  79. Command []byte `json:"command,omitempty"` // Command to be applied to state machine (for normal entries)
  80. Config *ClusterConfig `json:"config,omitempty"` // New cluster configuration (for config entries)
  81. }
  82. // ConfigChangeType represents the type of configuration change
  83. type ConfigChangeType int
  84. const (
  85. ConfigAddNode ConfigChangeType = iota
  86. ConfigRemoveNode
  87. )
  88. // ConfigChange represents a single node configuration change
  89. type ConfigChange struct {
  90. Type ConfigChangeType `json:"type"` // Add or remove
  91. NodeID string `json:"node_id"` // Node to add/remove
  92. Address string `json:"address"` // Node address (for add)
  93. }
  94. // ClusterConfig represents the cluster membership configuration
  95. type ClusterConfig struct {
  96. Nodes map[string]string `json:"nodes"` // NodeID -> Address mapping for all nodes including self
  97. }
  98. // PersistentState represents the persistent state on all servers
  99. // (Updated on stable storage before responding to RPCs)
  100. type PersistentState struct {
  101. CurrentTerm uint64 `json:"current_term"` // Latest term server has seen
  102. VotedFor string `json:"voted_for"` // CandidateId that received vote in current term
  103. }
  104. // VolatileState represents the volatile state on all servers
  105. type VolatileState struct {
  106. CommitIndex uint64 // Index of highest log entry known to be committed
  107. LastApplied uint64 // Index of highest log entry applied to state machine
  108. }
  109. // LeaderVolatileState represents volatile state on leaders
  110. // (Reinitialized after election)
  111. type LeaderVolatileState struct {
  112. NextIndex map[string]uint64 // For each server, index of the next log entry to send
  113. MatchIndex map[string]uint64 // For each server, index of highest log entry known to be replicated
  114. }
  115. // Config holds the configuration for a Raft node
  116. type Config struct {
  117. // NodeID is the unique identifier for this node
  118. NodeID string
  119. // Peers is the list of peer node addresses (excluding self)
  120. // Deprecated: Use ClusterNodes instead for dynamic membership
  121. Peers []string
  122. // PeerMap maps nodeID to address for all nodes (including self)
  123. // Used for request forwarding. Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002"}
  124. // Deprecated: Use ClusterNodes instead
  125. PeerMap map[string]string
  126. // ClusterNodes maps nodeID to address for all nodes (including self)
  127. // This is the canonical cluster membership configuration
  128. // Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002", "node3": "127.0.0.1:9003"}
  129. ClusterNodes map[string]string
  130. // ListenAddr is the address this node listens on
  131. ListenAddr string
  132. // DataDir is the directory for persistent storage
  133. DataDir string
  134. // ElectionTimeoutMin is the minimum election timeout
  135. ElectionTimeoutMin time.Duration
  136. // ElectionTimeoutMax is the maximum election timeout
  137. ElectionTimeoutMax time.Duration
  138. // HeartbeatInterval is the interval between heartbeats from leader
  139. HeartbeatInterval time.Duration
  140. // MaxLogEntriesPerRequest limits entries sent in a single AppendEntries
  141. MaxLogEntriesPerRequest int
  142. // MemoryLogCapacity is the maximum number of log entries to keep in memory
  143. MemoryLogCapacity int
  144. // LogCompactionEnabled determines if log compaction is enabled
  145. // If false, the log will grow indefinitely (safest for data, but consumes disk)
  146. // Default: false
  147. LogCompactionEnabled bool
  148. // SnapshotThreshold triggers snapshot when log grows beyond this
  149. SnapshotThreshold uint64
  150. // SnapshotMinRetention is the minimum number of log entries to retain after compaction
  151. // This ensures followers have enough entries for catch-up without needing a full snapshot
  152. // Default: 1000
  153. SnapshotMinRetention uint64
  154. // SnapshotProvider is a callback function that returns the current state machine snapshot
  155. // If set, automatic log compaction will be enabled
  156. // The function should return the serialized state that can restore the state machine
  157. // minIncludeIndex: The snapshot MUST include all entries up to at least this index.
  158. SnapshotProvider func(minIncludeIndex uint64) ([]byte, error)
  159. // GetHandler is a callback function to handle remote Get requests
  160. // If set, clients can read values from this node via RPC
  161. GetHandler func(key string) (value string, found bool)
  162. // SnapshotChunkSize is the size of each chunk when transferring snapshots
  163. // Default: 1MB
  164. SnapshotChunkSize int
  165. // LastAppliedIndex is the index of the last log entry applied to the state machine
  166. // Used for initialization to avoid re-applying entries
  167. LastAppliedIndex uint64
  168. // RPC Timeout configurations
  169. RPCTimeout time.Duration // Default: 500ms for normal RPCs
  170. SnapshotRPCTimeout time.Duration // Default: 30s for snapshot transfers
  171. ProposeTimeout time.Duration // Default: 3s for propose forwarding
  172. // Retry configurations
  173. MaxRetries int // Default: 3
  174. RetryBackoff time.Duration // Default: 100ms
  175. // Batching configurations
  176. BatchMinWait time.Duration // Minimum wait time for batching (Default: 1ms)
  177. BatchMaxWait time.Duration // Maximum wait time for batching (Default: 10ms)
  178. // BatchMaxSize is the maximum batch size before forcing flush (Default: 100)
  179. BatchMaxSize int
  180. // EnableCLI determines if the CLI should be started automatically
  181. // Default: false
  182. EnableCLI bool
  183. // Logger for debug output
  184. Logger Logger
  185. }
  186. // Clone creates a deep copy of the config
  187. func (c *Config) Clone() *Config {
  188. clone := *c
  189. if c.Peers != nil {
  190. clone.Peers = make([]string, len(c.Peers))
  191. copy(clone.Peers, c.Peers)
  192. }
  193. if c.PeerMap != nil {
  194. clone.PeerMap = make(map[string]string)
  195. for k, v := range c.PeerMap {
  196. clone.PeerMap[k] = v
  197. }
  198. }
  199. if c.ClusterNodes != nil {
  200. clone.ClusterNodes = make(map[string]string)
  201. for k, v := range c.ClusterNodes {
  202. clone.ClusterNodes[k] = v
  203. }
  204. }
  205. return &clone
  206. }
  207. // GetPeerAddresses returns the addresses of all peers (excluding self)
  208. func (c *Config) GetPeerAddresses() []string {
  209. if c.ClusterNodes != nil {
  210. peers := make([]string, 0, len(c.ClusterNodes)-1)
  211. for nodeID, addr := range c.ClusterNodes {
  212. if nodeID != c.NodeID {
  213. peers = append(peers, addr)
  214. }
  215. }
  216. return peers
  217. }
  218. return c.Peers
  219. }
  220. // GetClusterSize returns the total number of nodes in the cluster
  221. func (c *Config) GetClusterSize() int {
  222. if c.ClusterNodes != nil {
  223. return len(c.ClusterNodes)
  224. }
  225. return len(c.Peers) + 1
  226. }
  227. // DefaultConfig returns a configuration with sensible defaults
  228. func DefaultConfig() *Config {
  229. return &Config{
  230. ElectionTimeoutMin: 150 * time.Millisecond,
  231. ElectionTimeoutMax: 300 * time.Millisecond,
  232. HeartbeatInterval: 50 * time.Millisecond,
  233. MaxLogEntriesPerRequest: 5000,
  234. MemoryLogCapacity: 10000,
  235. LogCompactionEnabled: false, // 默认不启用压缩
  236. SnapshotThreshold: 100000, // 默认阈值(仅在 LogCompactionEnabled=true 时生效)
  237. SnapshotMinRetention: 10000, // 保留1万条用于 follower 追赶
  238. SnapshotChunkSize: 1024 * 1024, // 1MB chunks
  239. RPCTimeout: 500 * time.Millisecond,
  240. SnapshotRPCTimeout: 30 * time.Second,
  241. ProposeTimeout: 3 * time.Second,
  242. MaxRetries: 3,
  243. RetryBackoff: 100 * time.Millisecond,
  244. BatchMinWait: 1 * time.Millisecond,
  245. BatchMaxWait: 10 * time.Millisecond,
  246. BatchMaxSize: 100,
  247. EnableCLI: true,
  248. }
  249. }
  250. // ==================== Metrics ====================
  251. // Metrics holds runtime metrics for monitoring
  252. type Metrics struct {
  253. // Term metrics
  254. Term uint64 `json:"term"`
  255. // Proposal metrics
  256. ProposalsTotal uint64 `json:"proposals_total"`
  257. ProposalsSuccess uint64 `json:"proposals_success"`
  258. ProposalsFailed uint64 `json:"proposals_failed"`
  259. ProposalsForwarded uint64 `json:"proposals_forwarded"`
  260. // Replication metrics
  261. AppendsSent uint64 `json:"appends_sent"`
  262. AppendsReceived uint64 `json:"appends_received"`
  263. AppendsSuccess uint64 `json:"appends_success"`
  264. AppendsFailed uint64 `json:"appends_failed"`
  265. // Election metrics
  266. ElectionsStarted uint64 `json:"elections_started"`
  267. ElectionsWon uint64 `json:"elections_won"`
  268. PreVotesStarted uint64 `json:"pre_votes_started"`
  269. PreVotesGranted uint64 `json:"pre_votes_granted"`
  270. // Snapshot metrics
  271. SnapshotsTaken uint64 `json:"snapshots_taken"`
  272. SnapshotsInstalled uint64 `json:"snapshots_installed"`
  273. SnapshotsSent uint64 `json:"snapshots_sent"`
  274. // Read metrics
  275. ReadIndexRequests uint64 `json:"read_index_requests"`
  276. ReadIndexSuccess uint64 `json:"read_index_success"`
  277. // Leadership transfer metrics
  278. LeadershipTransfers uint64 `json:"leadership_transfers"`
  279. LeadershipTransferSuccess uint64 `json:"leadership_transfer_success"`
  280. }
  281. // HealthStatus represents the health status of a Raft node
  282. type HealthStatus struct {
  283. NodeID string `json:"node_id"`
  284. State string `json:"state"`
  285. Term uint64 `json:"term"`
  286. LeaderID string `json:"leader_id"`
  287. ClusterSize int `json:"cluster_size"`
  288. ClusterNodes map[string]string `json:"cluster_nodes"`
  289. CommitIndex uint64 `json:"commit_index"`
  290. LastApplied uint64 `json:"last_applied"`
  291. LogBehind uint64 `json:"log_behind"` // commitIndex - lastApplied
  292. LastHeartbeat time.Time `json:"last_heartbeat"`
  293. IsHealthy bool `json:"is_healthy"`
  294. Uptime time.Duration `json:"uptime"`
  295. }
  296. // Logger interface for logging
  297. type Logger interface {
  298. Debug(format string, args ...interface{})
  299. Info(format string, args ...interface{})
  300. Warn(format string, args ...interface{})
  301. Error(format string, args ...interface{})
  302. }
  303. // DefaultLogger implements a simple console logger
  304. type DefaultLogger struct {
  305. Prefix string
  306. mu sync.Mutex
  307. }
  308. func (l *DefaultLogger) log(level, format string, args ...interface{}) {
  309. l.mu.Lock()
  310. defer l.mu.Unlock()
  311. // Use fmt.Printf with timestamp
  312. // Commented out to avoid import cycle, implemented in raft.go
  313. }
  314. func (l *DefaultLogger) Debug(format string, args ...interface{}) {}
  315. func (l *DefaultLogger) Info(format string, args ...interface{}) {}
  316. func (l *DefaultLogger) Warn(format string, args ...interface{}) {}
  317. func (l *DefaultLogger) Error(format string, args ...interface{}) {}
  318. // NoopLogger implements a no-op logger
  319. type NoopLogger struct{}
  320. func (l *NoopLogger) Debug(format string, args ...interface{}) {}
  321. func (l *NoopLogger) Info(format string, args ...interface{}) {}
  322. func (l *NoopLogger) Warn(format string, args ...interface{}) {}
  323. func (l *NoopLogger) Error(format string, args ...interface{}) {}
  324. // StateMachine interface that users must implement
  325. type StateMachine interface {
  326. // Apply applies a command to the state machine
  327. Apply(command []byte) (interface{}, error)
  328. // Snapshot returns a snapshot of the current state
  329. Snapshot() ([]byte, error)
  330. // Restore restores the state machine from a snapshot
  331. Restore(snapshot []byte) error
  332. }
  333. // ApplyMsg is sent to the application layer when a log entry is committed
  334. type ApplyMsg struct {
  335. CommandValid bool
  336. Command []byte
  337. CommandIndex uint64
  338. CommandTerm uint64
  339. // For snapshots
  340. SnapshotValid bool
  341. Snapshot []byte
  342. SnapshotIndex uint64
  343. SnapshotTerm uint64
  344. }
  345. // RequestVoteArgs is the arguments for RequestVote RPC
  346. type RequestVoteArgs struct {
  347. Term uint64 `json:"term"` // Candidate's term
  348. CandidateID string `json:"candidate_id"` // Candidate requesting vote
  349. LastLogIndex uint64 `json:"last_log_index"` // Index of candidate's last log entry
  350. LastLogTerm uint64 `json:"last_log_term"` // Term of candidate's last log entry
  351. PreVote bool `json:"pre_vote"` // True if this is a pre-vote request
  352. }
  353. // RequestVoteReply is the response for RequestVote RPC
  354. type RequestVoteReply struct {
  355. Term uint64 `json:"term"` // CurrentTerm, for candidate to update itself
  356. VoteGranted bool `json:"vote_granted"` // True means candidate received vote
  357. }
  358. // AppendEntriesArgs is the arguments for AppendEntries RPC
  359. type AppendEntriesArgs struct {
  360. Term uint64 `json:"term"` // Leader's term
  361. LeaderID string `json:"leader_id"` // So follower can redirect clients
  362. PrevLogIndex uint64 `json:"prev_log_index"` // Index of log entry immediately preceding new ones
  363. PrevLogTerm uint64 `json:"prev_log_term"` // Term of PrevLogIndex entry
  364. Entries []LogEntry `json:"entries"` // Log entries to store (empty for heartbeat)
  365. LeaderCommit uint64 `json:"leader_commit"` // Leader's commitIndex
  366. }
  367. // AppendEntriesReply is the response for AppendEntries RPC
  368. type AppendEntriesReply struct {
  369. Term uint64 `json:"term"` // CurrentTerm, for leader to update itself
  370. Success bool `json:"success"` // True if follower contained entry matching PrevLogIndex and PrevLogTerm
  371. // Optimization: help leader find correct NextIndex faster
  372. ConflictIndex uint64 `json:"conflict_index"` // Index of first entry with conflicting term
  373. ConflictTerm uint64 `json:"conflict_term"` // Term of conflicting entry
  374. }
  375. // InstallSnapshotArgs is the arguments for InstallSnapshot RPC
  376. type InstallSnapshotArgs struct {
  377. Term uint64 `json:"term"` // Leader's term
  378. LeaderID string `json:"leader_id"` // So follower can redirect clients
  379. LastIncludedIndex uint64 `json:"last_included_index"` // Snapshot replaces all entries up through this index
  380. LastIncludedTerm uint64 `json:"last_included_term"` // Term of LastIncludedIndex
  381. Offset uint64 `json:"offset"` // Byte offset for chunked transfer
  382. Data []byte `json:"data"` // Snapshot data (chunk)
  383. Done bool `json:"done"` // True if this is the last chunk
  384. }
  385. // InstallSnapshotReply is the response for InstallSnapshot RPC
  386. type InstallSnapshotReply struct {
  387. Term uint64 `json:"term"` // CurrentTerm, for leader to update itself
  388. Success bool `json:"success"` // True if chunk was accepted
  389. }
  390. // TimeoutNowArgs is the arguments for TimeoutNow RPC (leadership transfer)
  391. type TimeoutNowArgs struct {
  392. Term uint64 `json:"term"`
  393. LeaderID string `json:"leader_id"`
  394. }
  395. // TimeoutNowReply is the response for TimeoutNow RPC
  396. type TimeoutNowReply struct {
  397. Term uint64 `json:"term"`
  398. Success bool `json:"success"`
  399. }
  400. // ReadIndexArgs is the arguments for ReadIndex RPC
  401. type ReadIndexArgs struct {
  402. // Empty - just need to confirm leadership
  403. }
  404. // ReadIndexReply is the response for ReadIndex RPC
  405. type ReadIndexReply struct {
  406. ReadIndex uint64 `json:"read_index"`
  407. Success bool `json:"success"`
  408. Error string `json:"error,omitempty"`
  409. }
  410. // RPC message types for network communication
  411. type RPCType int
  412. const (
  413. RPCRequestVote RPCType = iota
  414. RPCAppendEntries
  415. RPCInstallSnapshot
  416. RPCPropose // For request forwarding
  417. RPCAddNode // For AddNode forwarding
  418. RPCRemoveNode // For RemoveNode forwarding
  419. RPCTimeoutNow // For leadership transfer
  420. RPCReadIndex // For linearizable reads
  421. RPCGet // For remote KV reads
  422. )
  423. // ProposeArgs is the arguments for Propose RPC (forwarding)
  424. type ProposeArgs struct {
  425. Command []byte `json:"command"`
  426. }
  427. // ProposeReply is the response for Propose RPC
  428. type ProposeReply struct {
  429. Success bool `json:"success"`
  430. Index uint64 `json:"index,omitempty"`
  431. Term uint64 `json:"term,omitempty"`
  432. Error string `json:"error,omitempty"`
  433. }
  434. // AddNodeArgs is the arguments for AddNode RPC (forwarding)
  435. type AddNodeArgs struct {
  436. NodeID string `json:"node_id"`
  437. Address string `json:"address"`
  438. }
  439. // AddNodeReply is the response for AddNode RPC
  440. type AddNodeReply struct {
  441. Success bool `json:"success"`
  442. Error string `json:"error,omitempty"`
  443. }
  444. // RemoveNodeArgs is the arguments for RemoveNode RPC (forwarding)
  445. type RemoveNodeArgs struct {
  446. NodeID string `json:"node_id"`
  447. }
  448. // RemoveNodeReply is the response for RemoveNode RPC
  449. type RemoveNodeReply struct {
  450. Success bool `json:"success"`
  451. Error string `json:"error,omitempty"`
  452. }
  453. // GetArgs is the arguments for Get RPC (for remote KV reads)
  454. type GetArgs struct {
  455. Key string `json:"key"`
  456. }
  457. // GetReply is the response for Get RPC
  458. type GetReply struct {
  459. Value string `json:"value,omitempty"`
  460. Found bool `json:"found"`
  461. Error string `json:"error,omitempty"`
  462. }
  463. // RPCMessage wraps all RPC types for network transmission
  464. type RPCMessage struct {
  465. Type RPCType `json:"type"`
  466. Payload interface{} `json:"payload"`
  467. }
  468. // RPCResponse wraps all RPC response types
  469. type RPCResponse struct {
  470. Type RPCType `json:"type"`
  471. Payload interface{} `json:"payload"`
  472. Error string `json:"error,omitempty"`
  473. }