types.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. package raft
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. // ==================== Custom Errors ====================
  9. var (
  10. // ErrNoLeader indicates no leader is available
  11. ErrNoLeader = errors.New("no leader available")
  12. // ErrNotLeader indicates this node is not the leader
  13. ErrNotLeader = errors.New("not leader")
  14. // ErrConfigInFlight indicates a configuration change is in progress
  15. ErrConfigInFlight = errors.New("configuration change in progress")
  16. // ErrTimeout indicates an operation timed out
  17. ErrTimeout = errors.New("operation timed out")
  18. // ErrShutdown indicates the raft node is shutting down
  19. ErrShutdown = errors.New("raft is shutting down")
  20. // ErrPersistFailed indicates persistent storage failed
  21. ErrPersistFailed = errors.New("failed to persist state")
  22. // ErrLeadershipLost indicates leadership was lost during operation
  23. ErrLeadershipLost = errors.New("leadership lost")
  24. )
  25. // RaftError wraps errors with additional context
  26. type RaftError struct {
  27. Err error
  28. LeaderID string // Known leader, if any
  29. RetryIn time.Duration // Suggested retry delay
  30. }
  31. func (e *RaftError) Error() string {
  32. if e.LeaderID != "" {
  33. return fmt.Sprintf("%s (leader: %s)", e.Err, e.LeaderID)
  34. }
  35. return e.Err.Error()
  36. }
  37. func (e *RaftError) Unwrap() error {
  38. return e.Err
  39. }
  40. // NewRaftError creates a new RaftError
  41. func NewRaftError(err error, leaderID string, retryIn time.Duration) *RaftError {
  42. return &RaftError{
  43. Err: err,
  44. LeaderID: leaderID,
  45. RetryIn: retryIn,
  46. }
  47. }
  48. // NodeState represents the current state of a Raft node
  49. type NodeState int
  50. const (
  51. Follower NodeState = iota
  52. Candidate
  53. Leader
  54. )
  55. func (s NodeState) String() string {
  56. switch s {
  57. case Follower:
  58. return "Follower"
  59. case Candidate:
  60. return "Candidate"
  61. case Leader:
  62. return "Leader"
  63. default:
  64. return "Unknown"
  65. }
  66. }
  67. // EntryType represents the type of a log entry
  68. type EntryType int
  69. const (
  70. EntryNormal EntryType = iota // Normal command entry
  71. EntryConfig // Configuration change entry
  72. EntryNoop // No-op entry (used by leader to commit previous term entries)
  73. )
  74. // LogEntry represents a single entry in the replicated log
  75. type LogEntry struct {
  76. Index uint64 `json:"index"` // Log index (1-based)
  77. Term uint64 `json:"term"` // Term when entry was received
  78. Type EntryType `json:"type,omitempty"` // Entry type (normal or config change)
  79. Command []byte `json:"command,omitempty"` // Command to be applied to state machine (for normal entries)
  80. Config *ClusterConfig `json:"config,omitempty"` // New cluster configuration (for config entries)
  81. }
  82. // ConfigChangeType represents the type of configuration change
  83. type ConfigChangeType int
  84. const (
  85. ConfigAddNode ConfigChangeType = iota
  86. ConfigRemoveNode
  87. )
  88. // ConfigChange represents a single node configuration change
  89. type ConfigChange struct {
  90. Type ConfigChangeType `json:"type"` // Add or remove
  91. NodeID string `json:"node_id"` // Node to add/remove
  92. Address string `json:"address"` // Node address (for add)
  93. }
  94. // ClusterConfig represents the cluster membership configuration
  95. type ClusterConfig struct {
  96. Nodes map[string]string `json:"nodes"` // NodeID -> Address mapping for all nodes including self
  97. }
  98. // PersistentState represents the persistent state on all servers
  99. // (Updated on stable storage before responding to RPCs)
  100. type PersistentState struct {
  101. CurrentTerm uint64 `json:"current_term"` // Latest term server has seen
  102. VotedFor string `json:"voted_for"` // CandidateId that received vote in current term
  103. }
  104. // VolatileState represents the volatile state on all servers
  105. type VolatileState struct {
  106. CommitIndex uint64 // Index of highest log entry known to be committed
  107. LastApplied uint64 // Index of highest log entry applied to state machine
  108. }
  109. // LeaderVolatileState represents volatile state on leaders
  110. // (Reinitialized after election)
  111. type LeaderVolatileState struct {
  112. NextIndex map[string]uint64 // For each server, index of the next log entry to send
  113. MatchIndex map[string]uint64 // For each server, index of highest log entry known to be replicated
  114. }
  115. // Config holds the configuration for a Raft node
  116. type Config struct {
  117. // NodeID is the unique identifier for this node
  118. NodeID string
  119. // Peers is the list of peer node addresses (excluding self)
  120. // Deprecated: Use ClusterNodes instead for dynamic membership
  121. Peers []string
  122. // PeerMap maps nodeID to address for all nodes (including self)
  123. // Used for request forwarding. Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002"}
  124. // Deprecated: Use ClusterNodes instead
  125. PeerMap map[string]string
  126. // ClusterNodes maps nodeID to address for all nodes (including self)
  127. // This is the canonical cluster membership configuration
  128. // Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002", "node3": "127.0.0.1:9003"}
  129. ClusterNodes map[string]string
  130. // ListenAddr is the address this node listens on
  131. ListenAddr string
  132. // DataDir is the directory for persistent storage
  133. DataDir string
  134. // ElectionTimeoutMin is the minimum election timeout
  135. ElectionTimeoutMin time.Duration
  136. // ElectionTimeoutMax is the maximum election timeout
  137. ElectionTimeoutMax time.Duration
  138. // HeartbeatInterval is the interval between heartbeats from leader
  139. HeartbeatInterval time.Duration
  140. // MaxLogEntriesPerRequest limits entries sent in a single AppendEntries
  141. MaxLogEntriesPerRequest int
  142. // MemoryLogCapacity is the maximum number of log entries to keep in memory
  143. MemoryLogCapacity int
  144. // SnapshotThreshold triggers snapshot when log grows beyond this
  145. SnapshotThreshold uint64
  146. // SnapshotMinRetention is the minimum number of log entries to retain after compaction
  147. // This ensures followers have enough entries for catch-up without needing a full snapshot
  148. // Default: 1000
  149. SnapshotMinRetention uint64
  150. // SnapshotProvider is a callback function that returns the current state machine snapshot
  151. // If set, automatic log compaction will be enabled
  152. // The function should return the serialized state that can restore the state machine
  153. // minIncludeIndex: The snapshot MUST include all entries up to at least this index.
  154. SnapshotProvider func(minIncludeIndex uint64) ([]byte, error)
  155. // GetHandler is a callback function to handle remote Get requests
  156. // If set, clients can read values from this node via RPC
  157. GetHandler func(key string) (value string, found bool)
  158. // SnapshotChunkSize is the size of each chunk when transferring snapshots
  159. // Default: 1MB
  160. SnapshotChunkSize int
  161. // LastAppliedIndex is the index of the last log entry applied to the state machine
  162. // Used for initialization to avoid re-applying entries
  163. LastAppliedIndex uint64
  164. // RPC Timeout configurations
  165. RPCTimeout time.Duration // Default: 500ms for normal RPCs
  166. SnapshotRPCTimeout time.Duration // Default: 30s for snapshot transfers
  167. ProposeTimeout time.Duration // Default: 3s for propose forwarding
  168. // Retry configurations
  169. MaxRetries int // Default: 3
  170. RetryBackoff time.Duration // Default: 100ms
  171. // Batching configurations
  172. BatchMinWait time.Duration // Minimum wait time for batching (Default: 1ms)
  173. BatchMaxWait time.Duration // Maximum wait time for batching (Default: 10ms)
  174. BatchMaxSize int // Maximum batch size before forcing flush (Default: 100)
  175. // Logger for debug output
  176. Logger Logger
  177. }
  178. // Clone creates a deep copy of the config
  179. func (c *Config) Clone() *Config {
  180. clone := *c
  181. if c.Peers != nil {
  182. clone.Peers = make([]string, len(c.Peers))
  183. copy(clone.Peers, c.Peers)
  184. }
  185. if c.PeerMap != nil {
  186. clone.PeerMap = make(map[string]string)
  187. for k, v := range c.PeerMap {
  188. clone.PeerMap[k] = v
  189. }
  190. }
  191. if c.ClusterNodes != nil {
  192. clone.ClusterNodes = make(map[string]string)
  193. for k, v := range c.ClusterNodes {
  194. clone.ClusterNodes[k] = v
  195. }
  196. }
  197. return &clone
  198. }
  199. // GetPeerAddresses returns the addresses of all peers (excluding self)
  200. func (c *Config) GetPeerAddresses() []string {
  201. if c.ClusterNodes != nil {
  202. peers := make([]string, 0, len(c.ClusterNodes)-1)
  203. for nodeID, addr := range c.ClusterNodes {
  204. if nodeID != c.NodeID {
  205. peers = append(peers, addr)
  206. }
  207. }
  208. return peers
  209. }
  210. return c.Peers
  211. }
  212. // GetClusterSize returns the total number of nodes in the cluster
  213. func (c *Config) GetClusterSize() int {
  214. if c.ClusterNodes != nil {
  215. return len(c.ClusterNodes)
  216. }
  217. return len(c.Peers) + 1
  218. }
  219. // DefaultConfig returns a configuration with sensible defaults
  220. func DefaultConfig() *Config {
  221. return &Config{
  222. ElectionTimeoutMin: 150 * time.Millisecond,
  223. ElectionTimeoutMax: 300 * time.Millisecond,
  224. HeartbeatInterval: 50 * time.Millisecond,
  225. MaxLogEntriesPerRequest: 5000,
  226. MemoryLogCapacity: 10000,
  227. SnapshotThreshold: ^uint64(0), // 默认不启用压缩 (MaxUint64)
  228. SnapshotMinRetention: 10000, // 保留1万条用于 follower 追赶
  229. SnapshotChunkSize: 1024 * 1024, // 1MB chunks
  230. RPCTimeout: 500 * time.Millisecond,
  231. SnapshotRPCTimeout: 30 * time.Second,
  232. ProposeTimeout: 3 * time.Second,
  233. MaxRetries: 3,
  234. RetryBackoff: 100 * time.Millisecond,
  235. BatchMinWait: 1 * time.Millisecond,
  236. BatchMaxWait: 10 * time.Millisecond,
  237. BatchMaxSize: 100,
  238. }
  239. }
  240. // ==================== Metrics ====================
  241. // Metrics holds runtime metrics for monitoring
  242. type Metrics struct {
  243. // Term metrics
  244. Term uint64 `json:"term"`
  245. // Proposal metrics
  246. ProposalsTotal uint64 `json:"proposals_total"`
  247. ProposalsSuccess uint64 `json:"proposals_success"`
  248. ProposalsFailed uint64 `json:"proposals_failed"`
  249. ProposalsForwarded uint64 `json:"proposals_forwarded"`
  250. // Replication metrics
  251. AppendsSent uint64 `json:"appends_sent"`
  252. AppendsReceived uint64 `json:"appends_received"`
  253. AppendsSuccess uint64 `json:"appends_success"`
  254. AppendsFailed uint64 `json:"appends_failed"`
  255. // Election metrics
  256. ElectionsStarted uint64 `json:"elections_started"`
  257. ElectionsWon uint64 `json:"elections_won"`
  258. PreVotesStarted uint64 `json:"pre_votes_started"`
  259. PreVotesGranted uint64 `json:"pre_votes_granted"`
  260. // Snapshot metrics
  261. SnapshotsTaken uint64 `json:"snapshots_taken"`
  262. SnapshotsInstalled uint64 `json:"snapshots_installed"`
  263. SnapshotsSent uint64 `json:"snapshots_sent"`
  264. // Read metrics
  265. ReadIndexRequests uint64 `json:"read_index_requests"`
  266. ReadIndexSuccess uint64 `json:"read_index_success"`
  267. // Leadership transfer metrics
  268. LeadershipTransfers uint64 `json:"leadership_transfers"`
  269. LeadershipTransferSuccess uint64 `json:"leadership_transfer_success"`
  270. }
  271. // HealthStatus represents the health status of a Raft node
  272. type HealthStatus struct {
  273. NodeID string `json:"node_id"`
  274. State string `json:"state"`
  275. Term uint64 `json:"term"`
  276. LeaderID string `json:"leader_id"`
  277. ClusterSize int `json:"cluster_size"`
  278. ClusterNodes map[string]string `json:"cluster_nodes"`
  279. CommitIndex uint64 `json:"commit_index"`
  280. LastApplied uint64 `json:"last_applied"`
  281. LogBehind uint64 `json:"log_behind"` // commitIndex - lastApplied
  282. LastHeartbeat time.Time `json:"last_heartbeat"`
  283. IsHealthy bool `json:"is_healthy"`
  284. Uptime time.Duration `json:"uptime"`
  285. }
  286. // Logger interface for logging
  287. type Logger interface {
  288. Debug(format string, args ...interface{})
  289. Info(format string, args ...interface{})
  290. Warn(format string, args ...interface{})
  291. Error(format string, args ...interface{})
  292. }
  293. // DefaultLogger implements a simple console logger
  294. type DefaultLogger struct {
  295. Prefix string
  296. mu sync.Mutex
  297. }
  298. func (l *DefaultLogger) log(level, format string, args ...interface{}) {
  299. l.mu.Lock()
  300. defer l.mu.Unlock()
  301. // Use fmt.Printf with timestamp
  302. // Commented out to avoid import cycle, implemented in raft.go
  303. }
  304. func (l *DefaultLogger) Debug(format string, args ...interface{}) {}
  305. func (l *DefaultLogger) Info(format string, args ...interface{}) {}
  306. func (l *DefaultLogger) Warn(format string, args ...interface{}) {}
  307. func (l *DefaultLogger) Error(format string, args ...interface{}) {}
  308. // NoopLogger implements a no-op logger
  309. type NoopLogger struct{}
  310. func (l *NoopLogger) Debug(format string, args ...interface{}) {}
  311. func (l *NoopLogger) Info(format string, args ...interface{}) {}
  312. func (l *NoopLogger) Warn(format string, args ...interface{}) {}
  313. func (l *NoopLogger) Error(format string, args ...interface{}) {}
  314. // StateMachine interface that users must implement
  315. type StateMachine interface {
  316. // Apply applies a command to the state machine
  317. Apply(command []byte) (interface{}, error)
  318. // Snapshot returns a snapshot of the current state
  319. Snapshot() ([]byte, error)
  320. // Restore restores the state machine from a snapshot
  321. Restore(snapshot []byte) error
  322. }
  323. // ApplyMsg is sent to the application layer when a log entry is committed
  324. type ApplyMsg struct {
  325. CommandValid bool
  326. Command []byte
  327. CommandIndex uint64
  328. CommandTerm uint64
  329. // For snapshots
  330. SnapshotValid bool
  331. Snapshot []byte
  332. SnapshotIndex uint64
  333. SnapshotTerm uint64
  334. }
  335. // RequestVoteArgs is the arguments for RequestVote RPC
  336. type RequestVoteArgs struct {
  337. Term uint64 `json:"term"` // Candidate's term
  338. CandidateID string `json:"candidate_id"` // Candidate requesting vote
  339. LastLogIndex uint64 `json:"last_log_index"` // Index of candidate's last log entry
  340. LastLogTerm uint64 `json:"last_log_term"` // Term of candidate's last log entry
  341. PreVote bool `json:"pre_vote"` // True if this is a pre-vote request
  342. }
  343. // RequestVoteReply is the response for RequestVote RPC
  344. type RequestVoteReply struct {
  345. Term uint64 `json:"term"` // CurrentTerm, for candidate to update itself
  346. VoteGranted bool `json:"vote_granted"` // True means candidate received vote
  347. }
  348. // AppendEntriesArgs is the arguments for AppendEntries RPC
  349. type AppendEntriesArgs struct {
  350. Term uint64 `json:"term"` // Leader's term
  351. LeaderID string `json:"leader_id"` // So follower can redirect clients
  352. PrevLogIndex uint64 `json:"prev_log_index"` // Index of log entry immediately preceding new ones
  353. PrevLogTerm uint64 `json:"prev_log_term"` // Term of PrevLogIndex entry
  354. Entries []LogEntry `json:"entries"` // Log entries to store (empty for heartbeat)
  355. LeaderCommit uint64 `json:"leader_commit"` // Leader's commitIndex
  356. }
  357. // AppendEntriesReply is the response for AppendEntries RPC
  358. type AppendEntriesReply struct {
  359. Term uint64 `json:"term"` // CurrentTerm, for leader to update itself
  360. Success bool `json:"success"` // True if follower contained entry matching PrevLogIndex and PrevLogTerm
  361. // Optimization: help leader find correct NextIndex faster
  362. ConflictIndex uint64 `json:"conflict_index"` // Index of first entry with conflicting term
  363. ConflictTerm uint64 `json:"conflict_term"` // Term of conflicting entry
  364. }
  365. // InstallSnapshotArgs is the arguments for InstallSnapshot RPC
  366. type InstallSnapshotArgs struct {
  367. Term uint64 `json:"term"` // Leader's term
  368. LeaderID string `json:"leader_id"` // So follower can redirect clients
  369. LastIncludedIndex uint64 `json:"last_included_index"` // Snapshot replaces all entries up through this index
  370. LastIncludedTerm uint64 `json:"last_included_term"` // Term of LastIncludedIndex
  371. Offset uint64 `json:"offset"` // Byte offset for chunked transfer
  372. Data []byte `json:"data"` // Snapshot data (chunk)
  373. Done bool `json:"done"` // True if this is the last chunk
  374. }
  375. // InstallSnapshotReply is the response for InstallSnapshot RPC
  376. type InstallSnapshotReply struct {
  377. Term uint64 `json:"term"` // CurrentTerm, for leader to update itself
  378. Success bool `json:"success"` // True if chunk was accepted
  379. }
  380. // TimeoutNowArgs is the arguments for TimeoutNow RPC (leadership transfer)
  381. type TimeoutNowArgs struct {
  382. Term uint64 `json:"term"`
  383. LeaderID string `json:"leader_id"`
  384. }
  385. // TimeoutNowReply is the response for TimeoutNow RPC
  386. type TimeoutNowReply struct {
  387. Term uint64 `json:"term"`
  388. Success bool `json:"success"`
  389. }
  390. // ReadIndexArgs is the arguments for ReadIndex RPC
  391. type ReadIndexArgs struct {
  392. // Empty - just need to confirm leadership
  393. }
  394. // ReadIndexReply is the response for ReadIndex RPC
  395. type ReadIndexReply struct {
  396. ReadIndex uint64 `json:"read_index"`
  397. Success bool `json:"success"`
  398. Error string `json:"error,omitempty"`
  399. }
  400. // RPC message types for network communication
  401. type RPCType int
  402. const (
  403. RPCRequestVote RPCType = iota
  404. RPCAppendEntries
  405. RPCInstallSnapshot
  406. RPCPropose // For request forwarding
  407. RPCAddNode // For AddNode forwarding
  408. RPCRemoveNode // For RemoveNode forwarding
  409. RPCTimeoutNow // For leadership transfer
  410. RPCReadIndex // For linearizable reads
  411. RPCGet // For remote KV reads
  412. )
  413. // ProposeArgs is the arguments for Propose RPC (forwarding)
  414. type ProposeArgs struct {
  415. Command []byte `json:"command"`
  416. }
  417. // ProposeReply is the response for Propose RPC
  418. type ProposeReply struct {
  419. Success bool `json:"success"`
  420. Index uint64 `json:"index,omitempty"`
  421. Term uint64 `json:"term,omitempty"`
  422. Error string `json:"error,omitempty"`
  423. }
  424. // AddNodeArgs is the arguments for AddNode RPC (forwarding)
  425. type AddNodeArgs struct {
  426. NodeID string `json:"node_id"`
  427. Address string `json:"address"`
  428. }
  429. // AddNodeReply is the response for AddNode RPC
  430. type AddNodeReply struct {
  431. Success bool `json:"success"`
  432. Error string `json:"error,omitempty"`
  433. }
  434. // RemoveNodeArgs is the arguments for RemoveNode RPC (forwarding)
  435. type RemoveNodeArgs struct {
  436. NodeID string `json:"node_id"`
  437. }
  438. // RemoveNodeReply is the response for RemoveNode RPC
  439. type RemoveNodeReply struct {
  440. Success bool `json:"success"`
  441. Error string `json:"error,omitempty"`
  442. }
  443. // GetArgs is the arguments for Get RPC (for remote KV reads)
  444. type GetArgs struct {
  445. Key string `json:"key"`
  446. }
  447. // GetReply is the response for Get RPC
  448. type GetReply struct {
  449. Value string `json:"value,omitempty"`
  450. Found bool `json:"found"`
  451. Error string `json:"error,omitempty"`
  452. }
  453. // RPCMessage wraps all RPC types for network transmission
  454. type RPCMessage struct {
  455. Type RPCType `json:"type"`
  456. Payload interface{} `json:"payload"`
  457. }
  458. // RPCResponse wraps all RPC response types
  459. type RPCResponse struct {
  460. Type RPCType `json:"type"`
  461. Payload interface{} `json:"payload"`
  462. Error string `json:"error,omitempty"`
  463. }