raft.go 73 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847
  1. package raft
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "os"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. )
  12. // Raft represents a Raft consensus node
  13. type Raft struct {
  14. mu sync.RWMutex
  15. // Node identity
  16. nodeID string
  17. peers []string
  18. // Cluster membership - maps nodeID to address
  19. clusterNodes map[string]string
  20. // Current state
  21. state NodeState
  22. currentTerm uint64
  23. votedFor string
  24. leaderID string
  25. // Log management
  26. log *LogManager
  27. storage Storage
  28. commitIndex uint64
  29. lastApplied uint64
  30. // Leader state
  31. nextIndex map[string]uint64
  32. matchIndex map[string]uint64
  33. // Configuration
  34. config *Config
  35. // Communication
  36. transport Transport
  37. // Channels
  38. applyCh chan ApplyMsg
  39. stopCh chan struct{}
  40. commitCh chan struct{}
  41. // Election timer
  42. electionTimer *time.Timer
  43. heartbeatTimer *time.Timer
  44. // Statistics (deprecated, use metrics instead)
  45. stats Stats
  46. // Metrics for monitoring
  47. metrics Metrics
  48. // Logger
  49. logger Logger
  50. // Running flag
  51. running int32
  52. // Replication trigger channel - used to batch replication requests
  53. replicationCh chan struct{}
  54. // Pending config change - only one config change can be pending at a time
  55. pendingConfigChange bool
  56. // Old cluster nodes - used for majority calculation during config change
  57. // This ensures we use the old cluster size until the config change is committed
  58. oldClusterNodes map[string]string
  59. // Index of the pending config change entry
  60. configChangeIndex uint64
  61. // Joining cluster state - when a standalone node is being added to a cluster
  62. // This prevents the node from starting elections while syncing
  63. joiningCluster bool
  64. joiningClusterTime time.Time
  65. // Compaction state - prevents concurrent compaction
  66. compacting int32
  67. // Dynamic compaction threshold - updated after each compaction
  68. // Next compaction triggers when log size >= this value
  69. // Formula: lastCompactionSize * 1.5 (or initial SnapshotThreshold)
  70. nextCompactionThreshold uint64
  71. // WaitGroup to track goroutines for clean shutdown
  72. wg sync.WaitGroup
  73. // Leadership transfer state
  74. transferring bool
  75. transferTarget string
  76. transferDeadline time.Time
  77. // Last heartbeat received (for health check)
  78. lastHeartbeat time.Time
  79. // Start time (for uptime calculation)
  80. startTime time.Time
  81. // ReadIndex waiting queue
  82. readIndexCh chan *readIndexRequest
  83. // Snapshot receiving state (for chunked transfer)
  84. pendingSnapshot *pendingSnapshotState
  85. // Last contact time for each peer (for leader check)
  86. lastContact map[string]time.Time
  87. }
  88. // readIndexRequest represents a pending read index request
  89. type readIndexRequest struct {
  90. readIndex uint64
  91. done chan error
  92. }
  93. // pendingSnapshotState tracks chunked snapshot reception
  94. type pendingSnapshotState struct {
  95. lastIncludedIndex uint64
  96. lastIncludedTerm uint64
  97. data []byte
  98. receivedBytes uint64
  99. }
  100. // Stats holds runtime statistics
  101. type Stats struct {
  102. Term uint64
  103. State string
  104. FirstLogIndex uint64 // Added for monitoring compaction
  105. LastLogIndex uint64
  106. LastLogTerm uint64
  107. CommitIndex uint64
  108. LastApplied uint64
  109. LeaderID string
  110. VotesReceived int
  111. AppendsSent int64
  112. AppendsReceived int64
  113. ClusterSize int // Number of nodes in cluster
  114. ClusterNodes map[string]string // NodeID -> Address mapping
  115. }
  116. // ConsoleLogger implements Logger with console output
  117. type ConsoleLogger struct {
  118. Prefix string
  119. Level int // 0=debug, 1=info, 2=warn, 3=error
  120. mu sync.Mutex
  121. }
  122. func NewConsoleLogger(prefix string, level int) *ConsoleLogger {
  123. return &ConsoleLogger{Prefix: prefix, Level: level}
  124. }
  125. func (l *ConsoleLogger) log(level int, levelStr, format string, args ...interface{}) {
  126. if level < l.Level {
  127. return
  128. }
  129. l.mu.Lock()
  130. defer l.mu.Unlock()
  131. msg := fmt.Sprintf(format, args...)
  132. fmt.Printf("[%s] %s [%s] %s\n", time.Now().Format("15:04:05.000"), l.Prefix, levelStr, msg)
  133. }
  134. func (l *ConsoleLogger) Debug(format string, args ...interface{}) {
  135. l.log(0, "DEBUG", format, args...)
  136. }
  137. func (l *ConsoleLogger) Info(format string, args ...interface{}) {
  138. l.log(1, "INFO", format, args...)
  139. }
  140. func (l *ConsoleLogger) Warn(format string, args ...interface{}) {
  141. l.log(2, "WARN", format, args...)
  142. }
  143. func (l *ConsoleLogger) Error(format string, args ...interface{}) {
  144. l.log(3, "ERROR", format, args...)
  145. }
  146. // NewRaft creates a new Raft node
  147. func NewRaft(config *Config, transport Transport, applyCh chan ApplyMsg) (*Raft, error) {
  148. if config.Logger == nil {
  149. config.Logger = NewConsoleLogger(config.NodeID, 1)
  150. }
  151. // Create storage
  152. storage, err := NewHybridStorage(config.DataDir, config.MemoryLogCapacity, config.Logger)
  153. if err != nil {
  154. return nil, fmt.Errorf("failed to create storage: %w", err)
  155. }
  156. // Create log manager
  157. logMgr := NewLogManager(storage, config.Logger)
  158. // Load persistent state
  159. state, err := storage.GetState()
  160. if err != nil {
  161. return nil, fmt.Errorf("failed to load state: %w", err)
  162. }
  163. // Load or initialize cluster configuration
  164. clusterNodes := make(map[string]string)
  165. // Try to load saved cluster config first
  166. savedConfig, err := storage.GetClusterConfig()
  167. if err != nil {
  168. return nil, fmt.Errorf("failed to load cluster config: %w", err)
  169. }
  170. if savedConfig != nil && len(savedConfig.Nodes) > 0 {
  171. // Use saved config
  172. clusterNodes = savedConfig.Nodes
  173. config.Logger.Info("Loaded cluster config with %d nodes", len(clusterNodes))
  174. } else {
  175. // Initialize from config
  176. if config.ClusterNodes != nil && len(config.ClusterNodes) > 0 {
  177. for k, v := range config.ClusterNodes {
  178. clusterNodes[k] = v
  179. }
  180. } else {
  181. // Build from Peers + self (backward compatibility)
  182. clusterNodes[config.NodeID] = config.ListenAddr
  183. if config.PeerMap != nil {
  184. for k, v := range config.PeerMap {
  185. clusterNodes[k] = v
  186. }
  187. }
  188. }
  189. // Save initial config
  190. if len(clusterNodes) > 0 {
  191. if err := storage.SaveClusterConfig(&ClusterConfig{Nodes: clusterNodes}); err != nil {
  192. config.Logger.Warn("Failed to save initial cluster config: %v", err)
  193. }
  194. }
  195. }
  196. // Build peers list from cluster nodes (excluding self)
  197. var peers []string
  198. for nodeID, addr := range clusterNodes {
  199. if nodeID != config.NodeID {
  200. peers = append(peers, addr)
  201. }
  202. }
  203. r := &Raft{
  204. nodeID: config.NodeID,
  205. peers: peers,
  206. clusterNodes: clusterNodes,
  207. state: Follower,
  208. currentTerm: state.CurrentTerm,
  209. votedFor: state.VotedFor,
  210. log: logMgr,
  211. storage: storage,
  212. config: config,
  213. transport: transport,
  214. applyCh: applyCh,
  215. stopCh: make(chan struct{}),
  216. commitCh: make(chan struct{}, 100), // Increased buffer for event-driven apply
  217. replicationCh: make(chan struct{}, 1),
  218. nextIndex: make(map[string]uint64),
  219. matchIndex: make(map[string]uint64),
  220. logger: config.Logger,
  221. readIndexCh: make(chan *readIndexRequest, 100),
  222. lastHeartbeat: time.Now(),
  223. lastContact: make(map[string]time.Time),
  224. }
  225. // Initialize metrics
  226. r.metrics.Term = state.CurrentTerm
  227. // Initialize lastApplied and commitIndex from config (if provided)
  228. if config.LastAppliedIndex > 0 {
  229. r.lastApplied = config.LastAppliedIndex
  230. r.commitIndex = config.LastAppliedIndex // Applied implies committed
  231. r.logger.Info("Initialized lastApplied and commitIndex to %d from config", config.LastAppliedIndex)
  232. }
  233. // Set RPC handler
  234. transport.SetRPCHandler(r)
  235. return r, nil
  236. }
  237. // Start starts the Raft node
  238. func (r *Raft) Start() error {
  239. if !atomic.CompareAndSwapInt32(&r.running, 0, 1) {
  240. return fmt.Errorf("already running")
  241. }
  242. // Record start time
  243. r.startTime = time.Now()
  244. // Start transport
  245. if err := r.transport.Start(); err != nil {
  246. return fmt.Errorf("failed to start transport: %w", err)
  247. }
  248. // Restore FSM from snapshot if exists
  249. // This must happen before starting apply loop to ensure FSM state is restored
  250. if err := r.restoreFromSnapshot(); err != nil {
  251. // Suppress warning if it's just a missing file (first start)
  252. if !os.IsNotExist(err) && !strings.Contains(err.Error(), "no such file") {
  253. r.logger.Warn("Failed to restore from snapshot: %v", err)
  254. } else {
  255. r.logger.Info("No snapshot found, starting with empty state")
  256. }
  257. // Continue anyway - the node can still function, just without historical state
  258. }
  259. // Start election timer
  260. r.resetElectionTimer()
  261. // Start background goroutines with WaitGroup tracking
  262. r.wg.Add(4)
  263. go func() {
  264. defer r.wg.Done()
  265. r.applyLoop()
  266. }()
  267. go func() {
  268. defer r.wg.Done()
  269. r.replicationLoop()
  270. }()
  271. go func() {
  272. defer r.wg.Done()
  273. r.mainLoop()
  274. }()
  275. go func() {
  276. defer r.wg.Done()
  277. r.readIndexLoop()
  278. }()
  279. r.logger.Info("Raft node started")
  280. return nil
  281. }
  282. // Stop stops the Raft node
  283. func (r *Raft) Stop() error {
  284. if !atomic.CompareAndSwapInt32(&r.running, 1, 0) {
  285. return fmt.Errorf("not running")
  286. }
  287. // Signal all goroutines to stop
  288. close(r.stopCh)
  289. // Stop timers first to prevent new operations
  290. r.mu.Lock()
  291. if r.electionTimer != nil {
  292. r.electionTimer.Stop()
  293. }
  294. if r.heartbeatTimer != nil {
  295. r.heartbeatTimer.Stop()
  296. }
  297. r.mu.Unlock()
  298. // Wait for all goroutines to finish with timeout
  299. done := make(chan struct{})
  300. go func() {
  301. r.wg.Wait()
  302. close(done)
  303. }()
  304. select {
  305. case <-done:
  306. // All goroutines exited cleanly
  307. case <-time.After(3 * time.Second):
  308. r.logger.Warn("Timeout waiting for goroutines to stop")
  309. }
  310. // Stop transport (has its own timeout)
  311. if err := r.transport.Stop(); err != nil {
  312. r.logger.Error("Failed to stop transport: %v", err)
  313. }
  314. if err := r.storage.Close(); err != nil {
  315. r.logger.Error("Failed to close storage: %v", err)
  316. }
  317. r.logger.Info("Raft node stopped")
  318. return nil
  319. }
  320. // mainLoop is the main event loop
  321. func (r *Raft) mainLoop() {
  322. for {
  323. select {
  324. case <-r.stopCh:
  325. return
  326. default:
  327. }
  328. r.mu.RLock()
  329. state := r.state
  330. r.mu.RUnlock()
  331. switch state {
  332. case Follower:
  333. r.runFollower()
  334. case Candidate:
  335. r.runCandidate()
  336. case Leader:
  337. r.runLeader()
  338. }
  339. }
  340. }
  341. // runFollower handles follower behavior
  342. func (r *Raft) runFollower() {
  343. r.logger.Debug("Running as follower in term %d", r.currentTerm)
  344. for {
  345. select {
  346. case <-r.stopCh:
  347. return
  348. case <-r.electionTimer.C:
  349. r.mu.Lock()
  350. if r.state == Follower {
  351. // If we're joining a cluster, don't start elections
  352. // Give time for the leader to sync us up
  353. if r.joiningCluster {
  354. // Allow joining for up to 30 seconds
  355. if time.Since(r.joiningClusterTime) < 30*time.Second {
  356. r.logger.Debug("Suppressing election during cluster join")
  357. r.resetElectionTimer()
  358. r.mu.Unlock()
  359. continue
  360. }
  361. // Timeout - clear joining state
  362. r.joiningCluster = false
  363. r.logger.Warn("Cluster join timeout, resuming normal election behavior")
  364. }
  365. r.logger.Debug("Election timeout, becoming candidate")
  366. r.state = Candidate
  367. r.leaderID = "" // Clear leaderID so we can vote for self in Pre-Vote
  368. }
  369. r.mu.Unlock()
  370. return
  371. }
  372. }
  373. }
  374. // runCandidate handles candidate behavior (leader election)
  375. // Implements Pre-Vote mechanism to prevent term explosion
  376. func (r *Raft) runCandidate() {
  377. // Phase 1: Pre-Vote (don't increment term yet)
  378. if !r.runPreVote() {
  379. // Pre-vote failed, wait for timeout before retrying
  380. r.mu.Lock()
  381. r.resetElectionTimer()
  382. r.mu.Unlock()
  383. select {
  384. case <-r.stopCh:
  385. return
  386. case <-r.electionTimer.C:
  387. // Timer expired, will retry pre-vote
  388. }
  389. return
  390. }
  391. // Phase 2: Actual election (pre-vote succeeded, now increment term)
  392. r.mu.Lock()
  393. r.currentTerm++
  394. r.votedFor = r.nodeID
  395. r.leaderID = ""
  396. currentTerm := r.currentTerm
  397. // Update metrics
  398. atomic.StoreUint64(&r.metrics.Term, currentTerm)
  399. if err := r.persistState(); err != nil {
  400. r.logger.Error("Failed to persist state during election: %v", err)
  401. r.mu.Unlock()
  402. return // Cannot proceed without persisting state
  403. }
  404. atomic.AddUint64(&r.metrics.ElectionsStarted, 1)
  405. r.resetElectionTimer()
  406. r.mu.Unlock()
  407. r.logger.Debug("Starting election for term %d", currentTerm)
  408. // Get current peers list
  409. r.mu.RLock()
  410. currentPeers := make([]string, len(r.peers))
  411. copy(currentPeers, r.peers)
  412. clusterSize := len(r.clusterNodes)
  413. if clusterSize == 0 {
  414. clusterSize = len(r.peers) + 1
  415. }
  416. r.mu.RUnlock()
  417. // Request actual votes from all peers
  418. votes := 1 // Vote for self
  419. voteCh := make(chan bool, len(currentPeers))
  420. lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
  421. for _, peer := range currentPeers {
  422. go func(peer string) {
  423. args := &RequestVoteArgs{
  424. Term: currentTerm,
  425. CandidateID: r.nodeID,
  426. LastLogIndex: lastLogIndex,
  427. LastLogTerm: lastLogTerm,
  428. PreVote: false,
  429. }
  430. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  431. defer cancel()
  432. reply, err := r.transport.RequestVote(ctx, peer, args)
  433. if err != nil {
  434. r.logger.Debug("RequestVote to %s failed: %v", peer, err)
  435. voteCh <- false
  436. return
  437. }
  438. r.mu.Lock()
  439. defer r.mu.Unlock()
  440. if reply.Term > r.currentTerm {
  441. r.becomeFollower(reply.Term)
  442. voteCh <- false
  443. return
  444. }
  445. voteCh <- reply.VoteGranted
  446. }(peer)
  447. }
  448. // Wait for votes - majority is (clusterSize/2) + 1
  449. needed := clusterSize/2 + 1
  450. // If we already have enough votes (single-node cluster), become leader immediately
  451. if votes >= needed {
  452. r.mu.Lock()
  453. if r.state == Candidate && r.currentTerm == currentTerm {
  454. r.becomeLeader()
  455. }
  456. r.mu.Unlock()
  457. return
  458. }
  459. for i := 0; i < len(currentPeers); i++ {
  460. select {
  461. case <-r.stopCh:
  462. return
  463. case <-r.electionTimer.C:
  464. r.logger.Debug("Election timeout, will start new election")
  465. return
  466. case granted := <-voteCh:
  467. if granted {
  468. votes++
  469. if votes >= needed {
  470. r.mu.Lock()
  471. if r.state == Candidate && r.currentTerm == currentTerm {
  472. r.becomeLeader()
  473. }
  474. r.mu.Unlock()
  475. return
  476. }
  477. }
  478. }
  479. // Check if we're still a candidate
  480. r.mu.RLock()
  481. if r.state != Candidate {
  482. r.mu.RUnlock()
  483. return
  484. }
  485. r.mu.RUnlock()
  486. }
  487. // Election failed, wait for timeout
  488. r.mu.RLock()
  489. stillCandidate := r.state == Candidate
  490. r.mu.RUnlock()
  491. if stillCandidate {
  492. r.logger.Debug("Election failed, waiting for timeout (got %d/%d votes)", votes, needed)
  493. select {
  494. case <-r.stopCh:
  495. return
  496. case <-r.electionTimer.C:
  497. // Timer expired, will start new election
  498. }
  499. }
  500. }
  501. // runPreVote sends pre-vote requests to check if we can win an election
  502. // Returns true if we got majority pre-votes, false otherwise
  503. func (r *Raft) runPreVote() bool {
  504. r.mu.RLock()
  505. currentTerm := r.currentTerm
  506. currentPeers := make([]string, len(r.peers))
  507. copy(currentPeers, r.peers)
  508. clusterSize := len(r.clusterNodes)
  509. if clusterSize == 0 {
  510. clusterSize = len(r.peers) + 1
  511. }
  512. leaderID := r.leaderID
  513. r.mu.RUnlock()
  514. // Pre-vote uses term+1 but doesn't actually increment it
  515. preVoteTerm := currentTerm + 1
  516. lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
  517. r.logger.Debug("Starting pre-vote for term %d", preVoteTerm)
  518. // Per Raft Pre-Vote optimization (§9.6): if we have a known leader,
  519. // don't vote for self. This prevents standalone nodes from constantly
  520. // electing themselves when they should be joining an existing cluster.
  521. preVotes := 0
  522. if leaderID == "" {
  523. preVotes = 1 // Vote for self only if we don't know of a leader
  524. } else {
  525. r.logger.Debug("Pre-vote: not self-voting because we have leader %s", leaderID)
  526. }
  527. preVoteCh := make(chan bool, len(currentPeers))
  528. for _, peer := range currentPeers {
  529. go func(peer string) {
  530. args := &RequestVoteArgs{
  531. Term: preVoteTerm,
  532. CandidateID: r.nodeID,
  533. LastLogIndex: lastLogIndex,
  534. LastLogTerm: lastLogTerm,
  535. PreVote: true,
  536. }
  537. ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
  538. defer cancel()
  539. reply, err := r.transport.RequestVote(ctx, peer, args)
  540. if err != nil {
  541. r.logger.Debug("PreVote to %s failed: %v", peer, err)
  542. preVoteCh <- false
  543. return
  544. }
  545. // For pre-vote, we don't step down even if reply.Term > currentTerm
  546. // because the other node might also be doing pre-vote
  547. preVoteCh <- reply.VoteGranted
  548. }(peer)
  549. }
  550. // Wait for pre-votes with a shorter timeout - majority is (clusterSize/2) + 1
  551. needed := clusterSize/2 + 1
  552. // If we already have enough votes (single-node cluster with no known leader), succeed immediately
  553. if preVotes >= needed {
  554. r.logger.Debug("Pre-vote succeeded immediately (got %d/%d pre-votes)", preVotes, needed)
  555. return true
  556. }
  557. timeout := time.After(500 * time.Millisecond)
  558. for i := 0; i < len(currentPeers); i++ {
  559. select {
  560. case <-r.stopCh:
  561. return false
  562. case <-timeout:
  563. r.logger.Debug("Pre-vote timeout (got %d/%d pre-votes)", preVotes, needed)
  564. return false
  565. case granted := <-preVoteCh:
  566. if granted {
  567. preVotes++
  568. if preVotes >= needed {
  569. r.logger.Debug("Pre-vote succeeded (got %d/%d pre-votes)", preVotes, needed)
  570. return true
  571. }
  572. }
  573. }
  574. // Check if we're still a candidate
  575. r.mu.RLock()
  576. if r.state != Candidate {
  577. r.mu.RUnlock()
  578. return false
  579. }
  580. r.mu.RUnlock()
  581. }
  582. r.logger.Debug("Pre-vote failed (got %d/%d pre-votes)", preVotes, needed)
  583. return false
  584. }
  585. // runLeader handles leader behavior
  586. func (r *Raft) runLeader() {
  587. r.logger.Debug("Running as leader in term %d", r.currentTerm)
  588. // Send initial heartbeat
  589. r.sendHeartbeats()
  590. // Start heartbeat timer
  591. r.mu.Lock()
  592. r.heartbeatTimer = time.NewTimer(r.config.HeartbeatInterval)
  593. r.mu.Unlock()
  594. for {
  595. select {
  596. case <-r.stopCh:
  597. return
  598. case <-r.heartbeatTimer.C:
  599. r.mu.RLock()
  600. if r.state != Leader {
  601. r.mu.RUnlock()
  602. return
  603. }
  604. r.mu.RUnlock()
  605. r.sendHeartbeats()
  606. r.heartbeatTimer.Reset(r.config.HeartbeatInterval)
  607. case <-r.commitCh:
  608. r.updateCommitIndex()
  609. }
  610. }
  611. }
  612. // becomeFollower transitions to follower state
  613. func (r *Raft) becomeFollower(term uint64) {
  614. oldState := r.state
  615. oldTerm := r.currentTerm
  616. r.state = Follower
  617. r.currentTerm = term
  618. // Update metrics
  619. atomic.StoreUint64(&r.metrics.Term, term)
  620. r.votedFor = ""
  621. r.leaderID = ""
  622. // Must persist before responding - use mustPersistState for critical transitions
  623. r.mustPersistState()
  624. r.resetElectionTimer()
  625. // Clear leadership transfer state
  626. r.transferring = false
  627. r.transferTarget = ""
  628. // Only log significant state changes
  629. if oldState == Leader && term > oldTerm {
  630. // Stepping down from leader is notable
  631. r.logger.Debug("Stepped down from leader in term %d", term)
  632. }
  633. }
  634. // becomeLeader transitions to leader state
  635. func (r *Raft) becomeLeader() {
  636. r.state = Leader
  637. r.leaderID = r.nodeID
  638. // Update metrics
  639. atomic.AddUint64(&r.metrics.ElectionsWon, 1)
  640. // Initialize leader state for all peers
  641. lastIndex := r.log.LastIndex()
  642. for nodeID, addr := range r.clusterNodes {
  643. if nodeID != r.nodeID {
  644. r.nextIndex[addr] = lastIndex + 1
  645. r.matchIndex[addr] = 0
  646. }
  647. }
  648. // Also handle legacy peers
  649. for _, peer := range r.peers {
  650. if _, exists := r.nextIndex[peer]; !exists {
  651. r.nextIndex[peer] = lastIndex + 1
  652. r.matchIndex[peer] = 0
  653. }
  654. }
  655. r.logger.Info("Became leader in term %d with %d peers", r.currentTerm, len(r.clusterNodes)-1)
  656. // Append a no-op entry to commit entries from previous terms
  657. // This is a standard Raft optimization - the leader appends a no-op
  658. // entry in its current term to quickly commit all pending entries
  659. noopEntry := LogEntry{
  660. Index: r.log.LastIndex() + 1,
  661. Term: r.currentTerm,
  662. Type: EntryNoop,
  663. Command: nil,
  664. }
  665. if err := r.log.Append(noopEntry); err != nil {
  666. r.logger.Error("Failed to append no-op entry: %v", err)
  667. } else {
  668. r.logger.Debug("Appended no-op entry at index %d for term %d", noopEntry.Index, noopEntry.Term)
  669. }
  670. }
  671. // sendHeartbeats sends AppendEntries RPCs to all peers
  672. func (r *Raft) sendHeartbeats() {
  673. r.mu.RLock()
  674. if r.state != Leader {
  675. r.mu.RUnlock()
  676. return
  677. }
  678. currentTerm := r.currentTerm
  679. leaderCommit := r.commitIndex
  680. // Get all peer addresses
  681. peerAddrs := make([]string, 0, len(r.clusterNodes))
  682. for nodeID, addr := range r.clusterNodes {
  683. if nodeID != r.nodeID {
  684. peerAddrs = append(peerAddrs, addr)
  685. }
  686. }
  687. // Also include legacy peers not in clusterNodes
  688. for _, peer := range r.peers {
  689. found := false
  690. for _, addr := range peerAddrs {
  691. if addr == peer {
  692. found = true
  693. break
  694. }
  695. }
  696. if !found {
  697. peerAddrs = append(peerAddrs, peer)
  698. }
  699. }
  700. r.mu.RUnlock()
  701. for _, peer := range peerAddrs {
  702. go r.replicateToPeer(peer, currentTerm, leaderCommit)
  703. }
  704. }
  705. // replicateToPeer sends AppendEntries to a specific peer
  706. func (r *Raft) replicateToPeer(peer string, term, leaderCommit uint64) {
  707. r.mu.RLock()
  708. if r.state != Leader || r.currentTerm != term {
  709. r.mu.RUnlock()
  710. return
  711. }
  712. nextIndex := r.nextIndex[peer]
  713. r.mu.RUnlock()
  714. // Get entries to send
  715. entries, prevLogIndex, prevLogTerm, err := r.log.GetEntriesForFollower(nextIndex, r.config.MaxLogEntriesPerRequest)
  716. if err == ErrCompacted {
  717. // Need to send snapshot
  718. r.sendSnapshot(peer)
  719. return
  720. }
  721. if err != nil {
  722. r.logger.Debug("Failed to get entries for %s: %v", peer, err)
  723. return
  724. }
  725. args := &AppendEntriesArgs{
  726. Term: term,
  727. LeaderID: r.nodeID,
  728. PrevLogIndex: prevLogIndex,
  729. PrevLogTerm: prevLogTerm,
  730. Entries: entries,
  731. LeaderCommit: leaderCommit,
  732. }
  733. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  734. defer cancel()
  735. reply, err := r.transport.AppendEntries(ctx, peer, args)
  736. if err != nil {
  737. r.logger.Debug("AppendEntries to %s failed: %v", peer, err)
  738. return
  739. }
  740. atomic.AddInt64(&r.stats.AppendsSent, 1)
  741. r.mu.Lock()
  742. defer r.mu.Unlock()
  743. if reply.Term > r.currentTerm {
  744. r.becomeFollower(reply.Term)
  745. return
  746. }
  747. if r.state != Leader || r.currentTerm != term {
  748. return
  749. }
  750. if reply.Success {
  751. // Update contact time
  752. r.lastContact[peer] = time.Now()
  753. // Update nextIndex and matchIndex
  754. if len(entries) > 0 {
  755. newMatchIndex := entries[len(entries)-1].Index
  756. if newMatchIndex > r.matchIndex[peer] {
  757. r.matchIndex[peer] = newMatchIndex
  758. r.nextIndex[peer] = newMatchIndex + 1
  759. // Try to update commit index
  760. select {
  761. case r.commitCh <- struct{}{}:
  762. default:
  763. }
  764. }
  765. }
  766. } else {
  767. // Even if failed (log conflict), we contacted the peer
  768. r.lastContact[peer] = time.Now()
  769. // Decrement nextIndex and retry
  770. if reply.ConflictTerm > 0 {
  771. // Find the last entry of ConflictTerm in our log
  772. found := false
  773. for idx := r.log.LastIndex(); idx >= r.log.FirstIndex(); idx-- {
  774. t, err := r.log.GetTerm(idx)
  775. if err != nil {
  776. break
  777. }
  778. if t == reply.ConflictTerm {
  779. r.nextIndex[peer] = idx + 1
  780. found = true
  781. break
  782. }
  783. if t < reply.ConflictTerm {
  784. break
  785. }
  786. }
  787. if !found {
  788. r.nextIndex[peer] = reply.ConflictIndex
  789. }
  790. } else if reply.ConflictIndex > 0 {
  791. r.nextIndex[peer] = reply.ConflictIndex
  792. } else {
  793. r.nextIndex[peer] = max(1, r.nextIndex[peer]-1)
  794. }
  795. }
  796. }
  797. // sendSnapshot sends a snapshot to a peer with chunked transfer support
  798. func (r *Raft) sendSnapshot(peer string) {
  799. r.mu.RLock()
  800. if r.state != Leader {
  801. r.mu.RUnlock()
  802. return
  803. }
  804. term := r.currentTerm
  805. chunkSize := r.config.SnapshotChunkSize
  806. if chunkSize <= 0 {
  807. chunkSize = 1024 * 1024 // Default 1MB
  808. }
  809. r.mu.RUnlock()
  810. data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
  811. if err != nil {
  812. r.logger.Error("Failed to get snapshot: %v", err)
  813. return
  814. }
  815. atomic.AddUint64(&r.metrics.SnapshotsSent, 1)
  816. r.logger.Info("Sending snapshot to %s: %d bytes, lastIndex=%d, lastTerm=%d",
  817. peer, len(data), lastIndex, lastTerm)
  818. // Send snapshot in chunks
  819. totalSize := len(data)
  820. offset := 0
  821. for offset < totalSize {
  822. // Check if we're still leader
  823. r.mu.RLock()
  824. if r.state != Leader || r.currentTerm != term {
  825. r.mu.RUnlock()
  826. r.logger.Debug("Aborting snapshot send: no longer leader")
  827. return
  828. }
  829. r.mu.RUnlock()
  830. // Calculate chunk size
  831. end := offset + chunkSize
  832. if end > totalSize {
  833. end = totalSize
  834. }
  835. chunk := data[offset:end]
  836. done := end >= totalSize
  837. args := &InstallSnapshotArgs{
  838. Term: term,
  839. LeaderID: r.nodeID,
  840. LastIncludedIndex: lastIndex,
  841. LastIncludedTerm: lastTerm,
  842. Offset: uint64(offset),
  843. Data: chunk,
  844. Done: done,
  845. }
  846. ctx, cancel := context.WithTimeout(context.Background(), r.config.SnapshotRPCTimeout)
  847. reply, err := r.transport.InstallSnapshot(ctx, peer, args)
  848. cancel()
  849. if err != nil {
  850. r.logger.Error("InstallSnapshot chunk to %s failed at offset %d: %v", peer, offset, err)
  851. return
  852. }
  853. if reply.Term > r.currentTerm {
  854. r.mu.Lock()
  855. r.becomeFollower(reply.Term)
  856. r.mu.Unlock()
  857. return
  858. }
  859. if !reply.Success {
  860. r.logger.Error("InstallSnapshot chunk rejected by %s at offset %d", peer, offset)
  861. return
  862. }
  863. offset = end
  864. }
  865. // Snapshot fully sent and accepted
  866. r.mu.Lock()
  867. defer r.mu.Unlock()
  868. if r.state != Leader || r.currentTerm != term {
  869. return
  870. }
  871. r.nextIndex[peer] = lastIndex + 1
  872. r.matchIndex[peer] = lastIndex
  873. r.logger.Info("Snapshot to %s completed, nextIndex=%d", peer, lastIndex+1)
  874. }
  875. // updateCommitIndex updates the commit index based on matchIndex
  876. func (r *Raft) updateCommitIndex() {
  877. r.mu.Lock()
  878. defer r.mu.Unlock()
  879. if r.state != Leader {
  880. return
  881. }
  882. // For config change entries, use the OLD cluster for majority calculation
  883. // This ensures that adding a node doesn't require the new node's vote
  884. // until the config change itself is committed
  885. votingNodes := r.clusterNodes
  886. if r.pendingConfigChange && r.oldClusterNodes != nil {
  887. votingNodes = r.oldClusterNodes
  888. }
  889. // Get current cluster size from voting nodes
  890. clusterSize := len(votingNodes)
  891. if clusterSize == 0 {
  892. clusterSize = len(r.peers) + 1
  893. }
  894. // Find the highest index replicated on a majority
  895. for n := r.log.LastIndex(); n > r.commitIndex; n-- {
  896. term, err := r.log.GetTerm(n)
  897. if err != nil {
  898. continue
  899. }
  900. // Only commit entries from current term
  901. if term != r.currentTerm {
  902. continue
  903. }
  904. // Count replicas (including self)
  905. count := 1 // Self
  906. for nodeID, addr := range votingNodes {
  907. if nodeID != r.nodeID {
  908. if r.matchIndex[addr] >= n {
  909. count++
  910. }
  911. }
  912. }
  913. // Also check legacy peers
  914. for _, peer := range r.peers {
  915. // Avoid double counting if peer is already in votingNodes
  916. alreadyCounted := false
  917. for _, addr := range votingNodes {
  918. if addr == peer {
  919. alreadyCounted = true
  920. break
  921. }
  922. }
  923. if !alreadyCounted && r.matchIndex[peer] >= n {
  924. count++
  925. }
  926. }
  927. // Majority is (n/2) + 1
  928. needed := clusterSize/2 + 1
  929. if count >= needed {
  930. r.commitIndex = n
  931. break
  932. }
  933. }
  934. }
  935. // applyLoop applies committed entries to the state machine
  936. // Uses event-driven model with fallback polling for reliability
  937. func (r *Raft) applyLoop() {
  938. // Use a ticker as fallback for missed signals (matches original 10ms polling)
  939. ticker := time.NewTicker(10 * time.Millisecond)
  940. defer ticker.Stop()
  941. for {
  942. select {
  943. case <-r.stopCh:
  944. return
  945. case <-r.commitCh:
  946. // Event-driven: triggered when commitIndex is updated
  947. r.applyCommitted()
  948. case <-ticker.C:
  949. // Fallback: check periodically in case we missed a signal
  950. r.applyCommitted()
  951. }
  952. }
  953. }
  954. // applyCommitted applies all committed but not yet applied entries
  955. func (r *Raft) applyCommitted() {
  956. r.mu.Lock()
  957. commitIndex := r.commitIndex
  958. lastApplied := r.lastApplied
  959. firstIndex := r.log.FirstIndex()
  960. lastLogIndex := r.log.LastIndex()
  961. r.mu.Unlock()
  962. // Safety check: ensure lastApplied is within valid range
  963. // If lastApplied is below firstIndex (due to snapshot), skip to firstIndex
  964. if lastApplied < firstIndex && firstIndex > 0 {
  965. r.mu.Lock()
  966. r.lastApplied = firstIndex
  967. lastApplied = firstIndex
  968. r.mu.Unlock()
  969. r.logger.Debug("Adjusted lastApplied to firstIndex %d after compaction", firstIndex)
  970. }
  971. // Safety check: don't try to apply beyond what we have in log
  972. if commitIndex > lastLogIndex {
  973. commitIndex = lastLogIndex
  974. }
  975. for lastApplied < commitIndex {
  976. lastApplied++
  977. // Skip if entry has been compacted
  978. if lastApplied < firstIndex {
  979. continue
  980. }
  981. entry, err := r.log.GetEntry(lastApplied)
  982. if err != nil {
  983. // If entry is compacted, skip ahead
  984. if err == ErrCompacted {
  985. r.mu.Lock()
  986. newFirstIndex := r.log.FirstIndex()
  987. if lastApplied < newFirstIndex {
  988. r.lastApplied = newFirstIndex
  989. lastApplied = newFirstIndex
  990. }
  991. r.mu.Unlock()
  992. continue
  993. }
  994. r.logger.Error("Failed to get entry %d: %v (firstIndex=%d, lastIndex=%d)",
  995. lastApplied, err, r.log.FirstIndex(), r.log.LastIndex())
  996. break
  997. }
  998. // Handle config change entries
  999. if entry.Type == EntryConfig {
  1000. r.applyConfigChange(entry)
  1001. r.mu.Lock()
  1002. r.lastApplied = lastApplied
  1003. r.mu.Unlock()
  1004. continue
  1005. }
  1006. // Handle no-op entries (just update lastApplied, don't send to state machine)
  1007. if entry.Type == EntryNoop {
  1008. r.mu.Lock()
  1009. r.lastApplied = lastApplied
  1010. r.mu.Unlock()
  1011. continue
  1012. }
  1013. // Normal command entry
  1014. msg := ApplyMsg{
  1015. CommandValid: true,
  1016. Command: entry.Command,
  1017. CommandIndex: entry.Index,
  1018. CommandTerm: entry.Term,
  1019. }
  1020. select {
  1021. case r.applyCh <- msg:
  1022. case <-r.stopCh:
  1023. return
  1024. }
  1025. r.mu.Lock()
  1026. r.lastApplied = lastApplied
  1027. r.mu.Unlock()
  1028. }
  1029. // Check if log compaction is needed
  1030. // Run asynchronously to avoid blocking the apply loop if snapshotting is slow
  1031. go r.maybeCompactLog()
  1032. }
  1033. // maybeCompactLog checks if automatic log compaction should be triggered
  1034. // It uses a dynamic threshold to prevent "compaction thrashing":
  1035. // - First compaction triggers at SnapshotThreshold (default 100,000)
  1036. // - After compaction, next threshold = current_log_size * 1.5
  1037. // - This prevents every new entry from triggering compaction if log stays large
  1038. func (r *Raft) maybeCompactLog() {
  1039. // Skip if no snapshot provider is configured
  1040. if r.config.SnapshotProvider == nil {
  1041. return
  1042. }
  1043. // Skip if log compaction is explicitly disabled
  1044. if !r.config.LogCompactionEnabled {
  1045. return
  1046. }
  1047. // Check if compaction is already in progress (atomic check-and-set)
  1048. if !atomic.CompareAndSwapInt32(&r.compacting, 0, 1) {
  1049. return
  1050. }
  1051. // Ensure we release the compacting flag when done
  1052. defer atomic.StoreInt32(&r.compacting, 0)
  1053. r.mu.RLock()
  1054. lastApplied := r.lastApplied
  1055. firstIndex := r.log.FirstIndex()
  1056. initialThreshold := r.config.SnapshotThreshold
  1057. minRetention := r.config.SnapshotMinRetention
  1058. isLeader := r.state == Leader
  1059. dynamicThreshold := r.nextCompactionThreshold
  1060. r.mu.RUnlock()
  1061. // Guard against underflow: ensure lastApplied > firstIndex
  1062. if lastApplied <= firstIndex {
  1063. return
  1064. }
  1065. // Calculate current log size
  1066. logSize := lastApplied - firstIndex
  1067. // Determine effective threshold:
  1068. // - Use initial threshold if no compaction has occurred yet (dynamicThreshold == 0)
  1069. // - Otherwise use the dynamic threshold
  1070. effectiveThreshold := initialThreshold
  1071. if dynamicThreshold > 0 {
  1072. effectiveThreshold = dynamicThreshold
  1073. }
  1074. // Check if we have enough entries to warrant compaction
  1075. if logSize <= effectiveThreshold {
  1076. return
  1077. }
  1078. // Guard against underflow: ensure lastApplied > minRetention
  1079. if lastApplied <= minRetention {
  1080. return
  1081. }
  1082. // Calculate the safe compaction point
  1083. // We need to retain at least minRetention entries for follower catch-up
  1084. compactUpTo := lastApplied - minRetention
  1085. if compactUpTo <= firstIndex {
  1086. return // Not enough entries to compact while maintaining retention
  1087. }
  1088. // For leader, also consider the minimum nextIndex of all followers
  1089. // to avoid compacting entries that followers still need
  1090. if isLeader {
  1091. r.mu.RLock()
  1092. minNextIndex := lastApplied
  1093. for _, nextIdx := range r.nextIndex {
  1094. if nextIdx < minNextIndex {
  1095. minNextIndex = nextIdx
  1096. }
  1097. }
  1098. r.mu.RUnlock()
  1099. // Don't compact entries that followers still need
  1100. // Keep a buffer of minRetention entries before the slowest follower
  1101. if minNextIndex > minRetention {
  1102. followerSafePoint := minNextIndex - minRetention
  1103. if followerSafePoint < compactUpTo {
  1104. compactUpTo = followerSafePoint
  1105. }
  1106. } else {
  1107. // Slowest follower is too far behind, don't compact
  1108. // They will need a full snapshot anyway
  1109. compactUpTo = firstIndex // Effectively skip compaction
  1110. }
  1111. }
  1112. // Final check - make sure we're actually compacting something meaningful
  1113. if compactUpTo <= firstIndex {
  1114. return
  1115. }
  1116. // Get snapshot from application layer
  1117. snapshotData, err := r.config.SnapshotProvider(compactUpTo)
  1118. if err != nil {
  1119. r.logger.Error("Failed to get snapshot from provider: %v", err)
  1120. return
  1121. }
  1122. // Get the term at the compaction point
  1123. term, err := r.log.GetTerm(compactUpTo)
  1124. if err != nil {
  1125. r.logger.Error("Failed to get term for compaction index %d: %v", compactUpTo, err)
  1126. return
  1127. }
  1128. // Save snapshot
  1129. if err := r.storage.SaveSnapshot(snapshotData, compactUpTo, term); err != nil {
  1130. r.logger.Error("Failed to save snapshot: %v", err)
  1131. return
  1132. }
  1133. // Compact the log
  1134. // CRITICAL: We must hold the write lock during compaction to prevent
  1135. // concurrent writes (AppendEntries) unless we can guarantee underlying
  1136. // file integrity with concurrent modification. The requirement is to
  1137. // pause writes during compaction.
  1138. r.mu.Lock()
  1139. if err := r.log.Compact(compactUpTo); err != nil {
  1140. r.mu.Unlock()
  1141. r.logger.Error("Failed to compact log: %v", err)
  1142. return
  1143. }
  1144. r.mu.Unlock()
  1145. // Calculate new log size after compaction
  1146. newLogSize := lastApplied - compactUpTo
  1147. // Update dynamic threshold for next compaction
  1148. // We use a linear growth model: next threshold = current size + SnapshotThreshold
  1149. // This ensures that we trigger compaction roughly every SnapshotThreshold entries.
  1150. r.mu.Lock()
  1151. r.nextCompactionThreshold = newLogSize + r.config.SnapshotThreshold
  1152. // Ensure threshold doesn't go below the initial threshold
  1153. if r.nextCompactionThreshold < initialThreshold {
  1154. r.nextCompactionThreshold = initialThreshold
  1155. }
  1156. r.mu.Unlock()
  1157. r.logger.Info("Auto compaction completed: compacted up to index %d (term %d), log size %d -> %d, next threshold: %d",
  1158. compactUpTo, term, logSize, newLogSize, r.nextCompactionThreshold)
  1159. }
  1160. // resetElectionTimer resets the election timeout
  1161. func (r *Raft) resetElectionTimer() {
  1162. timeout := r.config.ElectionTimeoutMin +
  1163. time.Duration(rand.Int63n(int64(r.config.ElectionTimeoutMax-r.config.ElectionTimeoutMin)))
  1164. if r.electionTimer == nil {
  1165. r.electionTimer = time.NewTimer(timeout)
  1166. } else {
  1167. if !r.electionTimer.Stop() {
  1168. select {
  1169. case <-r.electionTimer.C:
  1170. default:
  1171. }
  1172. }
  1173. r.electionTimer.Reset(timeout)
  1174. }
  1175. }
  1176. // persistState saves the current state to stable storage
  1177. // Returns error if persistence fails - caller MUST handle this for safety
  1178. func (r *Raft) persistState() error {
  1179. state := &PersistentState{
  1180. CurrentTerm: r.currentTerm,
  1181. VotedFor: r.votedFor,
  1182. }
  1183. if err := r.storage.SaveState(state); err != nil {
  1184. r.logger.Error("Failed to persist state: %v", err)
  1185. return fmt.Errorf("%w: %v", ErrPersistFailed, err)
  1186. }
  1187. return nil
  1188. }
  1189. // mustPersistState saves state and panics on failure
  1190. // Use this only in critical paths where failure is unrecoverable
  1191. func (r *Raft) mustPersistState() {
  1192. if err := r.persistState(); err != nil {
  1193. // In production, you might want to trigger a graceful shutdown instead
  1194. r.logger.Error("CRITICAL: Failed to persist state, node may be in inconsistent state: %v", err)
  1195. panic(err)
  1196. }
  1197. }
  1198. // HandleRequestVote handles RequestVote RPCs (including pre-vote)
  1199. func (r *Raft) HandleRequestVote(args *RequestVoteArgs) *RequestVoteReply {
  1200. r.mu.Lock()
  1201. defer r.mu.Unlock()
  1202. reply := &RequestVoteReply{
  1203. Term: r.currentTerm,
  1204. VoteGranted: false,
  1205. }
  1206. // Handle pre-vote separately
  1207. if args.PreVote {
  1208. return r.handlePreVote(args)
  1209. }
  1210. // Reply false if term < currentTerm
  1211. if args.Term < r.currentTerm {
  1212. return reply
  1213. }
  1214. // If term > currentTerm, become follower
  1215. if args.Term > r.currentTerm {
  1216. r.becomeFollower(args.Term)
  1217. }
  1218. reply.Term = r.currentTerm
  1219. // Check if we can vote for this candidate
  1220. if (r.votedFor == "" || r.votedFor == args.CandidateID) &&
  1221. r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
  1222. r.votedFor = args.CandidateID
  1223. if err := r.persistState(); err != nil {
  1224. // Cannot grant vote if we can't persist the decision
  1225. r.logger.Error("Failed to persist vote for %s: %v", args.CandidateID, err)
  1226. return reply
  1227. }
  1228. r.resetElectionTimer()
  1229. reply.VoteGranted = true
  1230. r.logger.Debug("Granted vote to %s for term %d", args.CandidateID, args.Term)
  1231. }
  1232. return reply
  1233. }
  1234. // handlePreVote handles pre-vote requests
  1235. // Pre-vote doesn't change our state, just checks if we would vote
  1236. func (r *Raft) handlePreVote(args *RequestVoteArgs) *RequestVoteReply {
  1237. reply := &RequestVoteReply{
  1238. Term: r.currentTerm,
  1239. VoteGranted: false,
  1240. }
  1241. // For pre-vote, we check:
  1242. // 1. The candidate's term is at least as high as ours
  1243. // 2. The candidate's log is at least as up-to-date as ours
  1244. // 3. We don't have a current leader (or the candidate's term is higher)
  1245. if args.Term < r.currentTerm {
  1246. return reply
  1247. }
  1248. // Per Raft Pre-Vote optimization (§9.6): reject pre-vote if we have a current
  1249. // leader and the candidate's term is not higher than ours. This prevents
  1250. // disruptive elections when a partitioned node tries to rejoin.
  1251. if r.leaderID != "" && args.Term <= r.currentTerm {
  1252. r.logger.Debug("Rejecting pre-vote from %s: have leader %s", args.CandidateID, r.leaderID)
  1253. return reply
  1254. }
  1255. // Grant pre-vote if log is up-to-date
  1256. // Note: we don't check votedFor for pre-vote, and we don't update any state
  1257. if r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
  1258. reply.VoteGranted = true
  1259. r.logger.Debug("Granted pre-vote to %s for term %d", args.CandidateID, args.Term)
  1260. }
  1261. return reply
  1262. }
  1263. // HandleAppendEntries handles AppendEntries RPCs
  1264. func (r *Raft) HandleAppendEntries(args *AppendEntriesArgs) *AppendEntriesReply {
  1265. r.mu.Lock()
  1266. defer r.mu.Unlock()
  1267. atomic.AddInt64(&r.stats.AppendsReceived, 1)
  1268. reply := &AppendEntriesReply{
  1269. Term: r.currentTerm,
  1270. Success: false,
  1271. }
  1272. // Check if we're a standalone node being added to a cluster
  1273. // A standalone node has only itself in clusterNodes
  1274. isStandalone := len(r.clusterNodes) == 1
  1275. if _, hasSelf := r.clusterNodes[r.nodeID]; isStandalone && hasSelf {
  1276. // We're standalone and receiving AppendEntries from an external leader
  1277. // This means we're being added to a cluster - suppress elections
  1278. if !r.joiningCluster {
  1279. r.joiningCluster = true
  1280. r.joiningClusterTime = time.Now()
  1281. r.logger.Info("Detected cluster join in progress, suppressing elections")
  1282. }
  1283. // When joining, accept higher terms from the leader to sync up
  1284. if args.Term > r.currentTerm {
  1285. r.becomeFollower(args.Term)
  1286. }
  1287. }
  1288. // Reply false if term < currentTerm
  1289. if args.Term < r.currentTerm {
  1290. // But still reset timer if we're joining a cluster to prevent elections
  1291. if r.joiningCluster {
  1292. r.resetElectionTimer()
  1293. }
  1294. return reply
  1295. }
  1296. // If term > currentTerm, or we're a candidate, or we're a leader receiving
  1297. // AppendEntries from another leader (split-brain scenario during cluster merge),
  1298. // become follower. In Raft, there can only be one leader per term.
  1299. if args.Term > r.currentTerm || r.state == Candidate || r.state == Leader {
  1300. r.becomeFollower(args.Term)
  1301. }
  1302. // Update leader info and reset election timer
  1303. r.leaderID = args.LeaderID
  1304. r.lastHeartbeat = time.Now()
  1305. r.resetElectionTimer()
  1306. reply.Term = r.currentTerm
  1307. // Try to append entries
  1308. success, conflictIndex, conflictTerm := r.log.AppendEntriesFromLeader(
  1309. args.PrevLogIndex, args.PrevLogTerm, args.Entries)
  1310. if !success {
  1311. reply.ConflictIndex = conflictIndex
  1312. reply.ConflictTerm = conflictTerm
  1313. return reply
  1314. }
  1315. reply.Success = true
  1316. // Update commit index safely
  1317. if args.LeaderCommit > r.commitIndex {
  1318. // Get our actual last log index
  1319. lastLogIndex := r.log.LastIndex()
  1320. // Calculate what index the entries would have reached
  1321. lastNewEntry := args.PrevLogIndex
  1322. if len(args.Entries) > 0 {
  1323. lastNewEntry = args.Entries[len(args.Entries)-1].Index
  1324. }
  1325. // Commit index should not exceed what we actually have in log
  1326. newCommitIndex := args.LeaderCommit
  1327. if newCommitIndex > lastNewEntry {
  1328. newCommitIndex = lastNewEntry
  1329. }
  1330. if newCommitIndex > lastLogIndex {
  1331. newCommitIndex = lastLogIndex
  1332. }
  1333. // Only advance commit index
  1334. if newCommitIndex > r.commitIndex {
  1335. r.commitIndex = newCommitIndex
  1336. }
  1337. }
  1338. return reply
  1339. }
  1340. // HandleInstallSnapshot handles InstallSnapshot RPCs with chunked transfer support
  1341. func (r *Raft) HandleInstallSnapshot(args *InstallSnapshotArgs) *InstallSnapshotReply {
  1342. r.mu.Lock()
  1343. defer r.mu.Unlock()
  1344. reply := &InstallSnapshotReply{
  1345. Term: r.currentTerm,
  1346. Success: false,
  1347. }
  1348. if args.Term < r.currentTerm {
  1349. return reply
  1350. }
  1351. if args.Term > r.currentTerm {
  1352. r.becomeFollower(args.Term)
  1353. }
  1354. r.leaderID = args.LeaderID
  1355. r.lastHeartbeat = time.Now()
  1356. r.resetElectionTimer()
  1357. reply.Term = r.currentTerm
  1358. // Skip if we already have this or a newer snapshot applied
  1359. if args.LastIncludedIndex <= r.lastApplied {
  1360. r.logger.Debug("Ignoring snapshot at index %d, already applied up to %d",
  1361. args.LastIncludedIndex, r.lastApplied)
  1362. reply.Success = true // Still success to let leader know we don't need it
  1363. return reply
  1364. }
  1365. // Handle chunked transfer
  1366. if args.Offset == 0 {
  1367. // First chunk - start new pending snapshot
  1368. r.pendingSnapshot = &pendingSnapshotState{
  1369. lastIncludedIndex: args.LastIncludedIndex,
  1370. lastIncludedTerm: args.LastIncludedTerm,
  1371. data: make([]byte, 0),
  1372. receivedBytes: 0,
  1373. }
  1374. r.logger.Info("Starting snapshot reception at index %d, term %d",
  1375. args.LastIncludedIndex, args.LastIncludedTerm)
  1376. }
  1377. // Validate we're receiving the expected snapshot
  1378. if r.pendingSnapshot == nil ||
  1379. r.pendingSnapshot.lastIncludedIndex != args.LastIncludedIndex ||
  1380. r.pendingSnapshot.lastIncludedTerm != args.LastIncludedTerm {
  1381. r.logger.Warn("Unexpected snapshot chunk: expected index %d, got %d",
  1382. r.pendingSnapshot.lastIncludedIndex, args.LastIncludedIndex)
  1383. return reply
  1384. }
  1385. // Validate offset matches what we've received
  1386. if uint64(args.Offset) != r.pendingSnapshot.receivedBytes {
  1387. r.logger.Warn("Unexpected chunk offset: expected %d, got %d",
  1388. r.pendingSnapshot.receivedBytes, args.Offset)
  1389. return reply
  1390. }
  1391. // Append chunk data
  1392. r.pendingSnapshot.data = append(r.pendingSnapshot.data, args.Data...)
  1393. r.pendingSnapshot.receivedBytes += uint64(len(args.Data))
  1394. reply.Success = true
  1395. r.logger.Debug("Received snapshot chunk: offset=%d, size=%d, done=%v",
  1396. args.Offset, len(args.Data), args.Done)
  1397. // If not done, wait for more chunks
  1398. if !args.Done {
  1399. return reply
  1400. }
  1401. // All chunks received - apply the snapshot
  1402. r.logger.Info("Installing complete snapshot: %d bytes at index %d, term %d",
  1403. len(r.pendingSnapshot.data), args.LastIncludedIndex, args.LastIncludedTerm)
  1404. atomic.AddUint64(&r.metrics.SnapshotsInstalled, 1)
  1405. // Save snapshot
  1406. if err := r.storage.SaveSnapshot(r.pendingSnapshot.data, args.LastIncludedIndex, args.LastIncludedTerm); err != nil {
  1407. r.logger.Error("Failed to save snapshot: %v", err)
  1408. r.pendingSnapshot = nil
  1409. reply.Success = false
  1410. return reply
  1411. }
  1412. // Compact log
  1413. if err := r.log.Compact(args.LastIncludedIndex); err != nil {
  1414. r.logger.Error("Failed to compact log: %v", err)
  1415. }
  1416. // Update state - must update both commitIndex and lastApplied
  1417. if args.LastIncludedIndex > r.commitIndex {
  1418. r.commitIndex = args.LastIncludedIndex
  1419. }
  1420. // Always update lastApplied to snapshot index to prevent trying to apply compacted entries
  1421. r.lastApplied = args.LastIncludedIndex
  1422. // Send snapshot to application (non-blocking with timeout)
  1423. // Use the complete pendingSnapshot data, not the last chunk
  1424. msg := ApplyMsg{
  1425. SnapshotValid: true,
  1426. Snapshot: r.pendingSnapshot.data,
  1427. SnapshotIndex: args.LastIncludedIndex,
  1428. SnapshotTerm: args.LastIncludedTerm,
  1429. }
  1430. // Clear pending snapshot
  1431. r.pendingSnapshot = nil
  1432. // Try to send, but don't block indefinitely
  1433. select {
  1434. case r.applyCh <- msg:
  1435. r.logger.Debug("Sent snapshot to application")
  1436. case <-time.After(100 * time.Millisecond):
  1437. r.logger.Warn("Timeout sending snapshot to application, will retry")
  1438. // The application will still get correct state via normal apply loop
  1439. }
  1440. return reply
  1441. }
  1442. // Propose proposes a new command to be replicated
  1443. func (r *Raft) Propose(command []byte) (uint64, uint64, bool) {
  1444. r.mu.Lock()
  1445. defer r.mu.Unlock()
  1446. if r.state != Leader {
  1447. return 0, 0, false
  1448. }
  1449. // Check connectivity (Lease Check)
  1450. // If we haven't heard from a majority of peers within ElectionTimeout,
  1451. // we shouldn't accept new commands because we might be partitioned.
  1452. if len(r.clusterNodes) > 1 {
  1453. activePeers := 1 // Self
  1454. now := time.Now()
  1455. timeout := r.config.ElectionTimeoutMax
  1456. for _, peer := range r.peers {
  1457. if last, ok := r.lastContact[peer]; ok && now.Sub(last) < timeout {
  1458. activePeers++
  1459. }
  1460. }
  1461. // Check majority
  1462. needed := len(r.clusterNodes)/2 + 1
  1463. if activePeers < needed {
  1464. r.logger.Warn("Rejecting Propose: lost contact with majority (active: %d, needed: %d)", activePeers, needed)
  1465. return 0, 0, false
  1466. }
  1467. }
  1468. index, err := r.log.AppendCommand(r.currentTerm, command)
  1469. if err != nil {
  1470. r.logger.Error("Failed to append command: %v", err)
  1471. return 0, 0, false
  1472. }
  1473. r.matchIndex[r.nodeID] = index
  1474. // For single-node cluster, we are the only voter and can commit immediately
  1475. // This fixes the issue where commitCh never gets triggered without other peers
  1476. if len(r.clusterNodes) <= 1 && len(r.peers) == 0 {
  1477. // Single node: self is majority, trigger commit immediately
  1478. select {
  1479. case r.commitCh <- struct{}{}:
  1480. default:
  1481. }
  1482. } else {
  1483. // Multi-node: trigger replication to other nodes
  1484. r.triggerReplication()
  1485. }
  1486. return index, r.currentTerm, true
  1487. }
  1488. // triggerReplication signals the replication loop to send heartbeats
  1489. // This uses a non-blocking send to batch replication requests
  1490. func (r *Raft) triggerReplication() {
  1491. select {
  1492. case r.replicationCh <- struct{}{}:
  1493. default:
  1494. // Replication already scheduled
  1495. }
  1496. }
  1497. // replicationLoop handles batched replication
  1498. // Uses simple delay-based batching: flush immediately when signaled, then wait
  1499. // to allow more requests to accumulate before the next flush.
  1500. func (r *Raft) replicationLoop() {
  1501. for {
  1502. select {
  1503. case <-r.stopCh:
  1504. return
  1505. case <-r.replicationCh:
  1506. // Flush and replicate immediately
  1507. r.flushAndReplicate()
  1508. // Wait briefly to allow batching of subsequent requests
  1509. // This gives time for more proposals to queue up before the next flush
  1510. time.Sleep(10 * time.Millisecond)
  1511. }
  1512. }
  1513. }
  1514. // flushAndReplicate flushes logs and sends heartbeats
  1515. func (r *Raft) flushAndReplicate() {
  1516. // Ensure logs are flushed to OS cache before sending to followers
  1517. // This implements Group Commit with Flush (fast) instead of Sync (slow)
  1518. if err := r.log.Flush(); err != nil {
  1519. r.logger.Error("Failed to flush log: %v", err)
  1520. }
  1521. r.sendHeartbeats()
  1522. }
  1523. // ProposeWithForward proposes a command, forwarding to leader if necessary
  1524. // This is the recommended method for applications to use
  1525. func (r *Raft) ProposeWithForward(command []byte) (index uint64, term uint64, err error) {
  1526. // Try local propose first
  1527. idx, t, isLeader := r.Propose(command)
  1528. if isLeader {
  1529. return idx, t, nil
  1530. }
  1531. // Not leader, forward to leader
  1532. r.mu.RLock()
  1533. leaderID := r.leaderID
  1534. // Use clusterNodes (dynamically maintained) to find leader address
  1535. leaderAddr := r.clusterNodes[leaderID]
  1536. r.mu.RUnlock()
  1537. if leaderID == "" {
  1538. return 0, 0, fmt.Errorf("no leader available")
  1539. }
  1540. if leaderAddr == "" {
  1541. return 0, 0, fmt.Errorf("leader %s address not found in cluster", leaderID)
  1542. }
  1543. // Forward to leader
  1544. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  1545. defer cancel()
  1546. args := &ProposeArgs{Command: command}
  1547. reply, err := r.transport.ForwardPropose(ctx, leaderAddr, args)
  1548. if err != nil {
  1549. return 0, 0, fmt.Errorf("forward failed: %w", err)
  1550. }
  1551. if !reply.Success {
  1552. return 0, 0, fmt.Errorf("leader rejected: %s", reply.Error)
  1553. }
  1554. return reply.Index, reply.Term, nil
  1555. }
  1556. // ForwardGet forwards a get request to the leader
  1557. func (r *Raft) ForwardGet(key string) (string, bool, error) {
  1558. // Check if we are leader (local read)
  1559. if r.state == Leader {
  1560. if r.config.GetHandler != nil {
  1561. val, found := r.config.GetHandler(key)
  1562. return val, found, nil
  1563. }
  1564. return "", false, fmt.Errorf("get handler not configured")
  1565. }
  1566. r.mu.RLock()
  1567. leaderID := r.leaderID
  1568. leaderAddr := r.clusterNodes[leaderID]
  1569. r.mu.RUnlock()
  1570. if leaderID == "" {
  1571. return "", false, ErrNoLeader
  1572. }
  1573. if leaderAddr == "" {
  1574. return "", false, fmt.Errorf("leader %s address not found", leaderID)
  1575. }
  1576. // Forward to leader
  1577. ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
  1578. defer cancel()
  1579. args := &GetArgs{Key: key}
  1580. reply, err := r.transport.ForwardGet(ctx, leaderAddr, args)
  1581. if err != nil {
  1582. return "", false, fmt.Errorf("forward failed: %w", err)
  1583. }
  1584. return reply.Value, reply.Found, nil
  1585. }
  1586. // HandlePropose handles forwarded propose requests
  1587. func (r *Raft) HandlePropose(args *ProposeArgs) *ProposeReply {
  1588. index, term, isLeader := r.Propose(args.Command)
  1589. if !isLeader {
  1590. return &ProposeReply{
  1591. Success: false,
  1592. Error: "not leader",
  1593. }
  1594. }
  1595. return &ProposeReply{
  1596. Success: true,
  1597. Index: index,
  1598. Term: term,
  1599. }
  1600. }
  1601. // HandleAddNode handles forwarded AddNode requests
  1602. func (r *Raft) HandleAddNode(args *AddNodeArgs) *AddNodeReply {
  1603. err := r.AddNode(args.NodeID, args.Address)
  1604. if err != nil {
  1605. return &AddNodeReply{
  1606. Success: false,
  1607. Error: err.Error(),
  1608. }
  1609. }
  1610. return &AddNodeReply{
  1611. Success: true,
  1612. }
  1613. }
  1614. // HandleRemoveNode handles forwarded RemoveNode requests
  1615. func (r *Raft) HandleRemoveNode(args *RemoveNodeArgs) *RemoveNodeReply {
  1616. err := r.RemoveNode(args.NodeID)
  1617. if err != nil {
  1618. return &RemoveNodeReply{
  1619. Success: false,
  1620. Error: err.Error(),
  1621. }
  1622. }
  1623. return &RemoveNodeReply{
  1624. Success: true,
  1625. }
  1626. }
  1627. // GetState returns the current term and whether this node is leader
  1628. func (r *Raft) GetState() (uint64, bool) {
  1629. r.mu.RLock()
  1630. defer r.mu.RUnlock()
  1631. return r.currentTerm, r.state == Leader
  1632. }
  1633. // GetLeaderID returns the current leader ID
  1634. func (r *Raft) GetLeaderID() string {
  1635. r.mu.RLock()
  1636. defer r.mu.RUnlock()
  1637. return r.leaderID
  1638. }
  1639. // GetStats returns runtime statistics
  1640. func (r *Raft) GetStats() Stats {
  1641. r.mu.RLock()
  1642. defer r.mu.RUnlock()
  1643. lastIndex, lastTerm := r.log.LastIndexAndTerm()
  1644. firstIndex := r.log.FirstIndex()
  1645. // Copy cluster nodes
  1646. nodes := make(map[string]string)
  1647. for k, v := range r.clusterNodes {
  1648. nodes[k] = v
  1649. }
  1650. clusterSize := len(r.clusterNodes)
  1651. if clusterSize == 0 {
  1652. clusterSize = len(r.peers) + 1
  1653. }
  1654. return Stats{
  1655. Term: r.currentTerm,
  1656. State: r.state.String(),
  1657. FirstLogIndex: firstIndex,
  1658. LastLogIndex: lastIndex,
  1659. LastLogTerm: lastTerm,
  1660. CommitIndex: r.commitIndex,
  1661. LastApplied: r.lastApplied,
  1662. LeaderID: r.leaderID,
  1663. AppendsSent: atomic.LoadInt64(&r.stats.AppendsSent),
  1664. AppendsReceived: atomic.LoadInt64(&r.stats.AppendsReceived),
  1665. ClusterSize: clusterSize,
  1666. ClusterNodes: nodes,
  1667. }
  1668. }
  1669. // restoreFromSnapshot restores the FSM from a snapshot at startup
  1670. // This is called during Start() to ensure the FSM has the correct state
  1671. // before processing any new commands
  1672. func (r *Raft) restoreFromSnapshot() error {
  1673. // Get snapshot from storage
  1674. data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
  1675. if err != nil {
  1676. return fmt.Errorf("failed to get snapshot: %w", err)
  1677. }
  1678. // No snapshot exists
  1679. if len(data) == 0 || lastIndex == 0 {
  1680. return nil
  1681. }
  1682. r.logger.Info("Restoring FSM from snapshot at index %d, term %d (%d bytes)",
  1683. lastIndex, lastTerm, len(data))
  1684. // Update lastApplied to snapshot index to prevent re-applying compacted entries
  1685. r.mu.Lock()
  1686. if lastIndex > r.lastApplied {
  1687. r.lastApplied = lastIndex
  1688. }
  1689. if lastIndex > r.commitIndex {
  1690. r.commitIndex = lastIndex
  1691. }
  1692. r.mu.Unlock()
  1693. // Send snapshot to FSM for restoration
  1694. // Use a goroutine with timeout to avoid blocking if applyCh is full
  1695. msg := ApplyMsg{
  1696. SnapshotValid: true,
  1697. Snapshot: data,
  1698. SnapshotIndex: lastIndex,
  1699. SnapshotTerm: lastTerm,
  1700. }
  1701. // Try to send with a timeout
  1702. select {
  1703. case r.applyCh <- msg:
  1704. r.logger.Info("FSM restoration triggered from snapshot at index %d", lastIndex)
  1705. case <-time.After(5 * time.Second):
  1706. return fmt.Errorf("timeout sending snapshot to applyCh")
  1707. }
  1708. return nil
  1709. }
  1710. // TakeSnapshot takes a snapshot of the current state
  1711. func (r *Raft) TakeSnapshot(data []byte, index uint64) error {
  1712. r.mu.Lock()
  1713. defer r.mu.Unlock()
  1714. if index > r.lastApplied {
  1715. return fmt.Errorf("snapshot index %d exceeds lastApplied %d", index, r.lastApplied)
  1716. }
  1717. term, err := r.log.GetTerm(index)
  1718. if err != nil {
  1719. return fmt.Errorf("failed to get term for index %d: %w", index, err)
  1720. }
  1721. if err := r.storage.SaveSnapshot(data, index, term); err != nil {
  1722. return fmt.Errorf("failed to save snapshot: %w", err)
  1723. }
  1724. if err := r.log.Compact(index); err != nil {
  1725. return fmt.Errorf("failed to compact log: %w", err)
  1726. }
  1727. r.logger.Info("Took snapshot at index %d, term %d", index, term)
  1728. return nil
  1729. }
  1730. func max(a, b uint64) uint64 {
  1731. if a > b {
  1732. return a
  1733. }
  1734. return b
  1735. }
  1736. // ==================== Membership Change API ====================
  1737. //
  1738. // This implementation uses Single-Node Membership Change (also known as one-at-a-time changes)
  1739. // as described in the Raft dissertation (§4.3). This is safe because:
  1740. //
  1741. // 1. We only allow one configuration change at a time (pendingConfigChange flag)
  1742. // 2. For commits, we use the OLD cluster majority until the config change is committed
  1743. // 3. The new node starts receiving entries immediately but doesn't affect majority calculation
  1744. //
  1745. // This approach is simpler than Joint Consensus and is sufficient for most use cases.
  1746. // The invariant maintained is: any two majorities (old or new) must overlap.
  1747. //
  1748. // For adding a node: old majority = N/2+1, new majority = (N+1)/2+1 = N/2+1 (overlaps)
  1749. // For removing a node: old majority = N/2+1, new majority = (N-1)/2+1 (overlaps if N > 1)
  1750. //
  1751. // WARNING: Avoid adding/removing multiple nodes rapidly. Wait for each change to be committed.
  1752. // AddNode adds a new node to the cluster
  1753. // This can only be called on the leader
  1754. // The new node must already be running and reachable
  1755. //
  1756. // Safety guarantees:
  1757. // - Only one config change can be in progress at a time
  1758. // - The old cluster majority is used until the config change is committed
  1759. // - Returns error if leadership is lost during the operation
  1760. func (r *Raft) AddNode(nodeID, address string) error {
  1761. r.mu.Lock()
  1762. // Must be leader
  1763. if r.state != Leader {
  1764. leaderID := r.leaderID
  1765. r.mu.Unlock()
  1766. return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  1767. }
  1768. // Check if we're in the middle of a leadership transfer
  1769. if r.transferring {
  1770. r.mu.Unlock()
  1771. return fmt.Errorf("leadership transfer in progress")
  1772. }
  1773. // Check if there's already a pending config change
  1774. if r.pendingConfigChange {
  1775. r.mu.Unlock()
  1776. return ErrConfigInFlight
  1777. }
  1778. // Validate nodeID and address
  1779. if nodeID == "" {
  1780. r.mu.Unlock()
  1781. return fmt.Errorf("nodeID cannot be empty")
  1782. }
  1783. if address == "" {
  1784. r.mu.Unlock()
  1785. return fmt.Errorf("address cannot be empty")
  1786. }
  1787. // Check if node already exists
  1788. if _, exists := r.clusterNodes[nodeID]; exists {
  1789. r.mu.Unlock()
  1790. return fmt.Errorf("node %s already exists in cluster", nodeID)
  1791. }
  1792. // Check if address is already used by another node
  1793. for existingID, existingAddr := range r.clusterNodes {
  1794. if existingAddr == address {
  1795. r.mu.Unlock()
  1796. return fmt.Errorf("address %s is already used by node %s", address, existingID)
  1797. }
  1798. }
  1799. // Save old cluster nodes for majority calculation during config change
  1800. // This ensures we use the OLD cluster size until the config change is committed
  1801. r.oldClusterNodes = make(map[string]string)
  1802. for k, v := range r.clusterNodes {
  1803. r.oldClusterNodes[k] = v
  1804. }
  1805. // Create new config with the added node
  1806. newNodes := make(map[string]string)
  1807. for k, v := range r.clusterNodes {
  1808. newNodes[k] = v
  1809. }
  1810. newNodes[nodeID] = address
  1811. // Create config change entry with ClusterConfig
  1812. configIndex := r.log.LastIndex() + 1
  1813. entry := LogEntry{
  1814. Index: configIndex,
  1815. Term: r.currentTerm,
  1816. Type: EntryConfig,
  1817. Config: &ClusterConfig{Nodes: newNodes},
  1818. }
  1819. // Mark config change as pending and store the index
  1820. r.pendingConfigChange = true
  1821. r.configChangeIndex = configIndex
  1822. // Immediately apply the new configuration (for single-node changes, this is safe)
  1823. // The new node will start receiving AppendEntries immediately
  1824. r.clusterNodes[nodeID] = address
  1825. r.peers = append(r.peers, address)
  1826. // Set nextIndex to 1 (or firstIndex) so the new node syncs from the beginning
  1827. // This is crucial - the new node's log is empty, so we must start from index 1
  1828. firstIndex := r.log.FirstIndex()
  1829. if firstIndex == 0 {
  1830. firstIndex = 1
  1831. }
  1832. r.nextIndex[address] = firstIndex
  1833. r.matchIndex[address] = 0
  1834. r.mu.Unlock()
  1835. // Append the config change entry to log
  1836. if err := r.log.Append(entry); err != nil {
  1837. r.mu.Lock()
  1838. r.pendingConfigChange = false
  1839. r.oldClusterNodes = nil
  1840. r.configChangeIndex = 0
  1841. // Rollback
  1842. delete(r.clusterNodes, nodeID)
  1843. r.rebuildPeersList()
  1844. r.mu.Unlock()
  1845. return fmt.Errorf("failed to append config entry: %w", err)
  1846. }
  1847. r.logger.Info("Adding node %s (%s) to cluster", nodeID, address)
  1848. // Trigger immediate replication
  1849. r.triggerReplication()
  1850. return nil
  1851. }
  1852. // RemoveNode removes a node from the cluster
  1853. // This can only be called on the leader
  1854. // The node being removed can be any node except the leader itself
  1855. //
  1856. // Safety guarantees:
  1857. // - Only one config change can be in progress at a time
  1858. // - Cannot remove the leader (transfer leadership first)
  1859. // - Cannot reduce cluster to 0 nodes
  1860. // - The old cluster majority is used until the config change is committed
  1861. func (r *Raft) RemoveNode(nodeID string) error {
  1862. r.mu.Lock()
  1863. // Must be leader
  1864. if r.state != Leader {
  1865. leaderID := r.leaderID
  1866. r.mu.Unlock()
  1867. return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  1868. }
  1869. // Check if we're in the middle of a leadership transfer
  1870. if r.transferring {
  1871. r.mu.Unlock()
  1872. return fmt.Errorf("leadership transfer in progress")
  1873. }
  1874. // Cannot remove self
  1875. if nodeID == r.nodeID {
  1876. r.mu.Unlock()
  1877. return fmt.Errorf("cannot remove self from cluster, use TransferLeadership first")
  1878. }
  1879. // Validate nodeID
  1880. if nodeID == "" {
  1881. r.mu.Unlock()
  1882. return fmt.Errorf("nodeID cannot be empty")
  1883. }
  1884. // Check if there's already a pending config change
  1885. if r.pendingConfigChange {
  1886. r.mu.Unlock()
  1887. return ErrConfigInFlight
  1888. }
  1889. // Check if node exists
  1890. if _, exists := r.clusterNodes[nodeID]; !exists {
  1891. r.mu.Unlock()
  1892. return fmt.Errorf("node %s not found in cluster", nodeID)
  1893. }
  1894. // Cannot reduce cluster below 1 node
  1895. if len(r.clusterNodes) <= 1 {
  1896. r.mu.Unlock()
  1897. return fmt.Errorf("cannot remove last node from cluster")
  1898. }
  1899. // Save old cluster nodes for majority calculation during config change
  1900. r.oldClusterNodes = make(map[string]string)
  1901. for k, v := range r.clusterNodes {
  1902. r.oldClusterNodes[k] = v
  1903. }
  1904. // Create new config without the removed node
  1905. newNodes := make(map[string]string)
  1906. for k, v := range r.clusterNodes {
  1907. if k != nodeID {
  1908. newNodes[k] = v
  1909. }
  1910. }
  1911. // Create config change entry with ClusterConfig
  1912. configIndex := r.log.LastIndex() + 1
  1913. entry := LogEntry{
  1914. Index: configIndex,
  1915. Term: r.currentTerm,
  1916. Type: EntryConfig,
  1917. Config: &ClusterConfig{Nodes: newNodes},
  1918. }
  1919. // Mark config change as pending and store the index
  1920. r.pendingConfigChange = true
  1921. r.configChangeIndex = configIndex
  1922. // Get the address of node being removed for cleanup
  1923. removedAddr := r.clusterNodes[nodeID]
  1924. // Immediately apply the new configuration
  1925. delete(r.clusterNodes, nodeID)
  1926. r.rebuildPeersList()
  1927. delete(r.nextIndex, removedAddr)
  1928. delete(r.matchIndex, removedAddr)
  1929. r.mu.Unlock()
  1930. // Append the config change entry to log
  1931. if err := r.log.Append(entry); err != nil {
  1932. r.mu.Lock()
  1933. r.pendingConfigChange = false
  1934. r.oldClusterNodes = nil
  1935. r.configChangeIndex = 0
  1936. // Rollback - this is tricky but we try our best
  1937. r.clusterNodes[nodeID] = removedAddr
  1938. r.rebuildPeersList()
  1939. r.mu.Unlock()
  1940. return fmt.Errorf("failed to append config entry: %w", err)
  1941. }
  1942. r.logger.Info("Removing node %s from cluster, config at index %d", nodeID, entry.Index)
  1943. // Trigger replication
  1944. go r.sendHeartbeats()
  1945. return nil
  1946. }
  1947. // AddNodeWithForward adds a node, forwarding to leader if necessary
  1948. // This is the recommended method for applications to use
  1949. func (r *Raft) AddNodeWithForward(nodeID, address string) error {
  1950. // Try local operation first
  1951. err := r.AddNode(nodeID, address)
  1952. if err == nil {
  1953. return nil
  1954. }
  1955. // Check if we're not the leader
  1956. r.mu.RLock()
  1957. state := r.state
  1958. leaderID := r.leaderID
  1959. leaderAddr := r.clusterNodes[leaderID]
  1960. r.mu.RUnlock()
  1961. if state == Leader {
  1962. // We are leader but AddNode failed for other reasons
  1963. return err
  1964. }
  1965. // Not leader, forward to leader
  1966. if leaderID == "" {
  1967. return fmt.Errorf("no leader available")
  1968. }
  1969. if leaderAddr == "" {
  1970. return fmt.Errorf("leader %s address not found in cluster", leaderID)
  1971. }
  1972. // Forward to leader
  1973. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1974. defer cancel()
  1975. args := &AddNodeArgs{NodeID: nodeID, Address: address}
  1976. reply, err := r.transport.ForwardAddNode(ctx, leaderAddr, args)
  1977. if err != nil {
  1978. return fmt.Errorf("forward failed: %w", err)
  1979. }
  1980. if !reply.Success {
  1981. return fmt.Errorf("leader rejected: %s", reply.Error)
  1982. }
  1983. return nil
  1984. }
  1985. // RemoveNodeWithForward removes a node, forwarding to leader if necessary
  1986. // This is the recommended method for applications to use
  1987. func (r *Raft) RemoveNodeWithForward(nodeID string) error {
  1988. // Try local operation first
  1989. err := r.RemoveNode(nodeID)
  1990. if err == nil {
  1991. return nil
  1992. }
  1993. // Check if we're not the leader
  1994. r.mu.RLock()
  1995. state := r.state
  1996. leaderID := r.leaderID
  1997. leaderAddr := r.clusterNodes[leaderID]
  1998. r.mu.RUnlock()
  1999. if state == Leader {
  2000. // We are leader but RemoveNode failed for other reasons
  2001. return err
  2002. }
  2003. // Not leader, forward to leader
  2004. if leaderID == "" {
  2005. return fmt.Errorf("no leader available")
  2006. }
  2007. if leaderAddr == "" {
  2008. return fmt.Errorf("leader %s address not found in cluster", leaderID)
  2009. }
  2010. // Forward to leader
  2011. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  2012. defer cancel()
  2013. args := &RemoveNodeArgs{NodeID: nodeID}
  2014. reply, err := r.transport.ForwardRemoveNode(ctx, leaderAddr, args)
  2015. if err != nil {
  2016. return fmt.Errorf("forward failed: %w", err)
  2017. }
  2018. if !reply.Success {
  2019. return fmt.Errorf("leader rejected: %s", reply.Error)
  2020. }
  2021. return nil
  2022. }
  2023. // rebuildPeersList rebuilds the peers slice from clusterNodes
  2024. func (r *Raft) rebuildPeersList() {
  2025. r.peers = make([]string, 0, len(r.clusterNodes)-1)
  2026. for nodeID, addr := range r.clusterNodes {
  2027. if nodeID != r.nodeID {
  2028. r.peers = append(r.peers, addr)
  2029. }
  2030. }
  2031. }
  2032. // GetClusterNodes returns a copy of the current cluster membership
  2033. func (r *Raft) GetClusterNodes() map[string]string {
  2034. r.mu.RLock()
  2035. defer r.mu.RUnlock()
  2036. nodes := make(map[string]string)
  2037. for k, v := range r.clusterNodes {
  2038. nodes[k] = v
  2039. }
  2040. return nodes
  2041. }
  2042. // applyConfigChange applies a configuration change entry
  2043. func (r *Raft) applyConfigChange(entry *LogEntry) {
  2044. if entry.Config == nil || entry.Config.Nodes == nil {
  2045. r.logger.Warn("Invalid config change entry at index %d", entry.Index)
  2046. return
  2047. }
  2048. r.mu.Lock()
  2049. defer r.mu.Unlock()
  2050. // Update cluster configuration
  2051. r.clusterNodes = make(map[string]string)
  2052. for k, v := range entry.Config.Nodes {
  2053. r.clusterNodes[k] = v
  2054. }
  2055. r.rebuildPeersList()
  2056. // Persist the new configuration
  2057. if err := r.storage.SaveClusterConfig(&ClusterConfig{Nodes: r.clusterNodes}); err != nil {
  2058. r.logger.Error("Failed to persist cluster config: %v", err)
  2059. }
  2060. // Clear pending flag and old cluster state
  2061. r.pendingConfigChange = false
  2062. r.oldClusterNodes = nil
  2063. r.configChangeIndex = 0
  2064. // If we were joining a cluster and now have multiple nodes, we've successfully joined
  2065. if r.joiningCluster && len(r.clusterNodes) > 1 {
  2066. r.joiningCluster = false
  2067. r.logger.Info("Successfully joined cluster with %d nodes", len(r.clusterNodes))
  2068. }
  2069. r.logger.Info("Applied config change at index %d, cluster now has %d nodes", entry.Index, len(r.clusterNodes))
  2070. // If we're the leader, update leader state
  2071. if r.state == Leader {
  2072. // Initialize nextIndex/matchIndex for any new nodes
  2073. lastIndex := r.log.LastIndex()
  2074. for nodeID, addr := range r.clusterNodes {
  2075. if nodeID != r.nodeID {
  2076. if _, exists := r.nextIndex[addr]; !exists {
  2077. r.nextIndex[addr] = lastIndex + 1
  2078. r.matchIndex[addr] = 0
  2079. }
  2080. }
  2081. }
  2082. // Clean up removed nodes
  2083. validAddrs := make(map[string]bool)
  2084. for nodeID, addr := range r.clusterNodes {
  2085. if nodeID != r.nodeID {
  2086. validAddrs[addr] = true
  2087. }
  2088. }
  2089. for addr := range r.nextIndex {
  2090. if !validAddrs[addr] {
  2091. delete(r.nextIndex, addr)
  2092. delete(r.matchIndex, addr)
  2093. }
  2094. }
  2095. }
  2096. }
  2097. // ==================== ReadIndex (Linearizable Reads) ====================
  2098. // readIndexLoop handles read index requests
  2099. func (r *Raft) readIndexLoop() {
  2100. for {
  2101. select {
  2102. case <-r.stopCh:
  2103. return
  2104. case req := <-r.readIndexCh:
  2105. r.processReadIndexRequest(req)
  2106. }
  2107. }
  2108. }
  2109. // processReadIndexRequest processes a single read index request
  2110. func (r *Raft) processReadIndexRequest(req *readIndexRequest) {
  2111. r.mu.RLock()
  2112. if r.state != Leader {
  2113. r.mu.RUnlock()
  2114. req.done <- ErrNotLeader
  2115. return
  2116. }
  2117. r.mu.RUnlock()
  2118. // Confirm leadership by sending heartbeats and waiting for majority ack
  2119. if !r.confirmLeadership() {
  2120. req.done <- ErrLeadershipLost
  2121. return
  2122. }
  2123. // Wait for apply to catch up to readIndex
  2124. if err := r.waitApply(req.readIndex, r.config.ProposeTimeout); err != nil {
  2125. req.done <- err
  2126. return
  2127. }
  2128. req.done <- nil
  2129. }
  2130. // ReadIndex implements linearizable reads
  2131. // It ensures that the read sees all writes that were committed before the read started
  2132. func (r *Raft) ReadIndex() (uint64, error) {
  2133. r.mu.RLock()
  2134. if r.state != Leader {
  2135. leaderID := r.leaderID
  2136. r.mu.RUnlock()
  2137. return 0, NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  2138. }
  2139. readIndex := r.commitIndex
  2140. r.mu.RUnlock()
  2141. atomic.AddUint64(&r.metrics.ReadIndexRequests, 1)
  2142. // Create request and send to processing loop
  2143. req := &readIndexRequest{
  2144. readIndex: readIndex,
  2145. done: make(chan error, 1),
  2146. }
  2147. select {
  2148. case r.readIndexCh <- req:
  2149. case <-r.stopCh:
  2150. return 0, ErrShutdown
  2151. case <-time.After(r.config.ProposeTimeout):
  2152. return 0, ErrTimeout
  2153. }
  2154. // Wait for result
  2155. select {
  2156. case err := <-req.done:
  2157. if err != nil {
  2158. return 0, err
  2159. }
  2160. atomic.AddUint64(&r.metrics.ReadIndexSuccess, 1)
  2161. return readIndex, nil
  2162. case <-r.stopCh:
  2163. return 0, ErrShutdown
  2164. case <-time.After(r.config.ProposeTimeout):
  2165. return 0, ErrTimeout
  2166. }
  2167. }
  2168. // confirmLeadership confirms we're still leader by getting acks from majority
  2169. func (r *Raft) confirmLeadership() bool {
  2170. r.mu.RLock()
  2171. if r.state != Leader {
  2172. r.mu.RUnlock()
  2173. return false
  2174. }
  2175. currentTerm := r.currentTerm
  2176. clusterSize := len(r.clusterNodes)
  2177. if clusterSize == 0 {
  2178. clusterSize = len(r.peers) + 1
  2179. }
  2180. r.mu.RUnlock()
  2181. // Single node cluster - we're always the leader
  2182. if clusterSize == 1 {
  2183. return true
  2184. }
  2185. // Send heartbeats and count acks
  2186. r.sendHeartbeats()
  2187. // Wait briefly and check if we're still leader
  2188. time.Sleep(r.config.HeartbeatInterval)
  2189. r.mu.RLock()
  2190. stillLeader := r.state == Leader && r.currentTerm == currentTerm
  2191. r.mu.RUnlock()
  2192. return stillLeader
  2193. }
  2194. // waitApply waits until lastApplied >= index
  2195. func (r *Raft) waitApply(index uint64, timeout time.Duration) error {
  2196. deadline := time.Now().Add(timeout)
  2197. for {
  2198. r.mu.RLock()
  2199. lastApplied := r.lastApplied
  2200. r.mu.RUnlock()
  2201. if lastApplied >= index {
  2202. return nil
  2203. }
  2204. if time.Now().After(deadline) {
  2205. return ErrTimeout
  2206. }
  2207. time.Sleep(1 * time.Millisecond)
  2208. }
  2209. }
  2210. // ==================== Health Check ====================
  2211. // HealthCheck returns the current health status of the node
  2212. func (r *Raft) HealthCheck() HealthStatus {
  2213. r.mu.RLock()
  2214. defer r.mu.RUnlock()
  2215. clusterNodes := make(map[string]string)
  2216. for k, v := range r.clusterNodes {
  2217. clusterNodes[k] = v
  2218. }
  2219. clusterSize := len(r.clusterNodes)
  2220. if clusterSize == 0 {
  2221. clusterSize = len(r.peers) + 1
  2222. }
  2223. logBehind := uint64(0)
  2224. if r.commitIndex > r.lastApplied {
  2225. logBehind = r.commitIndex - r.lastApplied
  2226. }
  2227. // Consider healthy if we're leader or have a known leader
  2228. isHealthy := r.state == Leader || r.leaderID != ""
  2229. return HealthStatus{
  2230. NodeID: r.nodeID,
  2231. State: r.state.String(),
  2232. Term: r.currentTerm,
  2233. LeaderID: r.leaderID,
  2234. ClusterSize: clusterSize,
  2235. ClusterNodes: clusterNodes,
  2236. CommitIndex: r.commitIndex,
  2237. LastApplied: r.lastApplied,
  2238. LogBehind: logBehind,
  2239. LastHeartbeat: r.lastHeartbeat,
  2240. IsHealthy: isHealthy,
  2241. Uptime: time.Since(r.startTime),
  2242. }
  2243. }
  2244. // GetMetrics returns the current metrics
  2245. func (r *Raft) GetMetrics() Metrics {
  2246. return Metrics{
  2247. Term: atomic.LoadUint64(&r.metrics.Term),
  2248. ProposalsTotal: atomic.LoadUint64(&r.metrics.ProposalsTotal),
  2249. ProposalsSuccess: atomic.LoadUint64(&r.metrics.ProposalsSuccess),
  2250. ProposalsFailed: atomic.LoadUint64(&r.metrics.ProposalsFailed),
  2251. ProposalsForwarded: atomic.LoadUint64(&r.metrics.ProposalsForwarded),
  2252. AppendsSent: atomic.LoadUint64(&r.metrics.AppendsSent),
  2253. AppendsReceived: atomic.LoadUint64(&r.metrics.AppendsReceived),
  2254. AppendsSuccess: atomic.LoadUint64(&r.metrics.AppendsSuccess),
  2255. AppendsFailed: atomic.LoadUint64(&r.metrics.AppendsFailed),
  2256. ElectionsStarted: atomic.LoadUint64(&r.metrics.ElectionsStarted),
  2257. ElectionsWon: atomic.LoadUint64(&r.metrics.ElectionsWon),
  2258. PreVotesStarted: atomic.LoadUint64(&r.metrics.PreVotesStarted),
  2259. PreVotesGranted: atomic.LoadUint64(&r.metrics.PreVotesGranted),
  2260. SnapshotsTaken: atomic.LoadUint64(&r.metrics.SnapshotsTaken),
  2261. SnapshotsInstalled: atomic.LoadUint64(&r.metrics.SnapshotsInstalled),
  2262. SnapshotsSent: atomic.LoadUint64(&r.metrics.SnapshotsSent),
  2263. ReadIndexRequests: atomic.LoadUint64(&r.metrics.ReadIndexRequests),
  2264. ReadIndexSuccess: atomic.LoadUint64(&r.metrics.ReadIndexSuccess),
  2265. LeadershipTransfers: atomic.LoadUint64(&r.metrics.LeadershipTransfers),
  2266. LeadershipTransferSuccess: atomic.LoadUint64(&r.metrics.LeadershipTransferSuccess),
  2267. }
  2268. }
  2269. // ==================== Leadership Transfer ====================
  2270. // TransferLeadership transfers leadership to the specified node
  2271. func (r *Raft) TransferLeadership(targetID string) error {
  2272. r.mu.Lock()
  2273. if r.state != Leader {
  2274. r.mu.Unlock()
  2275. return ErrNotLeader
  2276. }
  2277. if targetID == r.nodeID {
  2278. r.mu.Unlock()
  2279. return fmt.Errorf("cannot transfer to self")
  2280. }
  2281. targetAddr, exists := r.clusterNodes[targetID]
  2282. if !exists {
  2283. r.mu.Unlock()
  2284. return fmt.Errorf("target node %s not in cluster", targetID)
  2285. }
  2286. if r.transferring {
  2287. r.mu.Unlock()
  2288. return fmt.Errorf("leadership transfer already in progress")
  2289. }
  2290. r.transferring = true
  2291. r.transferTarget = targetID
  2292. r.transferDeadline = time.Now().Add(r.config.ElectionTimeoutMax * 2)
  2293. currentTerm := r.currentTerm
  2294. atomic.AddUint64(&r.metrics.LeadershipTransfers, 1)
  2295. r.mu.Unlock()
  2296. r.logger.Info("Starting leadership transfer to %s", targetID)
  2297. // Step 1: Sync target to our log
  2298. if err := r.syncFollowerToLatest(targetAddr); err != nil {
  2299. r.mu.Lock()
  2300. r.transferring = false
  2301. r.transferTarget = ""
  2302. r.mu.Unlock()
  2303. return fmt.Errorf("failed to sync target: %w", err)
  2304. }
  2305. // Step 2: Send TimeoutNow RPC
  2306. args := &TimeoutNowArgs{
  2307. Term: currentTerm,
  2308. LeaderID: r.nodeID,
  2309. }
  2310. ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
  2311. defer cancel()
  2312. reply, err := r.transport.TimeoutNow(ctx, targetAddr, args)
  2313. if err != nil {
  2314. r.mu.Lock()
  2315. r.transferring = false
  2316. r.transferTarget = ""
  2317. r.mu.Unlock()
  2318. return fmt.Errorf("TimeoutNow RPC failed: %w", err)
  2319. }
  2320. if !reply.Success {
  2321. r.mu.Lock()
  2322. r.transferring = false
  2323. r.transferTarget = ""
  2324. r.mu.Unlock()
  2325. return fmt.Errorf("target rejected leadership transfer")
  2326. }
  2327. atomic.AddUint64(&r.metrics.LeadershipTransferSuccess, 1)
  2328. r.logger.Info("Leadership transfer to %s initiated successfully", targetID)
  2329. // Note: We don't immediately step down; we wait for the target to win election
  2330. // and send us an AppendEntries with higher term
  2331. return nil
  2332. }
  2333. // syncFollowerToLatest ensures the follower is caught up to our log
  2334. func (r *Raft) syncFollowerToLatest(peerAddr string) error {
  2335. r.mu.RLock()
  2336. if r.state != Leader {
  2337. r.mu.RUnlock()
  2338. return ErrNotLeader
  2339. }
  2340. currentTerm := r.currentTerm
  2341. leaderCommit := r.commitIndex
  2342. lastIndex := r.log.LastIndex()
  2343. r.mu.RUnlock()
  2344. // Keep replicating until follower is caught up
  2345. deadline := time.Now().Add(r.config.ElectionTimeoutMax * 2)
  2346. for time.Now().Before(deadline) {
  2347. r.mu.RLock()
  2348. if r.state != Leader || r.currentTerm != currentTerm {
  2349. r.mu.RUnlock()
  2350. return ErrLeadershipLost
  2351. }
  2352. matchIndex := r.matchIndex[peerAddr]
  2353. r.mu.RUnlock()
  2354. if matchIndex >= lastIndex {
  2355. return nil // Caught up
  2356. }
  2357. // Trigger replication
  2358. r.replicateToPeer(peerAddr, currentTerm, leaderCommit)
  2359. time.Sleep(10 * time.Millisecond)
  2360. }
  2361. return ErrTimeout
  2362. }
  2363. // HandleTimeoutNow handles TimeoutNow RPC (for leadership transfer)
  2364. func (r *Raft) HandleTimeoutNow(args *TimeoutNowArgs) *TimeoutNowReply {
  2365. r.mu.Lock()
  2366. defer r.mu.Unlock()
  2367. reply := &TimeoutNowReply{
  2368. Term: r.currentTerm,
  2369. Success: false,
  2370. }
  2371. // Only accept if we're a follower and the term matches
  2372. if args.Term < r.currentTerm {
  2373. return reply
  2374. }
  2375. if r.state != Follower {
  2376. return reply
  2377. }
  2378. // Immediately start election
  2379. r.logger.Info("Received TimeoutNow from %s, starting immediate election", args.LeaderID)
  2380. r.state = Candidate
  2381. reply.Success = true
  2382. return reply
  2383. }
  2384. // HandleReadIndex handles ReadIndex RPC
  2385. func (r *Raft) HandleReadIndex(args *ReadIndexArgs) *ReadIndexReply {
  2386. reply := &ReadIndexReply{
  2387. Success: false,
  2388. }
  2389. readIndex, err := r.ReadIndex()
  2390. if err != nil {
  2391. reply.Error = err.Error()
  2392. return reply
  2393. }
  2394. reply.ReadIndex = readIndex
  2395. reply.Success = true
  2396. return reply
  2397. }
  2398. // HandleGet handles Get RPC for remote KV reads
  2399. func (r *Raft) HandleGet(args *GetArgs) *GetReply {
  2400. reply := &GetReply{
  2401. Found: false,
  2402. }
  2403. if r.config.GetHandler == nil {
  2404. reply.Error = "get handler not configured"
  2405. return reply
  2406. }
  2407. value, found := r.config.GetHandler(args.Key)
  2408. reply.Value = value
  2409. reply.Found = found
  2410. return reply
  2411. }