raft.go 69 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738
  1. package raft
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. // Raft represents a Raft consensus node
  11. type Raft struct {
  12. mu sync.RWMutex
  13. // Node identity
  14. nodeID string
  15. peers []string
  16. // Cluster membership - maps nodeID to address
  17. clusterNodes map[string]string
  18. // Current state
  19. state NodeState
  20. currentTerm uint64
  21. votedFor string
  22. leaderID string
  23. // Log management
  24. log *LogManager
  25. storage Storage
  26. commitIndex uint64
  27. lastApplied uint64
  28. // Leader state
  29. nextIndex map[string]uint64
  30. matchIndex map[string]uint64
  31. // Configuration
  32. config *Config
  33. // Communication
  34. transport Transport
  35. // Channels
  36. applyCh chan ApplyMsg
  37. stopCh chan struct{}
  38. commitCh chan struct{}
  39. // Election timer
  40. electionTimer *time.Timer
  41. heartbeatTimer *time.Timer
  42. // Statistics (deprecated, use metrics instead)
  43. stats Stats
  44. // Metrics for monitoring
  45. metrics Metrics
  46. // Logger
  47. logger Logger
  48. // Running flag
  49. running int32
  50. // Replication trigger channel - used to batch replication requests
  51. replicationCh chan struct{}
  52. // Pending config change - only one config change can be pending at a time
  53. pendingConfigChange bool
  54. // Old cluster nodes - used for majority calculation during config change
  55. // This ensures we use the old cluster size until the config change is committed
  56. oldClusterNodes map[string]string
  57. // Index of the pending config change entry
  58. configChangeIndex uint64
  59. // Joining cluster state - when a standalone node is being added to a cluster
  60. // This prevents the node from starting elections while syncing
  61. joiningCluster bool
  62. joiningClusterTime time.Time
  63. // Compaction state - prevents concurrent compaction
  64. compacting int32
  65. // Dynamic compaction threshold - updated after each compaction
  66. // Next compaction triggers when log size >= this value
  67. // Formula: lastCompactionSize * 1.5 (or initial SnapshotThreshold)
  68. nextCompactionThreshold uint64
  69. // WaitGroup to track goroutines for clean shutdown
  70. wg sync.WaitGroup
  71. // Leadership transfer state
  72. transferring bool
  73. transferTarget string
  74. transferDeadline time.Time
  75. // Last heartbeat received (for health check)
  76. lastHeartbeat time.Time
  77. // Start time (for uptime calculation)
  78. startTime time.Time
  79. // ReadIndex waiting queue
  80. readIndexCh chan *readIndexRequest
  81. // Snapshot receiving state (for chunked transfer)
  82. pendingSnapshot *pendingSnapshotState
  83. }
  84. // readIndexRequest represents a pending read index request
  85. type readIndexRequest struct {
  86. readIndex uint64
  87. done chan error
  88. }
  89. // pendingSnapshotState tracks chunked snapshot reception
  90. type pendingSnapshotState struct {
  91. lastIncludedIndex uint64
  92. lastIncludedTerm uint64
  93. data []byte
  94. receivedBytes uint64
  95. }
  96. // Stats holds runtime statistics
  97. type Stats struct {
  98. Term uint64
  99. State string
  100. LastLogIndex uint64
  101. LastLogTerm uint64
  102. CommitIndex uint64
  103. LastApplied uint64
  104. LeaderID string
  105. VotesReceived int
  106. AppendsSent int64
  107. AppendsReceived int64
  108. ClusterSize int // Number of nodes in cluster
  109. ClusterNodes map[string]string // NodeID -> Address mapping
  110. }
  111. // ConsoleLogger implements Logger with console output
  112. type ConsoleLogger struct {
  113. Prefix string
  114. Level int // 0=debug, 1=info, 2=warn, 3=error
  115. mu sync.Mutex
  116. }
  117. func NewConsoleLogger(prefix string, level int) *ConsoleLogger {
  118. return &ConsoleLogger{Prefix: prefix, Level: level}
  119. }
  120. func (l *ConsoleLogger) log(level int, levelStr, format string, args ...interface{}) {
  121. if level < l.Level {
  122. return
  123. }
  124. l.mu.Lock()
  125. defer l.mu.Unlock()
  126. msg := fmt.Sprintf(format, args...)
  127. fmt.Printf("[%s] %s [%s] %s\n", time.Now().Format("15:04:05.000"), l.Prefix, levelStr, msg)
  128. }
  129. func (l *ConsoleLogger) Debug(format string, args ...interface{}) {
  130. l.log(0, "DEBUG", format, args...)
  131. }
  132. func (l *ConsoleLogger) Info(format string, args ...interface{}) {
  133. l.log(1, "INFO", format, args...)
  134. }
  135. func (l *ConsoleLogger) Warn(format string, args ...interface{}) {
  136. l.log(2, "WARN", format, args...)
  137. }
  138. func (l *ConsoleLogger) Error(format string, args ...interface{}) {
  139. l.log(3, "ERROR", format, args...)
  140. }
  141. // NewRaft creates a new Raft node
  142. func NewRaft(config *Config, transport Transport, applyCh chan ApplyMsg) (*Raft, error) {
  143. if config.Logger == nil {
  144. config.Logger = NewConsoleLogger(config.NodeID, 1)
  145. }
  146. // Create storage
  147. storage, err := NewHybridStorage(config.DataDir, config.MemoryLogCapacity, config.Logger)
  148. if err != nil {
  149. return nil, fmt.Errorf("failed to create storage: %w", err)
  150. }
  151. // Create log manager
  152. logMgr := NewLogManager(storage, config.Logger)
  153. // Load persistent state
  154. state, err := storage.GetState()
  155. if err != nil {
  156. return nil, fmt.Errorf("failed to load state: %w", err)
  157. }
  158. // Load or initialize cluster configuration
  159. clusterNodes := make(map[string]string)
  160. // Try to load saved cluster config first
  161. savedConfig, err := storage.GetClusterConfig()
  162. if err != nil {
  163. return nil, fmt.Errorf("failed to load cluster config: %w", err)
  164. }
  165. if savedConfig != nil && len(savedConfig.Nodes) > 0 {
  166. // Use saved config
  167. clusterNodes = savedConfig.Nodes
  168. config.Logger.Info("Loaded cluster config with %d nodes", len(clusterNodes))
  169. } else {
  170. // Initialize from config
  171. if config.ClusterNodes != nil && len(config.ClusterNodes) > 0 {
  172. for k, v := range config.ClusterNodes {
  173. clusterNodes[k] = v
  174. }
  175. } else {
  176. // Build from Peers + self (backward compatibility)
  177. clusterNodes[config.NodeID] = config.ListenAddr
  178. if config.PeerMap != nil {
  179. for k, v := range config.PeerMap {
  180. clusterNodes[k] = v
  181. }
  182. }
  183. }
  184. // Save initial config
  185. if len(clusterNodes) > 0 {
  186. if err := storage.SaveClusterConfig(&ClusterConfig{Nodes: clusterNodes}); err != nil {
  187. config.Logger.Warn("Failed to save initial cluster config: %v", err)
  188. }
  189. }
  190. }
  191. // Build peers list from cluster nodes (excluding self)
  192. var peers []string
  193. for nodeID, addr := range clusterNodes {
  194. if nodeID != config.NodeID {
  195. peers = append(peers, addr)
  196. }
  197. }
  198. r := &Raft{
  199. nodeID: config.NodeID,
  200. peers: peers,
  201. clusterNodes: clusterNodes,
  202. state: Follower,
  203. currentTerm: state.CurrentTerm,
  204. votedFor: state.VotedFor,
  205. log: logMgr,
  206. storage: storage,
  207. config: config,
  208. transport: transport,
  209. applyCh: applyCh,
  210. stopCh: make(chan struct{}),
  211. commitCh: make(chan struct{}, 100), // Increased buffer for event-driven apply
  212. replicationCh: make(chan struct{}, 1),
  213. nextIndex: make(map[string]uint64),
  214. matchIndex: make(map[string]uint64),
  215. logger: config.Logger,
  216. readIndexCh: make(chan *readIndexRequest, 100),
  217. lastHeartbeat: time.Now(),
  218. }
  219. // Set RPC handler
  220. transport.SetRPCHandler(r)
  221. return r, nil
  222. }
  223. // Start starts the Raft node
  224. func (r *Raft) Start() error {
  225. if !atomic.CompareAndSwapInt32(&r.running, 0, 1) {
  226. return fmt.Errorf("already running")
  227. }
  228. // Record start time
  229. r.startTime = time.Now()
  230. // Start transport
  231. if err := r.transport.Start(); err != nil {
  232. return fmt.Errorf("failed to start transport: %w", err)
  233. }
  234. // Restore FSM from snapshot if exists
  235. // This must happen before starting apply loop to ensure FSM state is restored
  236. if err := r.restoreFromSnapshot(); err != nil {
  237. r.logger.Warn("Failed to restore from snapshot: %v", err)
  238. // Continue anyway - the node can still function, just without historical state
  239. }
  240. // Start election timer
  241. r.resetElectionTimer()
  242. // Start background goroutines with WaitGroup tracking
  243. r.wg.Add(4)
  244. go func() {
  245. defer r.wg.Done()
  246. r.applyLoop()
  247. }()
  248. go func() {
  249. defer r.wg.Done()
  250. r.replicationLoop()
  251. }()
  252. go func() {
  253. defer r.wg.Done()
  254. r.mainLoop()
  255. }()
  256. go func() {
  257. defer r.wg.Done()
  258. r.readIndexLoop()
  259. }()
  260. r.logger.Info("Raft node started")
  261. return nil
  262. }
  263. // Stop stops the Raft node
  264. func (r *Raft) Stop() error {
  265. if !atomic.CompareAndSwapInt32(&r.running, 1, 0) {
  266. return fmt.Errorf("not running")
  267. }
  268. // Signal all goroutines to stop
  269. close(r.stopCh)
  270. // Stop timers first to prevent new operations
  271. r.mu.Lock()
  272. if r.electionTimer != nil {
  273. r.electionTimer.Stop()
  274. }
  275. if r.heartbeatTimer != nil {
  276. r.heartbeatTimer.Stop()
  277. }
  278. r.mu.Unlock()
  279. // Wait for all goroutines to finish with timeout
  280. done := make(chan struct{})
  281. go func() {
  282. r.wg.Wait()
  283. close(done)
  284. }()
  285. select {
  286. case <-done:
  287. // All goroutines exited cleanly
  288. case <-time.After(3 * time.Second):
  289. r.logger.Warn("Timeout waiting for goroutines to stop")
  290. }
  291. // Stop transport (has its own timeout)
  292. if err := r.transport.Stop(); err != nil {
  293. r.logger.Error("Failed to stop transport: %v", err)
  294. }
  295. if err := r.storage.Close(); err != nil {
  296. r.logger.Error("Failed to close storage: %v", err)
  297. }
  298. r.logger.Info("Raft node stopped")
  299. return nil
  300. }
  301. // mainLoop is the main event loop
  302. func (r *Raft) mainLoop() {
  303. for {
  304. select {
  305. case <-r.stopCh:
  306. return
  307. default:
  308. }
  309. r.mu.RLock()
  310. state := r.state
  311. r.mu.RUnlock()
  312. switch state {
  313. case Follower:
  314. r.runFollower()
  315. case Candidate:
  316. r.runCandidate()
  317. case Leader:
  318. r.runLeader()
  319. }
  320. }
  321. }
  322. // runFollower handles follower behavior
  323. func (r *Raft) runFollower() {
  324. r.logger.Debug("Running as follower in term %d", r.currentTerm)
  325. for {
  326. select {
  327. case <-r.stopCh:
  328. return
  329. case <-r.electionTimer.C:
  330. r.mu.Lock()
  331. if r.state == Follower {
  332. // If we're joining a cluster, don't start elections
  333. // Give time for the leader to sync us up
  334. if r.joiningCluster {
  335. // Allow joining for up to 30 seconds
  336. if time.Since(r.joiningClusterTime) < 30*time.Second {
  337. r.logger.Debug("Suppressing election during cluster join")
  338. r.resetElectionTimer()
  339. r.mu.Unlock()
  340. continue
  341. }
  342. // Timeout - clear joining state
  343. r.joiningCluster = false
  344. r.logger.Warn("Cluster join timeout, resuming normal election behavior")
  345. }
  346. r.logger.Debug("Election timeout, becoming candidate")
  347. r.state = Candidate
  348. }
  349. r.mu.Unlock()
  350. return
  351. }
  352. }
  353. }
  354. // runCandidate handles candidate behavior (leader election)
  355. // Implements Pre-Vote mechanism to prevent term explosion
  356. func (r *Raft) runCandidate() {
  357. // Phase 1: Pre-Vote (don't increment term yet)
  358. if !r.runPreVote() {
  359. // Pre-vote failed, wait for timeout before retrying
  360. r.mu.Lock()
  361. r.resetElectionTimer()
  362. r.mu.Unlock()
  363. select {
  364. case <-r.stopCh:
  365. return
  366. case <-r.electionTimer.C:
  367. // Timer expired, will retry pre-vote
  368. }
  369. return
  370. }
  371. // Phase 2: Actual election (pre-vote succeeded, now increment term)
  372. r.mu.Lock()
  373. r.currentTerm++
  374. r.votedFor = r.nodeID
  375. r.leaderID = ""
  376. currentTerm := r.currentTerm
  377. if err := r.persistState(); err != nil {
  378. r.logger.Error("Failed to persist state during election: %v", err)
  379. r.mu.Unlock()
  380. return // Cannot proceed without persisting state
  381. }
  382. atomic.AddUint64(&r.metrics.ElectionsStarted, 1)
  383. r.resetElectionTimer()
  384. r.mu.Unlock()
  385. r.logger.Debug("Starting election for term %d", currentTerm)
  386. // Get current peers list
  387. r.mu.RLock()
  388. currentPeers := make([]string, len(r.peers))
  389. copy(currentPeers, r.peers)
  390. clusterSize := len(r.clusterNodes)
  391. if clusterSize == 0 {
  392. clusterSize = len(r.peers) + 1
  393. }
  394. r.mu.RUnlock()
  395. // Request actual votes from all peers
  396. votes := 1 // Vote for self
  397. voteCh := make(chan bool, len(currentPeers))
  398. lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
  399. for _, peer := range currentPeers {
  400. go func(peer string) {
  401. args := &RequestVoteArgs{
  402. Term: currentTerm,
  403. CandidateID: r.nodeID,
  404. LastLogIndex: lastLogIndex,
  405. LastLogTerm: lastLogTerm,
  406. PreVote: false,
  407. }
  408. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  409. defer cancel()
  410. reply, err := r.transport.RequestVote(ctx, peer, args)
  411. if err != nil {
  412. r.logger.Debug("RequestVote to %s failed: %v", peer, err)
  413. voteCh <- false
  414. return
  415. }
  416. r.mu.Lock()
  417. defer r.mu.Unlock()
  418. if reply.Term > r.currentTerm {
  419. r.becomeFollower(reply.Term)
  420. voteCh <- false
  421. return
  422. }
  423. voteCh <- reply.VoteGranted
  424. }(peer)
  425. }
  426. // Wait for votes - majority is (clusterSize/2) + 1
  427. needed := clusterSize/2 + 1
  428. // If we already have enough votes (single-node cluster), become leader immediately
  429. if votes >= needed {
  430. r.mu.Lock()
  431. if r.state == Candidate && r.currentTerm == currentTerm {
  432. r.becomeLeader()
  433. }
  434. r.mu.Unlock()
  435. return
  436. }
  437. for i := 0; i < len(currentPeers); i++ {
  438. select {
  439. case <-r.stopCh:
  440. return
  441. case <-r.electionTimer.C:
  442. r.logger.Debug("Election timeout, will start new election")
  443. return
  444. case granted := <-voteCh:
  445. if granted {
  446. votes++
  447. if votes >= needed {
  448. r.mu.Lock()
  449. if r.state == Candidate && r.currentTerm == currentTerm {
  450. r.becomeLeader()
  451. }
  452. r.mu.Unlock()
  453. return
  454. }
  455. }
  456. }
  457. // Check if we're still a candidate
  458. r.mu.RLock()
  459. if r.state != Candidate {
  460. r.mu.RUnlock()
  461. return
  462. }
  463. r.mu.RUnlock()
  464. }
  465. // Election failed, wait for timeout
  466. r.mu.RLock()
  467. stillCandidate := r.state == Candidate
  468. r.mu.RUnlock()
  469. if stillCandidate {
  470. r.logger.Debug("Election failed, waiting for timeout (got %d/%d votes)", votes, needed)
  471. select {
  472. case <-r.stopCh:
  473. return
  474. case <-r.electionTimer.C:
  475. // Timer expired, will start new election
  476. }
  477. }
  478. }
  479. // runPreVote sends pre-vote requests to check if we can win an election
  480. // Returns true if we got majority pre-votes, false otherwise
  481. func (r *Raft) runPreVote() bool {
  482. r.mu.RLock()
  483. currentTerm := r.currentTerm
  484. currentPeers := make([]string, len(r.peers))
  485. copy(currentPeers, r.peers)
  486. clusterSize := len(r.clusterNodes)
  487. if clusterSize == 0 {
  488. clusterSize = len(r.peers) + 1
  489. }
  490. leaderID := r.leaderID
  491. r.mu.RUnlock()
  492. // Pre-vote uses term+1 but doesn't actually increment it
  493. preVoteTerm := currentTerm + 1
  494. lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
  495. r.logger.Debug("Starting pre-vote for term %d", preVoteTerm)
  496. // Per Raft Pre-Vote optimization (§9.6): if we have a known leader,
  497. // don't vote for self. This prevents standalone nodes from constantly
  498. // electing themselves when they should be joining an existing cluster.
  499. preVotes := 0
  500. if leaderID == "" {
  501. preVotes = 1 // Vote for self only if we don't know of a leader
  502. } else {
  503. r.logger.Debug("Pre-vote: not self-voting because we have leader %s", leaderID)
  504. }
  505. preVoteCh := make(chan bool, len(currentPeers))
  506. for _, peer := range currentPeers {
  507. go func(peer string) {
  508. args := &RequestVoteArgs{
  509. Term: preVoteTerm,
  510. CandidateID: r.nodeID,
  511. LastLogIndex: lastLogIndex,
  512. LastLogTerm: lastLogTerm,
  513. PreVote: true,
  514. }
  515. ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
  516. defer cancel()
  517. reply, err := r.transport.RequestVote(ctx, peer, args)
  518. if err != nil {
  519. r.logger.Debug("PreVote to %s failed: %v", peer, err)
  520. preVoteCh <- false
  521. return
  522. }
  523. // For pre-vote, we don't step down even if reply.Term > currentTerm
  524. // because the other node might also be doing pre-vote
  525. preVoteCh <- reply.VoteGranted
  526. }(peer)
  527. }
  528. // Wait for pre-votes with a shorter timeout - majority is (clusterSize/2) + 1
  529. needed := clusterSize/2 + 1
  530. // If we already have enough votes (single-node cluster with no known leader), succeed immediately
  531. if preVotes >= needed {
  532. r.logger.Debug("Pre-vote succeeded immediately (got %d/%d pre-votes)", preVotes, needed)
  533. return true
  534. }
  535. timeout := time.After(500 * time.Millisecond)
  536. for i := 0; i < len(currentPeers); i++ {
  537. select {
  538. case <-r.stopCh:
  539. return false
  540. case <-timeout:
  541. r.logger.Debug("Pre-vote timeout (got %d/%d pre-votes)", preVotes, needed)
  542. return false
  543. case granted := <-preVoteCh:
  544. if granted {
  545. preVotes++
  546. if preVotes >= needed {
  547. r.logger.Debug("Pre-vote succeeded (got %d/%d pre-votes)", preVotes, needed)
  548. return true
  549. }
  550. }
  551. }
  552. // Check if we're still a candidate
  553. r.mu.RLock()
  554. if r.state != Candidate {
  555. r.mu.RUnlock()
  556. return false
  557. }
  558. r.mu.RUnlock()
  559. }
  560. r.logger.Debug("Pre-vote failed (got %d/%d pre-votes)", preVotes, needed)
  561. return false
  562. }
  563. // runLeader handles leader behavior
  564. func (r *Raft) runLeader() {
  565. r.logger.Debug("Running as leader in term %d", r.currentTerm)
  566. // Send initial heartbeat
  567. r.sendHeartbeats()
  568. // Start heartbeat timer
  569. r.mu.Lock()
  570. r.heartbeatTimer = time.NewTimer(r.config.HeartbeatInterval)
  571. r.mu.Unlock()
  572. for {
  573. select {
  574. case <-r.stopCh:
  575. return
  576. case <-r.heartbeatTimer.C:
  577. r.mu.RLock()
  578. if r.state != Leader {
  579. r.mu.RUnlock()
  580. return
  581. }
  582. r.mu.RUnlock()
  583. r.sendHeartbeats()
  584. r.heartbeatTimer.Reset(r.config.HeartbeatInterval)
  585. case <-r.commitCh:
  586. r.updateCommitIndex()
  587. }
  588. }
  589. }
  590. // becomeFollower transitions to follower state
  591. func (r *Raft) becomeFollower(term uint64) {
  592. oldState := r.state
  593. oldTerm := r.currentTerm
  594. r.state = Follower
  595. r.currentTerm = term
  596. r.votedFor = ""
  597. r.leaderID = ""
  598. // Must persist before responding - use mustPersistState for critical transitions
  599. r.mustPersistState()
  600. r.resetElectionTimer()
  601. // Clear leadership transfer state
  602. r.transferring = false
  603. r.transferTarget = ""
  604. // Only log significant state changes
  605. if oldState == Leader && term > oldTerm {
  606. // Stepping down from leader is notable
  607. r.logger.Debug("Stepped down from leader in term %d", term)
  608. }
  609. }
  610. // becomeLeader transitions to leader state
  611. func (r *Raft) becomeLeader() {
  612. r.state = Leader
  613. r.leaderID = r.nodeID
  614. // Update metrics
  615. atomic.AddUint64(&r.metrics.ElectionsWon, 1)
  616. // Initialize leader state for all peers
  617. lastIndex := r.log.LastIndex()
  618. for nodeID, addr := range r.clusterNodes {
  619. if nodeID != r.nodeID {
  620. r.nextIndex[addr] = lastIndex + 1
  621. r.matchIndex[addr] = 0
  622. }
  623. }
  624. // Also handle legacy peers
  625. for _, peer := range r.peers {
  626. if _, exists := r.nextIndex[peer]; !exists {
  627. r.nextIndex[peer] = lastIndex + 1
  628. r.matchIndex[peer] = 0
  629. }
  630. }
  631. r.logger.Info("Became leader in term %d with %d peers", r.currentTerm, len(r.clusterNodes)-1)
  632. // Append a no-op entry to commit entries from previous terms
  633. // This is a standard Raft optimization - the leader appends a no-op
  634. // entry in its current term to quickly commit all pending entries
  635. noopEntry := LogEntry{
  636. Index: r.log.LastIndex() + 1,
  637. Term: r.currentTerm,
  638. Type: EntryNoop,
  639. Command: nil,
  640. }
  641. if err := r.log.Append(noopEntry); err != nil {
  642. r.logger.Error("Failed to append no-op entry: %v", err)
  643. } else {
  644. r.logger.Debug("Appended no-op entry at index %d for term %d", noopEntry.Index, noopEntry.Term)
  645. }
  646. }
  647. // sendHeartbeats sends AppendEntries RPCs to all peers
  648. func (r *Raft) sendHeartbeats() {
  649. r.mu.RLock()
  650. if r.state != Leader {
  651. r.mu.RUnlock()
  652. return
  653. }
  654. currentTerm := r.currentTerm
  655. leaderCommit := r.commitIndex
  656. // Get all peer addresses
  657. peerAddrs := make([]string, 0, len(r.clusterNodes))
  658. for nodeID, addr := range r.clusterNodes {
  659. if nodeID != r.nodeID {
  660. peerAddrs = append(peerAddrs, addr)
  661. }
  662. }
  663. // Also include legacy peers not in clusterNodes
  664. for _, peer := range r.peers {
  665. found := false
  666. for _, addr := range peerAddrs {
  667. if addr == peer {
  668. found = true
  669. break
  670. }
  671. }
  672. if !found {
  673. peerAddrs = append(peerAddrs, peer)
  674. }
  675. }
  676. r.mu.RUnlock()
  677. for _, peer := range peerAddrs {
  678. go r.replicateToPeer(peer, currentTerm, leaderCommit)
  679. }
  680. }
  681. // replicateToPeer sends AppendEntries to a specific peer
  682. func (r *Raft) replicateToPeer(peer string, term, leaderCommit uint64) {
  683. r.mu.RLock()
  684. if r.state != Leader || r.currentTerm != term {
  685. r.mu.RUnlock()
  686. return
  687. }
  688. nextIndex := r.nextIndex[peer]
  689. r.mu.RUnlock()
  690. // Get entries to send
  691. entries, prevLogIndex, prevLogTerm, err := r.log.GetEntriesForFollower(nextIndex, r.config.MaxLogEntriesPerRequest)
  692. if err == ErrCompacted {
  693. // Need to send snapshot
  694. r.sendSnapshot(peer)
  695. return
  696. }
  697. if err != nil {
  698. r.logger.Debug("Failed to get entries for %s: %v", peer, err)
  699. return
  700. }
  701. args := &AppendEntriesArgs{
  702. Term: term,
  703. LeaderID: r.nodeID,
  704. PrevLogIndex: prevLogIndex,
  705. PrevLogTerm: prevLogTerm,
  706. Entries: entries,
  707. LeaderCommit: leaderCommit,
  708. }
  709. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  710. defer cancel()
  711. reply, err := r.transport.AppendEntries(ctx, peer, args)
  712. if err != nil {
  713. r.logger.Debug("AppendEntries to %s failed: %v", peer, err)
  714. return
  715. }
  716. atomic.AddInt64(&r.stats.AppendsSent, 1)
  717. r.mu.Lock()
  718. defer r.mu.Unlock()
  719. if reply.Term > r.currentTerm {
  720. r.becomeFollower(reply.Term)
  721. return
  722. }
  723. if r.state != Leader || r.currentTerm != term {
  724. return
  725. }
  726. if reply.Success {
  727. // Update nextIndex and matchIndex
  728. if len(entries) > 0 {
  729. newMatchIndex := entries[len(entries)-1].Index
  730. if newMatchIndex > r.matchIndex[peer] {
  731. r.matchIndex[peer] = newMatchIndex
  732. r.nextIndex[peer] = newMatchIndex + 1
  733. // Try to update commit index
  734. select {
  735. case r.commitCh <- struct{}{}:
  736. default:
  737. }
  738. }
  739. }
  740. } else {
  741. // Decrement nextIndex and retry
  742. if reply.ConflictTerm > 0 {
  743. // Find the last entry of ConflictTerm in our log
  744. found := false
  745. for idx := r.log.LastIndex(); idx >= r.log.FirstIndex(); idx-- {
  746. t, err := r.log.GetTerm(idx)
  747. if err != nil {
  748. break
  749. }
  750. if t == reply.ConflictTerm {
  751. r.nextIndex[peer] = idx + 1
  752. found = true
  753. break
  754. }
  755. if t < reply.ConflictTerm {
  756. break
  757. }
  758. }
  759. if !found {
  760. r.nextIndex[peer] = reply.ConflictIndex
  761. }
  762. } else if reply.ConflictIndex > 0 {
  763. r.nextIndex[peer] = reply.ConflictIndex
  764. } else {
  765. r.nextIndex[peer] = max(1, r.nextIndex[peer]-1)
  766. }
  767. }
  768. }
  769. // sendSnapshot sends a snapshot to a peer with chunked transfer support
  770. func (r *Raft) sendSnapshot(peer string) {
  771. r.mu.RLock()
  772. if r.state != Leader {
  773. r.mu.RUnlock()
  774. return
  775. }
  776. term := r.currentTerm
  777. chunkSize := r.config.SnapshotChunkSize
  778. if chunkSize <= 0 {
  779. chunkSize = 1024 * 1024 // Default 1MB
  780. }
  781. r.mu.RUnlock()
  782. data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
  783. if err != nil {
  784. r.logger.Error("Failed to get snapshot: %v", err)
  785. return
  786. }
  787. atomic.AddUint64(&r.metrics.SnapshotsSent, 1)
  788. r.logger.Info("Sending snapshot to %s: %d bytes, lastIndex=%d, lastTerm=%d",
  789. peer, len(data), lastIndex, lastTerm)
  790. // Send snapshot in chunks
  791. totalSize := len(data)
  792. offset := 0
  793. for offset < totalSize {
  794. // Check if we're still leader
  795. r.mu.RLock()
  796. if r.state != Leader || r.currentTerm != term {
  797. r.mu.RUnlock()
  798. r.logger.Debug("Aborting snapshot send: no longer leader")
  799. return
  800. }
  801. r.mu.RUnlock()
  802. // Calculate chunk size
  803. end := offset + chunkSize
  804. if end > totalSize {
  805. end = totalSize
  806. }
  807. chunk := data[offset:end]
  808. done := end >= totalSize
  809. args := &InstallSnapshotArgs{
  810. Term: term,
  811. LeaderID: r.nodeID,
  812. LastIncludedIndex: lastIndex,
  813. LastIncludedTerm: lastTerm,
  814. Offset: uint64(offset),
  815. Data: chunk,
  816. Done: done,
  817. }
  818. ctx, cancel := context.WithTimeout(context.Background(), r.config.SnapshotRPCTimeout)
  819. reply, err := r.transport.InstallSnapshot(ctx, peer, args)
  820. cancel()
  821. if err != nil {
  822. r.logger.Error("InstallSnapshot chunk to %s failed at offset %d: %v", peer, offset, err)
  823. return
  824. }
  825. if reply.Term > r.currentTerm {
  826. r.mu.Lock()
  827. r.becomeFollower(reply.Term)
  828. r.mu.Unlock()
  829. return
  830. }
  831. if !reply.Success {
  832. r.logger.Error("InstallSnapshot chunk rejected by %s at offset %d", peer, offset)
  833. return
  834. }
  835. offset = end
  836. }
  837. // Snapshot fully sent and accepted
  838. r.mu.Lock()
  839. defer r.mu.Unlock()
  840. if r.state != Leader || r.currentTerm != term {
  841. return
  842. }
  843. r.nextIndex[peer] = lastIndex + 1
  844. r.matchIndex[peer] = lastIndex
  845. r.logger.Info("Snapshot to %s completed, nextIndex=%d", peer, lastIndex+1)
  846. }
  847. // updateCommitIndex updates the commit index based on matchIndex
  848. func (r *Raft) updateCommitIndex() {
  849. r.mu.Lock()
  850. defer r.mu.Unlock()
  851. if r.state != Leader {
  852. return
  853. }
  854. // For config change entries, use the OLD cluster for majority calculation
  855. // This ensures that adding a node doesn't require the new node's vote
  856. // until the config change itself is committed
  857. votingNodes := r.clusterNodes
  858. if r.pendingConfigChange && r.oldClusterNodes != nil {
  859. votingNodes = r.oldClusterNodes
  860. }
  861. // Get current cluster size from voting nodes
  862. clusterSize := len(votingNodes)
  863. if clusterSize == 0 {
  864. clusterSize = len(r.peers) + 1
  865. }
  866. // Find the highest index replicated on a majority
  867. for n := r.log.LastIndex(); n > r.commitIndex; n-- {
  868. term, err := r.log.GetTerm(n)
  869. if err != nil {
  870. continue
  871. }
  872. // Only commit entries from current term
  873. if term != r.currentTerm {
  874. continue
  875. }
  876. // Count replicas (including self)
  877. count := 1 // Self
  878. for nodeID, addr := range votingNodes {
  879. if nodeID != r.nodeID {
  880. if r.matchIndex[addr] >= n {
  881. count++
  882. }
  883. }
  884. }
  885. // Also check legacy peers
  886. for _, peer := range r.peers {
  887. // Avoid double counting if peer is already in votingNodes
  888. alreadyCounted := false
  889. for _, addr := range votingNodes {
  890. if addr == peer {
  891. alreadyCounted = true
  892. break
  893. }
  894. }
  895. if !alreadyCounted && r.matchIndex[peer] >= n {
  896. count++
  897. }
  898. }
  899. // Majority is (n/2) + 1
  900. needed := clusterSize/2 + 1
  901. if count >= needed {
  902. r.commitIndex = n
  903. break
  904. }
  905. }
  906. }
  907. // applyLoop applies committed entries to the state machine
  908. // Uses event-driven model with fallback polling for reliability
  909. func (r *Raft) applyLoop() {
  910. // Use a ticker as fallback for missed signals (matches original 10ms polling)
  911. ticker := time.NewTicker(10 * time.Millisecond)
  912. defer ticker.Stop()
  913. for {
  914. select {
  915. case <-r.stopCh:
  916. return
  917. case <-r.commitCh:
  918. // Event-driven: triggered when commitIndex is updated
  919. r.applyCommitted()
  920. case <-ticker.C:
  921. // Fallback: check periodically in case we missed a signal
  922. r.applyCommitted()
  923. }
  924. }
  925. }
  926. // applyCommitted applies all committed but not yet applied entries
  927. func (r *Raft) applyCommitted() {
  928. r.mu.Lock()
  929. commitIndex := r.commitIndex
  930. lastApplied := r.lastApplied
  931. firstIndex := r.log.FirstIndex()
  932. lastLogIndex := r.log.LastIndex()
  933. r.mu.Unlock()
  934. // Safety check: ensure lastApplied is within valid range
  935. // If lastApplied is below firstIndex (due to snapshot), skip to firstIndex
  936. if lastApplied < firstIndex && firstIndex > 0 {
  937. r.mu.Lock()
  938. r.lastApplied = firstIndex
  939. lastApplied = firstIndex
  940. r.mu.Unlock()
  941. r.logger.Debug("Adjusted lastApplied to firstIndex %d after compaction", firstIndex)
  942. }
  943. // Safety check: don't try to apply beyond what we have in log
  944. if commitIndex > lastLogIndex {
  945. commitIndex = lastLogIndex
  946. }
  947. for lastApplied < commitIndex {
  948. lastApplied++
  949. // Skip if entry has been compacted
  950. if lastApplied < firstIndex {
  951. continue
  952. }
  953. entry, err := r.log.GetEntry(lastApplied)
  954. if err != nil {
  955. // If entry is compacted, skip ahead
  956. if err == ErrCompacted {
  957. r.mu.Lock()
  958. newFirstIndex := r.log.FirstIndex()
  959. if lastApplied < newFirstIndex {
  960. r.lastApplied = newFirstIndex
  961. lastApplied = newFirstIndex
  962. }
  963. r.mu.Unlock()
  964. continue
  965. }
  966. r.logger.Error("Failed to get entry %d: %v (firstIndex=%d, lastIndex=%d)",
  967. lastApplied, err, r.log.FirstIndex(), r.log.LastIndex())
  968. break
  969. }
  970. // Handle config change entries
  971. if entry.Type == EntryConfig {
  972. r.applyConfigChange(entry)
  973. r.mu.Lock()
  974. r.lastApplied = lastApplied
  975. r.mu.Unlock()
  976. continue
  977. }
  978. // Handle no-op entries (just update lastApplied, don't send to state machine)
  979. if entry.Type == EntryNoop {
  980. r.mu.Lock()
  981. r.lastApplied = lastApplied
  982. r.mu.Unlock()
  983. continue
  984. }
  985. // Normal command entry
  986. msg := ApplyMsg{
  987. CommandValid: true,
  988. Command: entry.Command,
  989. CommandIndex: entry.Index,
  990. CommandTerm: entry.Term,
  991. }
  992. select {
  993. case r.applyCh <- msg:
  994. case <-r.stopCh:
  995. return
  996. }
  997. r.mu.Lock()
  998. r.lastApplied = lastApplied
  999. r.mu.Unlock()
  1000. }
  1001. // Check if log compaction is needed
  1002. r.maybeCompactLog()
  1003. }
  1004. // maybeCompactLog checks if automatic log compaction should be triggered
  1005. // It uses a dynamic threshold to prevent "compaction thrashing":
  1006. // - First compaction triggers at SnapshotThreshold (default 100,000)
  1007. // - After compaction, next threshold = current_log_size * 1.5
  1008. // - This prevents every new entry from triggering compaction if log stays large
  1009. func (r *Raft) maybeCompactLog() {
  1010. // Skip if no snapshot provider is configured
  1011. if r.config.SnapshotProvider == nil {
  1012. return
  1013. }
  1014. // Check if compaction is already in progress (atomic check-and-set)
  1015. if !atomic.CompareAndSwapInt32(&r.compacting, 0, 1) {
  1016. return
  1017. }
  1018. // Ensure we release the compacting flag when done
  1019. defer atomic.StoreInt32(&r.compacting, 0)
  1020. r.mu.RLock()
  1021. lastApplied := r.lastApplied
  1022. firstIndex := r.log.FirstIndex()
  1023. initialThreshold := r.config.SnapshotThreshold
  1024. minRetention := r.config.SnapshotMinRetention
  1025. isLeader := r.state == Leader
  1026. dynamicThreshold := r.nextCompactionThreshold
  1027. r.mu.RUnlock()
  1028. // Guard against underflow: ensure lastApplied > firstIndex
  1029. if lastApplied <= firstIndex {
  1030. return
  1031. }
  1032. // Calculate current log size
  1033. logSize := lastApplied - firstIndex
  1034. // Determine effective threshold:
  1035. // - Use initial threshold if no compaction has occurred yet (dynamicThreshold == 0)
  1036. // - Otherwise use the dynamic threshold
  1037. effectiveThreshold := initialThreshold
  1038. if dynamicThreshold > 0 {
  1039. effectiveThreshold = dynamicThreshold
  1040. }
  1041. // Check if we have enough entries to warrant compaction
  1042. if logSize <= effectiveThreshold {
  1043. return
  1044. }
  1045. // Guard against underflow: ensure lastApplied > minRetention
  1046. if lastApplied <= minRetention {
  1047. return
  1048. }
  1049. // Calculate the safe compaction point
  1050. // We need to retain at least minRetention entries for follower catch-up
  1051. compactUpTo := lastApplied - minRetention
  1052. if compactUpTo <= firstIndex {
  1053. return // Not enough entries to compact while maintaining retention
  1054. }
  1055. // For leader, also consider the minimum nextIndex of all followers
  1056. // to avoid compacting entries that followers still need
  1057. if isLeader {
  1058. r.mu.RLock()
  1059. minNextIndex := lastApplied
  1060. for _, nextIdx := range r.nextIndex {
  1061. if nextIdx < minNextIndex {
  1062. minNextIndex = nextIdx
  1063. }
  1064. }
  1065. r.mu.RUnlock()
  1066. // Don't compact entries that followers still need
  1067. // Keep a buffer of minRetention entries before the slowest follower
  1068. if minNextIndex > minRetention {
  1069. followerSafePoint := minNextIndex - minRetention
  1070. if followerSafePoint < compactUpTo {
  1071. compactUpTo = followerSafePoint
  1072. }
  1073. } else {
  1074. // Slowest follower is too far behind, don't compact
  1075. // They will need a full snapshot anyway
  1076. compactUpTo = firstIndex // Effectively skip compaction
  1077. }
  1078. }
  1079. // Final check - make sure we're actually compacting something meaningful
  1080. if compactUpTo <= firstIndex {
  1081. return
  1082. }
  1083. // Get snapshot from application layer
  1084. snapshotData, err := r.config.SnapshotProvider()
  1085. if err != nil {
  1086. r.logger.Error("Failed to get snapshot from provider: %v", err)
  1087. return
  1088. }
  1089. // Get the term at the compaction point
  1090. term, err := r.log.GetTerm(compactUpTo)
  1091. if err != nil {
  1092. r.logger.Error("Failed to get term for compaction index %d: %v", compactUpTo, err)
  1093. return
  1094. }
  1095. // Save snapshot
  1096. if err := r.storage.SaveSnapshot(snapshotData, compactUpTo, term); err != nil {
  1097. r.logger.Error("Failed to save snapshot: %v", err)
  1098. return
  1099. }
  1100. // Compact the log
  1101. if err := r.log.Compact(compactUpTo); err != nil {
  1102. r.logger.Error("Failed to compact log: %v", err)
  1103. return
  1104. }
  1105. // Calculate new log size after compaction
  1106. newLogSize := lastApplied - compactUpTo
  1107. // Update dynamic threshold for next compaction: current size * 1.5
  1108. // This prevents "compaction thrashing" where every entry triggers compaction
  1109. r.mu.Lock()
  1110. r.nextCompactionThreshold = newLogSize + newLogSize/2 // newLogSize * 1.5
  1111. // Ensure threshold doesn't go below the initial threshold
  1112. if r.nextCompactionThreshold < initialThreshold {
  1113. r.nextCompactionThreshold = initialThreshold
  1114. }
  1115. r.mu.Unlock()
  1116. r.logger.Info("Auto compaction completed: compacted up to index %d (term %d), log size %d -> %d, next threshold: %d",
  1117. compactUpTo, term, logSize, newLogSize, r.nextCompactionThreshold)
  1118. }
  1119. // resetElectionTimer resets the election timeout
  1120. func (r *Raft) resetElectionTimer() {
  1121. timeout := r.config.ElectionTimeoutMin +
  1122. time.Duration(rand.Int63n(int64(r.config.ElectionTimeoutMax-r.config.ElectionTimeoutMin)))
  1123. if r.electionTimer == nil {
  1124. r.electionTimer = time.NewTimer(timeout)
  1125. } else {
  1126. if !r.electionTimer.Stop() {
  1127. select {
  1128. case <-r.electionTimer.C:
  1129. default:
  1130. }
  1131. }
  1132. r.electionTimer.Reset(timeout)
  1133. }
  1134. }
  1135. // persistState saves the current state to stable storage
  1136. // Returns error if persistence fails - caller MUST handle this for safety
  1137. func (r *Raft) persistState() error {
  1138. state := &PersistentState{
  1139. CurrentTerm: r.currentTerm,
  1140. VotedFor: r.votedFor,
  1141. }
  1142. if err := r.storage.SaveState(state); err != nil {
  1143. r.logger.Error("Failed to persist state: %v", err)
  1144. return fmt.Errorf("%w: %v", ErrPersistFailed, err)
  1145. }
  1146. return nil
  1147. }
  1148. // mustPersistState saves state and panics on failure
  1149. // Use this only in critical paths where failure is unrecoverable
  1150. func (r *Raft) mustPersistState() {
  1151. if err := r.persistState(); err != nil {
  1152. // In production, you might want to trigger a graceful shutdown instead
  1153. r.logger.Error("CRITICAL: Failed to persist state, node may be in inconsistent state: %v", err)
  1154. panic(err)
  1155. }
  1156. }
  1157. // HandleRequestVote handles RequestVote RPCs (including pre-vote)
  1158. func (r *Raft) HandleRequestVote(args *RequestVoteArgs) *RequestVoteReply {
  1159. r.mu.Lock()
  1160. defer r.mu.Unlock()
  1161. reply := &RequestVoteReply{
  1162. Term: r.currentTerm,
  1163. VoteGranted: false,
  1164. }
  1165. // Handle pre-vote separately
  1166. if args.PreVote {
  1167. return r.handlePreVote(args)
  1168. }
  1169. // Reply false if term < currentTerm
  1170. if args.Term < r.currentTerm {
  1171. return reply
  1172. }
  1173. // If term > currentTerm, become follower
  1174. if args.Term > r.currentTerm {
  1175. r.becomeFollower(args.Term)
  1176. }
  1177. reply.Term = r.currentTerm
  1178. // Check if we can vote for this candidate
  1179. if (r.votedFor == "" || r.votedFor == args.CandidateID) &&
  1180. r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
  1181. r.votedFor = args.CandidateID
  1182. if err := r.persistState(); err != nil {
  1183. // Cannot grant vote if we can't persist the decision
  1184. r.logger.Error("Failed to persist vote for %s: %v", args.CandidateID, err)
  1185. return reply
  1186. }
  1187. r.resetElectionTimer()
  1188. reply.VoteGranted = true
  1189. r.logger.Debug("Granted vote to %s for term %d", args.CandidateID, args.Term)
  1190. }
  1191. return reply
  1192. }
  1193. // handlePreVote handles pre-vote requests
  1194. // Pre-vote doesn't change our state, just checks if we would vote
  1195. func (r *Raft) handlePreVote(args *RequestVoteArgs) *RequestVoteReply {
  1196. reply := &RequestVoteReply{
  1197. Term: r.currentTerm,
  1198. VoteGranted: false,
  1199. }
  1200. // For pre-vote, we check:
  1201. // 1. The candidate's term is at least as high as ours
  1202. // 2. The candidate's log is at least as up-to-date as ours
  1203. // 3. We don't have a current leader (or the candidate's term is higher)
  1204. if args.Term < r.currentTerm {
  1205. return reply
  1206. }
  1207. // Per Raft Pre-Vote optimization (§9.6): reject pre-vote if we have a current
  1208. // leader and the candidate's term is not higher than ours. This prevents
  1209. // disruptive elections when a partitioned node tries to rejoin.
  1210. if r.leaderID != "" && args.Term <= r.currentTerm {
  1211. r.logger.Debug("Rejecting pre-vote from %s: have leader %s", args.CandidateID, r.leaderID)
  1212. return reply
  1213. }
  1214. // Grant pre-vote if log is up-to-date
  1215. // Note: we don't check votedFor for pre-vote, and we don't update any state
  1216. if r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
  1217. reply.VoteGranted = true
  1218. r.logger.Debug("Granted pre-vote to %s for term %d", args.CandidateID, args.Term)
  1219. }
  1220. return reply
  1221. }
  1222. // HandleAppendEntries handles AppendEntries RPCs
  1223. func (r *Raft) HandleAppendEntries(args *AppendEntriesArgs) *AppendEntriesReply {
  1224. r.mu.Lock()
  1225. defer r.mu.Unlock()
  1226. atomic.AddInt64(&r.stats.AppendsReceived, 1)
  1227. reply := &AppendEntriesReply{
  1228. Term: r.currentTerm,
  1229. Success: false,
  1230. }
  1231. // Check if we're a standalone node being added to a cluster
  1232. // A standalone node has only itself in clusterNodes
  1233. isStandalone := len(r.clusterNodes) == 1
  1234. if _, hasSelf := r.clusterNodes[r.nodeID]; isStandalone && hasSelf {
  1235. // We're standalone and receiving AppendEntries from an external leader
  1236. // This means we're being added to a cluster - suppress elections
  1237. if !r.joiningCluster {
  1238. r.joiningCluster = true
  1239. r.joiningClusterTime = time.Now()
  1240. r.logger.Info("Detected cluster join in progress, suppressing elections")
  1241. }
  1242. // When joining, accept higher terms from the leader to sync up
  1243. if args.Term > r.currentTerm {
  1244. r.becomeFollower(args.Term)
  1245. }
  1246. }
  1247. // Reply false if term < currentTerm
  1248. if args.Term < r.currentTerm {
  1249. // But still reset timer if we're joining a cluster to prevent elections
  1250. if r.joiningCluster {
  1251. r.resetElectionTimer()
  1252. }
  1253. return reply
  1254. }
  1255. // If term > currentTerm, or we're a candidate, or we're a leader receiving
  1256. // AppendEntries from another leader (split-brain scenario during cluster merge),
  1257. // become follower. In Raft, there can only be one leader per term.
  1258. if args.Term > r.currentTerm || r.state == Candidate || r.state == Leader {
  1259. r.becomeFollower(args.Term)
  1260. }
  1261. // Update leader info and reset election timer
  1262. r.leaderID = args.LeaderID
  1263. r.lastHeartbeat = time.Now()
  1264. r.resetElectionTimer()
  1265. reply.Term = r.currentTerm
  1266. // Try to append entries
  1267. success, conflictIndex, conflictTerm := r.log.AppendEntriesFromLeader(
  1268. args.PrevLogIndex, args.PrevLogTerm, args.Entries)
  1269. if !success {
  1270. reply.ConflictIndex = conflictIndex
  1271. reply.ConflictTerm = conflictTerm
  1272. return reply
  1273. }
  1274. reply.Success = true
  1275. // Update commit index safely
  1276. if args.LeaderCommit > r.commitIndex {
  1277. // Get our actual last log index
  1278. lastLogIndex := r.log.LastIndex()
  1279. // Calculate what index the entries would have reached
  1280. lastNewEntry := args.PrevLogIndex
  1281. if len(args.Entries) > 0 {
  1282. lastNewEntry = args.Entries[len(args.Entries)-1].Index
  1283. }
  1284. // Commit index should not exceed what we actually have in log
  1285. newCommitIndex := args.LeaderCommit
  1286. if newCommitIndex > lastNewEntry {
  1287. newCommitIndex = lastNewEntry
  1288. }
  1289. if newCommitIndex > lastLogIndex {
  1290. newCommitIndex = lastLogIndex
  1291. }
  1292. // Only advance commit index
  1293. if newCommitIndex > r.commitIndex {
  1294. r.commitIndex = newCommitIndex
  1295. }
  1296. }
  1297. return reply
  1298. }
  1299. // HandleInstallSnapshot handles InstallSnapshot RPCs with chunked transfer support
  1300. func (r *Raft) HandleInstallSnapshot(args *InstallSnapshotArgs) *InstallSnapshotReply {
  1301. r.mu.Lock()
  1302. defer r.mu.Unlock()
  1303. reply := &InstallSnapshotReply{
  1304. Term: r.currentTerm,
  1305. Success: false,
  1306. }
  1307. if args.Term < r.currentTerm {
  1308. return reply
  1309. }
  1310. if args.Term > r.currentTerm {
  1311. r.becomeFollower(args.Term)
  1312. }
  1313. r.leaderID = args.LeaderID
  1314. r.lastHeartbeat = time.Now()
  1315. r.resetElectionTimer()
  1316. reply.Term = r.currentTerm
  1317. // Skip if we already have this or a newer snapshot applied
  1318. if args.LastIncludedIndex <= r.lastApplied {
  1319. r.logger.Debug("Ignoring snapshot at index %d, already applied up to %d",
  1320. args.LastIncludedIndex, r.lastApplied)
  1321. reply.Success = true // Still success to let leader know we don't need it
  1322. return reply
  1323. }
  1324. // Handle chunked transfer
  1325. if args.Offset == 0 {
  1326. // First chunk - start new pending snapshot
  1327. r.pendingSnapshot = &pendingSnapshotState{
  1328. lastIncludedIndex: args.LastIncludedIndex,
  1329. lastIncludedTerm: args.LastIncludedTerm,
  1330. data: make([]byte, 0),
  1331. receivedBytes: 0,
  1332. }
  1333. r.logger.Info("Starting snapshot reception at index %d, term %d",
  1334. args.LastIncludedIndex, args.LastIncludedTerm)
  1335. }
  1336. // Validate we're receiving the expected snapshot
  1337. if r.pendingSnapshot == nil ||
  1338. r.pendingSnapshot.lastIncludedIndex != args.LastIncludedIndex ||
  1339. r.pendingSnapshot.lastIncludedTerm != args.LastIncludedTerm {
  1340. r.logger.Warn("Unexpected snapshot chunk: expected index %d, got %d",
  1341. r.pendingSnapshot.lastIncludedIndex, args.LastIncludedIndex)
  1342. return reply
  1343. }
  1344. // Validate offset matches what we've received
  1345. if uint64(args.Offset) != r.pendingSnapshot.receivedBytes {
  1346. r.logger.Warn("Unexpected chunk offset: expected %d, got %d",
  1347. r.pendingSnapshot.receivedBytes, args.Offset)
  1348. return reply
  1349. }
  1350. // Append chunk data
  1351. r.pendingSnapshot.data = append(r.pendingSnapshot.data, args.Data...)
  1352. r.pendingSnapshot.receivedBytes += uint64(len(args.Data))
  1353. reply.Success = true
  1354. r.logger.Debug("Received snapshot chunk: offset=%d, size=%d, done=%v",
  1355. args.Offset, len(args.Data), args.Done)
  1356. // If not done, wait for more chunks
  1357. if !args.Done {
  1358. return reply
  1359. }
  1360. // All chunks received - apply the snapshot
  1361. r.logger.Info("Installing complete snapshot: %d bytes at index %d, term %d",
  1362. len(r.pendingSnapshot.data), args.LastIncludedIndex, args.LastIncludedTerm)
  1363. atomic.AddUint64(&r.metrics.SnapshotsInstalled, 1)
  1364. // Save snapshot
  1365. if err := r.storage.SaveSnapshot(r.pendingSnapshot.data, args.LastIncludedIndex, args.LastIncludedTerm); err != nil {
  1366. r.logger.Error("Failed to save snapshot: %v", err)
  1367. r.pendingSnapshot = nil
  1368. reply.Success = false
  1369. return reply
  1370. }
  1371. // Compact log
  1372. if err := r.log.Compact(args.LastIncludedIndex); err != nil {
  1373. r.logger.Error("Failed to compact log: %v", err)
  1374. }
  1375. // Update state - must update both commitIndex and lastApplied
  1376. if args.LastIncludedIndex > r.commitIndex {
  1377. r.commitIndex = args.LastIncludedIndex
  1378. }
  1379. // Always update lastApplied to snapshot index to prevent trying to apply compacted entries
  1380. r.lastApplied = args.LastIncludedIndex
  1381. // Send snapshot to application (non-blocking with timeout)
  1382. // Use the complete pendingSnapshot data, not the last chunk
  1383. msg := ApplyMsg{
  1384. SnapshotValid: true,
  1385. Snapshot: r.pendingSnapshot.data,
  1386. SnapshotIndex: args.LastIncludedIndex,
  1387. SnapshotTerm: args.LastIncludedTerm,
  1388. }
  1389. // Clear pending snapshot
  1390. r.pendingSnapshot = nil
  1391. // Try to send, but don't block indefinitely
  1392. select {
  1393. case r.applyCh <- msg:
  1394. r.logger.Debug("Sent snapshot to application")
  1395. case <-time.After(100 * time.Millisecond):
  1396. r.logger.Warn("Timeout sending snapshot to application, will retry")
  1397. // The application will still get correct state via normal apply loop
  1398. }
  1399. return reply
  1400. }
  1401. // Propose proposes a new command to be replicated
  1402. func (r *Raft) Propose(command []byte) (uint64, uint64, bool) {
  1403. r.mu.Lock()
  1404. defer r.mu.Unlock()
  1405. if r.state != Leader {
  1406. return 0, 0, false
  1407. }
  1408. index, err := r.log.AppendCommand(r.currentTerm, command)
  1409. if err != nil {
  1410. r.logger.Error("Failed to append command: %v", err)
  1411. return 0, 0, false
  1412. }
  1413. r.matchIndex[r.nodeID] = index
  1414. // For single-node cluster, we are the only voter and can commit immediately
  1415. // This fixes the issue where commitCh never gets triggered without other peers
  1416. if len(r.clusterNodes) <= 1 && len(r.peers) == 0 {
  1417. // Single node: self is majority, trigger commit immediately
  1418. select {
  1419. case r.commitCh <- struct{}{}:
  1420. default:
  1421. }
  1422. } else {
  1423. // Multi-node: trigger replication to other nodes
  1424. r.triggerReplication()
  1425. }
  1426. return index, r.currentTerm, true
  1427. }
  1428. // triggerReplication signals the replication loop to send heartbeats
  1429. // This uses a non-blocking send to batch replication requests
  1430. func (r *Raft) triggerReplication() {
  1431. select {
  1432. case r.replicationCh <- struct{}{}:
  1433. default:
  1434. // Replication already scheduled
  1435. }
  1436. }
  1437. // replicationLoop handles batched replication
  1438. // Uses simple delay-based batching: flush immediately when signaled, then wait
  1439. // to allow more requests to accumulate before the next flush.
  1440. func (r *Raft) replicationLoop() {
  1441. for {
  1442. select {
  1443. case <-r.stopCh:
  1444. return
  1445. case <-r.replicationCh:
  1446. // Flush and replicate immediately
  1447. r.flushAndReplicate()
  1448. // Wait briefly to allow batching of subsequent requests
  1449. // This gives time for more proposals to queue up before the next flush
  1450. time.Sleep(10 * time.Millisecond)
  1451. }
  1452. }
  1453. }
  1454. // flushAndReplicate flushes logs and sends heartbeats
  1455. func (r *Raft) flushAndReplicate() {
  1456. // Ensure logs are flushed to OS cache before sending to followers
  1457. // This implements Group Commit with Flush (fast) instead of Sync (slow)
  1458. if err := r.log.Flush(); err != nil {
  1459. r.logger.Error("Failed to flush log: %v", err)
  1460. }
  1461. r.sendHeartbeats()
  1462. }
  1463. // ProposeWithForward proposes a command, forwarding to leader if necessary
  1464. // This is the recommended method for applications to use
  1465. func (r *Raft) ProposeWithForward(command []byte) (index uint64, term uint64, err error) {
  1466. // Try local propose first
  1467. idx, t, isLeader := r.Propose(command)
  1468. if isLeader {
  1469. return idx, t, nil
  1470. }
  1471. // Not leader, forward to leader
  1472. r.mu.RLock()
  1473. leaderID := r.leaderID
  1474. // Use clusterNodes (dynamically maintained) to find leader address
  1475. leaderAddr := r.clusterNodes[leaderID]
  1476. r.mu.RUnlock()
  1477. if leaderID == "" {
  1478. return 0, 0, fmt.Errorf("no leader available")
  1479. }
  1480. if leaderAddr == "" {
  1481. return 0, 0, fmt.Errorf("leader %s address not found in cluster", leaderID)
  1482. }
  1483. // Forward to leader
  1484. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  1485. defer cancel()
  1486. args := &ProposeArgs{Command: command}
  1487. reply, err := r.transport.ForwardPropose(ctx, leaderAddr, args)
  1488. if err != nil {
  1489. return 0, 0, fmt.Errorf("forward failed: %w", err)
  1490. }
  1491. if !reply.Success {
  1492. return 0, 0, fmt.Errorf("leader rejected: %s", reply.Error)
  1493. }
  1494. return reply.Index, reply.Term, nil
  1495. }
  1496. // HandlePropose handles forwarded propose requests
  1497. func (r *Raft) HandlePropose(args *ProposeArgs) *ProposeReply {
  1498. index, term, isLeader := r.Propose(args.Command)
  1499. if !isLeader {
  1500. return &ProposeReply{
  1501. Success: false,
  1502. Error: "not leader",
  1503. }
  1504. }
  1505. return &ProposeReply{
  1506. Success: true,
  1507. Index: index,
  1508. Term: term,
  1509. }
  1510. }
  1511. // HandleAddNode handles forwarded AddNode requests
  1512. func (r *Raft) HandleAddNode(args *AddNodeArgs) *AddNodeReply {
  1513. err := r.AddNode(args.NodeID, args.Address)
  1514. if err != nil {
  1515. return &AddNodeReply{
  1516. Success: false,
  1517. Error: err.Error(),
  1518. }
  1519. }
  1520. return &AddNodeReply{
  1521. Success: true,
  1522. }
  1523. }
  1524. // HandleRemoveNode handles forwarded RemoveNode requests
  1525. func (r *Raft) HandleRemoveNode(args *RemoveNodeArgs) *RemoveNodeReply {
  1526. err := r.RemoveNode(args.NodeID)
  1527. if err != nil {
  1528. return &RemoveNodeReply{
  1529. Success: false,
  1530. Error: err.Error(),
  1531. }
  1532. }
  1533. return &RemoveNodeReply{
  1534. Success: true,
  1535. }
  1536. }
  1537. // GetState returns the current term and whether this node is leader
  1538. func (r *Raft) GetState() (uint64, bool) {
  1539. r.mu.RLock()
  1540. defer r.mu.RUnlock()
  1541. return r.currentTerm, r.state == Leader
  1542. }
  1543. // GetLeaderID returns the current leader ID
  1544. func (r *Raft) GetLeaderID() string {
  1545. r.mu.RLock()
  1546. defer r.mu.RUnlock()
  1547. return r.leaderID
  1548. }
  1549. // GetStats returns runtime statistics
  1550. func (r *Raft) GetStats() Stats {
  1551. r.mu.RLock()
  1552. defer r.mu.RUnlock()
  1553. lastIndex, lastTerm := r.log.LastIndexAndTerm()
  1554. // Copy cluster nodes
  1555. nodes := make(map[string]string)
  1556. for k, v := range r.clusterNodes {
  1557. nodes[k] = v
  1558. }
  1559. clusterSize := len(r.clusterNodes)
  1560. if clusterSize == 0 {
  1561. clusterSize = len(r.peers) + 1
  1562. }
  1563. return Stats{
  1564. Term: r.currentTerm,
  1565. State: r.state.String(),
  1566. LastLogIndex: lastIndex,
  1567. LastLogTerm: lastTerm,
  1568. CommitIndex: r.commitIndex,
  1569. LastApplied: r.lastApplied,
  1570. LeaderID: r.leaderID,
  1571. AppendsSent: atomic.LoadInt64(&r.stats.AppendsSent),
  1572. AppendsReceived: atomic.LoadInt64(&r.stats.AppendsReceived),
  1573. ClusterSize: clusterSize,
  1574. ClusterNodes: nodes,
  1575. }
  1576. }
  1577. // restoreFromSnapshot restores the FSM from a snapshot at startup
  1578. // This is called during Start() to ensure the FSM has the correct state
  1579. // before processing any new commands
  1580. func (r *Raft) restoreFromSnapshot() error {
  1581. // Get snapshot from storage
  1582. data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
  1583. if err != nil {
  1584. return fmt.Errorf("failed to get snapshot: %w", err)
  1585. }
  1586. // No snapshot exists
  1587. if len(data) == 0 || lastIndex == 0 {
  1588. return nil
  1589. }
  1590. r.logger.Info("Restoring FSM from snapshot at index %d, term %d (%d bytes)",
  1591. lastIndex, lastTerm, len(data))
  1592. // Update lastApplied to snapshot index to prevent re-applying compacted entries
  1593. r.mu.Lock()
  1594. if lastIndex > r.lastApplied {
  1595. r.lastApplied = lastIndex
  1596. }
  1597. if lastIndex > r.commitIndex {
  1598. r.commitIndex = lastIndex
  1599. }
  1600. r.mu.Unlock()
  1601. // Send snapshot to FSM for restoration
  1602. // Use a goroutine with timeout to avoid blocking if applyCh is full
  1603. msg := ApplyMsg{
  1604. SnapshotValid: true,
  1605. Snapshot: data,
  1606. SnapshotIndex: lastIndex,
  1607. SnapshotTerm: lastTerm,
  1608. }
  1609. // Try to send with a timeout
  1610. select {
  1611. case r.applyCh <- msg:
  1612. r.logger.Info("FSM restoration triggered from snapshot at index %d", lastIndex)
  1613. case <-time.After(5 * time.Second):
  1614. return fmt.Errorf("timeout sending snapshot to applyCh")
  1615. }
  1616. return nil
  1617. }
  1618. // TakeSnapshot takes a snapshot of the current state
  1619. func (r *Raft) TakeSnapshot(data []byte, index uint64) error {
  1620. r.mu.Lock()
  1621. defer r.mu.Unlock()
  1622. if index > r.lastApplied {
  1623. return fmt.Errorf("snapshot index %d exceeds lastApplied %d", index, r.lastApplied)
  1624. }
  1625. term, err := r.log.GetTerm(index)
  1626. if err != nil {
  1627. return fmt.Errorf("failed to get term for index %d: %w", index, err)
  1628. }
  1629. if err := r.storage.SaveSnapshot(data, index, term); err != nil {
  1630. return fmt.Errorf("failed to save snapshot: %w", err)
  1631. }
  1632. if err := r.log.Compact(index); err != nil {
  1633. return fmt.Errorf("failed to compact log: %w", err)
  1634. }
  1635. r.logger.Info("Took snapshot at index %d, term %d", index, term)
  1636. return nil
  1637. }
  1638. func max(a, b uint64) uint64 {
  1639. if a > b {
  1640. return a
  1641. }
  1642. return b
  1643. }
  1644. // ==================== Membership Change API ====================
  1645. //
  1646. // This implementation uses Single-Node Membership Change (also known as one-at-a-time changes)
  1647. // as described in the Raft dissertation (§4.3). This is safe because:
  1648. //
  1649. // 1. We only allow one configuration change at a time (pendingConfigChange flag)
  1650. // 2. For commits, we use the OLD cluster majority until the config change is committed
  1651. // 3. The new node starts receiving entries immediately but doesn't affect majority calculation
  1652. //
  1653. // This approach is simpler than Joint Consensus and is sufficient for most use cases.
  1654. // The invariant maintained is: any two majorities (old or new) must overlap.
  1655. //
  1656. // For adding a node: old majority = N/2+1, new majority = (N+1)/2+1 = N/2+1 (overlaps)
  1657. // For removing a node: old majority = N/2+1, new majority = (N-1)/2+1 (overlaps if N > 1)
  1658. //
  1659. // WARNING: Avoid adding/removing multiple nodes rapidly. Wait for each change to be committed.
  1660. // AddNode adds a new node to the cluster
  1661. // This can only be called on the leader
  1662. // The new node must already be running and reachable
  1663. //
  1664. // Safety guarantees:
  1665. // - Only one config change can be in progress at a time
  1666. // - The old cluster majority is used until the config change is committed
  1667. // - Returns error if leadership is lost during the operation
  1668. func (r *Raft) AddNode(nodeID, address string) error {
  1669. r.mu.Lock()
  1670. // Must be leader
  1671. if r.state != Leader {
  1672. leaderID := r.leaderID
  1673. r.mu.Unlock()
  1674. return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  1675. }
  1676. // Check if we're in the middle of a leadership transfer
  1677. if r.transferring {
  1678. r.mu.Unlock()
  1679. return fmt.Errorf("leadership transfer in progress")
  1680. }
  1681. // Check if there's already a pending config change
  1682. if r.pendingConfigChange {
  1683. r.mu.Unlock()
  1684. return ErrConfigInFlight
  1685. }
  1686. // Validate nodeID and address
  1687. if nodeID == "" {
  1688. r.mu.Unlock()
  1689. return fmt.Errorf("nodeID cannot be empty")
  1690. }
  1691. if address == "" {
  1692. r.mu.Unlock()
  1693. return fmt.Errorf("address cannot be empty")
  1694. }
  1695. // Check if node already exists
  1696. if _, exists := r.clusterNodes[nodeID]; exists {
  1697. r.mu.Unlock()
  1698. return fmt.Errorf("node %s already exists in cluster", nodeID)
  1699. }
  1700. // Check if address is already used by another node
  1701. for existingID, existingAddr := range r.clusterNodes {
  1702. if existingAddr == address {
  1703. r.mu.Unlock()
  1704. return fmt.Errorf("address %s is already used by node %s", address, existingID)
  1705. }
  1706. }
  1707. // Save old cluster nodes for majority calculation during config change
  1708. // This ensures we use the OLD cluster size until the config change is committed
  1709. r.oldClusterNodes = make(map[string]string)
  1710. for k, v := range r.clusterNodes {
  1711. r.oldClusterNodes[k] = v
  1712. }
  1713. // Create new config with the added node
  1714. newNodes := make(map[string]string)
  1715. for k, v := range r.clusterNodes {
  1716. newNodes[k] = v
  1717. }
  1718. newNodes[nodeID] = address
  1719. // Create config change entry with ClusterConfig
  1720. configIndex := r.log.LastIndex() + 1
  1721. entry := LogEntry{
  1722. Index: configIndex,
  1723. Term: r.currentTerm,
  1724. Type: EntryConfig,
  1725. Config: &ClusterConfig{Nodes: newNodes},
  1726. }
  1727. // Mark config change as pending and store the index
  1728. r.pendingConfigChange = true
  1729. r.configChangeIndex = configIndex
  1730. // Immediately apply the new configuration (for single-node changes, this is safe)
  1731. // The new node will start receiving AppendEntries immediately
  1732. r.clusterNodes[nodeID] = address
  1733. r.peers = append(r.peers, address)
  1734. // Set nextIndex to 1 (or firstIndex) so the new node syncs from the beginning
  1735. // This is crucial - the new node's log is empty, so we must start from index 1
  1736. firstIndex := r.log.FirstIndex()
  1737. if firstIndex == 0 {
  1738. firstIndex = 1
  1739. }
  1740. r.nextIndex[address] = firstIndex
  1741. r.matchIndex[address] = 0
  1742. r.mu.Unlock()
  1743. // Append the config change entry to log
  1744. if err := r.log.Append(entry); err != nil {
  1745. r.mu.Lock()
  1746. r.pendingConfigChange = false
  1747. r.oldClusterNodes = nil
  1748. r.configChangeIndex = 0
  1749. // Rollback
  1750. delete(r.clusterNodes, nodeID)
  1751. r.rebuildPeersList()
  1752. r.mu.Unlock()
  1753. return fmt.Errorf("failed to append config entry: %w", err)
  1754. }
  1755. r.logger.Info("Adding node %s (%s) to cluster", nodeID, address)
  1756. // Trigger immediate replication
  1757. r.triggerReplication()
  1758. return nil
  1759. }
  1760. // RemoveNode removes a node from the cluster
  1761. // This can only be called on the leader
  1762. // The node being removed can be any node except the leader itself
  1763. //
  1764. // Safety guarantees:
  1765. // - Only one config change can be in progress at a time
  1766. // - Cannot remove the leader (transfer leadership first)
  1767. // - Cannot reduce cluster to 0 nodes
  1768. // - The old cluster majority is used until the config change is committed
  1769. func (r *Raft) RemoveNode(nodeID string) error {
  1770. r.mu.Lock()
  1771. // Must be leader
  1772. if r.state != Leader {
  1773. leaderID := r.leaderID
  1774. r.mu.Unlock()
  1775. return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  1776. }
  1777. // Check if we're in the middle of a leadership transfer
  1778. if r.transferring {
  1779. r.mu.Unlock()
  1780. return fmt.Errorf("leadership transfer in progress")
  1781. }
  1782. // Cannot remove self
  1783. if nodeID == r.nodeID {
  1784. r.mu.Unlock()
  1785. return fmt.Errorf("cannot remove self from cluster, use TransferLeadership first")
  1786. }
  1787. // Validate nodeID
  1788. if nodeID == "" {
  1789. r.mu.Unlock()
  1790. return fmt.Errorf("nodeID cannot be empty")
  1791. }
  1792. // Check if there's already a pending config change
  1793. if r.pendingConfigChange {
  1794. r.mu.Unlock()
  1795. return ErrConfigInFlight
  1796. }
  1797. // Check if node exists
  1798. if _, exists := r.clusterNodes[nodeID]; !exists {
  1799. r.mu.Unlock()
  1800. return fmt.Errorf("node %s not found in cluster", nodeID)
  1801. }
  1802. // Cannot reduce cluster below 1 node
  1803. if len(r.clusterNodes) <= 1 {
  1804. r.mu.Unlock()
  1805. return fmt.Errorf("cannot remove last node from cluster")
  1806. }
  1807. // Save old cluster nodes for majority calculation during config change
  1808. r.oldClusterNodes = make(map[string]string)
  1809. for k, v := range r.clusterNodes {
  1810. r.oldClusterNodes[k] = v
  1811. }
  1812. // Create new config without the removed node
  1813. newNodes := make(map[string]string)
  1814. for k, v := range r.clusterNodes {
  1815. if k != nodeID {
  1816. newNodes[k] = v
  1817. }
  1818. }
  1819. // Create config change entry with ClusterConfig
  1820. configIndex := r.log.LastIndex() + 1
  1821. entry := LogEntry{
  1822. Index: configIndex,
  1823. Term: r.currentTerm,
  1824. Type: EntryConfig,
  1825. Config: &ClusterConfig{Nodes: newNodes},
  1826. }
  1827. // Mark config change as pending and store the index
  1828. r.pendingConfigChange = true
  1829. r.configChangeIndex = configIndex
  1830. // Get the address of node being removed for cleanup
  1831. removedAddr := r.clusterNodes[nodeID]
  1832. // Immediately apply the new configuration
  1833. delete(r.clusterNodes, nodeID)
  1834. r.rebuildPeersList()
  1835. delete(r.nextIndex, removedAddr)
  1836. delete(r.matchIndex, removedAddr)
  1837. r.mu.Unlock()
  1838. // Append the config change entry to log
  1839. if err := r.log.Append(entry); err != nil {
  1840. r.mu.Lock()
  1841. r.pendingConfigChange = false
  1842. r.oldClusterNodes = nil
  1843. r.configChangeIndex = 0
  1844. // Rollback - this is tricky but we try our best
  1845. r.clusterNodes[nodeID] = removedAddr
  1846. r.rebuildPeersList()
  1847. r.mu.Unlock()
  1848. return fmt.Errorf("failed to append config entry: %w", err)
  1849. }
  1850. r.logger.Info("Removing node %s from cluster, config at index %d", nodeID, entry.Index)
  1851. // Trigger replication
  1852. go r.sendHeartbeats()
  1853. return nil
  1854. }
  1855. // AddNodeWithForward adds a node, forwarding to leader if necessary
  1856. // This is the recommended method for applications to use
  1857. func (r *Raft) AddNodeWithForward(nodeID, address string) error {
  1858. // Try local operation first
  1859. err := r.AddNode(nodeID, address)
  1860. if err == nil {
  1861. return nil
  1862. }
  1863. // Check if we're not the leader
  1864. r.mu.RLock()
  1865. state := r.state
  1866. leaderID := r.leaderID
  1867. leaderAddr := r.clusterNodes[leaderID]
  1868. r.mu.RUnlock()
  1869. if state == Leader {
  1870. // We are leader but AddNode failed for other reasons
  1871. return err
  1872. }
  1873. // Not leader, forward to leader
  1874. if leaderID == "" {
  1875. return fmt.Errorf("no leader available")
  1876. }
  1877. if leaderAddr == "" {
  1878. return fmt.Errorf("leader %s address not found in cluster", leaderID)
  1879. }
  1880. // Forward to leader
  1881. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1882. defer cancel()
  1883. args := &AddNodeArgs{NodeID: nodeID, Address: address}
  1884. reply, err := r.transport.ForwardAddNode(ctx, leaderAddr, args)
  1885. if err != nil {
  1886. return fmt.Errorf("forward failed: %w", err)
  1887. }
  1888. if !reply.Success {
  1889. return fmt.Errorf("leader rejected: %s", reply.Error)
  1890. }
  1891. return nil
  1892. }
  1893. // RemoveNodeWithForward removes a node, forwarding to leader if necessary
  1894. // This is the recommended method for applications to use
  1895. func (r *Raft) RemoveNodeWithForward(nodeID string) error {
  1896. // Try local operation first
  1897. err := r.RemoveNode(nodeID)
  1898. if err == nil {
  1899. return nil
  1900. }
  1901. // Check if we're not the leader
  1902. r.mu.RLock()
  1903. state := r.state
  1904. leaderID := r.leaderID
  1905. leaderAddr := r.clusterNodes[leaderID]
  1906. r.mu.RUnlock()
  1907. if state == Leader {
  1908. // We are leader but RemoveNode failed for other reasons
  1909. return err
  1910. }
  1911. // Not leader, forward to leader
  1912. if leaderID == "" {
  1913. return fmt.Errorf("no leader available")
  1914. }
  1915. if leaderAddr == "" {
  1916. return fmt.Errorf("leader %s address not found in cluster", leaderID)
  1917. }
  1918. // Forward to leader
  1919. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  1920. defer cancel()
  1921. args := &RemoveNodeArgs{NodeID: nodeID}
  1922. reply, err := r.transport.ForwardRemoveNode(ctx, leaderAddr, args)
  1923. if err != nil {
  1924. return fmt.Errorf("forward failed: %w", err)
  1925. }
  1926. if !reply.Success {
  1927. return fmt.Errorf("leader rejected: %s", reply.Error)
  1928. }
  1929. return nil
  1930. }
  1931. // rebuildPeersList rebuilds the peers slice from clusterNodes
  1932. func (r *Raft) rebuildPeersList() {
  1933. r.peers = make([]string, 0, len(r.clusterNodes)-1)
  1934. for nodeID, addr := range r.clusterNodes {
  1935. if nodeID != r.nodeID {
  1936. r.peers = append(r.peers, addr)
  1937. }
  1938. }
  1939. }
  1940. // GetClusterNodes returns a copy of the current cluster membership
  1941. func (r *Raft) GetClusterNodes() map[string]string {
  1942. r.mu.RLock()
  1943. defer r.mu.RUnlock()
  1944. nodes := make(map[string]string)
  1945. for k, v := range r.clusterNodes {
  1946. nodes[k] = v
  1947. }
  1948. return nodes
  1949. }
  1950. // applyConfigChange applies a configuration change entry
  1951. func (r *Raft) applyConfigChange(entry *LogEntry) {
  1952. if entry.Config == nil || entry.Config.Nodes == nil {
  1953. r.logger.Warn("Invalid config change entry at index %d", entry.Index)
  1954. return
  1955. }
  1956. r.mu.Lock()
  1957. defer r.mu.Unlock()
  1958. // Update cluster configuration
  1959. r.clusterNodes = make(map[string]string)
  1960. for k, v := range entry.Config.Nodes {
  1961. r.clusterNodes[k] = v
  1962. }
  1963. r.rebuildPeersList()
  1964. // Persist the new configuration
  1965. if err := r.storage.SaveClusterConfig(&ClusterConfig{Nodes: r.clusterNodes}); err != nil {
  1966. r.logger.Error("Failed to persist cluster config: %v", err)
  1967. }
  1968. // Clear pending flag and old cluster state
  1969. r.pendingConfigChange = false
  1970. r.oldClusterNodes = nil
  1971. r.configChangeIndex = 0
  1972. // If we were joining a cluster and now have multiple nodes, we've successfully joined
  1973. if r.joiningCluster && len(r.clusterNodes) > 1 {
  1974. r.joiningCluster = false
  1975. r.logger.Info("Successfully joined cluster with %d nodes", len(r.clusterNodes))
  1976. }
  1977. r.logger.Info("Applied config change at index %d, cluster now has %d nodes", entry.Index, len(r.clusterNodes))
  1978. // If we're the leader, update leader state
  1979. if r.state == Leader {
  1980. // Initialize nextIndex/matchIndex for any new nodes
  1981. lastIndex := r.log.LastIndex()
  1982. for nodeID, addr := range r.clusterNodes {
  1983. if nodeID != r.nodeID {
  1984. if _, exists := r.nextIndex[addr]; !exists {
  1985. r.nextIndex[addr] = lastIndex + 1
  1986. r.matchIndex[addr] = 0
  1987. }
  1988. }
  1989. }
  1990. // Clean up removed nodes
  1991. validAddrs := make(map[string]bool)
  1992. for nodeID, addr := range r.clusterNodes {
  1993. if nodeID != r.nodeID {
  1994. validAddrs[addr] = true
  1995. }
  1996. }
  1997. for addr := range r.nextIndex {
  1998. if !validAddrs[addr] {
  1999. delete(r.nextIndex, addr)
  2000. delete(r.matchIndex, addr)
  2001. }
  2002. }
  2003. }
  2004. }
  2005. // ==================== ReadIndex (Linearizable Reads) ====================
  2006. // readIndexLoop handles read index requests
  2007. func (r *Raft) readIndexLoop() {
  2008. for {
  2009. select {
  2010. case <-r.stopCh:
  2011. return
  2012. case req := <-r.readIndexCh:
  2013. r.processReadIndexRequest(req)
  2014. }
  2015. }
  2016. }
  2017. // processReadIndexRequest processes a single read index request
  2018. func (r *Raft) processReadIndexRequest(req *readIndexRequest) {
  2019. r.mu.RLock()
  2020. if r.state != Leader {
  2021. r.mu.RUnlock()
  2022. req.done <- ErrNotLeader
  2023. return
  2024. }
  2025. r.mu.RUnlock()
  2026. // Confirm leadership by sending heartbeats and waiting for majority ack
  2027. if !r.confirmLeadership() {
  2028. req.done <- ErrLeadershipLost
  2029. return
  2030. }
  2031. // Wait for apply to catch up to readIndex
  2032. if err := r.waitApply(req.readIndex, r.config.ProposeTimeout); err != nil {
  2033. req.done <- err
  2034. return
  2035. }
  2036. req.done <- nil
  2037. }
  2038. // ReadIndex implements linearizable reads
  2039. // It ensures that the read sees all writes that were committed before the read started
  2040. func (r *Raft) ReadIndex() (uint64, error) {
  2041. r.mu.RLock()
  2042. if r.state != Leader {
  2043. leaderID := r.leaderID
  2044. r.mu.RUnlock()
  2045. return 0, NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  2046. }
  2047. readIndex := r.commitIndex
  2048. r.mu.RUnlock()
  2049. atomic.AddUint64(&r.metrics.ReadIndexRequests, 1)
  2050. // Create request and send to processing loop
  2051. req := &readIndexRequest{
  2052. readIndex: readIndex,
  2053. done: make(chan error, 1),
  2054. }
  2055. select {
  2056. case r.readIndexCh <- req:
  2057. case <-r.stopCh:
  2058. return 0, ErrShutdown
  2059. case <-time.After(r.config.ProposeTimeout):
  2060. return 0, ErrTimeout
  2061. }
  2062. // Wait for result
  2063. select {
  2064. case err := <-req.done:
  2065. if err != nil {
  2066. return 0, err
  2067. }
  2068. atomic.AddUint64(&r.metrics.ReadIndexSuccess, 1)
  2069. return readIndex, nil
  2070. case <-r.stopCh:
  2071. return 0, ErrShutdown
  2072. case <-time.After(r.config.ProposeTimeout):
  2073. return 0, ErrTimeout
  2074. }
  2075. }
  2076. // confirmLeadership confirms we're still leader by getting acks from majority
  2077. func (r *Raft) confirmLeadership() bool {
  2078. r.mu.RLock()
  2079. if r.state != Leader {
  2080. r.mu.RUnlock()
  2081. return false
  2082. }
  2083. currentTerm := r.currentTerm
  2084. clusterSize := len(r.clusterNodes)
  2085. if clusterSize == 0 {
  2086. clusterSize = len(r.peers) + 1
  2087. }
  2088. r.mu.RUnlock()
  2089. // Single node cluster - we're always the leader
  2090. if clusterSize == 1 {
  2091. return true
  2092. }
  2093. // Send heartbeats and count acks
  2094. r.sendHeartbeats()
  2095. // Wait briefly and check if we're still leader
  2096. time.Sleep(r.config.HeartbeatInterval)
  2097. r.mu.RLock()
  2098. stillLeader := r.state == Leader && r.currentTerm == currentTerm
  2099. r.mu.RUnlock()
  2100. return stillLeader
  2101. }
  2102. // waitApply waits until lastApplied >= index
  2103. func (r *Raft) waitApply(index uint64, timeout time.Duration) error {
  2104. deadline := time.Now().Add(timeout)
  2105. for {
  2106. r.mu.RLock()
  2107. lastApplied := r.lastApplied
  2108. r.mu.RUnlock()
  2109. if lastApplied >= index {
  2110. return nil
  2111. }
  2112. if time.Now().After(deadline) {
  2113. return ErrTimeout
  2114. }
  2115. time.Sleep(1 * time.Millisecond)
  2116. }
  2117. }
  2118. // ==================== Health Check ====================
  2119. // HealthCheck returns the current health status of the node
  2120. func (r *Raft) HealthCheck() HealthStatus {
  2121. r.mu.RLock()
  2122. defer r.mu.RUnlock()
  2123. clusterNodes := make(map[string]string)
  2124. for k, v := range r.clusterNodes {
  2125. clusterNodes[k] = v
  2126. }
  2127. clusterSize := len(r.clusterNodes)
  2128. if clusterSize == 0 {
  2129. clusterSize = len(r.peers) + 1
  2130. }
  2131. logBehind := uint64(0)
  2132. if r.commitIndex > r.lastApplied {
  2133. logBehind = r.commitIndex - r.lastApplied
  2134. }
  2135. // Consider healthy if we're leader or have a known leader
  2136. isHealthy := r.state == Leader || r.leaderID != ""
  2137. return HealthStatus{
  2138. NodeID: r.nodeID,
  2139. State: r.state.String(),
  2140. Term: r.currentTerm,
  2141. LeaderID: r.leaderID,
  2142. ClusterSize: clusterSize,
  2143. ClusterNodes: clusterNodes,
  2144. CommitIndex: r.commitIndex,
  2145. LastApplied: r.lastApplied,
  2146. LogBehind: logBehind,
  2147. LastHeartbeat: r.lastHeartbeat,
  2148. IsHealthy: isHealthy,
  2149. Uptime: time.Since(r.startTime),
  2150. }
  2151. }
  2152. // GetMetrics returns the current metrics
  2153. func (r *Raft) GetMetrics() Metrics {
  2154. return Metrics{
  2155. Term: atomic.LoadUint64(&r.metrics.Term),
  2156. ProposalsTotal: atomic.LoadUint64(&r.metrics.ProposalsTotal),
  2157. ProposalsSuccess: atomic.LoadUint64(&r.metrics.ProposalsSuccess),
  2158. ProposalsFailed: atomic.LoadUint64(&r.metrics.ProposalsFailed),
  2159. ProposalsForwarded: atomic.LoadUint64(&r.metrics.ProposalsForwarded),
  2160. AppendsSent: atomic.LoadUint64(&r.metrics.AppendsSent),
  2161. AppendsReceived: atomic.LoadUint64(&r.metrics.AppendsReceived),
  2162. AppendsSuccess: atomic.LoadUint64(&r.metrics.AppendsSuccess),
  2163. AppendsFailed: atomic.LoadUint64(&r.metrics.AppendsFailed),
  2164. ElectionsStarted: atomic.LoadUint64(&r.metrics.ElectionsStarted),
  2165. ElectionsWon: atomic.LoadUint64(&r.metrics.ElectionsWon),
  2166. PreVotesStarted: atomic.LoadUint64(&r.metrics.PreVotesStarted),
  2167. PreVotesGranted: atomic.LoadUint64(&r.metrics.PreVotesGranted),
  2168. SnapshotsTaken: atomic.LoadUint64(&r.metrics.SnapshotsTaken),
  2169. SnapshotsInstalled: atomic.LoadUint64(&r.metrics.SnapshotsInstalled),
  2170. SnapshotsSent: atomic.LoadUint64(&r.metrics.SnapshotsSent),
  2171. ReadIndexRequests: atomic.LoadUint64(&r.metrics.ReadIndexRequests),
  2172. ReadIndexSuccess: atomic.LoadUint64(&r.metrics.ReadIndexSuccess),
  2173. LeadershipTransfers: atomic.LoadUint64(&r.metrics.LeadershipTransfers),
  2174. LeadershipTransferSuccess: atomic.LoadUint64(&r.metrics.LeadershipTransferSuccess),
  2175. }
  2176. }
  2177. // ==================== Leadership Transfer ====================
  2178. // TransferLeadership transfers leadership to the specified node
  2179. func (r *Raft) TransferLeadership(targetID string) error {
  2180. r.mu.Lock()
  2181. if r.state != Leader {
  2182. r.mu.Unlock()
  2183. return ErrNotLeader
  2184. }
  2185. if targetID == r.nodeID {
  2186. r.mu.Unlock()
  2187. return fmt.Errorf("cannot transfer to self")
  2188. }
  2189. targetAddr, exists := r.clusterNodes[targetID]
  2190. if !exists {
  2191. r.mu.Unlock()
  2192. return fmt.Errorf("target node %s not in cluster", targetID)
  2193. }
  2194. if r.transferring {
  2195. r.mu.Unlock()
  2196. return fmt.Errorf("leadership transfer already in progress")
  2197. }
  2198. r.transferring = true
  2199. r.transferTarget = targetID
  2200. r.transferDeadline = time.Now().Add(r.config.ElectionTimeoutMax * 2)
  2201. currentTerm := r.currentTerm
  2202. atomic.AddUint64(&r.metrics.LeadershipTransfers, 1)
  2203. r.mu.Unlock()
  2204. r.logger.Info("Starting leadership transfer to %s", targetID)
  2205. // Step 1: Sync target to our log
  2206. if err := r.syncFollowerToLatest(targetAddr); err != nil {
  2207. r.mu.Lock()
  2208. r.transferring = false
  2209. r.transferTarget = ""
  2210. r.mu.Unlock()
  2211. return fmt.Errorf("failed to sync target: %w", err)
  2212. }
  2213. // Step 2: Send TimeoutNow RPC
  2214. args := &TimeoutNowArgs{
  2215. Term: currentTerm,
  2216. LeaderID: r.nodeID,
  2217. }
  2218. ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
  2219. defer cancel()
  2220. reply, err := r.transport.TimeoutNow(ctx, targetAddr, args)
  2221. if err != nil {
  2222. r.mu.Lock()
  2223. r.transferring = false
  2224. r.transferTarget = ""
  2225. r.mu.Unlock()
  2226. return fmt.Errorf("TimeoutNow RPC failed: %w", err)
  2227. }
  2228. if !reply.Success {
  2229. r.mu.Lock()
  2230. r.transferring = false
  2231. r.transferTarget = ""
  2232. r.mu.Unlock()
  2233. return fmt.Errorf("target rejected leadership transfer")
  2234. }
  2235. atomic.AddUint64(&r.metrics.LeadershipTransferSuccess, 1)
  2236. r.logger.Info("Leadership transfer to %s initiated successfully", targetID)
  2237. // Note: We don't immediately step down; we wait for the target to win election
  2238. // and send us an AppendEntries with higher term
  2239. return nil
  2240. }
  2241. // syncFollowerToLatest ensures the follower is caught up to our log
  2242. func (r *Raft) syncFollowerToLatest(peerAddr string) error {
  2243. r.mu.RLock()
  2244. if r.state != Leader {
  2245. r.mu.RUnlock()
  2246. return ErrNotLeader
  2247. }
  2248. currentTerm := r.currentTerm
  2249. leaderCommit := r.commitIndex
  2250. lastIndex := r.log.LastIndex()
  2251. r.mu.RUnlock()
  2252. // Keep replicating until follower is caught up
  2253. deadline := time.Now().Add(r.config.ElectionTimeoutMax * 2)
  2254. for time.Now().Before(deadline) {
  2255. r.mu.RLock()
  2256. if r.state != Leader || r.currentTerm != currentTerm {
  2257. r.mu.RUnlock()
  2258. return ErrLeadershipLost
  2259. }
  2260. matchIndex := r.matchIndex[peerAddr]
  2261. r.mu.RUnlock()
  2262. if matchIndex >= lastIndex {
  2263. return nil // Caught up
  2264. }
  2265. // Trigger replication
  2266. r.replicateToPeer(peerAddr, currentTerm, leaderCommit)
  2267. time.Sleep(10 * time.Millisecond)
  2268. }
  2269. return ErrTimeout
  2270. }
  2271. // HandleTimeoutNow handles TimeoutNow RPC (for leadership transfer)
  2272. func (r *Raft) HandleTimeoutNow(args *TimeoutNowArgs) *TimeoutNowReply {
  2273. r.mu.Lock()
  2274. defer r.mu.Unlock()
  2275. reply := &TimeoutNowReply{
  2276. Term: r.currentTerm,
  2277. Success: false,
  2278. }
  2279. // Only accept if we're a follower and the term matches
  2280. if args.Term < r.currentTerm {
  2281. return reply
  2282. }
  2283. if r.state != Follower {
  2284. return reply
  2285. }
  2286. // Immediately start election
  2287. r.logger.Info("Received TimeoutNow from %s, starting immediate election", args.LeaderID)
  2288. r.state = Candidate
  2289. reply.Success = true
  2290. return reply
  2291. }
  2292. // HandleReadIndex handles ReadIndex RPC
  2293. func (r *Raft) HandleReadIndex(args *ReadIndexArgs) *ReadIndexReply {
  2294. reply := &ReadIndexReply{
  2295. Success: false,
  2296. }
  2297. readIndex, err := r.ReadIndex()
  2298. if err != nil {
  2299. reply.Error = err.Error()
  2300. return reply
  2301. }
  2302. reply.ReadIndex = readIndex
  2303. reply.Success = true
  2304. return reply
  2305. }
  2306. // HandleGet handles Get RPC for remote KV reads
  2307. func (r *Raft) HandleGet(args *GetArgs) *GetReply {
  2308. reply := &GetReply{
  2309. Found: false,
  2310. }
  2311. if r.config.GetHandler == nil {
  2312. reply.Error = "get handler not configured"
  2313. return reply
  2314. }
  2315. value, found := r.config.GetHandler(args.Key)
  2316. reply.Value = value
  2317. reply.Found = found
  2318. return reply
  2319. }