types.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. package raft
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. // ==================== Custom Errors ====================
  9. var (
  10. // ErrNoLeader indicates no leader is available
  11. ErrNoLeader = errors.New("no leader available")
  12. // ErrNotLeader indicates this node is not the leader
  13. ErrNotLeader = errors.New("not leader")
  14. // ErrConfigInFlight indicates a configuration change is in progress
  15. ErrConfigInFlight = errors.New("configuration change in progress")
  16. // ErrTimeout indicates an operation timed out
  17. ErrTimeout = errors.New("operation timed out")
  18. // ErrShutdown indicates the raft node is shutting down
  19. ErrShutdown = errors.New("raft is shutting down")
  20. // ErrPersistFailed indicates persistent storage failed
  21. ErrPersistFailed = errors.New("failed to persist state")
  22. // ErrLeadershipLost indicates leadership was lost during operation
  23. ErrLeadershipLost = errors.New("leadership lost")
  24. )
  25. // RaftError wraps errors with additional context
  26. type RaftError struct {
  27. Err error
  28. LeaderID string // Known leader, if any
  29. RetryIn time.Duration // Suggested retry delay
  30. }
  31. func (e *RaftError) Error() string {
  32. if e.LeaderID != "" {
  33. return fmt.Sprintf("%s (leader: %s)", e.Err, e.LeaderID)
  34. }
  35. return e.Err.Error()
  36. }
  37. func (e *RaftError) Unwrap() error {
  38. return e.Err
  39. }
  40. // NewRaftError creates a new RaftError
  41. func NewRaftError(err error, leaderID string, retryIn time.Duration) *RaftError {
  42. return &RaftError{
  43. Err: err,
  44. LeaderID: leaderID,
  45. RetryIn: retryIn,
  46. }
  47. }
  48. // NodeState represents the current state of a Raft node
  49. type NodeState int
  50. const (
  51. Follower NodeState = iota
  52. Candidate
  53. Leader
  54. )
  55. func (s NodeState) String() string {
  56. switch s {
  57. case Follower:
  58. return "Follower"
  59. case Candidate:
  60. return "Candidate"
  61. case Leader:
  62. return "Leader"
  63. default:
  64. return "Unknown"
  65. }
  66. }
  67. // EntryType represents the type of a log entry
  68. type EntryType int
  69. const (
  70. EntryNormal EntryType = iota // Normal command entry
  71. EntryConfig // Configuration change entry
  72. EntryNoop // No-op entry (used by leader to commit previous term entries)
  73. )
  74. // LogEntry represents a single entry in the replicated log
  75. type LogEntry struct {
  76. Index uint64 `json:"index"` // Log index (1-based)
  77. Term uint64 `json:"term"` // Term when entry was received
  78. Type EntryType `json:"type,omitempty"` // Entry type (normal or config change)
  79. Command []byte `json:"command,omitempty"` // Command to be applied to state machine (for normal entries)
  80. Config *ClusterConfig `json:"config,omitempty"` // New cluster configuration (for config entries)
  81. }
  82. // ConfigChangeType represents the type of configuration change
  83. type ConfigChangeType int
  84. const (
  85. ConfigAddNode ConfigChangeType = iota
  86. ConfigRemoveNode
  87. )
  88. // ConfigChange represents a single node configuration change
  89. type ConfigChange struct {
  90. Type ConfigChangeType `json:"type"` // Add or remove
  91. NodeID string `json:"node_id"` // Node to add/remove
  92. Address string `json:"address"` // Node address (for add)
  93. }
  94. // ClusterConfig represents the cluster membership configuration
  95. type ClusterConfig struct {
  96. Nodes map[string]string `json:"nodes"` // NodeID -> Address mapping for all nodes including self
  97. }
  98. // PersistentState represents the persistent state on all servers
  99. // (Updated on stable storage before responding to RPCs)
  100. type PersistentState struct {
  101. CurrentTerm uint64 `json:"current_term"` // Latest term server has seen
  102. VotedFor string `json:"voted_for"` // CandidateId that received vote in current term
  103. }
  104. // VolatileState represents the volatile state on all servers
  105. type VolatileState struct {
  106. CommitIndex uint64 // Index of highest log entry known to be committed
  107. LastApplied uint64 // Index of highest log entry applied to state machine
  108. }
  109. // LeaderVolatileState represents volatile state on leaders
  110. // (Reinitialized after election)
  111. type LeaderVolatileState struct {
  112. NextIndex map[string]uint64 // For each server, index of the next log entry to send
  113. MatchIndex map[string]uint64 // For each server, index of highest log entry known to be replicated
  114. }
  115. // Config holds the configuration for a Raft node
  116. type Config struct {
  117. // NodeID is the unique identifier for this node
  118. NodeID string
  119. // Peers is the list of peer node addresses (excluding self)
  120. // Deprecated: Use ClusterNodes instead for dynamic membership
  121. Peers []string
  122. // PeerMap maps nodeID to address for all nodes (including self)
  123. // Used for request forwarding. Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002"}
  124. // Deprecated: Use ClusterNodes instead
  125. PeerMap map[string]string
  126. // ClusterNodes maps nodeID to address for all nodes (including self)
  127. // This is the canonical cluster membership configuration
  128. // Example: {"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002", "node3": "127.0.0.1:9003"}
  129. ClusterNodes map[string]string
  130. // ListenAddr is the address this node listens on
  131. ListenAddr string
  132. // DataDir is the directory for persistent storage
  133. DataDir string
  134. // ElectionTimeoutMin is the minimum election timeout
  135. ElectionTimeoutMin time.Duration
  136. // ElectionTimeoutMax is the maximum election timeout
  137. ElectionTimeoutMax time.Duration
  138. // HeartbeatInterval is the interval between heartbeats from leader
  139. HeartbeatInterval time.Duration
  140. // MaxLogEntriesPerRequest limits entries sent in a single AppendEntries
  141. MaxLogEntriesPerRequest int
  142. // MemoryLogCapacity is the maximum number of log entries to keep in memory
  143. MemoryLogCapacity int
  144. // SnapshotThreshold triggers snapshot when log grows beyond this
  145. SnapshotThreshold uint64
  146. // SnapshotMinRetention is the minimum number of log entries to retain after compaction
  147. // This ensures followers have enough entries for catch-up without needing a full snapshot
  148. // Default: 1000
  149. SnapshotMinRetention uint64
  150. // SnapshotProvider is a callback function that returns the current state machine snapshot
  151. // If set, automatic log compaction will be enabled
  152. // The function should return the serialized state that can restore the state machine
  153. SnapshotProvider func() ([]byte, error)
  154. // GetHandler is a callback function to handle remote Get requests
  155. // If set, clients can read values from this node via RPC
  156. GetHandler func(key string) (value string, found bool)
  157. // SnapshotChunkSize is the size of each chunk when transferring snapshots
  158. // Default: 1MB
  159. SnapshotChunkSize int
  160. // RPC Timeout configurations
  161. RPCTimeout time.Duration // Default: 500ms for normal RPCs
  162. SnapshotRPCTimeout time.Duration // Default: 30s for snapshot transfers
  163. ProposeTimeout time.Duration // Default: 3s for propose forwarding
  164. // Retry configurations
  165. MaxRetries int // Default: 3
  166. RetryBackoff time.Duration // Default: 100ms
  167. // Batching configurations
  168. BatchMinWait time.Duration // Minimum wait time for batching (Default: 1ms)
  169. BatchMaxWait time.Duration // Maximum wait time for batching (Default: 10ms)
  170. BatchMaxSize int // Maximum batch size before forcing flush (Default: 100)
  171. // Logger for debug output
  172. Logger Logger
  173. }
  174. // Clone creates a deep copy of the config
  175. func (c *Config) Clone() *Config {
  176. clone := *c
  177. if c.Peers != nil {
  178. clone.Peers = make([]string, len(c.Peers))
  179. copy(clone.Peers, c.Peers)
  180. }
  181. if c.PeerMap != nil {
  182. clone.PeerMap = make(map[string]string)
  183. for k, v := range c.PeerMap {
  184. clone.PeerMap[k] = v
  185. }
  186. }
  187. if c.ClusterNodes != nil {
  188. clone.ClusterNodes = make(map[string]string)
  189. for k, v := range c.ClusterNodes {
  190. clone.ClusterNodes[k] = v
  191. }
  192. }
  193. return &clone
  194. }
  195. // GetPeerAddresses returns the addresses of all peers (excluding self)
  196. func (c *Config) GetPeerAddresses() []string {
  197. if c.ClusterNodes != nil {
  198. peers := make([]string, 0, len(c.ClusterNodes)-1)
  199. for nodeID, addr := range c.ClusterNodes {
  200. if nodeID != c.NodeID {
  201. peers = append(peers, addr)
  202. }
  203. }
  204. return peers
  205. }
  206. return c.Peers
  207. }
  208. // GetClusterSize returns the total number of nodes in the cluster
  209. func (c *Config) GetClusterSize() int {
  210. if c.ClusterNodes != nil {
  211. return len(c.ClusterNodes)
  212. }
  213. return len(c.Peers) + 1
  214. }
  215. // DefaultConfig returns a configuration with sensible defaults
  216. func DefaultConfig() *Config {
  217. return &Config{
  218. ElectionTimeoutMin: 150 * time.Millisecond,
  219. ElectionTimeoutMax: 300 * time.Millisecond,
  220. HeartbeatInterval: 50 * time.Millisecond,
  221. MaxLogEntriesPerRequest: 5000,
  222. MemoryLogCapacity: 10000,
  223. SnapshotThreshold: 100000, // 10万条日志触发压缩
  224. SnapshotMinRetention: 10000, // 保留1万条用于 follower 追赶
  225. SnapshotChunkSize: 1024 * 1024, // 1MB chunks
  226. RPCTimeout: 500 * time.Millisecond,
  227. SnapshotRPCTimeout: 30 * time.Second,
  228. ProposeTimeout: 3 * time.Second,
  229. MaxRetries: 3,
  230. RetryBackoff: 100 * time.Millisecond,
  231. BatchMinWait: 1 * time.Millisecond,
  232. BatchMaxWait: 10 * time.Millisecond,
  233. BatchMaxSize: 100,
  234. }
  235. }
  236. // ==================== Metrics ====================
  237. // Metrics holds runtime metrics for monitoring
  238. type Metrics struct {
  239. // Term metrics
  240. Term uint64 `json:"term"`
  241. // Proposal metrics
  242. ProposalsTotal uint64 `json:"proposals_total"`
  243. ProposalsSuccess uint64 `json:"proposals_success"`
  244. ProposalsFailed uint64 `json:"proposals_failed"`
  245. ProposalsForwarded uint64 `json:"proposals_forwarded"`
  246. // Replication metrics
  247. AppendsSent uint64 `json:"appends_sent"`
  248. AppendsReceived uint64 `json:"appends_received"`
  249. AppendsSuccess uint64 `json:"appends_success"`
  250. AppendsFailed uint64 `json:"appends_failed"`
  251. // Election metrics
  252. ElectionsStarted uint64 `json:"elections_started"`
  253. ElectionsWon uint64 `json:"elections_won"`
  254. PreVotesStarted uint64 `json:"pre_votes_started"`
  255. PreVotesGranted uint64 `json:"pre_votes_granted"`
  256. // Snapshot metrics
  257. SnapshotsTaken uint64 `json:"snapshots_taken"`
  258. SnapshotsInstalled uint64 `json:"snapshots_installed"`
  259. SnapshotsSent uint64 `json:"snapshots_sent"`
  260. // Read metrics
  261. ReadIndexRequests uint64 `json:"read_index_requests"`
  262. ReadIndexSuccess uint64 `json:"read_index_success"`
  263. // Leadership transfer metrics
  264. LeadershipTransfers uint64 `json:"leadership_transfers"`
  265. LeadershipTransferSuccess uint64 `json:"leadership_transfer_success"`
  266. }
  267. // HealthStatus represents the health status of a Raft node
  268. type HealthStatus struct {
  269. NodeID string `json:"node_id"`
  270. State string `json:"state"`
  271. Term uint64 `json:"term"`
  272. LeaderID string `json:"leader_id"`
  273. ClusterSize int `json:"cluster_size"`
  274. ClusterNodes map[string]string `json:"cluster_nodes"`
  275. CommitIndex uint64 `json:"commit_index"`
  276. LastApplied uint64 `json:"last_applied"`
  277. LogBehind uint64 `json:"log_behind"` // commitIndex - lastApplied
  278. LastHeartbeat time.Time `json:"last_heartbeat"`
  279. IsHealthy bool `json:"is_healthy"`
  280. Uptime time.Duration `json:"uptime"`
  281. }
  282. // Logger interface for logging
  283. type Logger interface {
  284. Debug(format string, args ...interface{})
  285. Info(format string, args ...interface{})
  286. Warn(format string, args ...interface{})
  287. Error(format string, args ...interface{})
  288. }
  289. // DefaultLogger implements a simple console logger
  290. type DefaultLogger struct {
  291. Prefix string
  292. mu sync.Mutex
  293. }
  294. func (l *DefaultLogger) log(level, format string, args ...interface{}) {
  295. l.mu.Lock()
  296. defer l.mu.Unlock()
  297. // Use fmt.Printf with timestamp
  298. // Commented out to avoid import cycle, implemented in raft.go
  299. }
  300. func (l *DefaultLogger) Debug(format string, args ...interface{}) {}
  301. func (l *DefaultLogger) Info(format string, args ...interface{}) {}
  302. func (l *DefaultLogger) Warn(format string, args ...interface{}) {}
  303. func (l *DefaultLogger) Error(format string, args ...interface{}) {}
  304. // NoopLogger implements a no-op logger
  305. type NoopLogger struct{}
  306. func (l *NoopLogger) Debug(format string, args ...interface{}) {}
  307. func (l *NoopLogger) Info(format string, args ...interface{}) {}
  308. func (l *NoopLogger) Warn(format string, args ...interface{}) {}
  309. func (l *NoopLogger) Error(format string, args ...interface{}) {}
  310. // StateMachine interface that users must implement
  311. type StateMachine interface {
  312. // Apply applies a command to the state machine
  313. Apply(command []byte) (interface{}, error)
  314. // Snapshot returns a snapshot of the current state
  315. Snapshot() ([]byte, error)
  316. // Restore restores the state machine from a snapshot
  317. Restore(snapshot []byte) error
  318. }
  319. // ApplyMsg is sent to the application layer when a log entry is committed
  320. type ApplyMsg struct {
  321. CommandValid bool
  322. Command []byte
  323. CommandIndex uint64
  324. CommandTerm uint64
  325. // For snapshots
  326. SnapshotValid bool
  327. Snapshot []byte
  328. SnapshotIndex uint64
  329. SnapshotTerm uint64
  330. }
  331. // RequestVoteArgs is the arguments for RequestVote RPC
  332. type RequestVoteArgs struct {
  333. Term uint64 `json:"term"` // Candidate's term
  334. CandidateID string `json:"candidate_id"` // Candidate requesting vote
  335. LastLogIndex uint64 `json:"last_log_index"` // Index of candidate's last log entry
  336. LastLogTerm uint64 `json:"last_log_term"` // Term of candidate's last log entry
  337. PreVote bool `json:"pre_vote"` // True if this is a pre-vote request
  338. }
  339. // RequestVoteReply is the response for RequestVote RPC
  340. type RequestVoteReply struct {
  341. Term uint64 `json:"term"` // CurrentTerm, for candidate to update itself
  342. VoteGranted bool `json:"vote_granted"` // True means candidate received vote
  343. }
  344. // AppendEntriesArgs is the arguments for AppendEntries RPC
  345. type AppendEntriesArgs struct {
  346. Term uint64 `json:"term"` // Leader's term
  347. LeaderID string `json:"leader_id"` // So follower can redirect clients
  348. PrevLogIndex uint64 `json:"prev_log_index"` // Index of log entry immediately preceding new ones
  349. PrevLogTerm uint64 `json:"prev_log_term"` // Term of PrevLogIndex entry
  350. Entries []LogEntry `json:"entries"` // Log entries to store (empty for heartbeat)
  351. LeaderCommit uint64 `json:"leader_commit"` // Leader's commitIndex
  352. }
  353. // AppendEntriesReply is the response for AppendEntries RPC
  354. type AppendEntriesReply struct {
  355. Term uint64 `json:"term"` // CurrentTerm, for leader to update itself
  356. Success bool `json:"success"` // True if follower contained entry matching PrevLogIndex and PrevLogTerm
  357. // Optimization: help leader find correct NextIndex faster
  358. ConflictIndex uint64 `json:"conflict_index"` // Index of first entry with conflicting term
  359. ConflictTerm uint64 `json:"conflict_term"` // Term of conflicting entry
  360. }
  361. // InstallSnapshotArgs is the arguments for InstallSnapshot RPC
  362. type InstallSnapshotArgs struct {
  363. Term uint64 `json:"term"` // Leader's term
  364. LeaderID string `json:"leader_id"` // So follower can redirect clients
  365. LastIncludedIndex uint64 `json:"last_included_index"` // Snapshot replaces all entries up through this index
  366. LastIncludedTerm uint64 `json:"last_included_term"` // Term of LastIncludedIndex
  367. Offset uint64 `json:"offset"` // Byte offset for chunked transfer
  368. Data []byte `json:"data"` // Snapshot data (chunk)
  369. Done bool `json:"done"` // True if this is the last chunk
  370. }
  371. // InstallSnapshotReply is the response for InstallSnapshot RPC
  372. type InstallSnapshotReply struct {
  373. Term uint64 `json:"term"` // CurrentTerm, for leader to update itself
  374. Success bool `json:"success"` // True if chunk was accepted
  375. }
  376. // TimeoutNowArgs is the arguments for TimeoutNow RPC (leadership transfer)
  377. type TimeoutNowArgs struct {
  378. Term uint64 `json:"term"`
  379. LeaderID string `json:"leader_id"`
  380. }
  381. // TimeoutNowReply is the response for TimeoutNow RPC
  382. type TimeoutNowReply struct {
  383. Term uint64 `json:"term"`
  384. Success bool `json:"success"`
  385. }
  386. // ReadIndexArgs is the arguments for ReadIndex RPC
  387. type ReadIndexArgs struct {
  388. // Empty - just need to confirm leadership
  389. }
  390. // ReadIndexReply is the response for ReadIndex RPC
  391. type ReadIndexReply struct {
  392. ReadIndex uint64 `json:"read_index"`
  393. Success bool `json:"success"`
  394. Error string `json:"error,omitempty"`
  395. }
  396. // RPC message types for network communication
  397. type RPCType int
  398. const (
  399. RPCRequestVote RPCType = iota
  400. RPCAppendEntries
  401. RPCInstallSnapshot
  402. RPCPropose // For request forwarding
  403. RPCAddNode // For AddNode forwarding
  404. RPCRemoveNode // For RemoveNode forwarding
  405. RPCTimeoutNow // For leadership transfer
  406. RPCReadIndex // For linearizable reads
  407. RPCGet // For remote KV reads
  408. )
  409. // ProposeArgs is the arguments for Propose RPC (forwarding)
  410. type ProposeArgs struct {
  411. Command []byte `json:"command"`
  412. }
  413. // ProposeReply is the response for Propose RPC
  414. type ProposeReply struct {
  415. Success bool `json:"success"`
  416. Index uint64 `json:"index,omitempty"`
  417. Term uint64 `json:"term,omitempty"`
  418. Error string `json:"error,omitempty"`
  419. }
  420. // AddNodeArgs is the arguments for AddNode RPC (forwarding)
  421. type AddNodeArgs struct {
  422. NodeID string `json:"node_id"`
  423. Address string `json:"address"`
  424. }
  425. // AddNodeReply is the response for AddNode RPC
  426. type AddNodeReply struct {
  427. Success bool `json:"success"`
  428. Error string `json:"error,omitempty"`
  429. }
  430. // RemoveNodeArgs is the arguments for RemoveNode RPC (forwarding)
  431. type RemoveNodeArgs struct {
  432. NodeID string `json:"node_id"`
  433. }
  434. // RemoveNodeReply is the response for RemoveNode RPC
  435. type RemoveNodeReply struct {
  436. Success bool `json:"success"`
  437. Error string `json:"error,omitempty"`
  438. }
  439. // GetArgs is the arguments for Get RPC (for remote KV reads)
  440. type GetArgs struct {
  441. Key string `json:"key"`
  442. }
  443. // GetReply is the response for Get RPC
  444. type GetReply struct {
  445. Value string `json:"value,omitempty"`
  446. Found bool `json:"found"`
  447. Error string `json:"error,omitempty"`
  448. }
  449. // RPCMessage wraps all RPC types for network transmission
  450. type RPCMessage struct {
  451. Type RPCType `json:"type"`
  452. Payload interface{} `json:"payload"`
  453. }
  454. // RPCResponse wraps all RPC response types
  455. type RPCResponse struct {
  456. Type RPCType `json:"type"`
  457. Payload interface{} `json:"payload"`
  458. Error string `json:"error,omitempty"`
  459. }