raft.go 72 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823
  1. package raft
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "os"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. )
  12. // Raft represents a Raft consensus node
  13. type Raft struct {
  14. mu sync.RWMutex
  15. // Node identity
  16. nodeID string
  17. peers []string
  18. // Cluster membership - maps nodeID to address
  19. clusterNodes map[string]string
  20. // Current state
  21. state NodeState
  22. currentTerm uint64
  23. votedFor string
  24. leaderID string
  25. // Log management
  26. log *LogManager
  27. storage Storage
  28. commitIndex uint64
  29. lastApplied uint64
  30. // Leader state
  31. nextIndex map[string]uint64
  32. matchIndex map[string]uint64
  33. // Configuration
  34. config *Config
  35. // Communication
  36. transport Transport
  37. // Channels
  38. applyCh chan ApplyMsg
  39. stopCh chan struct{}
  40. commitCh chan struct{}
  41. // Election timer
  42. electionTimer *time.Timer
  43. heartbeatTimer *time.Timer
  44. // Statistics (deprecated, use metrics instead)
  45. stats Stats
  46. // Metrics for monitoring
  47. metrics Metrics
  48. // Logger
  49. logger Logger
  50. // Running flag
  51. running int32
  52. // Replication trigger channel - used to batch replication requests
  53. replicationCh chan struct{}
  54. // Pending config change - only one config change can be pending at a time
  55. pendingConfigChange bool
  56. // Old cluster nodes - used for majority calculation during config change
  57. // This ensures we use the old cluster size until the config change is committed
  58. oldClusterNodes map[string]string
  59. // Index of the pending config change entry
  60. configChangeIndex uint64
  61. // Joining cluster state - when a standalone node is being added to a cluster
  62. // This prevents the node from starting elections while syncing
  63. joiningCluster bool
  64. joiningClusterTime time.Time
  65. // Compaction state - prevents concurrent compaction
  66. compacting int32
  67. // Dynamic compaction threshold - updated after each compaction
  68. // Next compaction triggers when log size >= this value
  69. // Formula: lastCompactionSize * 1.5 (or initial SnapshotThreshold)
  70. nextCompactionThreshold uint64
  71. // WaitGroup to track goroutines for clean shutdown
  72. wg sync.WaitGroup
  73. // Leadership transfer state
  74. transferring bool
  75. transferTarget string
  76. transferDeadline time.Time
  77. // Last heartbeat received (for health check)
  78. lastHeartbeat time.Time
  79. // Start time (for uptime calculation)
  80. startTime time.Time
  81. // ReadIndex waiting queue
  82. readIndexCh chan *readIndexRequest
  83. // Snapshot receiving state (for chunked transfer)
  84. pendingSnapshot *pendingSnapshotState
  85. // Last contact time for each peer (for leader check)
  86. lastContact map[string]time.Time
  87. }
  88. // readIndexRequest represents a pending read index request
  89. type readIndexRequest struct {
  90. readIndex uint64
  91. done chan error
  92. }
  93. // pendingSnapshotState tracks chunked snapshot reception
  94. type pendingSnapshotState struct {
  95. lastIncludedIndex uint64
  96. lastIncludedTerm uint64
  97. data []byte
  98. receivedBytes uint64
  99. }
  100. // Stats holds runtime statistics
  101. type Stats struct {
  102. Term uint64
  103. State string
  104. LastLogIndex uint64
  105. LastLogTerm uint64
  106. CommitIndex uint64
  107. LastApplied uint64
  108. LeaderID string
  109. VotesReceived int
  110. AppendsSent int64
  111. AppendsReceived int64
  112. ClusterSize int // Number of nodes in cluster
  113. ClusterNodes map[string]string // NodeID -> Address mapping
  114. }
  115. // ConsoleLogger implements Logger with console output
  116. type ConsoleLogger struct {
  117. Prefix string
  118. Level int // 0=debug, 1=info, 2=warn, 3=error
  119. mu sync.Mutex
  120. }
  121. func NewConsoleLogger(prefix string, level int) *ConsoleLogger {
  122. return &ConsoleLogger{Prefix: prefix, Level: level}
  123. }
  124. func (l *ConsoleLogger) log(level int, levelStr, format string, args ...interface{}) {
  125. if level < l.Level {
  126. return
  127. }
  128. l.mu.Lock()
  129. defer l.mu.Unlock()
  130. msg := fmt.Sprintf(format, args...)
  131. fmt.Printf("[%s] %s [%s] %s\n", time.Now().Format("15:04:05.000"), l.Prefix, levelStr, msg)
  132. }
  133. func (l *ConsoleLogger) Debug(format string, args ...interface{}) {
  134. l.log(0, "DEBUG", format, args...)
  135. }
  136. func (l *ConsoleLogger) Info(format string, args ...interface{}) {
  137. l.log(1, "INFO", format, args...)
  138. }
  139. func (l *ConsoleLogger) Warn(format string, args ...interface{}) {
  140. l.log(2, "WARN", format, args...)
  141. }
  142. func (l *ConsoleLogger) Error(format string, args ...interface{}) {
  143. l.log(3, "ERROR", format, args...)
  144. }
  145. // NewRaft creates a new Raft node
  146. func NewRaft(config *Config, transport Transport, applyCh chan ApplyMsg) (*Raft, error) {
  147. if config.Logger == nil {
  148. config.Logger = NewConsoleLogger(config.NodeID, 1)
  149. }
  150. // Create storage
  151. storage, err := NewHybridStorage(config.DataDir, config.MemoryLogCapacity, config.Logger)
  152. if err != nil {
  153. return nil, fmt.Errorf("failed to create storage: %w", err)
  154. }
  155. // Create log manager
  156. logMgr := NewLogManager(storage, config.Logger)
  157. // Load persistent state
  158. state, err := storage.GetState()
  159. if err != nil {
  160. return nil, fmt.Errorf("failed to load state: %w", err)
  161. }
  162. // Load or initialize cluster configuration
  163. clusterNodes := make(map[string]string)
  164. // Try to load saved cluster config first
  165. savedConfig, err := storage.GetClusterConfig()
  166. if err != nil {
  167. return nil, fmt.Errorf("failed to load cluster config: %w", err)
  168. }
  169. if savedConfig != nil && len(savedConfig.Nodes) > 0 {
  170. // Use saved config
  171. clusterNodes = savedConfig.Nodes
  172. config.Logger.Info("Loaded cluster config with %d nodes", len(clusterNodes))
  173. } else {
  174. // Initialize from config
  175. if config.ClusterNodes != nil && len(config.ClusterNodes) > 0 {
  176. for k, v := range config.ClusterNodes {
  177. clusterNodes[k] = v
  178. }
  179. } else {
  180. // Build from Peers + self (backward compatibility)
  181. clusterNodes[config.NodeID] = config.ListenAddr
  182. if config.PeerMap != nil {
  183. for k, v := range config.PeerMap {
  184. clusterNodes[k] = v
  185. }
  186. }
  187. }
  188. // Save initial config
  189. if len(clusterNodes) > 0 {
  190. if err := storage.SaveClusterConfig(&ClusterConfig{Nodes: clusterNodes}); err != nil {
  191. config.Logger.Warn("Failed to save initial cluster config: %v", err)
  192. }
  193. }
  194. }
  195. // Build peers list from cluster nodes (excluding self)
  196. var peers []string
  197. for nodeID, addr := range clusterNodes {
  198. if nodeID != config.NodeID {
  199. peers = append(peers, addr)
  200. }
  201. }
  202. r := &Raft{
  203. nodeID: config.NodeID,
  204. peers: peers,
  205. clusterNodes: clusterNodes,
  206. state: Follower,
  207. currentTerm: state.CurrentTerm,
  208. votedFor: state.VotedFor,
  209. log: logMgr,
  210. storage: storage,
  211. config: config,
  212. transport: transport,
  213. applyCh: applyCh,
  214. stopCh: make(chan struct{}),
  215. commitCh: make(chan struct{}, 100), // Increased buffer for event-driven apply
  216. replicationCh: make(chan struct{}, 1),
  217. nextIndex: make(map[string]uint64),
  218. matchIndex: make(map[string]uint64),
  219. logger: config.Logger,
  220. readIndexCh: make(chan *readIndexRequest, 100),
  221. lastHeartbeat: time.Now(),
  222. lastContact: make(map[string]time.Time),
  223. }
  224. // Initialize metrics
  225. r.metrics.Term = state.CurrentTerm
  226. // Set RPC handler
  227. transport.SetRPCHandler(r)
  228. return r, nil
  229. }
  230. // Start starts the Raft node
  231. func (r *Raft) Start() error {
  232. if !atomic.CompareAndSwapInt32(&r.running, 0, 1) {
  233. return fmt.Errorf("already running")
  234. }
  235. // Record start time
  236. r.startTime = time.Now()
  237. // Start transport
  238. if err := r.transport.Start(); err != nil {
  239. return fmt.Errorf("failed to start transport: %w", err)
  240. }
  241. // Restore FSM from snapshot if exists
  242. // This must happen before starting apply loop to ensure FSM state is restored
  243. if err := r.restoreFromSnapshot(); err != nil {
  244. // Suppress warning if it's just a missing file (first start)
  245. if !os.IsNotExist(err) && !strings.Contains(err.Error(), "no such file") {
  246. r.logger.Warn("Failed to restore from snapshot: %v", err)
  247. } else {
  248. r.logger.Info("No snapshot found, starting with empty state")
  249. }
  250. // Continue anyway - the node can still function, just without historical state
  251. }
  252. // Start election timer
  253. r.resetElectionTimer()
  254. // Start background goroutines with WaitGroup tracking
  255. r.wg.Add(4)
  256. go func() {
  257. defer r.wg.Done()
  258. r.applyLoop()
  259. }()
  260. go func() {
  261. defer r.wg.Done()
  262. r.replicationLoop()
  263. }()
  264. go func() {
  265. defer r.wg.Done()
  266. r.mainLoop()
  267. }()
  268. go func() {
  269. defer r.wg.Done()
  270. r.readIndexLoop()
  271. }()
  272. r.logger.Info("Raft node started")
  273. return nil
  274. }
  275. // Stop stops the Raft node
  276. func (r *Raft) Stop() error {
  277. if !atomic.CompareAndSwapInt32(&r.running, 1, 0) {
  278. return fmt.Errorf("not running")
  279. }
  280. // Signal all goroutines to stop
  281. close(r.stopCh)
  282. // Stop timers first to prevent new operations
  283. r.mu.Lock()
  284. if r.electionTimer != nil {
  285. r.electionTimer.Stop()
  286. }
  287. if r.heartbeatTimer != nil {
  288. r.heartbeatTimer.Stop()
  289. }
  290. r.mu.Unlock()
  291. // Wait for all goroutines to finish with timeout
  292. done := make(chan struct{})
  293. go func() {
  294. r.wg.Wait()
  295. close(done)
  296. }()
  297. select {
  298. case <-done:
  299. // All goroutines exited cleanly
  300. case <-time.After(3 * time.Second):
  301. r.logger.Warn("Timeout waiting for goroutines to stop")
  302. }
  303. // Stop transport (has its own timeout)
  304. if err := r.transport.Stop(); err != nil {
  305. r.logger.Error("Failed to stop transport: %v", err)
  306. }
  307. if err := r.storage.Close(); err != nil {
  308. r.logger.Error("Failed to close storage: %v", err)
  309. }
  310. r.logger.Info("Raft node stopped")
  311. return nil
  312. }
  313. // mainLoop is the main event loop
  314. func (r *Raft) mainLoop() {
  315. for {
  316. select {
  317. case <-r.stopCh:
  318. return
  319. default:
  320. }
  321. r.mu.RLock()
  322. state := r.state
  323. r.mu.RUnlock()
  324. switch state {
  325. case Follower:
  326. r.runFollower()
  327. case Candidate:
  328. r.runCandidate()
  329. case Leader:
  330. r.runLeader()
  331. }
  332. }
  333. }
  334. // runFollower handles follower behavior
  335. func (r *Raft) runFollower() {
  336. r.logger.Debug("Running as follower in term %d", r.currentTerm)
  337. for {
  338. select {
  339. case <-r.stopCh:
  340. return
  341. case <-r.electionTimer.C:
  342. r.mu.Lock()
  343. if r.state == Follower {
  344. // If we're joining a cluster, don't start elections
  345. // Give time for the leader to sync us up
  346. if r.joiningCluster {
  347. // Allow joining for up to 30 seconds
  348. if time.Since(r.joiningClusterTime) < 30*time.Second {
  349. r.logger.Debug("Suppressing election during cluster join")
  350. r.resetElectionTimer()
  351. r.mu.Unlock()
  352. continue
  353. }
  354. // Timeout - clear joining state
  355. r.joiningCluster = false
  356. r.logger.Warn("Cluster join timeout, resuming normal election behavior")
  357. }
  358. r.logger.Debug("Election timeout, becoming candidate")
  359. r.state = Candidate
  360. r.leaderID = "" // Clear leaderID so we can vote for self in Pre-Vote
  361. }
  362. r.mu.Unlock()
  363. return
  364. }
  365. }
  366. }
  367. // runCandidate handles candidate behavior (leader election)
  368. // Implements Pre-Vote mechanism to prevent term explosion
  369. func (r *Raft) runCandidate() {
  370. // Phase 1: Pre-Vote (don't increment term yet)
  371. if !r.runPreVote() {
  372. // Pre-vote failed, wait for timeout before retrying
  373. r.mu.Lock()
  374. r.resetElectionTimer()
  375. r.mu.Unlock()
  376. select {
  377. case <-r.stopCh:
  378. return
  379. case <-r.electionTimer.C:
  380. // Timer expired, will retry pre-vote
  381. }
  382. return
  383. }
  384. // Phase 2: Actual election (pre-vote succeeded, now increment term)
  385. r.mu.Lock()
  386. r.currentTerm++
  387. r.votedFor = r.nodeID
  388. r.leaderID = ""
  389. currentTerm := r.currentTerm
  390. // Update metrics
  391. atomic.StoreUint64(&r.metrics.Term, currentTerm)
  392. if err := r.persistState(); err != nil {
  393. r.logger.Error("Failed to persist state during election: %v", err)
  394. r.mu.Unlock()
  395. return // Cannot proceed without persisting state
  396. }
  397. atomic.AddUint64(&r.metrics.ElectionsStarted, 1)
  398. r.resetElectionTimer()
  399. r.mu.Unlock()
  400. r.logger.Debug("Starting election for term %d", currentTerm)
  401. // Get current peers list
  402. r.mu.RLock()
  403. currentPeers := make([]string, len(r.peers))
  404. copy(currentPeers, r.peers)
  405. clusterSize := len(r.clusterNodes)
  406. if clusterSize == 0 {
  407. clusterSize = len(r.peers) + 1
  408. }
  409. r.mu.RUnlock()
  410. // Request actual votes from all peers
  411. votes := 1 // Vote for self
  412. voteCh := make(chan bool, len(currentPeers))
  413. lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
  414. for _, peer := range currentPeers {
  415. go func(peer string) {
  416. args := &RequestVoteArgs{
  417. Term: currentTerm,
  418. CandidateID: r.nodeID,
  419. LastLogIndex: lastLogIndex,
  420. LastLogTerm: lastLogTerm,
  421. PreVote: false,
  422. }
  423. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  424. defer cancel()
  425. reply, err := r.transport.RequestVote(ctx, peer, args)
  426. if err != nil {
  427. r.logger.Debug("RequestVote to %s failed: %v", peer, err)
  428. voteCh <- false
  429. return
  430. }
  431. r.mu.Lock()
  432. defer r.mu.Unlock()
  433. if reply.Term > r.currentTerm {
  434. r.becomeFollower(reply.Term)
  435. voteCh <- false
  436. return
  437. }
  438. voteCh <- reply.VoteGranted
  439. }(peer)
  440. }
  441. // Wait for votes - majority is (clusterSize/2) + 1
  442. needed := clusterSize/2 + 1
  443. // If we already have enough votes (single-node cluster), become leader immediately
  444. if votes >= needed {
  445. r.mu.Lock()
  446. if r.state == Candidate && r.currentTerm == currentTerm {
  447. r.becomeLeader()
  448. }
  449. r.mu.Unlock()
  450. return
  451. }
  452. for i := 0; i < len(currentPeers); i++ {
  453. select {
  454. case <-r.stopCh:
  455. return
  456. case <-r.electionTimer.C:
  457. r.logger.Debug("Election timeout, will start new election")
  458. return
  459. case granted := <-voteCh:
  460. if granted {
  461. votes++
  462. if votes >= needed {
  463. r.mu.Lock()
  464. if r.state == Candidate && r.currentTerm == currentTerm {
  465. r.becomeLeader()
  466. }
  467. r.mu.Unlock()
  468. return
  469. }
  470. }
  471. }
  472. // Check if we're still a candidate
  473. r.mu.RLock()
  474. if r.state != Candidate {
  475. r.mu.RUnlock()
  476. return
  477. }
  478. r.mu.RUnlock()
  479. }
  480. // Election failed, wait for timeout
  481. r.mu.RLock()
  482. stillCandidate := r.state == Candidate
  483. r.mu.RUnlock()
  484. if stillCandidate {
  485. r.logger.Debug("Election failed, waiting for timeout (got %d/%d votes)", votes, needed)
  486. select {
  487. case <-r.stopCh:
  488. return
  489. case <-r.electionTimer.C:
  490. // Timer expired, will start new election
  491. }
  492. }
  493. }
  494. // runPreVote sends pre-vote requests to check if we can win an election
  495. // Returns true if we got majority pre-votes, false otherwise
  496. func (r *Raft) runPreVote() bool {
  497. r.mu.RLock()
  498. currentTerm := r.currentTerm
  499. currentPeers := make([]string, len(r.peers))
  500. copy(currentPeers, r.peers)
  501. clusterSize := len(r.clusterNodes)
  502. if clusterSize == 0 {
  503. clusterSize = len(r.peers) + 1
  504. }
  505. leaderID := r.leaderID
  506. r.mu.RUnlock()
  507. // Pre-vote uses term+1 but doesn't actually increment it
  508. preVoteTerm := currentTerm + 1
  509. lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
  510. r.logger.Debug("Starting pre-vote for term %d", preVoteTerm)
  511. // Per Raft Pre-Vote optimization (§9.6): if we have a known leader,
  512. // don't vote for self. This prevents standalone nodes from constantly
  513. // electing themselves when they should be joining an existing cluster.
  514. preVotes := 0
  515. if leaderID == "" {
  516. preVotes = 1 // Vote for self only if we don't know of a leader
  517. } else {
  518. r.logger.Debug("Pre-vote: not self-voting because we have leader %s", leaderID)
  519. }
  520. preVoteCh := make(chan bool, len(currentPeers))
  521. for _, peer := range currentPeers {
  522. go func(peer string) {
  523. args := &RequestVoteArgs{
  524. Term: preVoteTerm,
  525. CandidateID: r.nodeID,
  526. LastLogIndex: lastLogIndex,
  527. LastLogTerm: lastLogTerm,
  528. PreVote: true,
  529. }
  530. ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
  531. defer cancel()
  532. reply, err := r.transport.RequestVote(ctx, peer, args)
  533. if err != nil {
  534. r.logger.Debug("PreVote to %s failed: %v", peer, err)
  535. preVoteCh <- false
  536. return
  537. }
  538. // For pre-vote, we don't step down even if reply.Term > currentTerm
  539. // because the other node might also be doing pre-vote
  540. preVoteCh <- reply.VoteGranted
  541. }(peer)
  542. }
  543. // Wait for pre-votes with a shorter timeout - majority is (clusterSize/2) + 1
  544. needed := clusterSize/2 + 1
  545. // If we already have enough votes (single-node cluster with no known leader), succeed immediately
  546. if preVotes >= needed {
  547. r.logger.Debug("Pre-vote succeeded immediately (got %d/%d pre-votes)", preVotes, needed)
  548. return true
  549. }
  550. timeout := time.After(500 * time.Millisecond)
  551. for i := 0; i < len(currentPeers); i++ {
  552. select {
  553. case <-r.stopCh:
  554. return false
  555. case <-timeout:
  556. r.logger.Debug("Pre-vote timeout (got %d/%d pre-votes)", preVotes, needed)
  557. return false
  558. case granted := <-preVoteCh:
  559. if granted {
  560. preVotes++
  561. if preVotes >= needed {
  562. r.logger.Debug("Pre-vote succeeded (got %d/%d pre-votes)", preVotes, needed)
  563. return true
  564. }
  565. }
  566. }
  567. // Check if we're still a candidate
  568. r.mu.RLock()
  569. if r.state != Candidate {
  570. r.mu.RUnlock()
  571. return false
  572. }
  573. r.mu.RUnlock()
  574. }
  575. r.logger.Debug("Pre-vote failed (got %d/%d pre-votes)", preVotes, needed)
  576. return false
  577. }
  578. // runLeader handles leader behavior
  579. func (r *Raft) runLeader() {
  580. r.logger.Debug("Running as leader in term %d", r.currentTerm)
  581. // Send initial heartbeat
  582. r.sendHeartbeats()
  583. // Start heartbeat timer
  584. r.mu.Lock()
  585. r.heartbeatTimer = time.NewTimer(r.config.HeartbeatInterval)
  586. r.mu.Unlock()
  587. for {
  588. select {
  589. case <-r.stopCh:
  590. return
  591. case <-r.heartbeatTimer.C:
  592. r.mu.RLock()
  593. if r.state != Leader {
  594. r.mu.RUnlock()
  595. return
  596. }
  597. r.mu.RUnlock()
  598. r.sendHeartbeats()
  599. r.heartbeatTimer.Reset(r.config.HeartbeatInterval)
  600. case <-r.commitCh:
  601. r.updateCommitIndex()
  602. }
  603. }
  604. }
  605. // becomeFollower transitions to follower state
  606. func (r *Raft) becomeFollower(term uint64) {
  607. oldState := r.state
  608. oldTerm := r.currentTerm
  609. r.state = Follower
  610. r.currentTerm = term
  611. // Update metrics
  612. atomic.StoreUint64(&r.metrics.Term, term)
  613. r.votedFor = ""
  614. r.leaderID = ""
  615. // Must persist before responding - use mustPersistState for critical transitions
  616. r.mustPersistState()
  617. r.resetElectionTimer()
  618. // Clear leadership transfer state
  619. r.transferring = false
  620. r.transferTarget = ""
  621. // Only log significant state changes
  622. if oldState == Leader && term > oldTerm {
  623. // Stepping down from leader is notable
  624. r.logger.Debug("Stepped down from leader in term %d", term)
  625. }
  626. }
  627. // becomeLeader transitions to leader state
  628. func (r *Raft) becomeLeader() {
  629. r.state = Leader
  630. r.leaderID = r.nodeID
  631. // Update metrics
  632. atomic.AddUint64(&r.metrics.ElectionsWon, 1)
  633. // Initialize leader state for all peers
  634. lastIndex := r.log.LastIndex()
  635. for nodeID, addr := range r.clusterNodes {
  636. if nodeID != r.nodeID {
  637. r.nextIndex[addr] = lastIndex + 1
  638. r.matchIndex[addr] = 0
  639. }
  640. }
  641. // Also handle legacy peers
  642. for _, peer := range r.peers {
  643. if _, exists := r.nextIndex[peer]; !exists {
  644. r.nextIndex[peer] = lastIndex + 1
  645. r.matchIndex[peer] = 0
  646. }
  647. }
  648. r.logger.Info("Became leader in term %d with %d peers", r.currentTerm, len(r.clusterNodes)-1)
  649. // Append a no-op entry to commit entries from previous terms
  650. // This is a standard Raft optimization - the leader appends a no-op
  651. // entry in its current term to quickly commit all pending entries
  652. noopEntry := LogEntry{
  653. Index: r.log.LastIndex() + 1,
  654. Term: r.currentTerm,
  655. Type: EntryNoop,
  656. Command: nil,
  657. }
  658. if err := r.log.Append(noopEntry); err != nil {
  659. r.logger.Error("Failed to append no-op entry: %v", err)
  660. } else {
  661. r.logger.Debug("Appended no-op entry at index %d for term %d", noopEntry.Index, noopEntry.Term)
  662. }
  663. }
  664. // sendHeartbeats sends AppendEntries RPCs to all peers
  665. func (r *Raft) sendHeartbeats() {
  666. r.mu.RLock()
  667. if r.state != Leader {
  668. r.mu.RUnlock()
  669. return
  670. }
  671. currentTerm := r.currentTerm
  672. leaderCommit := r.commitIndex
  673. // Get all peer addresses
  674. peerAddrs := make([]string, 0, len(r.clusterNodes))
  675. for nodeID, addr := range r.clusterNodes {
  676. if nodeID != r.nodeID {
  677. peerAddrs = append(peerAddrs, addr)
  678. }
  679. }
  680. // Also include legacy peers not in clusterNodes
  681. for _, peer := range r.peers {
  682. found := false
  683. for _, addr := range peerAddrs {
  684. if addr == peer {
  685. found = true
  686. break
  687. }
  688. }
  689. if !found {
  690. peerAddrs = append(peerAddrs, peer)
  691. }
  692. }
  693. r.mu.RUnlock()
  694. for _, peer := range peerAddrs {
  695. go r.replicateToPeer(peer, currentTerm, leaderCommit)
  696. }
  697. }
  698. // replicateToPeer sends AppendEntries to a specific peer
  699. func (r *Raft) replicateToPeer(peer string, term, leaderCommit uint64) {
  700. r.mu.RLock()
  701. if r.state != Leader || r.currentTerm != term {
  702. r.mu.RUnlock()
  703. return
  704. }
  705. nextIndex := r.nextIndex[peer]
  706. r.mu.RUnlock()
  707. // Get entries to send
  708. entries, prevLogIndex, prevLogTerm, err := r.log.GetEntriesForFollower(nextIndex, r.config.MaxLogEntriesPerRequest)
  709. if err == ErrCompacted {
  710. // Need to send snapshot
  711. r.sendSnapshot(peer)
  712. return
  713. }
  714. if err != nil {
  715. r.logger.Debug("Failed to get entries for %s: %v", peer, err)
  716. return
  717. }
  718. args := &AppendEntriesArgs{
  719. Term: term,
  720. LeaderID: r.nodeID,
  721. PrevLogIndex: prevLogIndex,
  722. PrevLogTerm: prevLogTerm,
  723. Entries: entries,
  724. LeaderCommit: leaderCommit,
  725. }
  726. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  727. defer cancel()
  728. reply, err := r.transport.AppendEntries(ctx, peer, args)
  729. if err != nil {
  730. r.logger.Debug("AppendEntries to %s failed: %v", peer, err)
  731. return
  732. }
  733. atomic.AddInt64(&r.stats.AppendsSent, 1)
  734. r.mu.Lock()
  735. defer r.mu.Unlock()
  736. if reply.Term > r.currentTerm {
  737. r.becomeFollower(reply.Term)
  738. return
  739. }
  740. if r.state != Leader || r.currentTerm != term {
  741. return
  742. }
  743. if reply.Success {
  744. // Update contact time
  745. r.lastContact[peer] = time.Now()
  746. // Update nextIndex and matchIndex
  747. if len(entries) > 0 {
  748. newMatchIndex := entries[len(entries)-1].Index
  749. if newMatchIndex > r.matchIndex[peer] {
  750. r.matchIndex[peer] = newMatchIndex
  751. r.nextIndex[peer] = newMatchIndex + 1
  752. // Try to update commit index
  753. select {
  754. case r.commitCh <- struct{}{}:
  755. default:
  756. }
  757. }
  758. }
  759. } else {
  760. // Even if failed (log conflict), we contacted the peer
  761. r.lastContact[peer] = time.Now()
  762. // Decrement nextIndex and retry
  763. if reply.ConflictTerm > 0 {
  764. // Find the last entry of ConflictTerm in our log
  765. found := false
  766. for idx := r.log.LastIndex(); idx >= r.log.FirstIndex(); idx-- {
  767. t, err := r.log.GetTerm(idx)
  768. if err != nil {
  769. break
  770. }
  771. if t == reply.ConflictTerm {
  772. r.nextIndex[peer] = idx + 1
  773. found = true
  774. break
  775. }
  776. if t < reply.ConflictTerm {
  777. break
  778. }
  779. }
  780. if !found {
  781. r.nextIndex[peer] = reply.ConflictIndex
  782. }
  783. } else if reply.ConflictIndex > 0 {
  784. r.nextIndex[peer] = reply.ConflictIndex
  785. } else {
  786. r.nextIndex[peer] = max(1, r.nextIndex[peer]-1)
  787. }
  788. }
  789. }
  790. // sendSnapshot sends a snapshot to a peer with chunked transfer support
  791. func (r *Raft) sendSnapshot(peer string) {
  792. r.mu.RLock()
  793. if r.state != Leader {
  794. r.mu.RUnlock()
  795. return
  796. }
  797. term := r.currentTerm
  798. chunkSize := r.config.SnapshotChunkSize
  799. if chunkSize <= 0 {
  800. chunkSize = 1024 * 1024 // Default 1MB
  801. }
  802. r.mu.RUnlock()
  803. data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
  804. if err != nil {
  805. r.logger.Error("Failed to get snapshot: %v", err)
  806. return
  807. }
  808. atomic.AddUint64(&r.metrics.SnapshotsSent, 1)
  809. r.logger.Info("Sending snapshot to %s: %d bytes, lastIndex=%d, lastTerm=%d",
  810. peer, len(data), lastIndex, lastTerm)
  811. // Send snapshot in chunks
  812. totalSize := len(data)
  813. offset := 0
  814. for offset < totalSize {
  815. // Check if we're still leader
  816. r.mu.RLock()
  817. if r.state != Leader || r.currentTerm != term {
  818. r.mu.RUnlock()
  819. r.logger.Debug("Aborting snapshot send: no longer leader")
  820. return
  821. }
  822. r.mu.RUnlock()
  823. // Calculate chunk size
  824. end := offset + chunkSize
  825. if end > totalSize {
  826. end = totalSize
  827. }
  828. chunk := data[offset:end]
  829. done := end >= totalSize
  830. args := &InstallSnapshotArgs{
  831. Term: term,
  832. LeaderID: r.nodeID,
  833. LastIncludedIndex: lastIndex,
  834. LastIncludedTerm: lastTerm,
  835. Offset: uint64(offset),
  836. Data: chunk,
  837. Done: done,
  838. }
  839. ctx, cancel := context.WithTimeout(context.Background(), r.config.SnapshotRPCTimeout)
  840. reply, err := r.transport.InstallSnapshot(ctx, peer, args)
  841. cancel()
  842. if err != nil {
  843. r.logger.Error("InstallSnapshot chunk to %s failed at offset %d: %v", peer, offset, err)
  844. return
  845. }
  846. if reply.Term > r.currentTerm {
  847. r.mu.Lock()
  848. r.becomeFollower(reply.Term)
  849. r.mu.Unlock()
  850. return
  851. }
  852. if !reply.Success {
  853. r.logger.Error("InstallSnapshot chunk rejected by %s at offset %d", peer, offset)
  854. return
  855. }
  856. offset = end
  857. }
  858. // Snapshot fully sent and accepted
  859. r.mu.Lock()
  860. defer r.mu.Unlock()
  861. if r.state != Leader || r.currentTerm != term {
  862. return
  863. }
  864. r.nextIndex[peer] = lastIndex + 1
  865. r.matchIndex[peer] = lastIndex
  866. r.logger.Info("Snapshot to %s completed, nextIndex=%d", peer, lastIndex+1)
  867. }
  868. // updateCommitIndex updates the commit index based on matchIndex
  869. func (r *Raft) updateCommitIndex() {
  870. r.mu.Lock()
  871. defer r.mu.Unlock()
  872. if r.state != Leader {
  873. return
  874. }
  875. // For config change entries, use the OLD cluster for majority calculation
  876. // This ensures that adding a node doesn't require the new node's vote
  877. // until the config change itself is committed
  878. votingNodes := r.clusterNodes
  879. if r.pendingConfigChange && r.oldClusterNodes != nil {
  880. votingNodes = r.oldClusterNodes
  881. }
  882. // Get current cluster size from voting nodes
  883. clusterSize := len(votingNodes)
  884. if clusterSize == 0 {
  885. clusterSize = len(r.peers) + 1
  886. }
  887. // Find the highest index replicated on a majority
  888. for n := r.log.LastIndex(); n > r.commitIndex; n-- {
  889. term, err := r.log.GetTerm(n)
  890. if err != nil {
  891. continue
  892. }
  893. // Only commit entries from current term
  894. if term != r.currentTerm {
  895. continue
  896. }
  897. // Count replicas (including self)
  898. count := 1 // Self
  899. for nodeID, addr := range votingNodes {
  900. if nodeID != r.nodeID {
  901. if r.matchIndex[addr] >= n {
  902. count++
  903. }
  904. }
  905. }
  906. // Also check legacy peers
  907. for _, peer := range r.peers {
  908. // Avoid double counting if peer is already in votingNodes
  909. alreadyCounted := false
  910. for _, addr := range votingNodes {
  911. if addr == peer {
  912. alreadyCounted = true
  913. break
  914. }
  915. }
  916. if !alreadyCounted && r.matchIndex[peer] >= n {
  917. count++
  918. }
  919. }
  920. // Majority is (n/2) + 1
  921. needed := clusterSize/2 + 1
  922. if count >= needed {
  923. r.commitIndex = n
  924. break
  925. }
  926. }
  927. }
  928. // applyLoop applies committed entries to the state machine
  929. // Uses event-driven model with fallback polling for reliability
  930. func (r *Raft) applyLoop() {
  931. // Use a ticker as fallback for missed signals (matches original 10ms polling)
  932. ticker := time.NewTicker(10 * time.Millisecond)
  933. defer ticker.Stop()
  934. for {
  935. select {
  936. case <-r.stopCh:
  937. return
  938. case <-r.commitCh:
  939. // Event-driven: triggered when commitIndex is updated
  940. r.applyCommitted()
  941. case <-ticker.C:
  942. // Fallback: check periodically in case we missed a signal
  943. r.applyCommitted()
  944. }
  945. }
  946. }
  947. // applyCommitted applies all committed but not yet applied entries
  948. func (r *Raft) applyCommitted() {
  949. r.mu.Lock()
  950. commitIndex := r.commitIndex
  951. lastApplied := r.lastApplied
  952. firstIndex := r.log.FirstIndex()
  953. lastLogIndex := r.log.LastIndex()
  954. r.mu.Unlock()
  955. // Safety check: ensure lastApplied is within valid range
  956. // If lastApplied is below firstIndex (due to snapshot), skip to firstIndex
  957. if lastApplied < firstIndex && firstIndex > 0 {
  958. r.mu.Lock()
  959. r.lastApplied = firstIndex
  960. lastApplied = firstIndex
  961. r.mu.Unlock()
  962. r.logger.Debug("Adjusted lastApplied to firstIndex %d after compaction", firstIndex)
  963. }
  964. // Safety check: don't try to apply beyond what we have in log
  965. if commitIndex > lastLogIndex {
  966. commitIndex = lastLogIndex
  967. }
  968. for lastApplied < commitIndex {
  969. lastApplied++
  970. // Skip if entry has been compacted
  971. if lastApplied < firstIndex {
  972. continue
  973. }
  974. entry, err := r.log.GetEntry(lastApplied)
  975. if err != nil {
  976. // If entry is compacted, skip ahead
  977. if err == ErrCompacted {
  978. r.mu.Lock()
  979. newFirstIndex := r.log.FirstIndex()
  980. if lastApplied < newFirstIndex {
  981. r.lastApplied = newFirstIndex
  982. lastApplied = newFirstIndex
  983. }
  984. r.mu.Unlock()
  985. continue
  986. }
  987. r.logger.Error("Failed to get entry %d: %v (firstIndex=%d, lastIndex=%d)",
  988. lastApplied, err, r.log.FirstIndex(), r.log.LastIndex())
  989. break
  990. }
  991. // Handle config change entries
  992. if entry.Type == EntryConfig {
  993. r.applyConfigChange(entry)
  994. r.mu.Lock()
  995. r.lastApplied = lastApplied
  996. r.mu.Unlock()
  997. continue
  998. }
  999. // Handle no-op entries (just update lastApplied, don't send to state machine)
  1000. if entry.Type == EntryNoop {
  1001. r.mu.Lock()
  1002. r.lastApplied = lastApplied
  1003. r.mu.Unlock()
  1004. continue
  1005. }
  1006. // Normal command entry
  1007. msg := ApplyMsg{
  1008. CommandValid: true,
  1009. Command: entry.Command,
  1010. CommandIndex: entry.Index,
  1011. CommandTerm: entry.Term,
  1012. }
  1013. select {
  1014. case r.applyCh <- msg:
  1015. case <-r.stopCh:
  1016. return
  1017. }
  1018. r.mu.Lock()
  1019. r.lastApplied = lastApplied
  1020. r.mu.Unlock()
  1021. }
  1022. // Check if log compaction is needed
  1023. r.maybeCompactLog()
  1024. }
  1025. // maybeCompactLog checks if automatic log compaction should be triggered
  1026. // It uses a dynamic threshold to prevent "compaction thrashing":
  1027. // - First compaction triggers at SnapshotThreshold (default 100,000)
  1028. // - After compaction, next threshold = current_log_size * 1.5
  1029. // - This prevents every new entry from triggering compaction if log stays large
  1030. func (r *Raft) maybeCompactLog() {
  1031. // Skip if no snapshot provider is configured
  1032. if r.config.SnapshotProvider == nil {
  1033. return
  1034. }
  1035. // Check if compaction is already in progress (atomic check-and-set)
  1036. if !atomic.CompareAndSwapInt32(&r.compacting, 0, 1) {
  1037. return
  1038. }
  1039. // Ensure we release the compacting flag when done
  1040. defer atomic.StoreInt32(&r.compacting, 0)
  1041. r.mu.RLock()
  1042. lastApplied := r.lastApplied
  1043. firstIndex := r.log.FirstIndex()
  1044. initialThreshold := r.config.SnapshotThreshold
  1045. minRetention := r.config.SnapshotMinRetention
  1046. isLeader := r.state == Leader
  1047. dynamicThreshold := r.nextCompactionThreshold
  1048. r.mu.RUnlock()
  1049. // Guard against underflow: ensure lastApplied > firstIndex
  1050. if lastApplied <= firstIndex {
  1051. return
  1052. }
  1053. // Calculate current log size
  1054. logSize := lastApplied - firstIndex
  1055. // Determine effective threshold:
  1056. // - Use initial threshold if no compaction has occurred yet (dynamicThreshold == 0)
  1057. // - Otherwise use the dynamic threshold
  1058. effectiveThreshold := initialThreshold
  1059. if dynamicThreshold > 0 {
  1060. effectiveThreshold = dynamicThreshold
  1061. }
  1062. // Check if we have enough entries to warrant compaction
  1063. if logSize <= effectiveThreshold {
  1064. return
  1065. }
  1066. // Guard against underflow: ensure lastApplied > minRetention
  1067. if lastApplied <= minRetention {
  1068. return
  1069. }
  1070. // Calculate the safe compaction point
  1071. // We need to retain at least minRetention entries for follower catch-up
  1072. compactUpTo := lastApplied - minRetention
  1073. if compactUpTo <= firstIndex {
  1074. return // Not enough entries to compact while maintaining retention
  1075. }
  1076. // For leader, also consider the minimum nextIndex of all followers
  1077. // to avoid compacting entries that followers still need
  1078. if isLeader {
  1079. r.mu.RLock()
  1080. minNextIndex := lastApplied
  1081. for _, nextIdx := range r.nextIndex {
  1082. if nextIdx < minNextIndex {
  1083. minNextIndex = nextIdx
  1084. }
  1085. }
  1086. r.mu.RUnlock()
  1087. // Don't compact entries that followers still need
  1088. // Keep a buffer of minRetention entries before the slowest follower
  1089. if minNextIndex > minRetention {
  1090. followerSafePoint := minNextIndex - minRetention
  1091. if followerSafePoint < compactUpTo {
  1092. compactUpTo = followerSafePoint
  1093. }
  1094. } else {
  1095. // Slowest follower is too far behind, don't compact
  1096. // They will need a full snapshot anyway
  1097. compactUpTo = firstIndex // Effectively skip compaction
  1098. }
  1099. }
  1100. // Final check - make sure we're actually compacting something meaningful
  1101. if compactUpTo <= firstIndex {
  1102. return
  1103. }
  1104. // Get snapshot from application layer
  1105. snapshotData, err := r.config.SnapshotProvider()
  1106. if err != nil {
  1107. r.logger.Error("Failed to get snapshot from provider: %v", err)
  1108. return
  1109. }
  1110. // Get the term at the compaction point
  1111. term, err := r.log.GetTerm(compactUpTo)
  1112. if err != nil {
  1113. r.logger.Error("Failed to get term for compaction index %d: %v", compactUpTo, err)
  1114. return
  1115. }
  1116. // Save snapshot
  1117. if err := r.storage.SaveSnapshot(snapshotData, compactUpTo, term); err != nil {
  1118. r.logger.Error("Failed to save snapshot: %v", err)
  1119. return
  1120. }
  1121. // Compact the log
  1122. if err := r.log.Compact(compactUpTo); err != nil {
  1123. r.logger.Error("Failed to compact log: %v", err)
  1124. return
  1125. }
  1126. // Calculate new log size after compaction
  1127. newLogSize := lastApplied - compactUpTo
  1128. // Update dynamic threshold for next compaction: current size * 1.5
  1129. // This prevents "compaction thrashing" where every entry triggers compaction
  1130. r.mu.Lock()
  1131. r.nextCompactionThreshold = newLogSize + newLogSize/2 // newLogSize * 1.5
  1132. // Ensure threshold doesn't go below the initial threshold
  1133. if r.nextCompactionThreshold < initialThreshold {
  1134. r.nextCompactionThreshold = initialThreshold
  1135. }
  1136. r.mu.Unlock()
  1137. r.logger.Info("Auto compaction completed: compacted up to index %d (term %d), log size %d -> %d, next threshold: %d",
  1138. compactUpTo, term, logSize, newLogSize, r.nextCompactionThreshold)
  1139. }
  1140. // resetElectionTimer resets the election timeout
  1141. func (r *Raft) resetElectionTimer() {
  1142. timeout := r.config.ElectionTimeoutMin +
  1143. time.Duration(rand.Int63n(int64(r.config.ElectionTimeoutMax-r.config.ElectionTimeoutMin)))
  1144. if r.electionTimer == nil {
  1145. r.electionTimer = time.NewTimer(timeout)
  1146. } else {
  1147. if !r.electionTimer.Stop() {
  1148. select {
  1149. case <-r.electionTimer.C:
  1150. default:
  1151. }
  1152. }
  1153. r.electionTimer.Reset(timeout)
  1154. }
  1155. }
  1156. // persistState saves the current state to stable storage
  1157. // Returns error if persistence fails - caller MUST handle this for safety
  1158. func (r *Raft) persistState() error {
  1159. state := &PersistentState{
  1160. CurrentTerm: r.currentTerm,
  1161. VotedFor: r.votedFor,
  1162. }
  1163. if err := r.storage.SaveState(state); err != nil {
  1164. r.logger.Error("Failed to persist state: %v", err)
  1165. return fmt.Errorf("%w: %v", ErrPersistFailed, err)
  1166. }
  1167. return nil
  1168. }
  1169. // mustPersistState saves state and panics on failure
  1170. // Use this only in critical paths where failure is unrecoverable
  1171. func (r *Raft) mustPersistState() {
  1172. if err := r.persistState(); err != nil {
  1173. // In production, you might want to trigger a graceful shutdown instead
  1174. r.logger.Error("CRITICAL: Failed to persist state, node may be in inconsistent state: %v", err)
  1175. panic(err)
  1176. }
  1177. }
  1178. // HandleRequestVote handles RequestVote RPCs (including pre-vote)
  1179. func (r *Raft) HandleRequestVote(args *RequestVoteArgs) *RequestVoteReply {
  1180. r.mu.Lock()
  1181. defer r.mu.Unlock()
  1182. reply := &RequestVoteReply{
  1183. Term: r.currentTerm,
  1184. VoteGranted: false,
  1185. }
  1186. // Handle pre-vote separately
  1187. if args.PreVote {
  1188. return r.handlePreVote(args)
  1189. }
  1190. // Reply false if term < currentTerm
  1191. if args.Term < r.currentTerm {
  1192. return reply
  1193. }
  1194. // If term > currentTerm, become follower
  1195. if args.Term > r.currentTerm {
  1196. r.becomeFollower(args.Term)
  1197. }
  1198. reply.Term = r.currentTerm
  1199. // Check if we can vote for this candidate
  1200. if (r.votedFor == "" || r.votedFor == args.CandidateID) &&
  1201. r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
  1202. r.votedFor = args.CandidateID
  1203. if err := r.persistState(); err != nil {
  1204. // Cannot grant vote if we can't persist the decision
  1205. r.logger.Error("Failed to persist vote for %s: %v", args.CandidateID, err)
  1206. return reply
  1207. }
  1208. r.resetElectionTimer()
  1209. reply.VoteGranted = true
  1210. r.logger.Debug("Granted vote to %s for term %d", args.CandidateID, args.Term)
  1211. }
  1212. return reply
  1213. }
  1214. // handlePreVote handles pre-vote requests
  1215. // Pre-vote doesn't change our state, just checks if we would vote
  1216. func (r *Raft) handlePreVote(args *RequestVoteArgs) *RequestVoteReply {
  1217. reply := &RequestVoteReply{
  1218. Term: r.currentTerm,
  1219. VoteGranted: false,
  1220. }
  1221. // For pre-vote, we check:
  1222. // 1. The candidate's term is at least as high as ours
  1223. // 2. The candidate's log is at least as up-to-date as ours
  1224. // 3. We don't have a current leader (or the candidate's term is higher)
  1225. if args.Term < r.currentTerm {
  1226. return reply
  1227. }
  1228. // Per Raft Pre-Vote optimization (§9.6): reject pre-vote if we have a current
  1229. // leader and the candidate's term is not higher than ours. This prevents
  1230. // disruptive elections when a partitioned node tries to rejoin.
  1231. if r.leaderID != "" && args.Term <= r.currentTerm {
  1232. r.logger.Debug("Rejecting pre-vote from %s: have leader %s", args.CandidateID, r.leaderID)
  1233. return reply
  1234. }
  1235. // Grant pre-vote if log is up-to-date
  1236. // Note: we don't check votedFor for pre-vote, and we don't update any state
  1237. if r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
  1238. reply.VoteGranted = true
  1239. r.logger.Debug("Granted pre-vote to %s for term %d", args.CandidateID, args.Term)
  1240. }
  1241. return reply
  1242. }
  1243. // HandleAppendEntries handles AppendEntries RPCs
  1244. func (r *Raft) HandleAppendEntries(args *AppendEntriesArgs) *AppendEntriesReply {
  1245. r.mu.Lock()
  1246. defer r.mu.Unlock()
  1247. atomic.AddInt64(&r.stats.AppendsReceived, 1)
  1248. reply := &AppendEntriesReply{
  1249. Term: r.currentTerm,
  1250. Success: false,
  1251. }
  1252. // Check if we're a standalone node being added to a cluster
  1253. // A standalone node has only itself in clusterNodes
  1254. isStandalone := len(r.clusterNodes) == 1
  1255. if _, hasSelf := r.clusterNodes[r.nodeID]; isStandalone && hasSelf {
  1256. // We're standalone and receiving AppendEntries from an external leader
  1257. // This means we're being added to a cluster - suppress elections
  1258. if !r.joiningCluster {
  1259. r.joiningCluster = true
  1260. r.joiningClusterTime = time.Now()
  1261. r.logger.Info("Detected cluster join in progress, suppressing elections")
  1262. }
  1263. // When joining, accept higher terms from the leader to sync up
  1264. if args.Term > r.currentTerm {
  1265. r.becomeFollower(args.Term)
  1266. }
  1267. }
  1268. // Reply false if term < currentTerm
  1269. if args.Term < r.currentTerm {
  1270. // But still reset timer if we're joining a cluster to prevent elections
  1271. if r.joiningCluster {
  1272. r.resetElectionTimer()
  1273. }
  1274. return reply
  1275. }
  1276. // If term > currentTerm, or we're a candidate, or we're a leader receiving
  1277. // AppendEntries from another leader (split-brain scenario during cluster merge),
  1278. // become follower. In Raft, there can only be one leader per term.
  1279. if args.Term > r.currentTerm || r.state == Candidate || r.state == Leader {
  1280. r.becomeFollower(args.Term)
  1281. }
  1282. // Update leader info and reset election timer
  1283. r.leaderID = args.LeaderID
  1284. r.lastHeartbeat = time.Now()
  1285. r.resetElectionTimer()
  1286. reply.Term = r.currentTerm
  1287. // Try to append entries
  1288. success, conflictIndex, conflictTerm := r.log.AppendEntriesFromLeader(
  1289. args.PrevLogIndex, args.PrevLogTerm, args.Entries)
  1290. if !success {
  1291. reply.ConflictIndex = conflictIndex
  1292. reply.ConflictTerm = conflictTerm
  1293. return reply
  1294. }
  1295. reply.Success = true
  1296. // Update commit index safely
  1297. if args.LeaderCommit > r.commitIndex {
  1298. // Get our actual last log index
  1299. lastLogIndex := r.log.LastIndex()
  1300. // Calculate what index the entries would have reached
  1301. lastNewEntry := args.PrevLogIndex
  1302. if len(args.Entries) > 0 {
  1303. lastNewEntry = args.Entries[len(args.Entries)-1].Index
  1304. }
  1305. // Commit index should not exceed what we actually have in log
  1306. newCommitIndex := args.LeaderCommit
  1307. if newCommitIndex > lastNewEntry {
  1308. newCommitIndex = lastNewEntry
  1309. }
  1310. if newCommitIndex > lastLogIndex {
  1311. newCommitIndex = lastLogIndex
  1312. }
  1313. // Only advance commit index
  1314. if newCommitIndex > r.commitIndex {
  1315. r.commitIndex = newCommitIndex
  1316. }
  1317. }
  1318. return reply
  1319. }
  1320. // HandleInstallSnapshot handles InstallSnapshot RPCs with chunked transfer support
  1321. func (r *Raft) HandleInstallSnapshot(args *InstallSnapshotArgs) *InstallSnapshotReply {
  1322. r.mu.Lock()
  1323. defer r.mu.Unlock()
  1324. reply := &InstallSnapshotReply{
  1325. Term: r.currentTerm,
  1326. Success: false,
  1327. }
  1328. if args.Term < r.currentTerm {
  1329. return reply
  1330. }
  1331. if args.Term > r.currentTerm {
  1332. r.becomeFollower(args.Term)
  1333. }
  1334. r.leaderID = args.LeaderID
  1335. r.lastHeartbeat = time.Now()
  1336. r.resetElectionTimer()
  1337. reply.Term = r.currentTerm
  1338. // Skip if we already have this or a newer snapshot applied
  1339. if args.LastIncludedIndex <= r.lastApplied {
  1340. r.logger.Debug("Ignoring snapshot at index %d, already applied up to %d",
  1341. args.LastIncludedIndex, r.lastApplied)
  1342. reply.Success = true // Still success to let leader know we don't need it
  1343. return reply
  1344. }
  1345. // Handle chunked transfer
  1346. if args.Offset == 0 {
  1347. // First chunk - start new pending snapshot
  1348. r.pendingSnapshot = &pendingSnapshotState{
  1349. lastIncludedIndex: args.LastIncludedIndex,
  1350. lastIncludedTerm: args.LastIncludedTerm,
  1351. data: make([]byte, 0),
  1352. receivedBytes: 0,
  1353. }
  1354. r.logger.Info("Starting snapshot reception at index %d, term %d",
  1355. args.LastIncludedIndex, args.LastIncludedTerm)
  1356. }
  1357. // Validate we're receiving the expected snapshot
  1358. if r.pendingSnapshot == nil ||
  1359. r.pendingSnapshot.lastIncludedIndex != args.LastIncludedIndex ||
  1360. r.pendingSnapshot.lastIncludedTerm != args.LastIncludedTerm {
  1361. r.logger.Warn("Unexpected snapshot chunk: expected index %d, got %d",
  1362. r.pendingSnapshot.lastIncludedIndex, args.LastIncludedIndex)
  1363. return reply
  1364. }
  1365. // Validate offset matches what we've received
  1366. if uint64(args.Offset) != r.pendingSnapshot.receivedBytes {
  1367. r.logger.Warn("Unexpected chunk offset: expected %d, got %d",
  1368. r.pendingSnapshot.receivedBytes, args.Offset)
  1369. return reply
  1370. }
  1371. // Append chunk data
  1372. r.pendingSnapshot.data = append(r.pendingSnapshot.data, args.Data...)
  1373. r.pendingSnapshot.receivedBytes += uint64(len(args.Data))
  1374. reply.Success = true
  1375. r.logger.Debug("Received snapshot chunk: offset=%d, size=%d, done=%v",
  1376. args.Offset, len(args.Data), args.Done)
  1377. // If not done, wait for more chunks
  1378. if !args.Done {
  1379. return reply
  1380. }
  1381. // All chunks received - apply the snapshot
  1382. r.logger.Info("Installing complete snapshot: %d bytes at index %d, term %d",
  1383. len(r.pendingSnapshot.data), args.LastIncludedIndex, args.LastIncludedTerm)
  1384. atomic.AddUint64(&r.metrics.SnapshotsInstalled, 1)
  1385. // Save snapshot
  1386. if err := r.storage.SaveSnapshot(r.pendingSnapshot.data, args.LastIncludedIndex, args.LastIncludedTerm); err != nil {
  1387. r.logger.Error("Failed to save snapshot: %v", err)
  1388. r.pendingSnapshot = nil
  1389. reply.Success = false
  1390. return reply
  1391. }
  1392. // Compact log
  1393. if err := r.log.Compact(args.LastIncludedIndex); err != nil {
  1394. r.logger.Error("Failed to compact log: %v", err)
  1395. }
  1396. // Update state - must update both commitIndex and lastApplied
  1397. if args.LastIncludedIndex > r.commitIndex {
  1398. r.commitIndex = args.LastIncludedIndex
  1399. }
  1400. // Always update lastApplied to snapshot index to prevent trying to apply compacted entries
  1401. r.lastApplied = args.LastIncludedIndex
  1402. // Send snapshot to application (non-blocking with timeout)
  1403. // Use the complete pendingSnapshot data, not the last chunk
  1404. msg := ApplyMsg{
  1405. SnapshotValid: true,
  1406. Snapshot: r.pendingSnapshot.data,
  1407. SnapshotIndex: args.LastIncludedIndex,
  1408. SnapshotTerm: args.LastIncludedTerm,
  1409. }
  1410. // Clear pending snapshot
  1411. r.pendingSnapshot = nil
  1412. // Try to send, but don't block indefinitely
  1413. select {
  1414. case r.applyCh <- msg:
  1415. r.logger.Debug("Sent snapshot to application")
  1416. case <-time.After(100 * time.Millisecond):
  1417. r.logger.Warn("Timeout sending snapshot to application, will retry")
  1418. // The application will still get correct state via normal apply loop
  1419. }
  1420. return reply
  1421. }
  1422. // Propose proposes a new command to be replicated
  1423. func (r *Raft) Propose(command []byte) (uint64, uint64, bool) {
  1424. r.mu.Lock()
  1425. defer r.mu.Unlock()
  1426. if r.state != Leader {
  1427. return 0, 0, false
  1428. }
  1429. // Check connectivity (Lease Check)
  1430. // If we haven't heard from a majority of peers within ElectionTimeout,
  1431. // we shouldn't accept new commands because we might be partitioned.
  1432. if len(r.clusterNodes) > 1 {
  1433. activePeers := 1 // Self
  1434. now := time.Now()
  1435. timeout := r.config.ElectionTimeoutMax
  1436. for _, peer := range r.peers {
  1437. if last, ok := r.lastContact[peer]; ok && now.Sub(last) < timeout {
  1438. activePeers++
  1439. }
  1440. }
  1441. // Check majority
  1442. needed := len(r.clusterNodes)/2 + 1
  1443. if activePeers < needed {
  1444. r.logger.Warn("Rejecting Propose: lost contact with majority (active: %d, needed: %d)", activePeers, needed)
  1445. return 0, 0, false
  1446. }
  1447. }
  1448. index, err := r.log.AppendCommand(r.currentTerm, command)
  1449. if err != nil {
  1450. r.logger.Error("Failed to append command: %v", err)
  1451. return 0, 0, false
  1452. }
  1453. r.matchIndex[r.nodeID] = index
  1454. // For single-node cluster, we are the only voter and can commit immediately
  1455. // This fixes the issue where commitCh never gets triggered without other peers
  1456. if len(r.clusterNodes) <= 1 && len(r.peers) == 0 {
  1457. // Single node: self is majority, trigger commit immediately
  1458. select {
  1459. case r.commitCh <- struct{}{}:
  1460. default:
  1461. }
  1462. } else {
  1463. // Multi-node: trigger replication to other nodes
  1464. r.triggerReplication()
  1465. }
  1466. return index, r.currentTerm, true
  1467. }
  1468. // triggerReplication signals the replication loop to send heartbeats
  1469. // This uses a non-blocking send to batch replication requests
  1470. func (r *Raft) triggerReplication() {
  1471. select {
  1472. case r.replicationCh <- struct{}{}:
  1473. default:
  1474. // Replication already scheduled
  1475. }
  1476. }
  1477. // replicationLoop handles batched replication
  1478. // Uses simple delay-based batching: flush immediately when signaled, then wait
  1479. // to allow more requests to accumulate before the next flush.
  1480. func (r *Raft) replicationLoop() {
  1481. for {
  1482. select {
  1483. case <-r.stopCh:
  1484. return
  1485. case <-r.replicationCh:
  1486. // Flush and replicate immediately
  1487. r.flushAndReplicate()
  1488. // Wait briefly to allow batching of subsequent requests
  1489. // This gives time for more proposals to queue up before the next flush
  1490. time.Sleep(10 * time.Millisecond)
  1491. }
  1492. }
  1493. }
  1494. // flushAndReplicate flushes logs and sends heartbeats
  1495. func (r *Raft) flushAndReplicate() {
  1496. // Ensure logs are flushed to OS cache before sending to followers
  1497. // This implements Group Commit with Flush (fast) instead of Sync (slow)
  1498. if err := r.log.Flush(); err != nil {
  1499. r.logger.Error("Failed to flush log: %v", err)
  1500. }
  1501. r.sendHeartbeats()
  1502. }
  1503. // ProposeWithForward proposes a command, forwarding to leader if necessary
  1504. // This is the recommended method for applications to use
  1505. func (r *Raft) ProposeWithForward(command []byte) (index uint64, term uint64, err error) {
  1506. // Try local propose first
  1507. idx, t, isLeader := r.Propose(command)
  1508. if isLeader {
  1509. return idx, t, nil
  1510. }
  1511. // Not leader, forward to leader
  1512. r.mu.RLock()
  1513. leaderID := r.leaderID
  1514. // Use clusterNodes (dynamically maintained) to find leader address
  1515. leaderAddr := r.clusterNodes[leaderID]
  1516. r.mu.RUnlock()
  1517. if leaderID == "" {
  1518. return 0, 0, fmt.Errorf("no leader available")
  1519. }
  1520. if leaderAddr == "" {
  1521. return 0, 0, fmt.Errorf("leader %s address not found in cluster", leaderID)
  1522. }
  1523. // Forward to leader
  1524. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  1525. defer cancel()
  1526. args := &ProposeArgs{Command: command}
  1527. reply, err := r.transport.ForwardPropose(ctx, leaderAddr, args)
  1528. if err != nil {
  1529. return 0, 0, fmt.Errorf("forward failed: %w", err)
  1530. }
  1531. if !reply.Success {
  1532. return 0, 0, fmt.Errorf("leader rejected: %s", reply.Error)
  1533. }
  1534. return reply.Index, reply.Term, nil
  1535. }
  1536. // ForwardGet forwards a get request to the leader
  1537. func (r *Raft) ForwardGet(key string) (string, bool, error) {
  1538. // Check if we are leader (local read)
  1539. if r.state == Leader {
  1540. if r.config.GetHandler != nil {
  1541. val, found := r.config.GetHandler(key)
  1542. return val, found, nil
  1543. }
  1544. return "", false, fmt.Errorf("get handler not configured")
  1545. }
  1546. r.mu.RLock()
  1547. leaderID := r.leaderID
  1548. leaderAddr := r.clusterNodes[leaderID]
  1549. r.mu.RUnlock()
  1550. if leaderID == "" {
  1551. return "", false, ErrNoLeader
  1552. }
  1553. if leaderAddr == "" {
  1554. return "", false, fmt.Errorf("leader %s address not found", leaderID)
  1555. }
  1556. // Forward to leader
  1557. ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
  1558. defer cancel()
  1559. args := &GetArgs{Key: key}
  1560. reply, err := r.transport.ForwardGet(ctx, leaderAddr, args)
  1561. if err != nil {
  1562. return "", false, fmt.Errorf("forward failed: %w", err)
  1563. }
  1564. return reply.Value, reply.Found, nil
  1565. }
  1566. // HandlePropose handles forwarded propose requests
  1567. func (r *Raft) HandlePropose(args *ProposeArgs) *ProposeReply {
  1568. index, term, isLeader := r.Propose(args.Command)
  1569. if !isLeader {
  1570. return &ProposeReply{
  1571. Success: false,
  1572. Error: "not leader",
  1573. }
  1574. }
  1575. return &ProposeReply{
  1576. Success: true,
  1577. Index: index,
  1578. Term: term,
  1579. }
  1580. }
  1581. // HandleAddNode handles forwarded AddNode requests
  1582. func (r *Raft) HandleAddNode(args *AddNodeArgs) *AddNodeReply {
  1583. err := r.AddNode(args.NodeID, args.Address)
  1584. if err != nil {
  1585. return &AddNodeReply{
  1586. Success: false,
  1587. Error: err.Error(),
  1588. }
  1589. }
  1590. return &AddNodeReply{
  1591. Success: true,
  1592. }
  1593. }
  1594. // HandleRemoveNode handles forwarded RemoveNode requests
  1595. func (r *Raft) HandleRemoveNode(args *RemoveNodeArgs) *RemoveNodeReply {
  1596. err := r.RemoveNode(args.NodeID)
  1597. if err != nil {
  1598. return &RemoveNodeReply{
  1599. Success: false,
  1600. Error: err.Error(),
  1601. }
  1602. }
  1603. return &RemoveNodeReply{
  1604. Success: true,
  1605. }
  1606. }
  1607. // GetState returns the current term and whether this node is leader
  1608. func (r *Raft) GetState() (uint64, bool) {
  1609. r.mu.RLock()
  1610. defer r.mu.RUnlock()
  1611. return r.currentTerm, r.state == Leader
  1612. }
  1613. // GetLeaderID returns the current leader ID
  1614. func (r *Raft) GetLeaderID() string {
  1615. r.mu.RLock()
  1616. defer r.mu.RUnlock()
  1617. return r.leaderID
  1618. }
  1619. // GetStats returns runtime statistics
  1620. func (r *Raft) GetStats() Stats {
  1621. r.mu.RLock()
  1622. defer r.mu.RUnlock()
  1623. lastIndex, lastTerm := r.log.LastIndexAndTerm()
  1624. // Copy cluster nodes
  1625. nodes := make(map[string]string)
  1626. for k, v := range r.clusterNodes {
  1627. nodes[k] = v
  1628. }
  1629. clusterSize := len(r.clusterNodes)
  1630. if clusterSize == 0 {
  1631. clusterSize = len(r.peers) + 1
  1632. }
  1633. return Stats{
  1634. Term: r.currentTerm,
  1635. State: r.state.String(),
  1636. LastLogIndex: lastIndex,
  1637. LastLogTerm: lastTerm,
  1638. CommitIndex: r.commitIndex,
  1639. LastApplied: r.lastApplied,
  1640. LeaderID: r.leaderID,
  1641. AppendsSent: atomic.LoadInt64(&r.stats.AppendsSent),
  1642. AppendsReceived: atomic.LoadInt64(&r.stats.AppendsReceived),
  1643. ClusterSize: clusterSize,
  1644. ClusterNodes: nodes,
  1645. }
  1646. }
  1647. // restoreFromSnapshot restores the FSM from a snapshot at startup
  1648. // This is called during Start() to ensure the FSM has the correct state
  1649. // before processing any new commands
  1650. func (r *Raft) restoreFromSnapshot() error {
  1651. // Get snapshot from storage
  1652. data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
  1653. if err != nil {
  1654. return fmt.Errorf("failed to get snapshot: %w", err)
  1655. }
  1656. // No snapshot exists
  1657. if len(data) == 0 || lastIndex == 0 {
  1658. return nil
  1659. }
  1660. r.logger.Info("Restoring FSM from snapshot at index %d, term %d (%d bytes)",
  1661. lastIndex, lastTerm, len(data))
  1662. // Update lastApplied to snapshot index to prevent re-applying compacted entries
  1663. r.mu.Lock()
  1664. if lastIndex > r.lastApplied {
  1665. r.lastApplied = lastIndex
  1666. }
  1667. if lastIndex > r.commitIndex {
  1668. r.commitIndex = lastIndex
  1669. }
  1670. r.mu.Unlock()
  1671. // Send snapshot to FSM for restoration
  1672. // Use a goroutine with timeout to avoid blocking if applyCh is full
  1673. msg := ApplyMsg{
  1674. SnapshotValid: true,
  1675. Snapshot: data,
  1676. SnapshotIndex: lastIndex,
  1677. SnapshotTerm: lastTerm,
  1678. }
  1679. // Try to send with a timeout
  1680. select {
  1681. case r.applyCh <- msg:
  1682. r.logger.Info("FSM restoration triggered from snapshot at index %d", lastIndex)
  1683. case <-time.After(5 * time.Second):
  1684. return fmt.Errorf("timeout sending snapshot to applyCh")
  1685. }
  1686. return nil
  1687. }
  1688. // TakeSnapshot takes a snapshot of the current state
  1689. func (r *Raft) TakeSnapshot(data []byte, index uint64) error {
  1690. r.mu.Lock()
  1691. defer r.mu.Unlock()
  1692. if index > r.lastApplied {
  1693. return fmt.Errorf("snapshot index %d exceeds lastApplied %d", index, r.lastApplied)
  1694. }
  1695. term, err := r.log.GetTerm(index)
  1696. if err != nil {
  1697. return fmt.Errorf("failed to get term for index %d: %w", index, err)
  1698. }
  1699. if err := r.storage.SaveSnapshot(data, index, term); err != nil {
  1700. return fmt.Errorf("failed to save snapshot: %w", err)
  1701. }
  1702. if err := r.log.Compact(index); err != nil {
  1703. return fmt.Errorf("failed to compact log: %w", err)
  1704. }
  1705. r.logger.Info("Took snapshot at index %d, term %d", index, term)
  1706. return nil
  1707. }
  1708. func max(a, b uint64) uint64 {
  1709. if a > b {
  1710. return a
  1711. }
  1712. return b
  1713. }
  1714. // ==================== Membership Change API ====================
  1715. //
  1716. // This implementation uses Single-Node Membership Change (also known as one-at-a-time changes)
  1717. // as described in the Raft dissertation (§4.3). This is safe because:
  1718. //
  1719. // 1. We only allow one configuration change at a time (pendingConfigChange flag)
  1720. // 2. For commits, we use the OLD cluster majority until the config change is committed
  1721. // 3. The new node starts receiving entries immediately but doesn't affect majority calculation
  1722. //
  1723. // This approach is simpler than Joint Consensus and is sufficient for most use cases.
  1724. // The invariant maintained is: any two majorities (old or new) must overlap.
  1725. //
  1726. // For adding a node: old majority = N/2+1, new majority = (N+1)/2+1 = N/2+1 (overlaps)
  1727. // For removing a node: old majority = N/2+1, new majority = (N-1)/2+1 (overlaps if N > 1)
  1728. //
  1729. // WARNING: Avoid adding/removing multiple nodes rapidly. Wait for each change to be committed.
  1730. // AddNode adds a new node to the cluster
  1731. // This can only be called on the leader
  1732. // The new node must already be running and reachable
  1733. //
  1734. // Safety guarantees:
  1735. // - Only one config change can be in progress at a time
  1736. // - The old cluster majority is used until the config change is committed
  1737. // - Returns error if leadership is lost during the operation
  1738. func (r *Raft) AddNode(nodeID, address string) error {
  1739. r.mu.Lock()
  1740. // Must be leader
  1741. if r.state != Leader {
  1742. leaderID := r.leaderID
  1743. r.mu.Unlock()
  1744. return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  1745. }
  1746. // Check if we're in the middle of a leadership transfer
  1747. if r.transferring {
  1748. r.mu.Unlock()
  1749. return fmt.Errorf("leadership transfer in progress")
  1750. }
  1751. // Check if there's already a pending config change
  1752. if r.pendingConfigChange {
  1753. r.mu.Unlock()
  1754. return ErrConfigInFlight
  1755. }
  1756. // Validate nodeID and address
  1757. if nodeID == "" {
  1758. r.mu.Unlock()
  1759. return fmt.Errorf("nodeID cannot be empty")
  1760. }
  1761. if address == "" {
  1762. r.mu.Unlock()
  1763. return fmt.Errorf("address cannot be empty")
  1764. }
  1765. // Check if node already exists
  1766. if _, exists := r.clusterNodes[nodeID]; exists {
  1767. r.mu.Unlock()
  1768. return fmt.Errorf("node %s already exists in cluster", nodeID)
  1769. }
  1770. // Check if address is already used by another node
  1771. for existingID, existingAddr := range r.clusterNodes {
  1772. if existingAddr == address {
  1773. r.mu.Unlock()
  1774. return fmt.Errorf("address %s is already used by node %s", address, existingID)
  1775. }
  1776. }
  1777. // Save old cluster nodes for majority calculation during config change
  1778. // This ensures we use the OLD cluster size until the config change is committed
  1779. r.oldClusterNodes = make(map[string]string)
  1780. for k, v := range r.clusterNodes {
  1781. r.oldClusterNodes[k] = v
  1782. }
  1783. // Create new config with the added node
  1784. newNodes := make(map[string]string)
  1785. for k, v := range r.clusterNodes {
  1786. newNodes[k] = v
  1787. }
  1788. newNodes[nodeID] = address
  1789. // Create config change entry with ClusterConfig
  1790. configIndex := r.log.LastIndex() + 1
  1791. entry := LogEntry{
  1792. Index: configIndex,
  1793. Term: r.currentTerm,
  1794. Type: EntryConfig,
  1795. Config: &ClusterConfig{Nodes: newNodes},
  1796. }
  1797. // Mark config change as pending and store the index
  1798. r.pendingConfigChange = true
  1799. r.configChangeIndex = configIndex
  1800. // Immediately apply the new configuration (for single-node changes, this is safe)
  1801. // The new node will start receiving AppendEntries immediately
  1802. r.clusterNodes[nodeID] = address
  1803. r.peers = append(r.peers, address)
  1804. // Set nextIndex to 1 (or firstIndex) so the new node syncs from the beginning
  1805. // This is crucial - the new node's log is empty, so we must start from index 1
  1806. firstIndex := r.log.FirstIndex()
  1807. if firstIndex == 0 {
  1808. firstIndex = 1
  1809. }
  1810. r.nextIndex[address] = firstIndex
  1811. r.matchIndex[address] = 0
  1812. r.mu.Unlock()
  1813. // Append the config change entry to log
  1814. if err := r.log.Append(entry); err != nil {
  1815. r.mu.Lock()
  1816. r.pendingConfigChange = false
  1817. r.oldClusterNodes = nil
  1818. r.configChangeIndex = 0
  1819. // Rollback
  1820. delete(r.clusterNodes, nodeID)
  1821. r.rebuildPeersList()
  1822. r.mu.Unlock()
  1823. return fmt.Errorf("failed to append config entry: %w", err)
  1824. }
  1825. r.logger.Info("Adding node %s (%s) to cluster", nodeID, address)
  1826. // Trigger immediate replication
  1827. r.triggerReplication()
  1828. return nil
  1829. }
  1830. // RemoveNode removes a node from the cluster
  1831. // This can only be called on the leader
  1832. // The node being removed can be any node except the leader itself
  1833. //
  1834. // Safety guarantees:
  1835. // - Only one config change can be in progress at a time
  1836. // - Cannot remove the leader (transfer leadership first)
  1837. // - Cannot reduce cluster to 0 nodes
  1838. // - The old cluster majority is used until the config change is committed
  1839. func (r *Raft) RemoveNode(nodeID string) error {
  1840. r.mu.Lock()
  1841. // Must be leader
  1842. if r.state != Leader {
  1843. leaderID := r.leaderID
  1844. r.mu.Unlock()
  1845. return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  1846. }
  1847. // Check if we're in the middle of a leadership transfer
  1848. if r.transferring {
  1849. r.mu.Unlock()
  1850. return fmt.Errorf("leadership transfer in progress")
  1851. }
  1852. // Cannot remove self
  1853. if nodeID == r.nodeID {
  1854. r.mu.Unlock()
  1855. return fmt.Errorf("cannot remove self from cluster, use TransferLeadership first")
  1856. }
  1857. // Validate nodeID
  1858. if nodeID == "" {
  1859. r.mu.Unlock()
  1860. return fmt.Errorf("nodeID cannot be empty")
  1861. }
  1862. // Check if there's already a pending config change
  1863. if r.pendingConfigChange {
  1864. r.mu.Unlock()
  1865. return ErrConfigInFlight
  1866. }
  1867. // Check if node exists
  1868. if _, exists := r.clusterNodes[nodeID]; !exists {
  1869. r.mu.Unlock()
  1870. return fmt.Errorf("node %s not found in cluster", nodeID)
  1871. }
  1872. // Cannot reduce cluster below 1 node
  1873. if len(r.clusterNodes) <= 1 {
  1874. r.mu.Unlock()
  1875. return fmt.Errorf("cannot remove last node from cluster")
  1876. }
  1877. // Save old cluster nodes for majority calculation during config change
  1878. r.oldClusterNodes = make(map[string]string)
  1879. for k, v := range r.clusterNodes {
  1880. r.oldClusterNodes[k] = v
  1881. }
  1882. // Create new config without the removed node
  1883. newNodes := make(map[string]string)
  1884. for k, v := range r.clusterNodes {
  1885. if k != nodeID {
  1886. newNodes[k] = v
  1887. }
  1888. }
  1889. // Create config change entry with ClusterConfig
  1890. configIndex := r.log.LastIndex() + 1
  1891. entry := LogEntry{
  1892. Index: configIndex,
  1893. Term: r.currentTerm,
  1894. Type: EntryConfig,
  1895. Config: &ClusterConfig{Nodes: newNodes},
  1896. }
  1897. // Mark config change as pending and store the index
  1898. r.pendingConfigChange = true
  1899. r.configChangeIndex = configIndex
  1900. // Get the address of node being removed for cleanup
  1901. removedAddr := r.clusterNodes[nodeID]
  1902. // Immediately apply the new configuration
  1903. delete(r.clusterNodes, nodeID)
  1904. r.rebuildPeersList()
  1905. delete(r.nextIndex, removedAddr)
  1906. delete(r.matchIndex, removedAddr)
  1907. r.mu.Unlock()
  1908. // Append the config change entry to log
  1909. if err := r.log.Append(entry); err != nil {
  1910. r.mu.Lock()
  1911. r.pendingConfigChange = false
  1912. r.oldClusterNodes = nil
  1913. r.configChangeIndex = 0
  1914. // Rollback - this is tricky but we try our best
  1915. r.clusterNodes[nodeID] = removedAddr
  1916. r.rebuildPeersList()
  1917. r.mu.Unlock()
  1918. return fmt.Errorf("failed to append config entry: %w", err)
  1919. }
  1920. r.logger.Info("Removing node %s from cluster, config at index %d", nodeID, entry.Index)
  1921. // Trigger replication
  1922. go r.sendHeartbeats()
  1923. return nil
  1924. }
  1925. // AddNodeWithForward adds a node, forwarding to leader if necessary
  1926. // This is the recommended method for applications to use
  1927. func (r *Raft) AddNodeWithForward(nodeID, address string) error {
  1928. // Try local operation first
  1929. err := r.AddNode(nodeID, address)
  1930. if err == nil {
  1931. return nil
  1932. }
  1933. // Check if we're not the leader
  1934. r.mu.RLock()
  1935. state := r.state
  1936. leaderID := r.leaderID
  1937. leaderAddr := r.clusterNodes[leaderID]
  1938. r.mu.RUnlock()
  1939. if state == Leader {
  1940. // We are leader but AddNode failed for other reasons
  1941. return err
  1942. }
  1943. // Not leader, forward to leader
  1944. if leaderID == "" {
  1945. return fmt.Errorf("no leader available")
  1946. }
  1947. if leaderAddr == "" {
  1948. return fmt.Errorf("leader %s address not found in cluster", leaderID)
  1949. }
  1950. // Forward to leader
  1951. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1952. defer cancel()
  1953. args := &AddNodeArgs{NodeID: nodeID, Address: address}
  1954. reply, err := r.transport.ForwardAddNode(ctx, leaderAddr, args)
  1955. if err != nil {
  1956. return fmt.Errorf("forward failed: %w", err)
  1957. }
  1958. if !reply.Success {
  1959. return fmt.Errorf("leader rejected: %s", reply.Error)
  1960. }
  1961. return nil
  1962. }
  1963. // RemoveNodeWithForward removes a node, forwarding to leader if necessary
  1964. // This is the recommended method for applications to use
  1965. func (r *Raft) RemoveNodeWithForward(nodeID string) error {
  1966. // Try local operation first
  1967. err := r.RemoveNode(nodeID)
  1968. if err == nil {
  1969. return nil
  1970. }
  1971. // Check if we're not the leader
  1972. r.mu.RLock()
  1973. state := r.state
  1974. leaderID := r.leaderID
  1975. leaderAddr := r.clusterNodes[leaderID]
  1976. r.mu.RUnlock()
  1977. if state == Leader {
  1978. // We are leader but RemoveNode failed for other reasons
  1979. return err
  1980. }
  1981. // Not leader, forward to leader
  1982. if leaderID == "" {
  1983. return fmt.Errorf("no leader available")
  1984. }
  1985. if leaderAddr == "" {
  1986. return fmt.Errorf("leader %s address not found in cluster", leaderID)
  1987. }
  1988. // Forward to leader
  1989. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1990. defer cancel()
  1991. args := &RemoveNodeArgs{NodeID: nodeID}
  1992. reply, err := r.transport.ForwardRemoveNode(ctx, leaderAddr, args)
  1993. if err != nil {
  1994. return fmt.Errorf("forward failed: %w", err)
  1995. }
  1996. if !reply.Success {
  1997. return fmt.Errorf("leader rejected: %s", reply.Error)
  1998. }
  1999. return nil
  2000. }
  2001. // rebuildPeersList rebuilds the peers slice from clusterNodes
  2002. func (r *Raft) rebuildPeersList() {
  2003. r.peers = make([]string, 0, len(r.clusterNodes)-1)
  2004. for nodeID, addr := range r.clusterNodes {
  2005. if nodeID != r.nodeID {
  2006. r.peers = append(r.peers, addr)
  2007. }
  2008. }
  2009. }
  2010. // GetClusterNodes returns a copy of the current cluster membership
  2011. func (r *Raft) GetClusterNodes() map[string]string {
  2012. r.mu.RLock()
  2013. defer r.mu.RUnlock()
  2014. nodes := make(map[string]string)
  2015. for k, v := range r.clusterNodes {
  2016. nodes[k] = v
  2017. }
  2018. return nodes
  2019. }
  2020. // applyConfigChange applies a configuration change entry
  2021. func (r *Raft) applyConfigChange(entry *LogEntry) {
  2022. if entry.Config == nil || entry.Config.Nodes == nil {
  2023. r.logger.Warn("Invalid config change entry at index %d", entry.Index)
  2024. return
  2025. }
  2026. r.mu.Lock()
  2027. defer r.mu.Unlock()
  2028. // Update cluster configuration
  2029. r.clusterNodes = make(map[string]string)
  2030. for k, v := range entry.Config.Nodes {
  2031. r.clusterNodes[k] = v
  2032. }
  2033. r.rebuildPeersList()
  2034. // Persist the new configuration
  2035. if err := r.storage.SaveClusterConfig(&ClusterConfig{Nodes: r.clusterNodes}); err != nil {
  2036. r.logger.Error("Failed to persist cluster config: %v", err)
  2037. }
  2038. // Clear pending flag and old cluster state
  2039. r.pendingConfigChange = false
  2040. r.oldClusterNodes = nil
  2041. r.configChangeIndex = 0
  2042. // If we were joining a cluster and now have multiple nodes, we've successfully joined
  2043. if r.joiningCluster && len(r.clusterNodes) > 1 {
  2044. r.joiningCluster = false
  2045. r.logger.Info("Successfully joined cluster with %d nodes", len(r.clusterNodes))
  2046. }
  2047. r.logger.Info("Applied config change at index %d, cluster now has %d nodes", entry.Index, len(r.clusterNodes))
  2048. // If we're the leader, update leader state
  2049. if r.state == Leader {
  2050. // Initialize nextIndex/matchIndex for any new nodes
  2051. lastIndex := r.log.LastIndex()
  2052. for nodeID, addr := range r.clusterNodes {
  2053. if nodeID != r.nodeID {
  2054. if _, exists := r.nextIndex[addr]; !exists {
  2055. r.nextIndex[addr] = lastIndex + 1
  2056. r.matchIndex[addr] = 0
  2057. }
  2058. }
  2059. }
  2060. // Clean up removed nodes
  2061. validAddrs := make(map[string]bool)
  2062. for nodeID, addr := range r.clusterNodes {
  2063. if nodeID != r.nodeID {
  2064. validAddrs[addr] = true
  2065. }
  2066. }
  2067. for addr := range r.nextIndex {
  2068. if !validAddrs[addr] {
  2069. delete(r.nextIndex, addr)
  2070. delete(r.matchIndex, addr)
  2071. }
  2072. }
  2073. }
  2074. }
  2075. // ==================== ReadIndex (Linearizable Reads) ====================
  2076. // readIndexLoop handles read index requests
  2077. func (r *Raft) readIndexLoop() {
  2078. for {
  2079. select {
  2080. case <-r.stopCh:
  2081. return
  2082. case req := <-r.readIndexCh:
  2083. r.processReadIndexRequest(req)
  2084. }
  2085. }
  2086. }
  2087. // processReadIndexRequest processes a single read index request
  2088. func (r *Raft) processReadIndexRequest(req *readIndexRequest) {
  2089. r.mu.RLock()
  2090. if r.state != Leader {
  2091. r.mu.RUnlock()
  2092. req.done <- ErrNotLeader
  2093. return
  2094. }
  2095. r.mu.RUnlock()
  2096. // Confirm leadership by sending heartbeats and waiting for majority ack
  2097. if !r.confirmLeadership() {
  2098. req.done <- ErrLeadershipLost
  2099. return
  2100. }
  2101. // Wait for apply to catch up to readIndex
  2102. if err := r.waitApply(req.readIndex, r.config.ProposeTimeout); err != nil {
  2103. req.done <- err
  2104. return
  2105. }
  2106. req.done <- nil
  2107. }
  2108. // ReadIndex implements linearizable reads
  2109. // It ensures that the read sees all writes that were committed before the read started
  2110. func (r *Raft) ReadIndex() (uint64, error) {
  2111. r.mu.RLock()
  2112. if r.state != Leader {
  2113. leaderID := r.leaderID
  2114. r.mu.RUnlock()
  2115. return 0, NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  2116. }
  2117. readIndex := r.commitIndex
  2118. r.mu.RUnlock()
  2119. atomic.AddUint64(&r.metrics.ReadIndexRequests, 1)
  2120. // Create request and send to processing loop
  2121. req := &readIndexRequest{
  2122. readIndex: readIndex,
  2123. done: make(chan error, 1),
  2124. }
  2125. select {
  2126. case r.readIndexCh <- req:
  2127. case <-r.stopCh:
  2128. return 0, ErrShutdown
  2129. case <-time.After(r.config.ProposeTimeout):
  2130. return 0, ErrTimeout
  2131. }
  2132. // Wait for result
  2133. select {
  2134. case err := <-req.done:
  2135. if err != nil {
  2136. return 0, err
  2137. }
  2138. atomic.AddUint64(&r.metrics.ReadIndexSuccess, 1)
  2139. return readIndex, nil
  2140. case <-r.stopCh:
  2141. return 0, ErrShutdown
  2142. case <-time.After(r.config.ProposeTimeout):
  2143. return 0, ErrTimeout
  2144. }
  2145. }
  2146. // confirmLeadership confirms we're still leader by getting acks from majority
  2147. func (r *Raft) confirmLeadership() bool {
  2148. r.mu.RLock()
  2149. if r.state != Leader {
  2150. r.mu.RUnlock()
  2151. return false
  2152. }
  2153. currentTerm := r.currentTerm
  2154. clusterSize := len(r.clusterNodes)
  2155. if clusterSize == 0 {
  2156. clusterSize = len(r.peers) + 1
  2157. }
  2158. r.mu.RUnlock()
  2159. // Single node cluster - we're always the leader
  2160. if clusterSize == 1 {
  2161. return true
  2162. }
  2163. // Send heartbeats and count acks
  2164. r.sendHeartbeats()
  2165. // Wait briefly and check if we're still leader
  2166. time.Sleep(r.config.HeartbeatInterval)
  2167. r.mu.RLock()
  2168. stillLeader := r.state == Leader && r.currentTerm == currentTerm
  2169. r.mu.RUnlock()
  2170. return stillLeader
  2171. }
  2172. // waitApply waits until lastApplied >= index
  2173. func (r *Raft) waitApply(index uint64, timeout time.Duration) error {
  2174. deadline := time.Now().Add(timeout)
  2175. for {
  2176. r.mu.RLock()
  2177. lastApplied := r.lastApplied
  2178. r.mu.RUnlock()
  2179. if lastApplied >= index {
  2180. return nil
  2181. }
  2182. if time.Now().After(deadline) {
  2183. return ErrTimeout
  2184. }
  2185. time.Sleep(1 * time.Millisecond)
  2186. }
  2187. }
  2188. // ==================== Health Check ====================
  2189. // HealthCheck returns the current health status of the node
  2190. func (r *Raft) HealthCheck() HealthStatus {
  2191. r.mu.RLock()
  2192. defer r.mu.RUnlock()
  2193. clusterNodes := make(map[string]string)
  2194. for k, v := range r.clusterNodes {
  2195. clusterNodes[k] = v
  2196. }
  2197. clusterSize := len(r.clusterNodes)
  2198. if clusterSize == 0 {
  2199. clusterSize = len(r.peers) + 1
  2200. }
  2201. logBehind := uint64(0)
  2202. if r.commitIndex > r.lastApplied {
  2203. logBehind = r.commitIndex - r.lastApplied
  2204. }
  2205. // Consider healthy if we're leader or have a known leader
  2206. isHealthy := r.state == Leader || r.leaderID != ""
  2207. return HealthStatus{
  2208. NodeID: r.nodeID,
  2209. State: r.state.String(),
  2210. Term: r.currentTerm,
  2211. LeaderID: r.leaderID,
  2212. ClusterSize: clusterSize,
  2213. ClusterNodes: clusterNodes,
  2214. CommitIndex: r.commitIndex,
  2215. LastApplied: r.lastApplied,
  2216. LogBehind: logBehind,
  2217. LastHeartbeat: r.lastHeartbeat,
  2218. IsHealthy: isHealthy,
  2219. Uptime: time.Since(r.startTime),
  2220. }
  2221. }
  2222. // GetMetrics returns the current metrics
  2223. func (r *Raft) GetMetrics() Metrics {
  2224. return Metrics{
  2225. Term: atomic.LoadUint64(&r.metrics.Term),
  2226. ProposalsTotal: atomic.LoadUint64(&r.metrics.ProposalsTotal),
  2227. ProposalsSuccess: atomic.LoadUint64(&r.metrics.ProposalsSuccess),
  2228. ProposalsFailed: atomic.LoadUint64(&r.metrics.ProposalsFailed),
  2229. ProposalsForwarded: atomic.LoadUint64(&r.metrics.ProposalsForwarded),
  2230. AppendsSent: atomic.LoadUint64(&r.metrics.AppendsSent),
  2231. AppendsReceived: atomic.LoadUint64(&r.metrics.AppendsReceived),
  2232. AppendsSuccess: atomic.LoadUint64(&r.metrics.AppendsSuccess),
  2233. AppendsFailed: atomic.LoadUint64(&r.metrics.AppendsFailed),
  2234. ElectionsStarted: atomic.LoadUint64(&r.metrics.ElectionsStarted),
  2235. ElectionsWon: atomic.LoadUint64(&r.metrics.ElectionsWon),
  2236. PreVotesStarted: atomic.LoadUint64(&r.metrics.PreVotesStarted),
  2237. PreVotesGranted: atomic.LoadUint64(&r.metrics.PreVotesGranted),
  2238. SnapshotsTaken: atomic.LoadUint64(&r.metrics.SnapshotsTaken),
  2239. SnapshotsInstalled: atomic.LoadUint64(&r.metrics.SnapshotsInstalled),
  2240. SnapshotsSent: atomic.LoadUint64(&r.metrics.SnapshotsSent),
  2241. ReadIndexRequests: atomic.LoadUint64(&r.metrics.ReadIndexRequests),
  2242. ReadIndexSuccess: atomic.LoadUint64(&r.metrics.ReadIndexSuccess),
  2243. LeadershipTransfers: atomic.LoadUint64(&r.metrics.LeadershipTransfers),
  2244. LeadershipTransferSuccess: atomic.LoadUint64(&r.metrics.LeadershipTransferSuccess),
  2245. }
  2246. }
  2247. // ==================== Leadership Transfer ====================
  2248. // TransferLeadership transfers leadership to the specified node
  2249. func (r *Raft) TransferLeadership(targetID string) error {
  2250. r.mu.Lock()
  2251. if r.state != Leader {
  2252. r.mu.Unlock()
  2253. return ErrNotLeader
  2254. }
  2255. if targetID == r.nodeID {
  2256. r.mu.Unlock()
  2257. return fmt.Errorf("cannot transfer to self")
  2258. }
  2259. targetAddr, exists := r.clusterNodes[targetID]
  2260. if !exists {
  2261. r.mu.Unlock()
  2262. return fmt.Errorf("target node %s not in cluster", targetID)
  2263. }
  2264. if r.transferring {
  2265. r.mu.Unlock()
  2266. return fmt.Errorf("leadership transfer already in progress")
  2267. }
  2268. r.transferring = true
  2269. r.transferTarget = targetID
  2270. r.transferDeadline = time.Now().Add(r.config.ElectionTimeoutMax * 2)
  2271. currentTerm := r.currentTerm
  2272. atomic.AddUint64(&r.metrics.LeadershipTransfers, 1)
  2273. r.mu.Unlock()
  2274. r.logger.Info("Starting leadership transfer to %s", targetID)
  2275. // Step 1: Sync target to our log
  2276. if err := r.syncFollowerToLatest(targetAddr); err != nil {
  2277. r.mu.Lock()
  2278. r.transferring = false
  2279. r.transferTarget = ""
  2280. r.mu.Unlock()
  2281. return fmt.Errorf("failed to sync target: %w", err)
  2282. }
  2283. // Step 2: Send TimeoutNow RPC
  2284. args := &TimeoutNowArgs{
  2285. Term: currentTerm,
  2286. LeaderID: r.nodeID,
  2287. }
  2288. ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
  2289. defer cancel()
  2290. reply, err := r.transport.TimeoutNow(ctx, targetAddr, args)
  2291. if err != nil {
  2292. r.mu.Lock()
  2293. r.transferring = false
  2294. r.transferTarget = ""
  2295. r.mu.Unlock()
  2296. return fmt.Errorf("TimeoutNow RPC failed: %w", err)
  2297. }
  2298. if !reply.Success {
  2299. r.mu.Lock()
  2300. r.transferring = false
  2301. r.transferTarget = ""
  2302. r.mu.Unlock()
  2303. return fmt.Errorf("target rejected leadership transfer")
  2304. }
  2305. atomic.AddUint64(&r.metrics.LeadershipTransferSuccess, 1)
  2306. r.logger.Info("Leadership transfer to %s initiated successfully", targetID)
  2307. // Note: We don't immediately step down; we wait for the target to win election
  2308. // and send us an AppendEntries with higher term
  2309. return nil
  2310. }
  2311. // syncFollowerToLatest ensures the follower is caught up to our log
  2312. func (r *Raft) syncFollowerToLatest(peerAddr string) error {
  2313. r.mu.RLock()
  2314. if r.state != Leader {
  2315. r.mu.RUnlock()
  2316. return ErrNotLeader
  2317. }
  2318. currentTerm := r.currentTerm
  2319. leaderCommit := r.commitIndex
  2320. lastIndex := r.log.LastIndex()
  2321. r.mu.RUnlock()
  2322. // Keep replicating until follower is caught up
  2323. deadline := time.Now().Add(r.config.ElectionTimeoutMax * 2)
  2324. for time.Now().Before(deadline) {
  2325. r.mu.RLock()
  2326. if r.state != Leader || r.currentTerm != currentTerm {
  2327. r.mu.RUnlock()
  2328. return ErrLeadershipLost
  2329. }
  2330. matchIndex := r.matchIndex[peerAddr]
  2331. r.mu.RUnlock()
  2332. if matchIndex >= lastIndex {
  2333. return nil // Caught up
  2334. }
  2335. // Trigger replication
  2336. r.replicateToPeer(peerAddr, currentTerm, leaderCommit)
  2337. time.Sleep(10 * time.Millisecond)
  2338. }
  2339. return ErrTimeout
  2340. }
  2341. // HandleTimeoutNow handles TimeoutNow RPC (for leadership transfer)
  2342. func (r *Raft) HandleTimeoutNow(args *TimeoutNowArgs) *TimeoutNowReply {
  2343. r.mu.Lock()
  2344. defer r.mu.Unlock()
  2345. reply := &TimeoutNowReply{
  2346. Term: r.currentTerm,
  2347. Success: false,
  2348. }
  2349. // Only accept if we're a follower and the term matches
  2350. if args.Term < r.currentTerm {
  2351. return reply
  2352. }
  2353. if r.state != Follower {
  2354. return reply
  2355. }
  2356. // Immediately start election
  2357. r.logger.Info("Received TimeoutNow from %s, starting immediate election", args.LeaderID)
  2358. r.state = Candidate
  2359. reply.Success = true
  2360. return reply
  2361. }
  2362. // HandleReadIndex handles ReadIndex RPC
  2363. func (r *Raft) HandleReadIndex(args *ReadIndexArgs) *ReadIndexReply {
  2364. reply := &ReadIndexReply{
  2365. Success: false,
  2366. }
  2367. readIndex, err := r.ReadIndex()
  2368. if err != nil {
  2369. reply.Error = err.Error()
  2370. return reply
  2371. }
  2372. reply.ReadIndex = readIndex
  2373. reply.Success = true
  2374. return reply
  2375. }
  2376. // HandleGet handles Get RPC for remote KV reads
  2377. func (r *Raft) HandleGet(args *GetArgs) *GetReply {
  2378. reply := &GetReply{
  2379. Found: false,
  2380. }
  2381. if r.config.GetHandler == nil {
  2382. reply.Error = "get handler not configured"
  2383. return reply
  2384. }
  2385. value, found := r.config.GetHandler(args.Key)
  2386. reply.Value = value
  2387. reply.Found = found
  2388. return reply
  2389. }