types.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. package raft
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. // ==================== Custom Errors ====================
  9. var (
  10. // ErrNoLeader indicates no leader is available
  11. ErrNoLeader = errors.New("no leader available")
  12. // ErrNotLeader indicates this node is not the leader
  13. ErrNotLeader = errors.New("not leader")
  14. // ErrConfigInFlight indicates a configuration change is in progress
  15. ErrConfigInFlight = errors.New("configuration change in progress")
  16. // ErrTimeout indicates an operation timed out
  17. ErrTimeout = errors.New("operation timed out")
  18. // ErrShutdown indicates the raft node is shutting down
  19. ErrShutdown = errors.New("raft is shutting down")
  20. // ErrPersistFailed indicates persistent storage failed
  21. ErrPersistFailed = errors.New("failed to persist state")
  22. // ErrLeadershipLost indicates leadership was lost during operation
  23. ErrLeadershipLost = errors.New("leadership lost")
  24. )
  25. // RaftError wraps errors with additional context
  26. type RaftError struct {
  27. Err error
  28. LeaderID string // Known leader, if any
  29. RetryIn time.Duration // Suggested retry delay
  30. }
  31. func (e *RaftError) Error() string {
  32. if e.LeaderID != "" {
  33. return fmt.Sprintf("%s (leader: %s)", e.Err, e.LeaderID)
  34. }
  35. return e.Err.Error()
  36. }
  37. func (e *RaftError) Unwrap() error {
  38. return e.Err
  39. }
  40. // NewRaftError creates a new RaftError
  41. func NewRaftError(err error, leaderID string, retryIn time.Duration) *RaftError {
  42. return &RaftError{
  43. Err: err,
  44. LeaderID: leaderID,
  45. RetryIn: retryIn,
  46. }
  47. }
  48. // NodeState represents the current state of a Raft node
  49. type NodeState int
  50. const (
  51. Follower NodeState = iota
  52. Candidate
  53. Leader
  54. )
  55. func (s NodeState) String() string {
  56. switch s {
  57. case Follower:
  58. return "Follower"
  59. case Candidate:
  60. return "Candidate"
  61. case Leader:
  62. return "Leader"
  63. default:
  64. return "Unknown"
  65. }
  66. }
  67. // EntryType represents the type of a log entry
  68. type EntryType int
  69. const (
  70. EntryNormal EntryType = iota // Normal command entry
  71. EntryConfig // Configuration change entry
  72. EntryNoop // No-op entry (used by leader to commit previous term entries)
  73. )
  74. // LogEntry represents a single entry in the replicated log
  75. type LogEntry struct {
  76. Index uint64 `json:"index"` // Log index (1-based)
  77. Term uint64 `json:"term"` // Term when entry was received
  78. Type EntryType `json:"type,omitempty"` // Entry type (normal or config change)
  79. Command []byte `json:"command,omitempty"` // Command to be applied to state machine (for normal entries)
  80. Config *ClusterConfig `json:"config,omitempty"` // New cluster configuration (for config entries)
  81. }
  82. // ConfigChangeType represents the type of configuration change
  83. type ConfigChangeType int
  84. const (
  85. ConfigAddNode ConfigChangeType = iota
  86. ConfigRemoveNode
  87. )
  88. // ConfigChange represents a single node configuration change
  89. type ConfigChange struct {
  90. Type ConfigChangeType `json:"type"` // Add or remove
  91. NodeID string `json:"node_id"` // Node to add/remove
  92. Address string `json:"address"` // Node address (for add)
  93. }
  94. // ClusterConfig represents the cluster membership configuration
  95. type ClusterConfig struct {
  96. Nodes map[string]string `json:"nodes"` // NodeID -> Address mapping for all nodes including self
  97. }
  98. // PersistentState represents the persistent state on all servers
  99. // (Updated on stable storage before responding to RPCs)
  100. type PersistentState struct {
  101. CurrentTerm uint64 `json:"current_term"` // Latest term server has seen
  102. VotedFor string `json:"voted_for"` // CandidateId that received vote in current term
  103. }
  104. // VolatileState represents the volatile state on all servers
  105. type VolatileState struct {
  106. CommitIndex uint64 // Index of highest log entry known to be committed
  107. LastApplied uint64 // Index of highest log entry applied to state machine
  108. }
  109. // LeaderVolatileState represents volatile state on leaders
  110. // (Reinitialized after election)
  111. type LeaderVolatileState struct {
  112. NextIndex map[string]uint64 // For each server, index of the next log entry to send
  113. MatchIndex map[string]uint64 // For each server, index of highest log entry known to be replicated
  114. }
  115. // Config holds the configuration for a Raft node
  116. type Config struct {
  117. // NodeID is the unique identifier for this node
  118. NodeID string
  119. // Peers is the list of peer node addresses (excluding self)
  120. // Deprecated: Use ClusterNodes instead for dynamic membership
  121. Peers []string
  122. // PeerMap maps nodeID to address for all nodes (including self)
  123. // Used for request forwarding. Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002"}
  124. // Deprecated: Use ClusterNodes instead
  125. PeerMap map[string]string
  126. // ClusterNodes maps nodeID to address for all nodes (including self)
  127. // This is the canonical cluster membership configuration
  128. // Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002", "node3": "127.0.0.1:9003"}
  129. ClusterNodes map[string]string
  130. // ListenAddr is the address this node listens on
  131. ListenAddr string
  132. // DataDir is the directory for persistent storage
  133. DataDir string
  134. // ElectionTimeoutMin is the minimum election timeout
  135. ElectionTimeoutMin time.Duration
  136. // ElectionTimeoutMax is the maximum election timeout
  137. ElectionTimeoutMax time.Duration
  138. // HeartbeatInterval is the interval between heartbeats from leader
  139. HeartbeatInterval time.Duration
  140. // MaxLogEntriesPerRequest limits entries sent in a single AppendEntries
  141. MaxLogEntriesPerRequest int
  142. // MemoryLogCapacity is the maximum number of log entries to keep in memory
  143. MemoryLogCapacity int
  144. // LogCompactionEnabled determines if log compaction is enabled
  145. // If false, the log will grow indefinitely (safest for data, but consumes disk)
  146. // Default: false
  147. LogCompactionEnabled bool
  148. // SnapshotThreshold triggers snapshot when log grows beyond this
  149. SnapshotThreshold uint64
  150. // SnapshotMinRetention is the minimum number of log entries to retain after compaction
  151. // This ensures followers have enough entries for catch-up without needing a full snapshot
  152. // Default: 1000
  153. SnapshotMinRetention uint64
  154. // SnapshotProvider is a callback function that returns the current state machine snapshot
  155. // If set, automatic log compaction will be enabled
  156. // The function should return the serialized state that can restore the state machine
  157. // minIncludeIndex: The snapshot MUST include all entries up to at least this index.
  158. SnapshotProvider func(minIncludeIndex uint64) ([]byte, error)
  159. // GetHandler is a callback function to handle remote Get requests
  160. // If set, clients can read values from this node via RPC
  161. GetHandler func(key string) (value string, found bool)
  162. // SnapshotChunkSize is the size of each chunk when transferring snapshots
  163. // Default: 1MB
  164. SnapshotChunkSize int
  165. // LastAppliedIndex is the index of the last log entry applied to the state machine
  166. // Used for initialization to avoid re-applying entries
  167. LastAppliedIndex uint64
  168. // RPC Timeout configurations
  169. RPCTimeout time.Duration // Default: 500ms for normal RPCs
  170. SnapshotRPCTimeout time.Duration // Default: 30s for snapshot transfers
  171. ProposeTimeout time.Duration // Default: 3s for propose forwarding
  172. // Retry configurations
  173. MaxRetries int // Default: 3
  174. RetryBackoff time.Duration // Default: 100ms
  175. // Batching configurations
  176. BatchMinWait time.Duration // Minimum wait time for batching (Default: 1ms)
  177. BatchMaxWait time.Duration // Maximum wait time for batching (Default: 10ms)
  178. // BatchMaxSize is the maximum batch size before forcing flush (Default: 100)
  179. BatchMaxSize int
  180. // EnableCLI determines if the CLI should be started automatically
  181. // Default: false
  182. EnableCLI bool
  183. // HTTPAddr is the address for the HTTP API server (e.g. ":8080")
  184. // If empty, the HTTP server is disabled
  185. HTTPAddr string
  186. // Logger for debug output
  187. Logger Logger
  188. }
  189. // Clone creates a deep copy of the config
  190. func (c *Config) Clone() *Config {
  191. clone := *c
  192. if c.Peers != nil {
  193. clone.Peers = make([]string, len(c.Peers))
  194. copy(clone.Peers, c.Peers)
  195. }
  196. if c.PeerMap != nil {
  197. clone.PeerMap = make(map[string]string)
  198. for k, v := range c.PeerMap {
  199. clone.PeerMap[k] = v
  200. }
  201. }
  202. if c.ClusterNodes != nil {
  203. clone.ClusterNodes = make(map[string]string)
  204. for k, v := range c.ClusterNodes {
  205. clone.ClusterNodes[k] = v
  206. }
  207. }
  208. return &clone
  209. }
  210. // GetPeerAddresses returns the addresses of all peers (excluding self)
  211. func (c *Config) GetPeerAddresses() []string {
  212. if c.ClusterNodes != nil {
  213. peers := make([]string, 0, len(c.ClusterNodes)-1)
  214. for nodeID, addr := range c.ClusterNodes {
  215. if nodeID != c.NodeID {
  216. peers = append(peers, addr)
  217. }
  218. }
  219. return peers
  220. }
  221. return c.Peers
  222. }
  223. // GetClusterSize returns the total number of nodes in the cluster
  224. func (c *Config) GetClusterSize() int {
  225. if c.ClusterNodes != nil {
  226. return len(c.ClusterNodes)
  227. }
  228. return len(c.Peers) + 1
  229. }
  230. // DefaultConfig returns a configuration with sensible defaults
  231. func DefaultConfig() *Config {
  232. return &Config{
  233. ElectionTimeoutMin: 150 * time.Millisecond,
  234. ElectionTimeoutMax: 300 * time.Millisecond,
  235. HeartbeatInterval: 50 * time.Millisecond,
  236. MaxLogEntriesPerRequest: 5000,
  237. MemoryLogCapacity: 10000,
  238. LogCompactionEnabled: false, // 默认不启用压缩
  239. SnapshotThreshold: 100000, // 默认阈值(仅在 LogCompactionEnabled=true 时生效)
  240. SnapshotMinRetention: 10000, // 保留1万条用于 follower 追赶
  241. SnapshotChunkSize: 1024 * 1024, // 1MB chunks
  242. RPCTimeout: 500 * time.Millisecond,
  243. SnapshotRPCTimeout: 30 * time.Second,
  244. ProposeTimeout: 3 * time.Second,
  245. MaxRetries: 3,
  246. RetryBackoff: 100 * time.Millisecond,
  247. BatchMinWait: 1 * time.Millisecond,
  248. BatchMaxWait: 10 * time.Millisecond,
  249. BatchMaxSize: 100,
  250. EnableCLI: true,
  251. HTTPAddr: "",
  252. }
  253. }
  254. // ==================== Metrics ====================
  255. // Metrics holds runtime metrics for monitoring
  256. type Metrics struct {
  257. // Term metrics
  258. Term uint64 `json:"term"`
  259. // Proposal metrics
  260. ProposalsTotal uint64 `json:"proposals_total"`
  261. ProposalsSuccess uint64 `json:"proposals_success"`
  262. ProposalsFailed uint64 `json:"proposals_failed"`
  263. ProposalsForwarded uint64 `json:"proposals_forwarded"`
  264. // Replication metrics
  265. AppendsSent uint64 `json:"appends_sent"`
  266. AppendsReceived uint64 `json:"appends_received"`
  267. AppendsSuccess uint64 `json:"appends_success"`
  268. AppendsFailed uint64 `json:"appends_failed"`
  269. // Election metrics
  270. ElectionsStarted uint64 `json:"elections_started"`
  271. ElectionsWon uint64 `json:"elections_won"`
  272. PreVotesStarted uint64 `json:"pre_votes_started"`
  273. PreVotesGranted uint64 `json:"pre_votes_granted"`
  274. // Snapshot metrics
  275. SnapshotsTaken uint64 `json:"snapshots_taken"`
  276. SnapshotsInstalled uint64 `json:"snapshots_installed"`
  277. SnapshotsSent uint64 `json:"snapshots_sent"`
  278. // Read metrics
  279. ReadIndexRequests uint64 `json:"read_index_requests"`
  280. ReadIndexSuccess uint64 `json:"read_index_success"`
  281. // Leadership transfer metrics
  282. LeadershipTransfers uint64 `json:"leadership_transfers"`
  283. LeadershipTransferSuccess uint64 `json:"leadership_transfer_success"`
  284. }
  285. // HealthStatus represents the health status of a Raft node
  286. type HealthStatus struct {
  287. NodeID string `json:"node_id"`
  288. State string `json:"state"`
  289. Term uint64 `json:"term"`
  290. LeaderID string `json:"leader_id"`
  291. ClusterSize int `json:"cluster_size"`
  292. ClusterNodes map[string]string `json:"cluster_nodes"`
  293. CommitIndex uint64 `json:"commit_index"`
  294. LastApplied uint64 `json:"last_applied"`
  295. LogBehind uint64 `json:"log_behind"` // commitIndex - lastApplied
  296. LastHeartbeat time.Time `json:"last_heartbeat"`
  297. IsHealthy bool `json:"is_healthy"`
  298. Uptime time.Duration `json:"uptime"`
  299. }
  300. // Logger interface for logging
  301. type Logger interface {
  302. Debug(format string, args ...interface{})
  303. Info(format string, args ...interface{})
  304. Warn(format string, args ...interface{})
  305. Error(format string, args ...interface{})
  306. }
  307. // DefaultLogger implements a simple console logger
  308. type DefaultLogger struct {
  309. Prefix string
  310. mu sync.Mutex
  311. }
  312. func (l *DefaultLogger) log(level, format string, args ...interface{}) {
  313. l.mu.Lock()
  314. defer l.mu.Unlock()
  315. // Use fmt.Printf with timestamp
  316. // Commented out to avoid import cycle, implemented in raft.go
  317. }
  318. func (l *DefaultLogger) Debug(format string, args ...interface{}) {}
  319. func (l *DefaultLogger) Info(format string, args ...interface{}) {}
  320. func (l *DefaultLogger) Warn(format string, args ...interface{}) {}
  321. func (l *DefaultLogger) Error(format string, args ...interface{}) {}
  322. // NoopLogger implements a no-op logger
  323. type NoopLogger struct{}
  324. func (l *NoopLogger) Debug(format string, args ...interface{}) {}
  325. func (l *NoopLogger) Info(format string, args ...interface{}) {}
  326. func (l *NoopLogger) Warn(format string, args ...interface{}) {}
  327. func (l *NoopLogger) Error(format string, args ...interface{}) {}
  328. // StateMachine interface that users must implement
  329. type StateMachine interface {
  330. // Apply applies a command to the state machine
  331. Apply(command []byte) (interface{}, error)
  332. // Snapshot returns a snapshot of the current state
  333. Snapshot() ([]byte, error)
  334. // Restore restores the state machine from a snapshot
  335. Restore(snapshot []byte) error
  336. }
  337. // ApplyMsg is sent to the application layer when a log entry is committed
  338. type ApplyMsg struct {
  339. CommandValid bool
  340. Command []byte
  341. CommandIndex uint64
  342. CommandTerm uint64
  343. // For snapshots
  344. SnapshotValid bool
  345. Snapshot []byte
  346. SnapshotIndex uint64
  347. SnapshotTerm uint64
  348. }
  349. // RequestVoteArgs is the arguments for RequestVote RPC
  350. type RequestVoteArgs struct {
  351. Term uint64 `json:"term"` // Candidate's term
  352. CandidateID string `json:"candidate_id"` // Candidate requesting vote
  353. LastLogIndex uint64 `json:"last_log_index"` // Index of candidate's last log entry
  354. LastLogTerm uint64 `json:"last_log_term"` // Term of candidate's last log entry
  355. PreVote bool `json:"pre_vote"` // True if this is a pre-vote request
  356. }
  357. // RequestVoteReply is the response for RequestVote RPC
  358. type RequestVoteReply struct {
  359. Term uint64 `json:"term"` // CurrentTerm, for candidate to update itself
  360. VoteGranted bool `json:"vote_granted"` // True means candidate received vote
  361. }
  362. // AppendEntriesArgs is the arguments for AppendEntries RPC
  363. type AppendEntriesArgs struct {
  364. Term uint64 `json:"term"` // Leader's term
  365. LeaderID string `json:"leader_id"` // So follower can redirect clients
  366. PrevLogIndex uint64 `json:"prev_log_index"` // Index of log entry immediately preceding new ones
  367. PrevLogTerm uint64 `json:"prev_log_term"` // Term of PrevLogIndex entry
  368. Entries []LogEntry `json:"entries"` // Log entries to store (empty for heartbeat)
  369. LeaderCommit uint64 `json:"leader_commit"` // Leader's commitIndex
  370. }
  371. // AppendEntriesReply is the response for AppendEntries RPC
  372. type AppendEntriesReply struct {
  373. Term uint64 `json:"term"` // CurrentTerm, for leader to update itself
  374. Success bool `json:"success"` // True if follower contained entry matching PrevLogIndex and PrevLogTerm
  375. // Optimization: help leader find correct NextIndex faster
  376. ConflictIndex uint64 `json:"conflict_index"` // Index of first entry with conflicting term
  377. ConflictTerm uint64 `json:"conflict_term"` // Term of conflicting entry
  378. }
  379. // InstallSnapshotArgs is the arguments for InstallSnapshot RPC
  380. type InstallSnapshotArgs struct {
  381. Term uint64 `json:"term"` // Leader's term
  382. LeaderID string `json:"leader_id"` // So follower can redirect clients
  383. LastIncludedIndex uint64 `json:"last_included_index"` // Snapshot replaces all entries up through this index
  384. LastIncludedTerm uint64 `json:"last_included_term"` // Term of LastIncludedIndex
  385. Offset uint64 `json:"offset"` // Byte offset for chunked transfer
  386. Data []byte `json:"data"` // Snapshot data (chunk)
  387. Done bool `json:"done"` // True if this is the last chunk
  388. }
  389. // InstallSnapshotReply is the response for InstallSnapshot RPC
  390. type InstallSnapshotReply struct {
  391. Term uint64 `json:"term"` // CurrentTerm, for leader to update itself
  392. Success bool `json:"success"` // True if chunk was accepted
  393. }
  394. // TimeoutNowArgs is the arguments for TimeoutNow RPC (leadership transfer)
  395. type TimeoutNowArgs struct {
  396. Term uint64 `json:"term"`
  397. LeaderID string `json:"leader_id"`
  398. }
  399. // TimeoutNowReply is the response for TimeoutNow RPC
  400. type TimeoutNowReply struct {
  401. Term uint64 `json:"term"`
  402. Success bool `json:"success"`
  403. }
  404. // ReadIndexArgs is the arguments for ReadIndex RPC
  405. type ReadIndexArgs struct {
  406. // Empty - just need to confirm leadership
  407. }
  408. // ReadIndexReply is the response for ReadIndex RPC
  409. type ReadIndexReply struct {
  410. ReadIndex uint64 `json:"read_index"`
  411. Success bool `json:"success"`
  412. Error string `json:"error,omitempty"`
  413. }
  414. // RPC message types for network communication
  415. type RPCType int
  416. const (
  417. RPCRequestVote RPCType = iota
  418. RPCAppendEntries
  419. RPCInstallSnapshot
  420. RPCPropose // For request forwarding
  421. RPCAddNode // For AddNode forwarding
  422. RPCRemoveNode // For RemoveNode forwarding
  423. RPCTimeoutNow // For leadership transfer
  424. RPCReadIndex // For linearizable reads
  425. RPCGet // For remote KV reads
  426. )
  427. // ProposeArgs is the arguments for Propose RPC (forwarding)
  428. type ProposeArgs struct {
  429. Command []byte `json:"command"`
  430. }
  431. // ProposeReply is the response for Propose RPC
  432. type ProposeReply struct {
  433. Success bool `json:"success"`
  434. Index uint64 `json:"index,omitempty"`
  435. Term uint64 `json:"term,omitempty"`
  436. Error string `json:"error,omitempty"`
  437. }
  438. // AddNodeArgs is the arguments for AddNode RPC (forwarding)
  439. type AddNodeArgs struct {
  440. NodeID string `json:"node_id"`
  441. Address string `json:"address"`
  442. }
  443. // AddNodeReply is the response for AddNode RPC
  444. type AddNodeReply struct {
  445. Success bool `json:"success"`
  446. Error string `json:"error,omitempty"`
  447. }
  448. // RemoveNodeArgs is the arguments for RemoveNode RPC (forwarding)
  449. type RemoveNodeArgs struct {
  450. NodeID string `json:"node_id"`
  451. }
  452. // RemoveNodeReply is the response for RemoveNode RPC
  453. type RemoveNodeReply struct {
  454. Success bool `json:"success"`
  455. Error string `json:"error,omitempty"`
  456. }
  457. // GetArgs is the arguments for Get RPC (for remote KV reads)
  458. type GetArgs struct {
  459. Key string `json:"key"`
  460. }
  461. // GetReply is the response for Get RPC
  462. type GetReply struct {
  463. Value string `json:"value,omitempty"`
  464. Found bool `json:"found"`
  465. Error string `json:"error,omitempty"`
  466. }
  467. // RPCMessage wraps all RPC types for network transmission
  468. type RPCMessage struct {
  469. Type RPCType `json:"type"`
  470. Payload interface{} `json:"payload"`
  471. }
  472. // RPCResponse wraps all RPC response types
  473. type RPCResponse struct {
  474. Type RPCType `json:"type"`
  475. Payload interface{} `json:"payload"`
  476. Error string `json:"error,omitempty"`
  477. }