raft.go 75 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924
  1. package raft
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "os"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. )
  12. // Raft represents a Raft consensus node
  13. type Raft struct {
  14. mu sync.RWMutex
  15. // Node identity
  16. nodeID string
  17. peers []string
  18. // Cluster membership - maps nodeID to address
  19. clusterNodes map[string]string
  20. // Current state
  21. state NodeState
  22. currentTerm uint64
  23. votedFor string
  24. leaderID string
  25. // Log management
  26. log *LogManager
  27. storage Storage
  28. commitIndex uint64
  29. lastApplied uint64
  30. // Leader state
  31. nextIndex map[string]uint64
  32. matchIndex map[string]uint64
  33. // Configuration
  34. config *Config
  35. // Communication
  36. transport Transport
  37. // Channels
  38. applyCh chan ApplyMsg
  39. stopCh chan struct{}
  40. commitCh chan struct{}
  41. // Election timer
  42. electionTimer *time.Timer
  43. heartbeatTimer *time.Timer
  44. // Statistics (deprecated, use metrics instead)
  45. stats Stats
  46. // Metrics for monitoring
  47. metrics Metrics
  48. // Logger
  49. logger Logger
  50. // Running flag
  51. running int32
  52. // Replication trigger channel - used to batch replication requests
  53. replicationCh chan struct{}
  54. // Pending config change - only one config change can be pending at a time
  55. pendingConfigChange bool
  56. // Old cluster nodes - used for majority calculation during config change
  57. // This ensures we use the old cluster size until the config change is committed
  58. oldClusterNodes map[string]string
  59. // Index of the pending config change entry
  60. configChangeIndex uint64
  61. // Joining cluster state - when a standalone node is being added to a cluster
  62. // This prevents the node from starting elections while syncing
  63. joiningCluster bool
  64. joiningClusterTime time.Time
  65. // Compaction state - prevents concurrent compaction
  66. compacting int32
  67. // Dynamic compaction threshold - updated after each compaction
  68. // Next compaction triggers when log size >= this value
  69. // Formula: lastCompactionSize * 1.5 (or initial SnapshotThreshold)
  70. nextCompactionThreshold uint64
  71. // WaitGroup to track goroutines for clean shutdown
  72. wg sync.WaitGroup
  73. // Leadership transfer state
  74. transferring bool
  75. transferTarget string
  76. transferDeadline time.Time
  77. // Last heartbeat received (for health check)
  78. lastHeartbeat time.Time
  79. // Start time (for uptime calculation)
  80. startTime time.Time
  81. // ReadIndex waiting queue
  82. readIndexCh chan *readIndexRequest
  83. // Snapshot receiving state (for chunked transfer)
  84. pendingSnapshot *pendingSnapshotState
  85. // Last contact time for each peer (for leader check)
  86. lastContact map[string]time.Time
  87. }
  88. // readIndexRequest represents a pending read index request
  89. type readIndexRequest struct {
  90. readIndex uint64
  91. done chan error
  92. }
  93. // pendingSnapshotState tracks chunked snapshot reception
  94. type pendingSnapshotState struct {
  95. lastIncludedIndex uint64
  96. lastIncludedTerm uint64
  97. data []byte
  98. receivedBytes uint64
  99. }
  100. // Stats holds runtime statistics
  101. type Stats struct {
  102. Term uint64
  103. State string
  104. FirstLogIndex uint64 // Added for monitoring compaction
  105. LastLogIndex uint64
  106. LastLogTerm uint64
  107. CommitIndex uint64
  108. LastApplied uint64
  109. LeaderID string
  110. VotesReceived int
  111. AppendsSent int64
  112. AppendsReceived int64
  113. ClusterSize int // Number of nodes in cluster
  114. ClusterNodes map[string]string // NodeID -> Address mapping
  115. }
  116. // ConsoleLogger implements Logger with console output
  117. type ConsoleLogger struct {
  118. Prefix string
  119. Level int // 0=debug, 1=info, 2=warn, 3=error
  120. mu sync.Mutex
  121. }
  122. func NewConsoleLogger(prefix string, level int) *ConsoleLogger {
  123. return &ConsoleLogger{Prefix: prefix, Level: level}
  124. }
  125. func (l *ConsoleLogger) log(level int, levelStr, format string, args ...interface{}) {
  126. if level < l.Level {
  127. return
  128. }
  129. l.mu.Lock()
  130. defer l.mu.Unlock()
  131. var levelColor string
  132. // Define colors locally to avoid dependency issues if cli.go is excluded
  133. const (
  134. colorReset = "\033[0m"
  135. colorDim = "\033[90m"
  136. colorRed = "\033[31m"
  137. colorGreen = "\033[32m"
  138. colorYellow = "\033[33m"
  139. colorCyan = "\033[36m"
  140. )
  141. switch level {
  142. case 0: levelColor = colorDim
  143. case 1: levelColor = colorGreen
  144. case 2: levelColor = colorYellow
  145. case 3: levelColor = colorRed
  146. default: levelColor = colorReset
  147. }
  148. msg := fmt.Sprintf(format, args...)
  149. // Format: [Time] Node [LEVEL] Message
  150. // Aligned for better readability
  151. fmt.Printf("%s[%s]%s %s%-8s%s %s[%-5s]%s %s\n",
  152. colorDim, time.Now().Format("15:04:05.000"), colorReset,
  153. colorCyan, l.Prefix, colorReset,
  154. levelColor, levelStr, colorReset,
  155. msg)
  156. }
  157. func (l *ConsoleLogger) Debug(format string, args ...interface{}) {
  158. l.log(0, "DEBUG", format, args...)
  159. }
  160. func (l *ConsoleLogger) Info(format string, args ...interface{}) {
  161. l.log(1, "INFO", format, args...)
  162. }
  163. func (l *ConsoleLogger) Warn(format string, args ...interface{}) {
  164. l.log(2, "WARN", format, args...)
  165. }
  166. func (l *ConsoleLogger) Error(format string, args ...interface{}) {
  167. l.log(3, "ERROR", format, args...)
  168. }
  169. // NewRaft creates a new Raft node
  170. func NewRaft(config *Config, transport Transport, applyCh chan ApplyMsg) (*Raft, error) {
  171. if config.Logger == nil {
  172. config.Logger = NewConsoleLogger(config.NodeID, 1)
  173. }
  174. // Create storage
  175. storage, err := NewHybridStorage(config.DataDir, config.MemoryLogCapacity, config.Logger)
  176. if err != nil {
  177. return nil, fmt.Errorf("failed to create storage: %w", err)
  178. }
  179. // Create log manager
  180. logMgr := NewLogManager(storage, config.Logger)
  181. // Load persistent state
  182. state, err := storage.GetState()
  183. if err != nil {
  184. return nil, fmt.Errorf("failed to load state: %w", err)
  185. }
  186. // Load or initialize cluster configuration
  187. clusterNodes := make(map[string]string)
  188. // Try to load saved cluster config first
  189. savedConfig, err := storage.GetClusterConfig()
  190. if err != nil {
  191. return nil, fmt.Errorf("failed to load cluster config: %w", err)
  192. }
  193. if savedConfig != nil && len(savedConfig.Nodes) > 0 {
  194. // Use saved config
  195. clusterNodes = savedConfig.Nodes
  196. config.Logger.Info("Loaded cluster config with %d nodes", len(clusterNodes))
  197. } else {
  198. // Initialize from config
  199. if config.ClusterNodes != nil && len(config.ClusterNodes) > 0 {
  200. for k, v := range config.ClusterNodes {
  201. clusterNodes[k] = v
  202. }
  203. } else {
  204. // Build from Peers + self (backward compatibility)
  205. clusterNodes[config.NodeID] = config.ListenAddr
  206. if config.PeerMap != nil {
  207. for k, v := range config.PeerMap {
  208. clusterNodes[k] = v
  209. }
  210. }
  211. }
  212. // Save initial config
  213. if len(clusterNodes) > 0 {
  214. if err := storage.SaveClusterConfig(&ClusterConfig{Nodes: clusterNodes}); err != nil {
  215. config.Logger.Warn("Failed to save initial cluster config: %v", err)
  216. }
  217. }
  218. }
  219. // Build peers list from cluster nodes (excluding self)
  220. var peers []string
  221. for nodeID, addr := range clusterNodes {
  222. if nodeID != config.NodeID {
  223. peers = append(peers, addr)
  224. }
  225. }
  226. r := &Raft{
  227. nodeID: config.NodeID,
  228. peers: peers,
  229. clusterNodes: clusterNodes,
  230. state: Follower,
  231. currentTerm: state.CurrentTerm,
  232. votedFor: state.VotedFor,
  233. log: logMgr,
  234. storage: storage,
  235. config: config,
  236. transport: transport,
  237. applyCh: applyCh,
  238. stopCh: make(chan struct{}),
  239. commitCh: make(chan struct{}, 100), // Increased buffer for event-driven apply
  240. replicationCh: make(chan struct{}, 1),
  241. nextIndex: make(map[string]uint64),
  242. matchIndex: make(map[string]uint64),
  243. logger: config.Logger,
  244. readIndexCh: make(chan *readIndexRequest, 100),
  245. lastHeartbeat: time.Now(),
  246. lastContact: make(map[string]time.Time),
  247. }
  248. // Initialize metrics
  249. r.metrics.Term = state.CurrentTerm
  250. // Initialize lastApplied and commitIndex from config (if provided)
  251. if config.LastAppliedIndex > 0 {
  252. r.lastApplied = config.LastAppliedIndex
  253. r.commitIndex = config.LastAppliedIndex // Applied implies committed
  254. r.logger.Info("Initialized lastApplied and commitIndex to %d from config", config.LastAppliedIndex)
  255. }
  256. // Set RPC handler
  257. transport.SetRPCHandler(r)
  258. return r, nil
  259. }
  260. // Start starts the Raft node
  261. func (r *Raft) Start() error {
  262. if !atomic.CompareAndSwapInt32(&r.running, 0, 1) {
  263. return fmt.Errorf("already running")
  264. }
  265. // Record start time
  266. r.startTime = time.Now()
  267. // Start transport
  268. if err := r.transport.Start(); err != nil {
  269. return fmt.Errorf("failed to start transport: %w", err)
  270. }
  271. // Restore FSM from snapshot if exists
  272. // This must happen before starting apply loop to ensure FSM state is restored
  273. if err := r.restoreFromSnapshot(); err != nil {
  274. // Suppress warning if it's just a missing file (first start)
  275. if !os.IsNotExist(err) && !strings.Contains(err.Error(), "no such file") {
  276. r.logger.Warn("Failed to restore from snapshot: %v", err)
  277. } else {
  278. r.logger.Info("No snapshot found, starting with empty state")
  279. }
  280. // Continue anyway - the node can still function, just without historical state
  281. }
  282. // Start election timer
  283. r.resetElectionTimer()
  284. // Start background goroutines with WaitGroup tracking
  285. r.wg.Add(4)
  286. go func() {
  287. defer r.wg.Done()
  288. r.applyLoop()
  289. }()
  290. go func() {
  291. defer r.wg.Done()
  292. r.replicationLoop()
  293. }()
  294. go func() {
  295. defer r.wg.Done()
  296. r.mainLoop()
  297. }()
  298. go func() {
  299. defer r.wg.Done()
  300. r.readIndexLoop()
  301. }()
  302. r.logger.Info("Raft node started")
  303. return nil
  304. }
  305. // Stop stops the Raft node
  306. func (r *Raft) Stop() error {
  307. if !atomic.CompareAndSwapInt32(&r.running, 1, 0) {
  308. return fmt.Errorf("not running")
  309. }
  310. // Signal all goroutines to stop
  311. close(r.stopCh)
  312. // Stop timers first to prevent new operations
  313. r.mu.Lock()
  314. if r.electionTimer != nil {
  315. r.electionTimer.Stop()
  316. }
  317. if r.heartbeatTimer != nil {
  318. r.heartbeatTimer.Stop()
  319. }
  320. r.mu.Unlock()
  321. // Wait for all goroutines to finish with timeout
  322. done := make(chan struct{})
  323. go func() {
  324. r.wg.Wait()
  325. close(done)
  326. }()
  327. select {
  328. case <-done:
  329. // All goroutines exited cleanly
  330. case <-time.After(3 * time.Second):
  331. r.logger.Warn("Timeout waiting for goroutines to stop")
  332. }
  333. // Stop transport (has its own timeout)
  334. if err := r.transport.Stop(); err != nil {
  335. r.logger.Error("Failed to stop transport: %v", err)
  336. }
  337. if err := r.storage.Close(); err != nil {
  338. r.logger.Error("Failed to close storage: %v", err)
  339. }
  340. r.logger.Info("Raft node stopped")
  341. return nil
  342. }
  343. // mainLoop is the main event loop
  344. func (r *Raft) mainLoop() {
  345. for {
  346. select {
  347. case <-r.stopCh:
  348. return
  349. default:
  350. }
  351. r.mu.RLock()
  352. state := r.state
  353. r.mu.RUnlock()
  354. switch state {
  355. case Follower:
  356. r.runFollower()
  357. case Candidate:
  358. r.runCandidate()
  359. case Leader:
  360. r.runLeader()
  361. }
  362. }
  363. }
  364. // runFollower handles follower behavior
  365. func (r *Raft) runFollower() {
  366. r.logger.Debug("Running as follower in term %d", r.currentTerm)
  367. for {
  368. select {
  369. case <-r.stopCh:
  370. return
  371. case <-r.electionTimer.C:
  372. r.mu.Lock()
  373. if r.state == Follower {
  374. // If we're joining a cluster, don't start elections
  375. // Give time for the leader to sync us up
  376. if r.joiningCluster {
  377. // Allow joining for up to 30 seconds
  378. if time.Since(r.joiningClusterTime) < 30*time.Second {
  379. r.logger.Debug("Suppressing election during cluster join")
  380. r.resetElectionTimer()
  381. r.mu.Unlock()
  382. continue
  383. }
  384. // Timeout - clear joining state
  385. r.joiningCluster = false
  386. r.logger.Warn("Cluster join timeout, resuming normal election behavior")
  387. }
  388. r.logger.Debug("Election timeout, becoming candidate")
  389. r.state = Candidate
  390. r.leaderID = "" // Clear leaderID so we can vote for self in Pre-Vote
  391. }
  392. r.mu.Unlock()
  393. return
  394. }
  395. }
  396. }
  397. // runCandidate handles candidate behavior (leader election)
  398. // Implements Pre-Vote mechanism to prevent term explosion
  399. func (r *Raft) runCandidate() {
  400. // Phase 1: Pre-Vote (don't increment term yet)
  401. if !r.runPreVote() {
  402. // Pre-vote failed, wait for timeout before retrying
  403. r.mu.Lock()
  404. r.resetElectionTimer()
  405. r.mu.Unlock()
  406. select {
  407. case <-r.stopCh:
  408. return
  409. case <-r.electionTimer.C:
  410. // Timer expired, will retry pre-vote
  411. }
  412. return
  413. }
  414. // Phase 2: Actual election (pre-vote succeeded, now increment term)
  415. r.mu.Lock()
  416. r.currentTerm++
  417. r.votedFor = r.nodeID
  418. r.leaderID = ""
  419. currentTerm := r.currentTerm
  420. // Update metrics
  421. atomic.StoreUint64(&r.metrics.Term, currentTerm)
  422. if err := r.persistState(); err != nil {
  423. r.logger.Error("Failed to persist state during election: %v", err)
  424. r.mu.Unlock()
  425. return // Cannot proceed without persisting state
  426. }
  427. atomic.AddUint64(&r.metrics.ElectionsStarted, 1)
  428. r.resetElectionTimer()
  429. r.mu.Unlock()
  430. r.logger.Debug("Starting election for term %d", currentTerm)
  431. // Get current peers list
  432. r.mu.RLock()
  433. currentPeers := make([]string, len(r.peers))
  434. copy(currentPeers, r.peers)
  435. clusterSize := len(r.clusterNodes)
  436. if clusterSize == 0 {
  437. clusterSize = len(r.peers) + 1
  438. }
  439. r.mu.RUnlock()
  440. // Request actual votes from all peers
  441. votes := 1 // Vote for self
  442. voteCh := make(chan bool, len(currentPeers))
  443. lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
  444. for _, peer := range currentPeers {
  445. go func(peer string) {
  446. args := &RequestVoteArgs{
  447. Term: currentTerm,
  448. CandidateID: r.nodeID,
  449. LastLogIndex: lastLogIndex,
  450. LastLogTerm: lastLogTerm,
  451. PreVote: false,
  452. }
  453. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  454. defer cancel()
  455. reply, err := r.transport.RequestVote(ctx, peer, args)
  456. if err != nil {
  457. r.logger.Debug("RequestVote to %s failed: %v", peer, err)
  458. voteCh <- false
  459. return
  460. }
  461. r.mu.Lock()
  462. defer r.mu.Unlock()
  463. if reply.Term > r.currentTerm {
  464. r.becomeFollower(reply.Term)
  465. voteCh <- false
  466. return
  467. }
  468. voteCh <- reply.VoteGranted
  469. }(peer)
  470. }
  471. // Wait for votes - majority is (clusterSize/2) + 1
  472. needed := clusterSize/2 + 1
  473. // If we already have enough votes (single-node cluster), become leader immediately
  474. if votes >= needed {
  475. r.mu.Lock()
  476. if r.state == Candidate && r.currentTerm == currentTerm {
  477. r.becomeLeader()
  478. }
  479. r.mu.Unlock()
  480. return
  481. }
  482. for i := 0; i < len(currentPeers); i++ {
  483. select {
  484. case <-r.stopCh:
  485. return
  486. case <-r.electionTimer.C:
  487. r.logger.Debug("Election timeout, will start new election")
  488. return
  489. case granted := <-voteCh:
  490. if granted {
  491. votes++
  492. if votes >= needed {
  493. r.mu.Lock()
  494. if r.state == Candidate && r.currentTerm == currentTerm {
  495. r.becomeLeader()
  496. }
  497. r.mu.Unlock()
  498. return
  499. }
  500. }
  501. }
  502. // Check if we're still a candidate
  503. r.mu.RLock()
  504. if r.state != Candidate {
  505. r.mu.RUnlock()
  506. return
  507. }
  508. r.mu.RUnlock()
  509. }
  510. // Election failed, wait for timeout
  511. r.mu.RLock()
  512. stillCandidate := r.state == Candidate
  513. r.mu.RUnlock()
  514. if stillCandidate {
  515. r.logger.Debug("Election failed, waiting for timeout (got %d/%d votes)", votes, needed)
  516. select {
  517. case <-r.stopCh:
  518. return
  519. case <-r.electionTimer.C:
  520. // Timer expired, will start new election
  521. }
  522. }
  523. }
  524. // runPreVote sends pre-vote requests to check if we can win an election
  525. // Returns true if we got majority pre-votes, false otherwise
  526. func (r *Raft) runPreVote() bool {
  527. r.mu.RLock()
  528. currentTerm := r.currentTerm
  529. currentPeers := make([]string, len(r.peers))
  530. copy(currentPeers, r.peers)
  531. clusterSize := len(r.clusterNodes)
  532. if clusterSize == 0 {
  533. clusterSize = len(r.peers) + 1
  534. }
  535. leaderID := r.leaderID
  536. r.mu.RUnlock()
  537. // Pre-vote uses term+1 but doesn't actually increment it
  538. preVoteTerm := currentTerm + 1
  539. lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
  540. r.logger.Debug("Starting pre-vote for term %d", preVoteTerm)
  541. // Per Raft Pre-Vote optimization (§9.6): if we have a known leader,
  542. // don't vote for self. This prevents standalone nodes from constantly
  543. // electing themselves when they should be joining an existing cluster.
  544. preVotes := 0
  545. if leaderID == "" {
  546. preVotes = 1 // Vote for self only if we don't know of a leader
  547. } else {
  548. r.logger.Debug("Pre-vote: not self-voting because we have leader %s", leaderID)
  549. }
  550. preVoteCh := make(chan bool, len(currentPeers))
  551. for _, peer := range currentPeers {
  552. go func(peer string) {
  553. args := &RequestVoteArgs{
  554. Term: preVoteTerm,
  555. CandidateID: r.nodeID,
  556. LastLogIndex: lastLogIndex,
  557. LastLogTerm: lastLogTerm,
  558. PreVote: true,
  559. }
  560. ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
  561. defer cancel()
  562. reply, err := r.transport.RequestVote(ctx, peer, args)
  563. if err != nil {
  564. r.logger.Debug("PreVote to %s failed: %v", peer, err)
  565. preVoteCh <- false
  566. return
  567. }
  568. // For pre-vote, we don't step down even if reply.Term > currentTerm
  569. // because the other node might also be doing pre-vote
  570. preVoteCh <- reply.VoteGranted
  571. }(peer)
  572. }
  573. // Wait for pre-votes with a shorter timeout - majority is (clusterSize/2) + 1
  574. needed := clusterSize/2 + 1
  575. // If we already have enough votes (single-node cluster with no known leader), succeed immediately
  576. if preVotes >= needed {
  577. r.logger.Debug("Pre-vote succeeded immediately (got %d/%d pre-votes)", preVotes, needed)
  578. return true
  579. }
  580. timeout := time.After(500 * time.Millisecond)
  581. for i := 0; i < len(currentPeers); i++ {
  582. select {
  583. case <-r.stopCh:
  584. return false
  585. case <-timeout:
  586. r.logger.Debug("Pre-vote timeout (got %d/%d pre-votes)", preVotes, needed)
  587. return false
  588. case granted := <-preVoteCh:
  589. if granted {
  590. preVotes++
  591. if preVotes >= needed {
  592. r.logger.Debug("Pre-vote succeeded (got %d/%d pre-votes)", preVotes, needed)
  593. return true
  594. }
  595. }
  596. }
  597. // Check if we're still a candidate
  598. r.mu.RLock()
  599. if r.state != Candidate {
  600. r.mu.RUnlock()
  601. return false
  602. }
  603. r.mu.RUnlock()
  604. }
  605. r.logger.Debug("Pre-vote failed (got %d/%d pre-votes)", preVotes, needed)
  606. return false
  607. }
  608. // runLeader handles leader behavior
  609. func (r *Raft) runLeader() {
  610. r.logger.Debug("Running as leader in term %d", r.currentTerm)
  611. // Send initial heartbeat
  612. r.sendHeartbeats()
  613. // Start heartbeat timer
  614. r.mu.Lock()
  615. r.heartbeatTimer = time.NewTimer(r.config.HeartbeatInterval)
  616. r.mu.Unlock()
  617. for {
  618. select {
  619. case <-r.stopCh:
  620. return
  621. case <-r.heartbeatTimer.C:
  622. r.mu.RLock()
  623. if r.state != Leader {
  624. r.mu.RUnlock()
  625. return
  626. }
  627. r.mu.RUnlock()
  628. r.sendHeartbeats()
  629. r.heartbeatTimer.Reset(r.config.HeartbeatInterval)
  630. case <-r.commitCh:
  631. r.updateCommitIndex()
  632. }
  633. }
  634. }
  635. // becomeFollower transitions to follower state
  636. func (r *Raft) becomeFollower(term uint64) {
  637. oldState := r.state
  638. oldTerm := r.currentTerm
  639. r.state = Follower
  640. r.currentTerm = term
  641. // Update metrics
  642. atomic.StoreUint64(&r.metrics.Term, term)
  643. r.votedFor = ""
  644. r.leaderID = ""
  645. // Must persist before responding - use mustPersistState for critical transitions
  646. r.mustPersistState()
  647. r.resetElectionTimer()
  648. // Clear leadership transfer state
  649. r.transferring = false
  650. r.transferTarget = ""
  651. // Only log significant state changes
  652. if oldState == Leader && term > oldTerm {
  653. // Stepping down from leader is notable
  654. r.logger.Debug("Stepped down from leader in term %d", term)
  655. }
  656. }
  657. // becomeLeader transitions to leader state
  658. func (r *Raft) becomeLeader() {
  659. r.state = Leader
  660. r.leaderID = r.nodeID
  661. // Update metrics
  662. atomic.AddUint64(&r.metrics.ElectionsWon, 1)
  663. // Initialize leader state for all peers
  664. lastIndex := r.log.LastIndex()
  665. for nodeID, addr := range r.clusterNodes {
  666. if nodeID != r.nodeID {
  667. r.nextIndex[addr] = lastIndex + 1
  668. r.matchIndex[addr] = 0
  669. }
  670. }
  671. // Also handle legacy peers
  672. for _, peer := range r.peers {
  673. if _, exists := r.nextIndex[peer]; !exists {
  674. r.nextIndex[peer] = lastIndex + 1
  675. r.matchIndex[peer] = 0
  676. }
  677. }
  678. r.logger.Info("Became leader in term %d with %d peers", r.currentTerm, len(r.clusterNodes)-1)
  679. // Append a no-op entry to commit entries from previous terms
  680. // This is a standard Raft optimization - the leader appends a no-op
  681. // entry in its current term to quickly commit all pending entries
  682. noopEntry := LogEntry{
  683. Index: r.log.LastIndex() + 1,
  684. Term: r.currentTerm,
  685. Type: EntryNoop,
  686. Command: nil,
  687. }
  688. if err := r.log.Append(noopEntry); err != nil {
  689. r.logger.Error("Failed to append no-op entry: %v", err)
  690. } else {
  691. r.logger.Debug("Appended no-op entry at index %d for term %d", noopEntry.Index, noopEntry.Term)
  692. }
  693. }
  694. // sendHeartbeats sends AppendEntries RPCs to all peers
  695. func (r *Raft) sendHeartbeats() {
  696. r.mu.RLock()
  697. if r.state != Leader {
  698. r.mu.RUnlock()
  699. return
  700. }
  701. currentTerm := r.currentTerm
  702. leaderCommit := r.commitIndex
  703. // Get all peer addresses
  704. peerAddrs := make([]string, 0, len(r.clusterNodes))
  705. for nodeID, addr := range r.clusterNodes {
  706. if nodeID != r.nodeID {
  707. peerAddrs = append(peerAddrs, addr)
  708. }
  709. }
  710. // Also include legacy peers not in clusterNodes
  711. for _, peer := range r.peers {
  712. found := false
  713. for _, addr := range peerAddrs {
  714. if addr == peer {
  715. found = true
  716. break
  717. }
  718. }
  719. if !found {
  720. peerAddrs = append(peerAddrs, peer)
  721. }
  722. }
  723. r.mu.RUnlock()
  724. for _, peer := range peerAddrs {
  725. go r.replicateToPeer(peer, currentTerm, leaderCommit)
  726. }
  727. }
  728. // replicateToPeer sends AppendEntries to a specific peer
  729. func (r *Raft) replicateToPeer(peer string, term, leaderCommit uint64) {
  730. r.mu.RLock()
  731. if r.state != Leader || r.currentTerm != term {
  732. r.mu.RUnlock()
  733. return
  734. }
  735. nextIndex := r.nextIndex[peer]
  736. r.mu.RUnlock()
  737. // Get entries to send
  738. entries, prevLogIndex, prevLogTerm, err := r.log.GetEntriesForFollower(nextIndex, r.config.MaxLogEntriesPerRequest)
  739. if err == ErrCompacted {
  740. // Need to send snapshot
  741. r.sendSnapshot(peer)
  742. return
  743. }
  744. if err != nil {
  745. r.logger.Debug("Failed to get entries for %s: %v", peer, err)
  746. return
  747. }
  748. args := &AppendEntriesArgs{
  749. Term: term,
  750. LeaderID: r.nodeID,
  751. PrevLogIndex: prevLogIndex,
  752. PrevLogTerm: prevLogTerm,
  753. Entries: entries,
  754. LeaderCommit: leaderCommit,
  755. }
  756. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  757. defer cancel()
  758. reply, err := r.transport.AppendEntries(ctx, peer, args)
  759. if err != nil {
  760. r.logger.Debug("AppendEntries to %s failed: %v", peer, err)
  761. return
  762. }
  763. atomic.AddInt64(&r.stats.AppendsSent, 1)
  764. r.mu.Lock()
  765. defer r.mu.Unlock()
  766. if reply.Term > r.currentTerm {
  767. r.becomeFollower(reply.Term)
  768. return
  769. }
  770. if r.state != Leader || r.currentTerm != term {
  771. return
  772. }
  773. if reply.Success {
  774. // Update contact time
  775. r.lastContact[peer] = time.Now()
  776. // Update nextIndex and matchIndex
  777. if len(entries) > 0 {
  778. newMatchIndex := entries[len(entries)-1].Index
  779. if newMatchIndex > r.matchIndex[peer] {
  780. r.matchIndex[peer] = newMatchIndex
  781. r.nextIndex[peer] = newMatchIndex + 1
  782. // Try to update commit index
  783. select {
  784. case r.commitCh <- struct{}{}:
  785. default:
  786. }
  787. }
  788. }
  789. } else {
  790. // Even if failed (log conflict), we contacted the peer
  791. r.lastContact[peer] = time.Now()
  792. // Decrement nextIndex and retry
  793. if reply.ConflictTerm > 0 {
  794. // Find the last entry of ConflictTerm in our log
  795. found := false
  796. for idx := r.log.LastIndex(); idx >= r.log.FirstIndex(); idx-- {
  797. t, err := r.log.GetTerm(idx)
  798. if err != nil {
  799. break
  800. }
  801. if t == reply.ConflictTerm {
  802. r.nextIndex[peer] = idx + 1
  803. found = true
  804. break
  805. }
  806. if t < reply.ConflictTerm {
  807. break
  808. }
  809. }
  810. if !found {
  811. r.nextIndex[peer] = reply.ConflictIndex
  812. }
  813. } else if reply.ConflictIndex > 0 {
  814. r.nextIndex[peer] = reply.ConflictIndex
  815. } else {
  816. r.nextIndex[peer] = max(1, r.nextIndex[peer]-1)
  817. }
  818. }
  819. }
  820. // sendSnapshot sends a snapshot to a peer with chunked transfer support
  821. func (r *Raft) sendSnapshot(peer string) {
  822. r.mu.RLock()
  823. if r.state != Leader {
  824. r.mu.RUnlock()
  825. return
  826. }
  827. term := r.currentTerm
  828. chunkSize := r.config.SnapshotChunkSize
  829. if chunkSize <= 0 {
  830. chunkSize = 1024 * 1024 // Default 1MB
  831. }
  832. r.mu.RUnlock()
  833. data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
  834. if err != nil {
  835. r.logger.Error("Failed to get snapshot: %v", err)
  836. return
  837. }
  838. atomic.AddUint64(&r.metrics.SnapshotsSent, 1)
  839. r.logger.Info("Sending snapshot to %s: %d bytes, lastIndex=%d, lastTerm=%d",
  840. peer, len(data), lastIndex, lastTerm)
  841. // Send snapshot in chunks
  842. totalSize := len(data)
  843. offset := 0
  844. for offset < totalSize {
  845. // Check if we're still leader
  846. r.mu.RLock()
  847. if r.state != Leader || r.currentTerm != term {
  848. r.mu.RUnlock()
  849. r.logger.Debug("Aborting snapshot send: no longer leader")
  850. return
  851. }
  852. r.mu.RUnlock()
  853. // Calculate chunk size
  854. end := offset + chunkSize
  855. if end > totalSize {
  856. end = totalSize
  857. }
  858. chunk := data[offset:end]
  859. done := end >= totalSize
  860. args := &InstallSnapshotArgs{
  861. Term: term,
  862. LeaderID: r.nodeID,
  863. LastIncludedIndex: lastIndex,
  864. LastIncludedTerm: lastTerm,
  865. Offset: uint64(offset),
  866. Data: chunk,
  867. Done: done,
  868. }
  869. ctx, cancel := context.WithTimeout(context.Background(), r.config.SnapshotRPCTimeout)
  870. reply, err := r.transport.InstallSnapshot(ctx, peer, args)
  871. cancel()
  872. if err != nil {
  873. r.logger.Error("InstallSnapshot chunk to %s failed at offset %d: %v", peer, offset, err)
  874. return
  875. }
  876. if reply.Term > r.currentTerm {
  877. r.mu.Lock()
  878. r.becomeFollower(reply.Term)
  879. r.mu.Unlock()
  880. return
  881. }
  882. if !reply.Success {
  883. r.logger.Error("InstallSnapshot chunk rejected by %s at offset %d", peer, offset)
  884. return
  885. }
  886. offset = end
  887. }
  888. // Snapshot fully sent and accepted
  889. r.mu.Lock()
  890. defer r.mu.Unlock()
  891. if r.state != Leader || r.currentTerm != term {
  892. return
  893. }
  894. r.nextIndex[peer] = lastIndex + 1
  895. r.matchIndex[peer] = lastIndex
  896. r.logger.Info("Snapshot to %s completed, nextIndex=%d", peer, lastIndex+1)
  897. }
  898. // updateCommitIndex updates the commit index based on matchIndex
  899. func (r *Raft) updateCommitIndex() {
  900. r.mu.Lock()
  901. defer r.mu.Unlock()
  902. if r.state != Leader {
  903. return
  904. }
  905. // For config change entries, use the OLD cluster for majority calculation
  906. // This ensures that adding a node doesn't require the new node's vote
  907. // until the config change itself is committed
  908. votingNodes := r.clusterNodes
  909. if r.pendingConfigChange && r.oldClusterNodes != nil {
  910. votingNodes = r.oldClusterNodes
  911. }
  912. // Get current cluster size from voting nodes
  913. clusterSize := len(votingNodes)
  914. if clusterSize == 0 {
  915. clusterSize = len(r.peers) + 1
  916. }
  917. // Find the highest index replicated on a majority
  918. for n := r.log.LastIndex(); n > r.commitIndex; n-- {
  919. term, err := r.log.GetTerm(n)
  920. if err != nil {
  921. continue
  922. }
  923. // Only commit entries from current term
  924. if term != r.currentTerm {
  925. continue
  926. }
  927. // Count replicas (including self)
  928. count := 1 // Self
  929. for nodeID, addr := range votingNodes {
  930. if nodeID != r.nodeID {
  931. if r.matchIndex[addr] >= n {
  932. count++
  933. }
  934. }
  935. }
  936. // Also check legacy peers
  937. for _, peer := range r.peers {
  938. // Avoid double counting if peer is already in votingNodes
  939. alreadyCounted := false
  940. for _, addr := range votingNodes {
  941. if addr == peer {
  942. alreadyCounted = true
  943. break
  944. }
  945. }
  946. if !alreadyCounted && r.matchIndex[peer] >= n {
  947. count++
  948. }
  949. }
  950. // Majority is (n/2) + 1
  951. needed := clusterSize/2 + 1
  952. if count >= needed {
  953. r.commitIndex = n
  954. break
  955. }
  956. }
  957. }
  958. // applyLoop applies committed entries to the state machine
  959. // Uses event-driven model with fallback polling for reliability
  960. func (r *Raft) applyLoop() {
  961. // Use a ticker as fallback for missed signals (matches original 10ms polling)
  962. ticker := time.NewTicker(10 * time.Millisecond)
  963. defer ticker.Stop()
  964. for {
  965. select {
  966. case <-r.stopCh:
  967. return
  968. case <-r.commitCh:
  969. // Event-driven: triggered when commitIndex is updated
  970. r.applyCommitted()
  971. case <-ticker.C:
  972. // Fallback: check periodically in case we missed a signal
  973. r.applyCommitted()
  974. }
  975. }
  976. }
  977. // applyCommitted applies all committed but not yet applied entries
  978. func (r *Raft) applyCommitted() {
  979. r.mu.Lock()
  980. commitIndex := r.commitIndex
  981. lastApplied := r.lastApplied
  982. firstIndex := r.log.FirstIndex()
  983. lastLogIndex := r.log.LastIndex()
  984. r.mu.Unlock()
  985. // Safety check: ensure lastApplied is within valid range
  986. // If lastApplied is below firstIndex (due to snapshot), skip to firstIndex
  987. if lastApplied < firstIndex && firstIndex > 0 {
  988. r.mu.Lock()
  989. r.lastApplied = firstIndex
  990. lastApplied = firstIndex
  991. r.mu.Unlock()
  992. r.logger.Debug("Adjusted lastApplied to firstIndex %d after compaction", firstIndex)
  993. }
  994. // Safety check: don't try to apply beyond what we have in log
  995. if commitIndex > lastLogIndex {
  996. commitIndex = lastLogIndex
  997. }
  998. for lastApplied < commitIndex {
  999. lastApplied++
  1000. // Skip if entry has been compacted
  1001. if lastApplied < firstIndex {
  1002. continue
  1003. }
  1004. entry, err := r.log.GetEntry(lastApplied)
  1005. if err != nil {
  1006. // If entry is compacted, skip ahead
  1007. if err == ErrCompacted {
  1008. r.mu.Lock()
  1009. newFirstIndex := r.log.FirstIndex()
  1010. if lastApplied < newFirstIndex {
  1011. r.lastApplied = newFirstIndex
  1012. lastApplied = newFirstIndex
  1013. }
  1014. r.mu.Unlock()
  1015. continue
  1016. }
  1017. r.logger.Error("Failed to get entry %d: %v (firstIndex=%d, lastIndex=%d)",
  1018. lastApplied, err, r.log.FirstIndex(), r.log.LastIndex())
  1019. break
  1020. }
  1021. // Handle config change entries
  1022. if entry.Type == EntryConfig {
  1023. r.applyConfigChange(entry)
  1024. r.mu.Lock()
  1025. r.lastApplied = lastApplied
  1026. r.mu.Unlock()
  1027. continue
  1028. }
  1029. // Handle no-op entries (just update lastApplied, don't send to state machine)
  1030. if entry.Type == EntryNoop {
  1031. r.mu.Lock()
  1032. r.lastApplied = lastApplied
  1033. r.mu.Unlock()
  1034. continue
  1035. }
  1036. // Normal command entry
  1037. msg := ApplyMsg{
  1038. CommandValid: true,
  1039. Command: entry.Command,
  1040. CommandIndex: entry.Index,
  1041. CommandTerm: entry.Term,
  1042. }
  1043. select {
  1044. case r.applyCh <- msg:
  1045. case <-r.stopCh:
  1046. return
  1047. }
  1048. r.mu.Lock()
  1049. r.lastApplied = lastApplied
  1050. r.mu.Unlock()
  1051. }
  1052. // Check if log compaction is needed
  1053. // Run asynchronously to avoid blocking the apply loop if snapshotting is slow
  1054. go r.maybeCompactLog()
  1055. }
  1056. // maybeCompactLog checks if automatic log compaction should be triggered
  1057. // It uses a dynamic threshold to prevent "compaction thrashing":
  1058. // - First compaction triggers at SnapshotThreshold (default 100,000)
  1059. // - After compaction, next threshold = current_log_size * 1.5
  1060. // - This prevents every new entry from triggering compaction if log stays large
  1061. func (r *Raft) maybeCompactLog() {
  1062. // Skip if no snapshot provider is configured
  1063. if r.config.SnapshotProvider == nil {
  1064. return
  1065. }
  1066. // Skip if log compaction is explicitly disabled
  1067. if !r.config.LogCompactionEnabled {
  1068. return
  1069. }
  1070. // Check if compaction is already in progress (atomic check-and-set)
  1071. if !atomic.CompareAndSwapInt32(&r.compacting, 0, 1) {
  1072. return
  1073. }
  1074. // Ensure we release the compacting flag when done
  1075. defer atomic.StoreInt32(&r.compacting, 0)
  1076. r.mu.RLock()
  1077. lastApplied := r.lastApplied
  1078. firstIndex := r.log.FirstIndex()
  1079. initialThreshold := r.config.SnapshotThreshold
  1080. minRetention := r.config.SnapshotMinRetention
  1081. isLeader := r.state == Leader
  1082. dynamicThreshold := r.nextCompactionThreshold
  1083. r.mu.RUnlock()
  1084. // Guard against underflow: ensure lastApplied > firstIndex
  1085. if lastApplied <= firstIndex {
  1086. return
  1087. }
  1088. // Calculate current log size
  1089. logSize := lastApplied - firstIndex
  1090. // Determine effective threshold:
  1091. // - Use initial threshold if no compaction has occurred yet (dynamicThreshold == 0)
  1092. // - Otherwise use the dynamic threshold
  1093. effectiveThreshold := initialThreshold
  1094. if dynamicThreshold > 0 {
  1095. effectiveThreshold = dynamicThreshold
  1096. }
  1097. // Check if we have enough entries to warrant compaction
  1098. if logSize <= effectiveThreshold {
  1099. return
  1100. }
  1101. // Guard against underflow: ensure lastApplied > minRetention
  1102. if lastApplied <= minRetention {
  1103. return
  1104. }
  1105. // Calculate the safe compaction point
  1106. // We need to retain at least minRetention entries for follower catch-up
  1107. compactUpTo := lastApplied - minRetention
  1108. if compactUpTo <= firstIndex {
  1109. return // Not enough entries to compact while maintaining retention
  1110. }
  1111. // For leader, also consider the minimum nextIndex of all followers
  1112. // to avoid compacting entries that followers still need
  1113. if isLeader {
  1114. r.mu.RLock()
  1115. minNextIndex := lastApplied
  1116. for _, nextIdx := range r.nextIndex {
  1117. if nextIdx < minNextIndex {
  1118. minNextIndex = nextIdx
  1119. }
  1120. }
  1121. r.mu.RUnlock()
  1122. // Don't compact entries that followers still need
  1123. // Keep a buffer of minRetention entries before the slowest follower
  1124. if minNextIndex > minRetention {
  1125. followerSafePoint := minNextIndex - minRetention
  1126. if followerSafePoint < compactUpTo {
  1127. compactUpTo = followerSafePoint
  1128. }
  1129. } else {
  1130. // Slowest follower is too far behind, don't compact
  1131. // They will need a full snapshot anyway
  1132. compactUpTo = firstIndex // Effectively skip compaction
  1133. }
  1134. }
  1135. // Final check - make sure we're actually compacting something meaningful
  1136. if compactUpTo <= firstIndex {
  1137. return
  1138. }
  1139. // Get snapshot from application layer
  1140. snapshotData, err := r.config.SnapshotProvider(compactUpTo)
  1141. if err != nil {
  1142. r.logger.Error("Failed to get snapshot from provider: %v", err)
  1143. return
  1144. }
  1145. // Get the term at the compaction point
  1146. term, err := r.log.GetTerm(compactUpTo)
  1147. if err != nil {
  1148. r.logger.Error("Failed to get term for compaction index %d: %v", compactUpTo, err)
  1149. return
  1150. }
  1151. // Save snapshot
  1152. if err := r.storage.SaveSnapshot(snapshotData, compactUpTo, term); err != nil {
  1153. r.logger.Error("Failed to save snapshot: %v", err)
  1154. return
  1155. }
  1156. // Compact the log
  1157. // CRITICAL: We must hold the write lock during compaction to prevent
  1158. // concurrent writes (AppendEntries) unless we can guarantee underlying
  1159. // file integrity with concurrent modification. The requirement is to
  1160. // pause writes during compaction.
  1161. r.mu.Lock()
  1162. if err := r.log.Compact(compactUpTo); err != nil {
  1163. r.mu.Unlock()
  1164. r.logger.Error("Failed to compact log: %v", err)
  1165. return
  1166. }
  1167. r.mu.Unlock()
  1168. // Calculate new log size after compaction
  1169. newLogSize := lastApplied - compactUpTo
  1170. // Update dynamic threshold for next compaction
  1171. // We use a linear growth model: next threshold = current size + SnapshotThreshold
  1172. // This ensures that we trigger compaction roughly every SnapshotThreshold entries.
  1173. r.mu.Lock()
  1174. r.nextCompactionThreshold = newLogSize + r.config.SnapshotThreshold
  1175. // Ensure threshold doesn't go below the initial threshold
  1176. if r.nextCompactionThreshold < initialThreshold {
  1177. r.nextCompactionThreshold = initialThreshold
  1178. }
  1179. r.mu.Unlock()
  1180. r.logger.Info("Auto compaction completed: compacted up to index %d (term %d), log size %d -> %d, next threshold: %d",
  1181. compactUpTo, term, logSize, newLogSize, r.nextCompactionThreshold)
  1182. }
  1183. // resetElectionTimer resets the election timeout
  1184. func (r *Raft) resetElectionTimer() {
  1185. timeout := r.config.ElectionTimeoutMin +
  1186. time.Duration(rand.Int63n(int64(r.config.ElectionTimeoutMax-r.config.ElectionTimeoutMin)))
  1187. if r.electionTimer == nil {
  1188. r.electionTimer = time.NewTimer(timeout)
  1189. } else {
  1190. if !r.electionTimer.Stop() {
  1191. select {
  1192. case <-r.electionTimer.C:
  1193. default:
  1194. }
  1195. }
  1196. r.electionTimer.Reset(timeout)
  1197. }
  1198. }
  1199. // persistState saves the current state to stable storage
  1200. // Returns error if persistence fails - caller MUST handle this for safety
  1201. func (r *Raft) persistState() error {
  1202. state := &PersistentState{
  1203. CurrentTerm: r.currentTerm,
  1204. VotedFor: r.votedFor,
  1205. }
  1206. if err := r.storage.SaveState(state); err != nil {
  1207. r.logger.Error("Failed to persist state: %v", err)
  1208. return fmt.Errorf("%w: %v", ErrPersistFailed, err)
  1209. }
  1210. return nil
  1211. }
  1212. // mustPersistState saves state and panics on failure
  1213. // Use this only in critical paths where failure is unrecoverable
  1214. func (r *Raft) mustPersistState() {
  1215. if err := r.persistState(); err != nil {
  1216. // In production, you might want to trigger a graceful shutdown instead
  1217. r.logger.Error("CRITICAL: Failed to persist state, node may be in inconsistent state: %v", err)
  1218. panic(err)
  1219. }
  1220. }
  1221. // HandleRequestVote handles RequestVote RPCs (including pre-vote)
  1222. func (r *Raft) HandleRequestVote(args *RequestVoteArgs) *RequestVoteReply {
  1223. r.mu.Lock()
  1224. defer r.mu.Unlock()
  1225. reply := &RequestVoteReply{
  1226. Term: r.currentTerm,
  1227. VoteGranted: false,
  1228. }
  1229. // Handle pre-vote separately
  1230. if args.PreVote {
  1231. return r.handlePreVote(args)
  1232. }
  1233. // Reply false if term < currentTerm
  1234. if args.Term < r.currentTerm {
  1235. return reply
  1236. }
  1237. // If term > currentTerm, become follower
  1238. if args.Term > r.currentTerm {
  1239. r.becomeFollower(args.Term)
  1240. }
  1241. reply.Term = r.currentTerm
  1242. // Check if we can vote for this candidate
  1243. if (r.votedFor == "" || r.votedFor == args.CandidateID) &&
  1244. r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
  1245. r.votedFor = args.CandidateID
  1246. if err := r.persistState(); err != nil {
  1247. // Cannot grant vote if we can't persist the decision
  1248. r.logger.Error("Failed to persist vote for %s: %v", args.CandidateID, err)
  1249. return reply
  1250. }
  1251. r.resetElectionTimer()
  1252. reply.VoteGranted = true
  1253. r.logger.Debug("Granted vote to %s for term %d", args.CandidateID, args.Term)
  1254. }
  1255. return reply
  1256. }
  1257. // handlePreVote handles pre-vote requests
  1258. // Pre-vote doesn't change our state, just checks if we would vote
  1259. func (r *Raft) handlePreVote(args *RequestVoteArgs) *RequestVoteReply {
  1260. reply := &RequestVoteReply{
  1261. Term: r.currentTerm,
  1262. VoteGranted: false,
  1263. }
  1264. // For pre-vote, we check:
  1265. // 1. The candidate's term is at least as high as ours
  1266. // 2. The candidate's log is at least as up-to-date as ours
  1267. // 3. We don't have a current leader (or the candidate's term is higher)
  1268. if args.Term < r.currentTerm {
  1269. return reply
  1270. }
  1271. // Per Raft Pre-Vote optimization (§9.6): reject pre-vote if we have a current
  1272. // leader and the candidate's term is not higher than ours. This prevents
  1273. // disruptive elections when a partitioned node tries to rejoin.
  1274. if r.leaderID != "" && args.Term <= r.currentTerm {
  1275. r.logger.Debug("Rejecting pre-vote from %s: have leader %s", args.CandidateID, r.leaderID)
  1276. return reply
  1277. }
  1278. // Grant pre-vote if log is up-to-date
  1279. // Note: we don't check votedFor for pre-vote, and we don't update any state
  1280. if r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
  1281. reply.VoteGranted = true
  1282. r.logger.Debug("Granted pre-vote to %s for term %d", args.CandidateID, args.Term)
  1283. }
  1284. return reply
  1285. }
  1286. // HandleAppendEntries handles AppendEntries RPCs
  1287. func (r *Raft) HandleAppendEntries(args *AppendEntriesArgs) *AppendEntriesReply {
  1288. r.mu.Lock()
  1289. defer r.mu.Unlock()
  1290. atomic.AddInt64(&r.stats.AppendsReceived, 1)
  1291. reply := &AppendEntriesReply{
  1292. Term: r.currentTerm,
  1293. Success: false,
  1294. }
  1295. // Check if we're a standalone node being added to a cluster
  1296. // A standalone node has only itself in clusterNodes
  1297. isStandalone := len(r.clusterNodes) == 1
  1298. if _, hasSelf := r.clusterNodes[r.nodeID]; isStandalone && hasSelf {
  1299. // We're standalone and receiving AppendEntries from an external leader
  1300. // This means we're being added to a cluster - suppress elections
  1301. if !r.joiningCluster {
  1302. r.joiningCluster = true
  1303. r.joiningClusterTime = time.Now()
  1304. r.logger.Info("Detected cluster join in progress, suppressing elections")
  1305. }
  1306. // When joining, accept higher terms from the leader to sync up
  1307. if args.Term > r.currentTerm {
  1308. r.becomeFollower(args.Term)
  1309. }
  1310. }
  1311. // Reply false if term < currentTerm
  1312. if args.Term < r.currentTerm {
  1313. // But still reset timer if we're joining a cluster to prevent elections
  1314. if r.joiningCluster {
  1315. r.resetElectionTimer()
  1316. }
  1317. return reply
  1318. }
  1319. // If term > currentTerm, or we're a candidate, or we're a leader receiving
  1320. // AppendEntries from another leader (split-brain scenario during cluster merge),
  1321. // become follower. In Raft, there can only be one leader per term.
  1322. if args.Term > r.currentTerm || r.state == Candidate || r.state == Leader {
  1323. r.becomeFollower(args.Term)
  1324. }
  1325. // Update leader info and reset election timer
  1326. r.leaderID = args.LeaderID
  1327. r.lastHeartbeat = time.Now()
  1328. r.resetElectionTimer()
  1329. reply.Term = r.currentTerm
  1330. // Try to append entries
  1331. success, conflictIndex, conflictTerm := r.log.AppendEntriesFromLeader(
  1332. args.PrevLogIndex, args.PrevLogTerm, args.Entries)
  1333. if !success {
  1334. reply.ConflictIndex = conflictIndex
  1335. reply.ConflictTerm = conflictTerm
  1336. return reply
  1337. }
  1338. reply.Success = true
  1339. // Update commit index safely
  1340. if args.LeaderCommit > r.commitIndex {
  1341. // Get our actual last log index
  1342. lastLogIndex := r.log.LastIndex()
  1343. // Calculate what index the entries would have reached
  1344. lastNewEntry := args.PrevLogIndex
  1345. if len(args.Entries) > 0 {
  1346. lastNewEntry = args.Entries[len(args.Entries)-1].Index
  1347. }
  1348. // Commit index should not exceed what we actually have in log
  1349. newCommitIndex := args.LeaderCommit
  1350. if newCommitIndex > lastNewEntry {
  1351. newCommitIndex = lastNewEntry
  1352. }
  1353. if newCommitIndex > lastLogIndex {
  1354. newCommitIndex = lastLogIndex
  1355. }
  1356. // Only advance commit index
  1357. if newCommitIndex > r.commitIndex {
  1358. r.commitIndex = newCommitIndex
  1359. }
  1360. }
  1361. return reply
  1362. }
  1363. // HandleInstallSnapshot handles InstallSnapshot RPCs with chunked transfer support
  1364. func (r *Raft) HandleInstallSnapshot(args *InstallSnapshotArgs) *InstallSnapshotReply {
  1365. r.mu.Lock()
  1366. defer r.mu.Unlock()
  1367. reply := &InstallSnapshotReply{
  1368. Term: r.currentTerm,
  1369. Success: false,
  1370. }
  1371. if args.Term < r.currentTerm {
  1372. return reply
  1373. }
  1374. if args.Term > r.currentTerm {
  1375. r.becomeFollower(args.Term)
  1376. }
  1377. r.leaderID = args.LeaderID
  1378. r.lastHeartbeat = time.Now()
  1379. r.resetElectionTimer()
  1380. reply.Term = r.currentTerm
  1381. // Skip if we already have this or a newer snapshot applied
  1382. if args.LastIncludedIndex <= r.lastApplied {
  1383. r.logger.Debug("Ignoring snapshot at index %d, already applied up to %d",
  1384. args.LastIncludedIndex, r.lastApplied)
  1385. reply.Success = true // Still success to let leader know we don't need it
  1386. return reply
  1387. }
  1388. // Handle chunked transfer
  1389. if args.Offset == 0 {
  1390. // First chunk - start new pending snapshot
  1391. r.pendingSnapshot = &pendingSnapshotState{
  1392. lastIncludedIndex: args.LastIncludedIndex,
  1393. lastIncludedTerm: args.LastIncludedTerm,
  1394. data: make([]byte, 0),
  1395. receivedBytes: 0,
  1396. }
  1397. r.logger.Info("Starting snapshot reception at index %d, term %d",
  1398. args.LastIncludedIndex, args.LastIncludedTerm)
  1399. }
  1400. // Validate we're receiving the expected snapshot
  1401. if r.pendingSnapshot == nil ||
  1402. r.pendingSnapshot.lastIncludedIndex != args.LastIncludedIndex ||
  1403. r.pendingSnapshot.lastIncludedTerm != args.LastIncludedTerm {
  1404. r.logger.Warn("Unexpected snapshot chunk: expected index %d, got %d",
  1405. r.pendingSnapshot.lastIncludedIndex, args.LastIncludedIndex)
  1406. return reply
  1407. }
  1408. // Validate offset matches what we've received
  1409. if uint64(args.Offset) != r.pendingSnapshot.receivedBytes {
  1410. r.logger.Warn("Unexpected chunk offset: expected %d, got %d",
  1411. r.pendingSnapshot.receivedBytes, args.Offset)
  1412. return reply
  1413. }
  1414. // Append chunk data
  1415. r.pendingSnapshot.data = append(r.pendingSnapshot.data, args.Data...)
  1416. r.pendingSnapshot.receivedBytes += uint64(len(args.Data))
  1417. reply.Success = true
  1418. r.logger.Debug("Received snapshot chunk: offset=%d, size=%d, done=%v",
  1419. args.Offset, len(args.Data), args.Done)
  1420. // If not done, wait for more chunks
  1421. if !args.Done {
  1422. return reply
  1423. }
  1424. // All chunks received - apply the snapshot
  1425. r.logger.Info("Installing complete snapshot: %d bytes at index %d, term %d",
  1426. len(r.pendingSnapshot.data), args.LastIncludedIndex, args.LastIncludedTerm)
  1427. atomic.AddUint64(&r.metrics.SnapshotsInstalled, 1)
  1428. // Save snapshot
  1429. if err := r.storage.SaveSnapshot(r.pendingSnapshot.data, args.LastIncludedIndex, args.LastIncludedTerm); err != nil {
  1430. r.logger.Error("Failed to save snapshot: %v", err)
  1431. r.pendingSnapshot = nil
  1432. reply.Success = false
  1433. return reply
  1434. }
  1435. // Compact log
  1436. if err := r.log.Compact(args.LastIncludedIndex); err != nil {
  1437. r.logger.Error("Failed to compact log: %v", err)
  1438. }
  1439. // Update state - must update both commitIndex and lastApplied
  1440. if args.LastIncludedIndex > r.commitIndex {
  1441. r.commitIndex = args.LastIncludedIndex
  1442. }
  1443. // Always update lastApplied to snapshot index to prevent trying to apply compacted entries
  1444. r.lastApplied = args.LastIncludedIndex
  1445. // Send snapshot to application (non-blocking with timeout)
  1446. // Use the complete pendingSnapshot data, not the last chunk
  1447. msg := ApplyMsg{
  1448. SnapshotValid: true,
  1449. Snapshot: r.pendingSnapshot.data,
  1450. SnapshotIndex: args.LastIncludedIndex,
  1451. SnapshotTerm: args.LastIncludedTerm,
  1452. }
  1453. // Clear pending snapshot
  1454. r.pendingSnapshot = nil
  1455. // Try to send, but don't block indefinitely
  1456. select {
  1457. case r.applyCh <- msg:
  1458. r.logger.Debug("Sent snapshot to application")
  1459. case <-time.After(100 * time.Millisecond):
  1460. r.logger.Warn("Timeout sending snapshot to application, will retry")
  1461. // The application will still get correct state via normal apply loop
  1462. }
  1463. return reply
  1464. }
  1465. // Propose proposes a new command to be replicated
  1466. func (r *Raft) Propose(command []byte) (uint64, uint64, bool) {
  1467. r.mu.Lock()
  1468. defer r.mu.Unlock()
  1469. if r.state != Leader {
  1470. return 0, 0, false
  1471. }
  1472. // Check connectivity (Lease Check)
  1473. // If we haven't heard from a majority of peers within ElectionTimeout,
  1474. // we shouldn't accept new commands because we might be partitioned.
  1475. if len(r.clusterNodes) > 1 {
  1476. activePeers := 1 // Self
  1477. now := time.Now()
  1478. timeout := r.config.ElectionTimeoutMax
  1479. for _, peer := range r.peers {
  1480. if last, ok := r.lastContact[peer]; ok && now.Sub(last) < timeout {
  1481. activePeers++
  1482. }
  1483. }
  1484. // Check majority
  1485. needed := len(r.clusterNodes)/2 + 1
  1486. if activePeers < needed {
  1487. r.logger.Warn("Rejecting Propose: lost contact with majority (active: %d, needed: %d)", activePeers, needed)
  1488. return 0, 0, false
  1489. }
  1490. }
  1491. index, err := r.log.AppendCommand(r.currentTerm, command)
  1492. if err != nil {
  1493. r.logger.Error("Failed to append command: %v", err)
  1494. return 0, 0, false
  1495. }
  1496. r.matchIndex[r.nodeID] = index
  1497. // For single-node cluster, we are the only voter and can commit immediately
  1498. // This fixes the issue where commitCh never gets triggered without other peers
  1499. if len(r.clusterNodes) <= 1 && len(r.peers) == 0 {
  1500. // Single node: self is majority, trigger commit immediately
  1501. select {
  1502. case r.commitCh <- struct{}{}:
  1503. default:
  1504. }
  1505. } else {
  1506. // Multi-node: trigger replication to other nodes
  1507. r.triggerReplication()
  1508. }
  1509. return index, r.currentTerm, true
  1510. }
  1511. // triggerReplication signals the replication loop to send heartbeats
  1512. // This uses a non-blocking send to batch replication requests
  1513. func (r *Raft) triggerReplication() {
  1514. select {
  1515. case r.replicationCh <- struct{}{}:
  1516. default:
  1517. // Replication already scheduled
  1518. }
  1519. }
  1520. // replicationLoop handles batched replication
  1521. // Uses simple delay-based batching: flush immediately when signaled, then wait
  1522. // to allow more requests to accumulate before the next flush.
  1523. func (r *Raft) replicationLoop() {
  1524. for {
  1525. select {
  1526. case <-r.stopCh:
  1527. return
  1528. case <-r.replicationCh:
  1529. // Flush and replicate immediately
  1530. r.flushAndReplicate()
  1531. // Wait briefly to allow batching of subsequent requests
  1532. // This gives time for more proposals to queue up before the next flush
  1533. time.Sleep(10 * time.Millisecond)
  1534. }
  1535. }
  1536. }
  1537. // flushAndReplicate flushes logs and sends heartbeats
  1538. func (r *Raft) flushAndReplicate() {
  1539. // Ensure logs are flushed to OS cache before sending to followers
  1540. // This implements Group Commit with Flush (fast) instead of Sync (slow)
  1541. if err := r.log.Flush(); err != nil {
  1542. r.logger.Error("Failed to flush log: %v", err)
  1543. }
  1544. r.sendHeartbeats()
  1545. }
  1546. // ProposeWithForward proposes a command, forwarding to leader if necessary
  1547. // This is the recommended method for applications to use
  1548. func (r *Raft) ProposeWithForward(command []byte) (index uint64, term uint64, err error) {
  1549. r.mu.RLock()
  1550. // Check if we are part of the cluster
  1551. // If we are not in clusterNodes, we shouldn't accept data commands
  1552. if _, exists := r.clusterNodes[r.nodeID]; !exists {
  1553. r.mu.RUnlock()
  1554. return 0, 0, fmt.Errorf("node %s is not part of the cluster", r.nodeID)
  1555. }
  1556. r.mu.RUnlock()
  1557. // Try local propose first
  1558. idx, t, isLeader := r.Propose(command)
  1559. if isLeader {
  1560. return idx, t, nil
  1561. }
  1562. // Not leader, forward to leader
  1563. r.mu.RLock()
  1564. leaderID := r.leaderID
  1565. // Use clusterNodes (dynamically maintained) to find leader address
  1566. leaderAddr := r.clusterNodes[leaderID]
  1567. r.mu.RUnlock()
  1568. if leaderID == "" {
  1569. return 0, 0, fmt.Errorf("no leader available")
  1570. }
  1571. if leaderAddr == "" {
  1572. return 0, 0, fmt.Errorf("leader %s address not found in cluster", leaderID)
  1573. }
  1574. // Forward to leader
  1575. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  1576. defer cancel()
  1577. args := &ProposeArgs{Command: command}
  1578. reply, err := r.transport.ForwardPropose(ctx, leaderAddr, args)
  1579. if err != nil {
  1580. return 0, 0, fmt.Errorf("forward failed: %w", err)
  1581. }
  1582. if !reply.Success {
  1583. return 0, 0, fmt.Errorf("leader rejected: %s", reply.Error)
  1584. }
  1585. return reply.Index, reply.Term, nil
  1586. }
  1587. // ForwardGet forwards a get request to the leader
  1588. func (r *Raft) ForwardGet(key string) (string, bool, error) {
  1589. r.mu.RLock()
  1590. // Check if we are part of the cluster
  1591. if _, exists := r.clusterNodes[r.nodeID]; !exists {
  1592. r.mu.RUnlock()
  1593. return "", false, fmt.Errorf("node %s is not part of the cluster", r.nodeID)
  1594. }
  1595. r.mu.RUnlock()
  1596. // Check if we are leader (local read)
  1597. if r.state == Leader {
  1598. if r.config.GetHandler != nil {
  1599. val, found := r.config.GetHandler(key)
  1600. return val, found, nil
  1601. }
  1602. return "", false, fmt.Errorf("get handler not configured")
  1603. }
  1604. r.mu.RLock()
  1605. leaderID := r.leaderID
  1606. leaderAddr := r.clusterNodes[leaderID]
  1607. r.mu.RUnlock()
  1608. if leaderID == "" {
  1609. return "", false, ErrNoLeader
  1610. }
  1611. if leaderAddr == "" {
  1612. return "", false, fmt.Errorf("leader %s address not found", leaderID)
  1613. }
  1614. // Forward to leader
  1615. ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
  1616. defer cancel()
  1617. args := &GetArgs{Key: key}
  1618. reply, err := r.transport.ForwardGet(ctx, leaderAddr, args)
  1619. if err != nil {
  1620. return "", false, fmt.Errorf("forward failed: %w", err)
  1621. }
  1622. return reply.Value, reply.Found, nil
  1623. }
  1624. // HandlePropose handles forwarded propose requests
  1625. func (r *Raft) HandlePropose(args *ProposeArgs) *ProposeReply {
  1626. index, term, isLeader := r.Propose(args.Command)
  1627. if !isLeader {
  1628. return &ProposeReply{
  1629. Success: false,
  1630. Error: "not leader",
  1631. }
  1632. }
  1633. return &ProposeReply{
  1634. Success: true,
  1635. Index: index,
  1636. Term: term,
  1637. }
  1638. }
  1639. // HandleAddNode handles forwarded AddNode requests
  1640. func (r *Raft) HandleAddNode(args *AddNodeArgs) *AddNodeReply {
  1641. err := r.AddNode(args.NodeID, args.Address)
  1642. if err != nil {
  1643. return &AddNodeReply{
  1644. Success: false,
  1645. Error: err.Error(),
  1646. }
  1647. }
  1648. return &AddNodeReply{
  1649. Success: true,
  1650. }
  1651. }
  1652. // HandleRemoveNode handles forwarded RemoveNode requests
  1653. func (r *Raft) HandleRemoveNode(args *RemoveNodeArgs) *RemoveNodeReply {
  1654. err := r.RemoveNode(args.NodeID)
  1655. if err != nil {
  1656. return &RemoveNodeReply{
  1657. Success: false,
  1658. Error: err.Error(),
  1659. }
  1660. }
  1661. return &RemoveNodeReply{
  1662. Success: true,
  1663. }
  1664. }
  1665. // GetState returns the current term and whether this node is leader
  1666. func (r *Raft) GetState() (uint64, bool) {
  1667. r.mu.RLock()
  1668. defer r.mu.RUnlock()
  1669. return r.currentTerm, r.state == Leader
  1670. }
  1671. // GetLeaderID returns the current leader ID
  1672. func (r *Raft) GetLeaderID() string {
  1673. r.mu.RLock()
  1674. defer r.mu.RUnlock()
  1675. return r.leaderID
  1676. }
  1677. // GetStats returns runtime statistics
  1678. func (r *Raft) GetStats() Stats {
  1679. r.mu.RLock()
  1680. defer r.mu.RUnlock()
  1681. lastIndex, lastTerm := r.log.LastIndexAndTerm()
  1682. firstIndex := r.log.FirstIndex()
  1683. // Copy cluster nodes
  1684. nodes := make(map[string]string)
  1685. for k, v := range r.clusterNodes {
  1686. nodes[k] = v
  1687. }
  1688. clusterSize := len(r.clusterNodes)
  1689. if clusterSize == 0 {
  1690. clusterSize = len(r.peers) + 1
  1691. }
  1692. return Stats{
  1693. Term: r.currentTerm,
  1694. State: r.state.String(),
  1695. FirstLogIndex: firstIndex,
  1696. LastLogIndex: lastIndex,
  1697. LastLogTerm: lastTerm,
  1698. CommitIndex: r.commitIndex,
  1699. LastApplied: r.lastApplied,
  1700. LeaderID: r.leaderID,
  1701. AppendsSent: atomic.LoadInt64(&r.stats.AppendsSent),
  1702. AppendsReceived: atomic.LoadInt64(&r.stats.AppendsReceived),
  1703. ClusterSize: clusterSize,
  1704. ClusterNodes: nodes,
  1705. }
  1706. }
  1707. // restoreFromSnapshot restores the FSM from a snapshot at startup
  1708. // This is called during Start() to ensure the FSM has the correct state
  1709. // before processing any new commands
  1710. func (r *Raft) restoreFromSnapshot() error {
  1711. // Get snapshot from storage
  1712. data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
  1713. if err != nil {
  1714. return fmt.Errorf("failed to get snapshot: %w", err)
  1715. }
  1716. // No snapshot exists
  1717. if len(data) == 0 || lastIndex == 0 {
  1718. return nil
  1719. }
  1720. r.logger.Info("Restoring FSM from snapshot at index %d, term %d (%d bytes)",
  1721. lastIndex, lastTerm, len(data))
  1722. // Update lastApplied to snapshot index to prevent re-applying compacted entries
  1723. r.mu.Lock()
  1724. if lastIndex > r.lastApplied {
  1725. r.lastApplied = lastIndex
  1726. }
  1727. if lastIndex > r.commitIndex {
  1728. r.commitIndex = lastIndex
  1729. }
  1730. r.mu.Unlock()
  1731. // Send snapshot to FSM for restoration
  1732. // Use a goroutine with timeout to avoid blocking if applyCh is full
  1733. msg := ApplyMsg{
  1734. SnapshotValid: true,
  1735. Snapshot: data,
  1736. SnapshotIndex: lastIndex,
  1737. SnapshotTerm: lastTerm,
  1738. }
  1739. // Try to send with a timeout
  1740. select {
  1741. case r.applyCh <- msg:
  1742. r.logger.Info("FSM restoration triggered from snapshot at index %d", lastIndex)
  1743. case <-time.After(5 * time.Second):
  1744. return fmt.Errorf("timeout sending snapshot to applyCh")
  1745. }
  1746. return nil
  1747. }
  1748. // TakeSnapshot takes a snapshot of the current state
  1749. func (r *Raft) TakeSnapshot(data []byte, index uint64) error {
  1750. r.mu.Lock()
  1751. defer r.mu.Unlock()
  1752. if index > r.lastApplied {
  1753. return fmt.Errorf("snapshot index %d exceeds lastApplied %d", index, r.lastApplied)
  1754. }
  1755. term, err := r.log.GetTerm(index)
  1756. if err != nil {
  1757. return fmt.Errorf("failed to get term for index %d: %w", index, err)
  1758. }
  1759. if err := r.storage.SaveSnapshot(data, index, term); err != nil {
  1760. return fmt.Errorf("failed to save snapshot: %w", err)
  1761. }
  1762. if err := r.log.Compact(index); err != nil {
  1763. return fmt.Errorf("failed to compact log: %w", err)
  1764. }
  1765. r.logger.Info("Took snapshot at index %d, term %d", index, term)
  1766. return nil
  1767. }
  1768. func max(a, b uint64) uint64 {
  1769. if a > b {
  1770. return a
  1771. }
  1772. return b
  1773. }
  1774. // ==================== Membership Change API ====================
  1775. //
  1776. // This implementation uses Single-Node Membership Change (also known as one-at-a-time changes)
  1777. // as described in the Raft dissertation (§4.3). This is safe because:
  1778. //
  1779. // 1. We only allow one configuration change at a time (pendingConfigChange flag)
  1780. // 2. For commits, we use the OLD cluster majority until the config change is committed
  1781. // 3. The new node starts receiving entries immediately but doesn't affect majority calculation
  1782. //
  1783. // This approach is simpler than Joint Consensus and is sufficient for most use cases.
  1784. // The invariant maintained is: any two majorities (old or new) must overlap.
  1785. //
  1786. // For adding a node: old majority = N/2+1, new majority = (N+1)/2+1 = N/2+1 (overlaps)
  1787. // For removing a node: old majority = N/2+1, new majority = (N-1)/2+1 (overlaps if N > 1)
  1788. //
  1789. // WARNING: Avoid adding/removing multiple nodes rapidly. Wait for each change to be committed.
  1790. // AddNode adds a new node to the cluster
  1791. // This can only be called on the leader
  1792. // The new node must already be running and reachable
  1793. //
  1794. // Safety guarantees:
  1795. // - Only one config change can be in progress at a time
  1796. // - The old cluster majority is used until the config change is committed
  1797. // - Returns error if leadership is lost during the operation
  1798. func (r *Raft) AddNode(nodeID, address string) error {
  1799. r.mu.Lock()
  1800. // Must be leader
  1801. if r.state != Leader {
  1802. leaderID := r.leaderID
  1803. r.mu.Unlock()
  1804. return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  1805. }
  1806. // Check if we're in the middle of a leadership transfer
  1807. if r.transferring {
  1808. r.mu.Unlock()
  1809. return fmt.Errorf("leadership transfer in progress")
  1810. }
  1811. // Check if there's already a pending config change
  1812. if r.pendingConfigChange {
  1813. r.mu.Unlock()
  1814. return ErrConfigInFlight
  1815. }
  1816. // Validate nodeID and address
  1817. if nodeID == "" {
  1818. r.mu.Unlock()
  1819. return fmt.Errorf("nodeID cannot be empty")
  1820. }
  1821. if address == "" {
  1822. r.mu.Unlock()
  1823. return fmt.Errorf("address cannot be empty")
  1824. }
  1825. // Check if node already exists
  1826. if _, exists := r.clusterNodes[nodeID]; exists {
  1827. r.mu.Unlock()
  1828. return fmt.Errorf("node %s already exists in cluster", nodeID)
  1829. }
  1830. // Check if address is already used by another node
  1831. for existingID, existingAddr := range r.clusterNodes {
  1832. if existingAddr == address {
  1833. r.mu.Unlock()
  1834. return fmt.Errorf("address %s is already used by node %s", address, existingID)
  1835. }
  1836. }
  1837. // Save old cluster nodes for majority calculation during config change
  1838. // This ensures we use the OLD cluster size until the config change is committed
  1839. r.oldClusterNodes = make(map[string]string)
  1840. for k, v := range r.clusterNodes {
  1841. r.oldClusterNodes[k] = v
  1842. }
  1843. // Create new config with the added node
  1844. newNodes := make(map[string]string)
  1845. for k, v := range r.clusterNodes {
  1846. newNodes[k] = v
  1847. }
  1848. newNodes[nodeID] = address
  1849. // Create config change entry with ClusterConfig
  1850. configIndex := r.log.LastIndex() + 1
  1851. entry := LogEntry{
  1852. Index: configIndex,
  1853. Term: r.currentTerm,
  1854. Type: EntryConfig,
  1855. Config: &ClusterConfig{Nodes: newNodes},
  1856. }
  1857. // Mark config change as pending and store the index
  1858. r.pendingConfigChange = true
  1859. r.configChangeIndex = configIndex
  1860. // Immediately apply the new configuration (for single-node changes, this is safe)
  1861. // The new node will start receiving AppendEntries immediately
  1862. r.clusterNodes[nodeID] = address
  1863. r.peers = append(r.peers, address)
  1864. // Set nextIndex to 1 (or firstIndex) so the new node syncs from the beginning
  1865. // This is crucial - the new node's log is empty, so we must start from index 1
  1866. firstIndex := r.log.FirstIndex()
  1867. if firstIndex == 0 {
  1868. firstIndex = 1
  1869. }
  1870. r.nextIndex[address] = firstIndex
  1871. r.matchIndex[address] = 0
  1872. r.mu.Unlock()
  1873. // Append the config change entry to log
  1874. if err := r.log.Append(entry); err != nil {
  1875. r.mu.Lock()
  1876. r.pendingConfigChange = false
  1877. r.oldClusterNodes = nil
  1878. r.configChangeIndex = 0
  1879. // Rollback
  1880. delete(r.clusterNodes, nodeID)
  1881. r.rebuildPeersList()
  1882. r.mu.Unlock()
  1883. return fmt.Errorf("failed to append config entry: %w", err)
  1884. }
  1885. r.logger.Info("Adding node %s (%s) to cluster", nodeID, address)
  1886. // Trigger immediate replication
  1887. r.triggerReplication()
  1888. return nil
  1889. }
  1890. // RemoveNode removes a node from the cluster
  1891. // This can only be called on the leader
  1892. // The node being removed can be any node except the leader itself
  1893. //
  1894. // Safety guarantees:
  1895. // - Only one config change can be in progress at a time
  1896. // - Cannot remove the leader (transfer leadership first)
  1897. // - Cannot reduce cluster to 0 nodes
  1898. // - The old cluster majority is used until the config change is committed
  1899. func (r *Raft) RemoveNode(nodeID string) error {
  1900. r.mu.Lock()
  1901. // Must be leader
  1902. if r.state != Leader {
  1903. leaderID := r.leaderID
  1904. r.mu.Unlock()
  1905. return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  1906. }
  1907. // Check if we're in the middle of a leadership transfer
  1908. if r.transferring {
  1909. r.mu.Unlock()
  1910. return fmt.Errorf("leadership transfer in progress")
  1911. }
  1912. // Cannot remove self
  1913. if nodeID == r.nodeID {
  1914. r.mu.Unlock()
  1915. return fmt.Errorf("cannot remove self from cluster, use TransferLeadership first")
  1916. }
  1917. // Validate nodeID
  1918. if nodeID == "" {
  1919. r.mu.Unlock()
  1920. return fmt.Errorf("nodeID cannot be empty")
  1921. }
  1922. // Check if there's already a pending config change
  1923. if r.pendingConfigChange {
  1924. r.mu.Unlock()
  1925. return ErrConfigInFlight
  1926. }
  1927. // Check if node exists
  1928. if _, exists := r.clusterNodes[nodeID]; !exists {
  1929. r.mu.Unlock()
  1930. return fmt.Errorf("node %s not found in cluster", nodeID)
  1931. }
  1932. // FORCE REMOVAL CHECK
  1933. // Check connectivity to the node being removed.
  1934. // If unreachable or invalid, we force remove it without standard consensus if needed,
  1935. // or at least bypass some strict checks.
  1936. // For Raft safety, we still need to commit a config change log entry so other nodes know about the removal.
  1937. // However, if the node is "invalid" (e.g. bad address), it won't respond to RPCs.
  1938. // We should allow removal even if we can't contact it.
  1939. // Check validity of address (simple heuristic)
  1940. nodeAddr := r.clusterNodes[nodeID]
  1941. isValidNode := true
  1942. if nodeAddr == "" || strings.HasPrefix(nodeAddr, ".") || !strings.Contains(nodeAddr, ":") {
  1943. isValidNode = false
  1944. r.logger.Warn("Removing invalid node %s with address %s", nodeID, nodeAddr)
  1945. }
  1946. // Cannot reduce cluster below 1 node, unless it's an invalid node cleanup
  1947. if isValidNode && len(r.clusterNodes) <= 1 {
  1948. r.mu.Unlock()
  1949. return fmt.Errorf("cannot remove last node from cluster")
  1950. }
  1951. // Save old cluster nodes for majority calculation during config change
  1952. r.oldClusterNodes = make(map[string]string)
  1953. for k, v := range r.clusterNodes {
  1954. r.oldClusterNodes[k] = v
  1955. }
  1956. // Create new config without the removed node
  1957. newNodes := make(map[string]string)
  1958. for k, v := range r.clusterNodes {
  1959. if k != nodeID {
  1960. newNodes[k] = v
  1961. }
  1962. }
  1963. // Create config change entry with ClusterConfig
  1964. configIndex := r.log.LastIndex() + 1
  1965. entry := LogEntry{
  1966. Index: configIndex,
  1967. Term: r.currentTerm,
  1968. Type: EntryConfig,
  1969. Config: &ClusterConfig{Nodes: newNodes},
  1970. }
  1971. // Mark config change as pending and store the index
  1972. r.pendingConfigChange = true
  1973. r.configChangeIndex = configIndex
  1974. // Get the address of node being removed for cleanup
  1975. removedAddr := r.clusterNodes[nodeID]
  1976. // Immediately apply the new configuration
  1977. delete(r.clusterNodes, nodeID)
  1978. r.rebuildPeersList()
  1979. delete(r.nextIndex, removedAddr)
  1980. delete(r.matchIndex, removedAddr)
  1981. r.mu.Unlock()
  1982. // Append the config change entry to log
  1983. if err := r.log.Append(entry); err != nil {
  1984. r.mu.Lock()
  1985. r.pendingConfigChange = false
  1986. r.oldClusterNodes = nil
  1987. r.configChangeIndex = 0
  1988. // Rollback - this is tricky but we try our best
  1989. r.clusterNodes[nodeID] = removedAddr
  1990. r.rebuildPeersList()
  1991. r.mu.Unlock()
  1992. return fmt.Errorf("failed to append config entry: %w", err)
  1993. }
  1994. r.logger.Info("Removing node %s from cluster, config at index %d", nodeID, entry.Index)
  1995. // Trigger replication
  1996. go r.sendHeartbeats()
  1997. return nil
  1998. }
  1999. // AddNodeWithForward adds a node, forwarding to leader if necessary
  2000. // This is the recommended method for applications to use
  2001. func (r *Raft) AddNodeWithForward(nodeID, address string) error {
  2002. // Try local operation first
  2003. err := r.AddNode(nodeID, address)
  2004. if err == nil {
  2005. return nil
  2006. }
  2007. // Check if we're not the leader
  2008. r.mu.RLock()
  2009. state := r.state
  2010. leaderID := r.leaderID
  2011. leaderAddr := r.clusterNodes[leaderID]
  2012. r.mu.RUnlock()
  2013. if state == Leader {
  2014. // We are leader but AddNode failed for other reasons
  2015. return err
  2016. }
  2017. // Not leader, forward to leader
  2018. if leaderID == "" {
  2019. return fmt.Errorf("no leader available")
  2020. }
  2021. if leaderAddr == "" {
  2022. return fmt.Errorf("leader %s address not found in cluster", leaderID)
  2023. }
  2024. // Forward to leader
  2025. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  2026. defer cancel()
  2027. args := &AddNodeArgs{NodeID: nodeID, Address: address}
  2028. reply, err := r.transport.ForwardAddNode(ctx, leaderAddr, args)
  2029. if err != nil {
  2030. return fmt.Errorf("forward failed: %w", err)
  2031. }
  2032. if !reply.Success {
  2033. return fmt.Errorf("leader rejected: %s", reply.Error)
  2034. }
  2035. return nil
  2036. }
  2037. // RemoveNodeWithForward removes a node, forwarding to leader if necessary
  2038. // This is the recommended method for applications to use
  2039. func (r *Raft) RemoveNodeWithForward(nodeID string) error {
  2040. // Try local operation first
  2041. err := r.RemoveNode(nodeID)
  2042. if err == nil {
  2043. return nil
  2044. }
  2045. // Check if we're not the leader
  2046. r.mu.RLock()
  2047. state := r.state
  2048. leaderID := r.leaderID
  2049. leaderAddr := r.clusterNodes[leaderID]
  2050. r.mu.RUnlock()
  2051. if state == Leader {
  2052. // We are leader but RemoveNode failed for other reasons
  2053. return err
  2054. }
  2055. // Not leader, forward to leader
  2056. if leaderID == "" {
  2057. return fmt.Errorf("no leader available")
  2058. }
  2059. if leaderAddr == "" {
  2060. return fmt.Errorf("leader %s address not found in cluster", leaderID)
  2061. }
  2062. // Forward to leader
  2063. // Short timeout for removal as it shouldn't block long
  2064. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  2065. defer cancel()
  2066. args := &RemoveNodeArgs{NodeID: nodeID}
  2067. reply, err := r.transport.ForwardRemoveNode(ctx, leaderAddr, args)
  2068. if err != nil {
  2069. return fmt.Errorf("forward failed: %w", err)
  2070. }
  2071. if !reply.Success {
  2072. return fmt.Errorf("leader rejected: %s", reply.Error)
  2073. }
  2074. return nil
  2075. }
  2076. // rebuildPeersList rebuilds the peers slice from clusterNodes
  2077. func (r *Raft) rebuildPeersList() {
  2078. r.peers = make([]string, 0, len(r.clusterNodes)-1)
  2079. for nodeID, addr := range r.clusterNodes {
  2080. if nodeID != r.nodeID {
  2081. r.peers = append(r.peers, addr)
  2082. }
  2083. }
  2084. }
  2085. // GetClusterNodes returns a copy of the current cluster membership
  2086. func (r *Raft) GetClusterNodes() map[string]string {
  2087. r.mu.RLock()
  2088. defer r.mu.RUnlock()
  2089. nodes := make(map[string]string)
  2090. for k, v := range r.clusterNodes {
  2091. nodes[k] = v
  2092. }
  2093. return nodes
  2094. }
  2095. // applyConfigChange applies a configuration change entry
  2096. func (r *Raft) applyConfigChange(entry *LogEntry) {
  2097. if entry.Config == nil || entry.Config.Nodes == nil {
  2098. r.logger.Warn("Invalid config change entry at index %d", entry.Index)
  2099. return
  2100. }
  2101. r.mu.Lock()
  2102. defer r.mu.Unlock()
  2103. // Update cluster configuration
  2104. r.clusterNodes = make(map[string]string)
  2105. for k, v := range entry.Config.Nodes {
  2106. r.clusterNodes[k] = v
  2107. }
  2108. r.rebuildPeersList()
  2109. // Persist the new configuration
  2110. if err := r.storage.SaveClusterConfig(&ClusterConfig{Nodes: r.clusterNodes}); err != nil {
  2111. r.logger.Error("Failed to persist cluster config: %v", err)
  2112. }
  2113. // Clear pending flag and old cluster state
  2114. r.pendingConfigChange = false
  2115. r.oldClusterNodes = nil
  2116. r.configChangeIndex = 0
  2117. // If we were joining a cluster and now have multiple nodes, we've successfully joined
  2118. if r.joiningCluster && len(r.clusterNodes) > 1 {
  2119. r.joiningCluster = false
  2120. r.logger.Info("Successfully joined cluster with %d nodes", len(r.clusterNodes))
  2121. }
  2122. // Check if we have been removed from the cluster
  2123. if _, exists := r.clusterNodes[r.nodeID]; !exists {
  2124. r.logger.Warn("Node %s removed from cluster configuration", r.nodeID)
  2125. // We could shut down here, or just stay as a listener.
  2126. // For now, let's step down to Follower if we are not already.
  2127. if r.state != Follower {
  2128. r.becomeFollower(r.currentTerm)
  2129. }
  2130. }
  2131. r.logger.Info("Applied config change at index %d, cluster now has %d nodes", entry.Index, len(r.clusterNodes))
  2132. // If we're the leader, update leader state
  2133. if r.state == Leader {
  2134. // Check if we are still in the cluster
  2135. if _, exists := r.clusterNodes[r.nodeID]; !exists {
  2136. r.logger.Warn("Leader removed from cluster (self), stepping down")
  2137. r.becomeFollower(r.currentTerm)
  2138. return
  2139. }
  2140. // Initialize nextIndex/matchIndex for any new nodes
  2141. lastIndex := r.log.LastIndex()
  2142. for nodeID, addr := range r.clusterNodes {
  2143. if nodeID != r.nodeID {
  2144. if _, exists := r.nextIndex[addr]; !exists {
  2145. r.nextIndex[addr] = lastIndex + 1
  2146. r.matchIndex[addr] = 0
  2147. }
  2148. }
  2149. }
  2150. // Clean up removed nodes
  2151. validAddrs := make(map[string]bool)
  2152. for nodeID, addr := range r.clusterNodes {
  2153. if nodeID != r.nodeID {
  2154. validAddrs[addr] = true
  2155. }
  2156. }
  2157. for addr := range r.nextIndex {
  2158. if !validAddrs[addr] {
  2159. delete(r.nextIndex, addr)
  2160. delete(r.matchIndex, addr)
  2161. }
  2162. }
  2163. }
  2164. }
  2165. // ==================== ReadIndex (Linearizable Reads) ====================
  2166. // readIndexLoop handles read index requests
  2167. func (r *Raft) readIndexLoop() {
  2168. for {
  2169. select {
  2170. case <-r.stopCh:
  2171. return
  2172. case req := <-r.readIndexCh:
  2173. r.processReadIndexRequest(req)
  2174. }
  2175. }
  2176. }
  2177. // processReadIndexRequest processes a single read index request
  2178. func (r *Raft) processReadIndexRequest(req *readIndexRequest) {
  2179. r.mu.RLock()
  2180. if r.state != Leader {
  2181. r.mu.RUnlock()
  2182. req.done <- ErrNotLeader
  2183. return
  2184. }
  2185. r.mu.RUnlock()
  2186. // Confirm leadership by sending heartbeats and waiting for majority ack
  2187. if !r.confirmLeadership() {
  2188. req.done <- ErrLeadershipLost
  2189. return
  2190. }
  2191. // Wait for apply to catch up to readIndex
  2192. if err := r.waitApply(req.readIndex, r.config.ProposeTimeout); err != nil {
  2193. req.done <- err
  2194. return
  2195. }
  2196. req.done <- nil
  2197. }
  2198. // ReadIndex implements linearizable reads
  2199. // It ensures that the read sees all writes that were committed before the read started
  2200. func (r *Raft) ReadIndex() (uint64, error) {
  2201. r.mu.RLock()
  2202. if r.state != Leader {
  2203. leaderID := r.leaderID
  2204. r.mu.RUnlock()
  2205. return 0, NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  2206. }
  2207. readIndex := r.commitIndex
  2208. r.mu.RUnlock()
  2209. atomic.AddUint64(&r.metrics.ReadIndexRequests, 1)
  2210. // Create request and send to processing loop
  2211. req := &readIndexRequest{
  2212. readIndex: readIndex,
  2213. done: make(chan error, 1),
  2214. }
  2215. select {
  2216. case r.readIndexCh <- req:
  2217. case <-r.stopCh:
  2218. return 0, ErrShutdown
  2219. case <-time.After(r.config.ProposeTimeout):
  2220. return 0, ErrTimeout
  2221. }
  2222. // Wait for result
  2223. select {
  2224. case err := <-req.done:
  2225. if err != nil {
  2226. return 0, err
  2227. }
  2228. atomic.AddUint64(&r.metrics.ReadIndexSuccess, 1)
  2229. return readIndex, nil
  2230. case <-r.stopCh:
  2231. return 0, ErrShutdown
  2232. case <-time.After(r.config.ProposeTimeout):
  2233. return 0, ErrTimeout
  2234. }
  2235. }
  2236. // confirmLeadership confirms we're still leader by getting acks from majority
  2237. func (r *Raft) confirmLeadership() bool {
  2238. r.mu.RLock()
  2239. if r.state != Leader {
  2240. r.mu.RUnlock()
  2241. return false
  2242. }
  2243. currentTerm := r.currentTerm
  2244. clusterSize := len(r.clusterNodes)
  2245. if clusterSize == 0 {
  2246. clusterSize = len(r.peers) + 1
  2247. }
  2248. r.mu.RUnlock()
  2249. // Single node cluster - we're always the leader
  2250. if clusterSize == 1 {
  2251. return true
  2252. }
  2253. // Send heartbeats and count acks
  2254. r.sendHeartbeats()
  2255. // Wait briefly and check if we're still leader
  2256. time.Sleep(r.config.HeartbeatInterval)
  2257. r.mu.RLock()
  2258. stillLeader := r.state == Leader && r.currentTerm == currentTerm
  2259. r.mu.RUnlock()
  2260. return stillLeader
  2261. }
  2262. // waitApply waits until lastApplied >= index
  2263. func (r *Raft) waitApply(index uint64, timeout time.Duration) error {
  2264. deadline := time.Now().Add(timeout)
  2265. for {
  2266. r.mu.RLock()
  2267. lastApplied := r.lastApplied
  2268. r.mu.RUnlock()
  2269. if lastApplied >= index {
  2270. return nil
  2271. }
  2272. if time.Now().After(deadline) {
  2273. return ErrTimeout
  2274. }
  2275. time.Sleep(1 * time.Millisecond)
  2276. }
  2277. }
  2278. // ==================== Health Check ====================
  2279. // HealthCheck returns the current health status of the node
  2280. func (r *Raft) HealthCheck() HealthStatus {
  2281. r.mu.RLock()
  2282. defer r.mu.RUnlock()
  2283. clusterNodes := make(map[string]string)
  2284. for k, v := range r.clusterNodes {
  2285. clusterNodes[k] = v
  2286. }
  2287. clusterSize := len(r.clusterNodes)
  2288. if clusterSize == 0 {
  2289. clusterSize = len(r.peers) + 1
  2290. }
  2291. logBehind := uint64(0)
  2292. if r.commitIndex > r.lastApplied {
  2293. logBehind = r.commitIndex - r.lastApplied
  2294. }
  2295. // Consider healthy if we're leader or have a known leader
  2296. isHealthy := r.state == Leader || r.leaderID != ""
  2297. return HealthStatus{
  2298. NodeID: r.nodeID,
  2299. State: r.state.String(),
  2300. Term: r.currentTerm,
  2301. LeaderID: r.leaderID,
  2302. ClusterSize: clusterSize,
  2303. ClusterNodes: clusterNodes,
  2304. CommitIndex: r.commitIndex,
  2305. LastApplied: r.lastApplied,
  2306. LogBehind: logBehind,
  2307. LastHeartbeat: r.lastHeartbeat,
  2308. IsHealthy: isHealthy,
  2309. Uptime: time.Since(r.startTime),
  2310. }
  2311. }
  2312. // GetMetrics returns the current metrics
  2313. func (r *Raft) GetMetrics() Metrics {
  2314. return Metrics{
  2315. Term: atomic.LoadUint64(&r.metrics.Term),
  2316. ProposalsTotal: atomic.LoadUint64(&r.metrics.ProposalsTotal),
  2317. ProposalsSuccess: atomic.LoadUint64(&r.metrics.ProposalsSuccess),
  2318. ProposalsFailed: atomic.LoadUint64(&r.metrics.ProposalsFailed),
  2319. ProposalsForwarded: atomic.LoadUint64(&r.metrics.ProposalsForwarded),
  2320. AppendsSent: atomic.LoadUint64(&r.metrics.AppendsSent),
  2321. AppendsReceived: atomic.LoadUint64(&r.metrics.AppendsReceived),
  2322. AppendsSuccess: atomic.LoadUint64(&r.metrics.AppendsSuccess),
  2323. AppendsFailed: atomic.LoadUint64(&r.metrics.AppendsFailed),
  2324. ElectionsStarted: atomic.LoadUint64(&r.metrics.ElectionsStarted),
  2325. ElectionsWon: atomic.LoadUint64(&r.metrics.ElectionsWon),
  2326. PreVotesStarted: atomic.LoadUint64(&r.metrics.PreVotesStarted),
  2327. PreVotesGranted: atomic.LoadUint64(&r.metrics.PreVotesGranted),
  2328. SnapshotsTaken: atomic.LoadUint64(&r.metrics.SnapshotsTaken),
  2329. SnapshotsInstalled: atomic.LoadUint64(&r.metrics.SnapshotsInstalled),
  2330. SnapshotsSent: atomic.LoadUint64(&r.metrics.SnapshotsSent),
  2331. ReadIndexRequests: atomic.LoadUint64(&r.metrics.ReadIndexRequests),
  2332. ReadIndexSuccess: atomic.LoadUint64(&r.metrics.ReadIndexSuccess),
  2333. LeadershipTransfers: atomic.LoadUint64(&r.metrics.LeadershipTransfers),
  2334. LeadershipTransferSuccess: atomic.LoadUint64(&r.metrics.LeadershipTransferSuccess),
  2335. }
  2336. }
  2337. // ==================== Leadership Transfer ====================
  2338. // TransferLeadership transfers leadership to the specified node
  2339. func (r *Raft) TransferLeadership(targetID string) error {
  2340. r.mu.Lock()
  2341. if r.state != Leader {
  2342. r.mu.Unlock()
  2343. return ErrNotLeader
  2344. }
  2345. if targetID == r.nodeID {
  2346. r.mu.Unlock()
  2347. return fmt.Errorf("cannot transfer to self")
  2348. }
  2349. targetAddr, exists := r.clusterNodes[targetID]
  2350. if !exists {
  2351. r.mu.Unlock()
  2352. return fmt.Errorf("target node %s not in cluster", targetID)
  2353. }
  2354. if r.transferring {
  2355. r.mu.Unlock()
  2356. return fmt.Errorf("leadership transfer already in progress")
  2357. }
  2358. r.transferring = true
  2359. r.transferTarget = targetID
  2360. r.transferDeadline = time.Now().Add(r.config.ElectionTimeoutMax * 2)
  2361. currentTerm := r.currentTerm
  2362. atomic.AddUint64(&r.metrics.LeadershipTransfers, 1)
  2363. r.mu.Unlock()
  2364. r.logger.Info("Starting leadership transfer to %s", targetID)
  2365. // Step 1: Sync target to our log
  2366. if err := r.syncFollowerToLatest(targetAddr); err != nil {
  2367. r.mu.Lock()
  2368. r.transferring = false
  2369. r.transferTarget = ""
  2370. r.mu.Unlock()
  2371. return fmt.Errorf("failed to sync target: %w", err)
  2372. }
  2373. // Step 2: Send TimeoutNow RPC
  2374. args := &TimeoutNowArgs{
  2375. Term: currentTerm,
  2376. LeaderID: r.nodeID,
  2377. }
  2378. ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
  2379. defer cancel()
  2380. reply, err := r.transport.TimeoutNow(ctx, targetAddr, args)
  2381. if err != nil {
  2382. r.mu.Lock()
  2383. r.transferring = false
  2384. r.transferTarget = ""
  2385. r.mu.Unlock()
  2386. return fmt.Errorf("TimeoutNow RPC failed: %w", err)
  2387. }
  2388. if !reply.Success {
  2389. r.mu.Lock()
  2390. r.transferring = false
  2391. r.transferTarget = ""
  2392. r.mu.Unlock()
  2393. return fmt.Errorf("target rejected leadership transfer")
  2394. }
  2395. atomic.AddUint64(&r.metrics.LeadershipTransferSuccess, 1)
  2396. r.logger.Info("Leadership transfer to %s initiated successfully", targetID)
  2397. // Note: We don't immediately step down; we wait for the target to win election
  2398. // and send us an AppendEntries with higher term
  2399. return nil
  2400. }
  2401. // syncFollowerToLatest ensures the follower is caught up to our log
  2402. func (r *Raft) syncFollowerToLatest(peerAddr string) error {
  2403. r.mu.RLock()
  2404. if r.state != Leader {
  2405. r.mu.RUnlock()
  2406. return ErrNotLeader
  2407. }
  2408. currentTerm := r.currentTerm
  2409. leaderCommit := r.commitIndex
  2410. lastIndex := r.log.LastIndex()
  2411. r.mu.RUnlock()
  2412. // Keep replicating until follower is caught up
  2413. deadline := time.Now().Add(r.config.ElectionTimeoutMax * 2)
  2414. for time.Now().Before(deadline) {
  2415. r.mu.RLock()
  2416. if r.state != Leader || r.currentTerm != currentTerm {
  2417. r.mu.RUnlock()
  2418. return ErrLeadershipLost
  2419. }
  2420. matchIndex := r.matchIndex[peerAddr]
  2421. r.mu.RUnlock()
  2422. if matchIndex >= lastIndex {
  2423. return nil // Caught up
  2424. }
  2425. // Trigger replication
  2426. r.replicateToPeer(peerAddr, currentTerm, leaderCommit)
  2427. time.Sleep(10 * time.Millisecond)
  2428. }
  2429. return ErrTimeout
  2430. }
  2431. // HandleTimeoutNow handles TimeoutNow RPC (for leadership transfer)
  2432. func (r *Raft) HandleTimeoutNow(args *TimeoutNowArgs) *TimeoutNowReply {
  2433. r.mu.Lock()
  2434. defer r.mu.Unlock()
  2435. reply := &TimeoutNowReply{
  2436. Term: r.currentTerm,
  2437. Success: false,
  2438. }
  2439. // Only accept if we're a follower and the term matches
  2440. if args.Term < r.currentTerm {
  2441. return reply
  2442. }
  2443. if r.state != Follower {
  2444. return reply
  2445. }
  2446. // Immediately start election
  2447. r.logger.Info("Received TimeoutNow from %s, starting immediate election", args.LeaderID)
  2448. r.state = Candidate
  2449. reply.Success = true
  2450. return reply
  2451. }
  2452. // HandleReadIndex handles ReadIndex RPC
  2453. func (r *Raft) HandleReadIndex(args *ReadIndexArgs) *ReadIndexReply {
  2454. reply := &ReadIndexReply{
  2455. Success: false,
  2456. }
  2457. readIndex, err := r.ReadIndex()
  2458. if err != nil {
  2459. reply.Error = err.Error()
  2460. return reply
  2461. }
  2462. reply.ReadIndex = readIndex
  2463. reply.Success = true
  2464. return reply
  2465. }
  2466. // HandleGet handles Get RPC for remote KV reads
  2467. func (r *Raft) HandleGet(args *GetArgs) *GetReply {
  2468. reply := &GetReply{
  2469. Found: false,
  2470. }
  2471. if r.config.GetHandler == nil {
  2472. reply.Error = "get handler not configured"
  2473. return reply
  2474. }
  2475. value, found := r.config.GetHandler(args.Key)
  2476. reply.Value = value
  2477. reply.Found = found
  2478. return reply
  2479. }