raft.go 76 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943
  1. package raft
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "os"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. )
  12. // Raft represents a Raft consensus node
  13. type Raft struct {
  14. mu sync.RWMutex
  15. // Node identity
  16. nodeID string
  17. peers []string
  18. // Cluster membership - maps nodeID to address
  19. clusterNodes map[string]string
  20. // Current state
  21. state NodeState
  22. currentTerm uint64
  23. votedFor string
  24. leaderID string
  25. // Log management
  26. log *LogManager
  27. storage Storage
  28. commitIndex uint64
  29. lastApplied uint64
  30. // Leader state
  31. nextIndex map[string]uint64
  32. matchIndex map[string]uint64
  33. // Configuration
  34. config *Config
  35. // Communication
  36. transport Transport
  37. // Channels
  38. applyCh chan ApplyMsg
  39. stopCh chan struct{}
  40. commitCh chan struct{}
  41. applyNotifyCh chan struct{}
  42. // Election timer
  43. electionTimer *time.Timer
  44. heartbeatTimer *time.Timer
  45. // Statistics (deprecated, use metrics instead)
  46. stats Stats
  47. // Metrics for monitoring
  48. metrics Metrics
  49. // Logger
  50. logger Logger
  51. // Running flag
  52. running int32
  53. // Replication trigger channel - used to batch replication requests
  54. replicationCh chan struct{}
  55. // Pending config change - only one config change can be pending at a time
  56. pendingConfigChange bool
  57. // Old cluster nodes - used for majority calculation during config change
  58. // This ensures we use the old cluster size until the config change is committed
  59. oldClusterNodes map[string]string
  60. // Index of the pending config change entry
  61. configChangeIndex uint64
  62. // Joining cluster state - when a standalone node is being added to a cluster
  63. // This prevents the node from starting elections while syncing
  64. joiningCluster bool
  65. joiningClusterTime time.Time
  66. // Compaction state - prevents concurrent compaction
  67. compacting int32
  68. // Dynamic compaction threshold - updated after each compaction
  69. // Next compaction triggers when log size >= this value
  70. // Formula: lastCompactionSize * 1.5 (or initial SnapshotThreshold)
  71. nextCompactionThreshold uint64
  72. // WaitGroup to track goroutines for clean shutdown
  73. wg sync.WaitGroup
  74. // Leadership transfer state
  75. transferring bool
  76. transferTarget string
  77. transferDeadline time.Time
  78. // Last heartbeat received (for health check)
  79. lastHeartbeat time.Time
  80. // Start time (for uptime calculation)
  81. startTime time.Time
  82. // ReadIndex waiting queue
  83. readIndexCh chan *readIndexRequest
  84. // Snapshot receiving state (for chunked transfer)
  85. pendingSnapshot *pendingSnapshotState
  86. // Last contact time for each peer (for leader check)
  87. lastContact map[string]time.Time
  88. }
  89. // readIndexRequest represents a pending read index request
  90. type readIndexRequest struct {
  91. readIndex uint64
  92. done chan error
  93. }
  94. // pendingSnapshotState tracks chunked snapshot reception
  95. type pendingSnapshotState struct {
  96. lastIncludedIndex uint64
  97. lastIncludedTerm uint64
  98. data []byte
  99. receivedBytes uint64
  100. }
  101. // Stats holds runtime statistics
  102. type Stats struct {
  103. Term uint64
  104. State string
  105. FirstLogIndex uint64 // Added for monitoring compaction
  106. LastLogIndex uint64
  107. LastLogTerm uint64
  108. CommitIndex uint64
  109. LastApplied uint64
  110. LeaderID string
  111. VotesReceived int
  112. AppendsSent int64
  113. AppendsReceived int64
  114. ClusterSize int // Number of nodes in cluster
  115. ClusterNodes map[string]string // NodeID -> Address mapping
  116. }
  117. // ConsoleLogger implements Logger with console output
  118. type ConsoleLogger struct {
  119. Prefix string
  120. Level int // 0=debug, 1=info, 2=warn, 3=error
  121. mu sync.Mutex
  122. }
  123. func NewConsoleLogger(prefix string, level int) *ConsoleLogger {
  124. return &ConsoleLogger{Prefix: prefix, Level: level}
  125. }
  126. func (l *ConsoleLogger) log(level int, levelStr, format string, args ...interface{}) {
  127. if level < l.Level {
  128. return
  129. }
  130. l.mu.Lock()
  131. defer l.mu.Unlock()
  132. var levelColor string
  133. // Define colors locally to avoid dependency issues if cli.go is excluded
  134. const (
  135. colorReset = "\033[0m"
  136. colorDim = "\033[90m"
  137. colorRed = "\033[31m"
  138. colorGreen = "\033[32m"
  139. colorYellow = "\033[33m"
  140. colorCyan = "\033[36m"
  141. )
  142. switch level {
  143. case 0: levelColor = colorDim
  144. case 1: levelColor = colorGreen
  145. case 2: levelColor = colorYellow
  146. case 3: levelColor = colorRed
  147. default: levelColor = colorReset
  148. }
  149. msg := fmt.Sprintf(format, args...)
  150. // Format: [Time] Node [LEVEL] Message
  151. // Aligned for better readability
  152. fmt.Printf("%s[%s]%s %s%-8s%s %s[%-5s]%s %s\n",
  153. colorDim, time.Now().Format("15:04:05.000"), colorReset,
  154. colorCyan, l.Prefix, colorReset,
  155. levelColor, levelStr, colorReset,
  156. msg)
  157. }
  158. func (l *ConsoleLogger) Debug(format string, args ...interface{}) {
  159. l.log(0, "DEBUG", format, args...)
  160. }
  161. func (l *ConsoleLogger) Info(format string, args ...interface{}) {
  162. l.log(1, "INFO", format, args...)
  163. }
  164. func (l *ConsoleLogger) Warn(format string, args ...interface{}) {
  165. l.log(2, "WARN", format, args...)
  166. }
  167. func (l *ConsoleLogger) Error(format string, args ...interface{}) {
  168. l.log(3, "ERROR", format, args...)
  169. }
  170. // NewRaft creates a new Raft node
  171. func NewRaft(config *Config, transport Transport, applyCh chan ApplyMsg) (*Raft, error) {
  172. if config.Logger == nil {
  173. config.Logger = NewConsoleLogger(config.NodeID, 1)
  174. }
  175. // Create storage
  176. storage, err := NewHybridStorage(config.DataDir, config.MemoryLogCapacity, config.Logger)
  177. if err != nil {
  178. return nil, fmt.Errorf("failed to create storage: %w", err)
  179. }
  180. // Create log manager
  181. logMgr := NewLogManager(storage, config.Logger)
  182. // Load persistent state
  183. state, err := storage.GetState()
  184. if err != nil {
  185. return nil, fmt.Errorf("failed to load state: %w", err)
  186. }
  187. // Load or initialize cluster configuration
  188. clusterNodes := make(map[string]string)
  189. // Try to load saved cluster config first
  190. savedConfig, err := storage.GetClusterConfig()
  191. if err != nil {
  192. return nil, fmt.Errorf("failed to load cluster config: %w", err)
  193. }
  194. if savedConfig != nil && len(savedConfig.Nodes) > 0 {
  195. // Use saved config
  196. clusterNodes = savedConfig.Nodes
  197. config.Logger.Info("Loaded cluster config with %d nodes", len(clusterNodes))
  198. } else {
  199. // Initialize from config
  200. if config.ClusterNodes != nil && len(config.ClusterNodes) > 0 {
  201. for k, v := range config.ClusterNodes {
  202. clusterNodes[k] = v
  203. }
  204. } else {
  205. // Build from Peers + self (backward compatibility)
  206. clusterNodes[config.NodeID] = config.ListenAddr
  207. if config.PeerMap != nil {
  208. for k, v := range config.PeerMap {
  209. clusterNodes[k] = v
  210. }
  211. }
  212. }
  213. // Save initial config
  214. if len(clusterNodes) > 0 {
  215. if err := storage.SaveClusterConfig(&ClusterConfig{Nodes: clusterNodes}); err != nil {
  216. config.Logger.Warn("Failed to save initial cluster config: %v", err)
  217. }
  218. }
  219. }
  220. // Build peers list from cluster nodes (excluding self)
  221. var peers []string
  222. for nodeID, addr := range clusterNodes {
  223. if nodeID != config.NodeID {
  224. peers = append(peers, addr)
  225. }
  226. }
  227. r := &Raft{
  228. nodeID: config.NodeID,
  229. peers: peers,
  230. clusterNodes: clusterNodes,
  231. state: Follower,
  232. currentTerm: state.CurrentTerm,
  233. votedFor: state.VotedFor,
  234. log: logMgr,
  235. storage: storage,
  236. config: config,
  237. transport: transport,
  238. applyCh: applyCh,
  239. stopCh: make(chan struct{}),
  240. commitCh: make(chan struct{}, 100), // Increased buffer for event-driven apply
  241. applyNotifyCh: make(chan struct{}, 100),
  242. replicationCh: make(chan struct{}, 1),
  243. nextIndex: make(map[string]uint64),
  244. matchIndex: make(map[string]uint64),
  245. logger: config.Logger,
  246. readIndexCh: make(chan *readIndexRequest, 100),
  247. lastHeartbeat: time.Now(),
  248. lastContact: make(map[string]time.Time),
  249. }
  250. // Initialize metrics
  251. r.metrics.Term = state.CurrentTerm
  252. // Initialize lastApplied and commitIndex from config (if provided)
  253. if config.LastAppliedIndex > 0 {
  254. r.lastApplied = config.LastAppliedIndex
  255. r.commitIndex = config.LastAppliedIndex // Applied implies committed
  256. r.logger.Info("Initialized lastApplied and commitIndex to %d from config", config.LastAppliedIndex)
  257. }
  258. // Set RPC handler
  259. transport.SetRPCHandler(r)
  260. return r, nil
  261. }
  262. // Start starts the Raft node
  263. func (r *Raft) Start() error {
  264. if !atomic.CompareAndSwapInt32(&r.running, 0, 1) {
  265. return fmt.Errorf("already running")
  266. }
  267. // Record start time
  268. r.startTime = time.Now()
  269. // Start transport
  270. if err := r.transport.Start(); err != nil {
  271. return fmt.Errorf("failed to start transport: %w", err)
  272. }
  273. // Restore FSM from snapshot if exists
  274. // This must happen before starting apply loop to ensure FSM state is restored
  275. if err := r.restoreFromSnapshot(); err != nil {
  276. // Suppress warning if it's just a missing file (first start)
  277. if !os.IsNotExist(err) && !strings.Contains(err.Error(), "no such file") {
  278. r.logger.Warn("Failed to restore from snapshot: %v", err)
  279. } else {
  280. r.logger.Info("No snapshot found, starting with empty state")
  281. }
  282. // Continue anyway - the node can still function, just without historical state
  283. }
  284. // Start election timer
  285. r.resetElectionTimer()
  286. // Start background goroutines with WaitGroup tracking
  287. r.wg.Add(4)
  288. go func() {
  289. defer r.wg.Done()
  290. r.applyLoop()
  291. }()
  292. go func() {
  293. defer r.wg.Done()
  294. r.replicationLoop()
  295. }()
  296. go func() {
  297. defer r.wg.Done()
  298. r.mainLoop()
  299. }()
  300. go func() {
  301. defer r.wg.Done()
  302. r.readIndexLoop()
  303. }()
  304. r.logger.Info("Raft node started")
  305. return nil
  306. }
  307. // Stop stops the Raft node
  308. func (r *Raft) Stop() error {
  309. if !atomic.CompareAndSwapInt32(&r.running, 1, 0) {
  310. return fmt.Errorf("not running")
  311. }
  312. // Signal all goroutines to stop
  313. close(r.stopCh)
  314. // Stop timers first to prevent new operations
  315. r.mu.Lock()
  316. if r.electionTimer != nil {
  317. r.electionTimer.Stop()
  318. }
  319. if r.heartbeatTimer != nil {
  320. r.heartbeatTimer.Stop()
  321. }
  322. r.mu.Unlock()
  323. // Wait for all goroutines to finish with timeout
  324. done := make(chan struct{})
  325. go func() {
  326. r.wg.Wait()
  327. close(done)
  328. }()
  329. select {
  330. case <-done:
  331. // All goroutines exited cleanly
  332. case <-time.After(3 * time.Second):
  333. r.logger.Warn("Timeout waiting for goroutines to stop")
  334. }
  335. // Stop transport (has its own timeout)
  336. if err := r.transport.Stop(); err != nil {
  337. r.logger.Error("Failed to stop transport: %v", err)
  338. }
  339. if err := r.storage.Close(); err != nil {
  340. r.logger.Error("Failed to close storage: %v", err)
  341. }
  342. r.logger.Info("Raft node stopped")
  343. return nil
  344. }
  345. // mainLoop is the main event loop
  346. func (r *Raft) mainLoop() {
  347. for {
  348. select {
  349. case <-r.stopCh:
  350. return
  351. default:
  352. }
  353. r.mu.RLock()
  354. state := r.state
  355. r.mu.RUnlock()
  356. switch state {
  357. case Follower:
  358. r.runFollower()
  359. case Candidate:
  360. r.runCandidate()
  361. case Leader:
  362. r.runLeader()
  363. }
  364. }
  365. }
  366. // runFollower handles follower behavior
  367. func (r *Raft) runFollower() {
  368. r.logger.Debug("Running as follower in term %d", r.currentTerm)
  369. for {
  370. select {
  371. case <-r.stopCh:
  372. return
  373. case <-r.electionTimer.C:
  374. r.mu.Lock()
  375. if r.state == Follower {
  376. // If we're joining a cluster, don't start elections
  377. // Give time for the leader to sync us up
  378. if r.joiningCluster {
  379. // Allow joining for up to 30 seconds
  380. if time.Since(r.joiningClusterTime) < 30*time.Second {
  381. r.logger.Debug("Suppressing election during cluster join")
  382. r.resetElectionTimer()
  383. r.mu.Unlock()
  384. continue
  385. }
  386. // Timeout - clear joining state
  387. r.joiningCluster = false
  388. r.logger.Warn("Cluster join timeout, resuming normal election behavior")
  389. }
  390. r.logger.Debug("Election timeout, becoming candidate")
  391. r.state = Candidate
  392. r.leaderID = "" // Clear leaderID so we can vote for self in Pre-Vote
  393. }
  394. r.mu.Unlock()
  395. return
  396. }
  397. }
  398. }
  399. // runCandidate handles candidate behavior (leader election)
  400. // Implements Pre-Vote mechanism to prevent term explosion
  401. func (r *Raft) runCandidate() {
  402. // Phase 1: Pre-Vote (don't increment term yet)
  403. if !r.runPreVote() {
  404. // Pre-vote failed, wait for timeout before retrying
  405. r.mu.Lock()
  406. r.resetElectionTimer()
  407. r.mu.Unlock()
  408. select {
  409. case <-r.stopCh:
  410. return
  411. case <-r.electionTimer.C:
  412. // Timer expired, will retry pre-vote
  413. }
  414. return
  415. }
  416. // Phase 2: Actual election (pre-vote succeeded, now increment term)
  417. r.mu.Lock()
  418. r.currentTerm++
  419. r.votedFor = r.nodeID
  420. r.leaderID = ""
  421. currentTerm := r.currentTerm
  422. // Update metrics
  423. atomic.StoreUint64(&r.metrics.Term, currentTerm)
  424. if err := r.persistState(); err != nil {
  425. r.logger.Error("Failed to persist state during election: %v", err)
  426. r.mu.Unlock()
  427. return // Cannot proceed without persisting state
  428. }
  429. atomic.AddUint64(&r.metrics.ElectionsStarted, 1)
  430. r.resetElectionTimer()
  431. r.mu.Unlock()
  432. r.logger.Debug("Starting election for term %d", currentTerm)
  433. // Get current peers list
  434. r.mu.RLock()
  435. currentPeers := make([]string, len(r.peers))
  436. copy(currentPeers, r.peers)
  437. clusterSize := len(r.clusterNodes)
  438. if clusterSize == 0 {
  439. clusterSize = len(r.peers) + 1
  440. }
  441. r.mu.RUnlock()
  442. // Request actual votes from all peers
  443. votes := 1 // Vote for self
  444. voteCh := make(chan bool, len(currentPeers))
  445. lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
  446. for _, peer := range currentPeers {
  447. go func(peer string) {
  448. args := &RequestVoteArgs{
  449. Term: currentTerm,
  450. CandidateID: r.nodeID,
  451. LastLogIndex: lastLogIndex,
  452. LastLogTerm: lastLogTerm,
  453. PreVote: false,
  454. }
  455. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  456. defer cancel()
  457. reply, err := r.transport.RequestVote(ctx, peer, args)
  458. if err != nil {
  459. r.logger.Debug("RequestVote to %s failed: %v", peer, err)
  460. voteCh <- false
  461. return
  462. }
  463. r.mu.Lock()
  464. defer r.mu.Unlock()
  465. if reply.Term > r.currentTerm {
  466. r.becomeFollower(reply.Term)
  467. voteCh <- false
  468. return
  469. }
  470. voteCh <- reply.VoteGranted
  471. }(peer)
  472. }
  473. // Wait for votes - majority is (clusterSize/2) + 1
  474. needed := clusterSize/2 + 1
  475. // If we already have enough votes (single-node cluster), become leader immediately
  476. if votes >= needed {
  477. r.mu.Lock()
  478. if r.state == Candidate && r.currentTerm == currentTerm {
  479. r.becomeLeader()
  480. }
  481. r.mu.Unlock()
  482. return
  483. }
  484. for i := 0; i < len(currentPeers); i++ {
  485. select {
  486. case <-r.stopCh:
  487. return
  488. case <-r.electionTimer.C:
  489. r.logger.Debug("Election timeout, will start new election")
  490. return
  491. case granted := <-voteCh:
  492. if granted {
  493. votes++
  494. if votes >= needed {
  495. r.mu.Lock()
  496. if r.state == Candidate && r.currentTerm == currentTerm {
  497. r.becomeLeader()
  498. }
  499. r.mu.Unlock()
  500. return
  501. }
  502. }
  503. }
  504. // Check if we're still a candidate
  505. r.mu.RLock()
  506. if r.state != Candidate {
  507. r.mu.RUnlock()
  508. return
  509. }
  510. r.mu.RUnlock()
  511. }
  512. // Election failed, wait for timeout
  513. r.mu.RLock()
  514. stillCandidate := r.state == Candidate
  515. r.mu.RUnlock()
  516. if stillCandidate {
  517. r.logger.Debug("Election failed, waiting for timeout (got %d/%d votes)", votes, needed)
  518. select {
  519. case <-r.stopCh:
  520. return
  521. case <-r.electionTimer.C:
  522. // Timer expired, will start new election
  523. }
  524. }
  525. }
  526. // runPreVote sends pre-vote requests to check if we can win an election
  527. // Returns true if we got majority pre-votes, false otherwise
  528. func (r *Raft) runPreVote() bool {
  529. r.mu.RLock()
  530. currentTerm := r.currentTerm
  531. currentPeers := make([]string, len(r.peers))
  532. copy(currentPeers, r.peers)
  533. clusterSize := len(r.clusterNodes)
  534. if clusterSize == 0 {
  535. clusterSize = len(r.peers) + 1
  536. }
  537. leaderID := r.leaderID
  538. r.mu.RUnlock()
  539. // Pre-vote uses term+1 but doesn't actually increment it
  540. preVoteTerm := currentTerm + 1
  541. lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
  542. r.logger.Debug("Starting pre-vote for term %d", preVoteTerm)
  543. // Per Raft Pre-Vote optimization (§9.6): if we have a known leader,
  544. // don't vote for self. This prevents standalone nodes from constantly
  545. // electing themselves when they should be joining an existing cluster.
  546. preVotes := 0
  547. if leaderID == "" {
  548. preVotes = 1 // Vote for self only if we don't know of a leader
  549. } else {
  550. r.logger.Debug("Pre-vote: not self-voting because we have leader %s", leaderID)
  551. }
  552. preVoteCh := make(chan bool, len(currentPeers))
  553. for _, peer := range currentPeers {
  554. go func(peer string) {
  555. args := &RequestVoteArgs{
  556. Term: preVoteTerm,
  557. CandidateID: r.nodeID,
  558. LastLogIndex: lastLogIndex,
  559. LastLogTerm: lastLogTerm,
  560. PreVote: true,
  561. }
  562. ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
  563. defer cancel()
  564. reply, err := r.transport.RequestVote(ctx, peer, args)
  565. if err != nil {
  566. r.logger.Debug("PreVote to %s failed: %v", peer, err)
  567. preVoteCh <- false
  568. return
  569. }
  570. // For pre-vote, we don't step down even if reply.Term > currentTerm
  571. // because the other node might also be doing pre-vote
  572. preVoteCh <- reply.VoteGranted
  573. }(peer)
  574. }
  575. // Wait for pre-votes with a shorter timeout - majority is (clusterSize/2) + 1
  576. needed := clusterSize/2 + 1
  577. // If we already have enough votes (single-node cluster with no known leader), succeed immediately
  578. if preVotes >= needed {
  579. r.logger.Debug("Pre-vote succeeded immediately (got %d/%d pre-votes)", preVotes, needed)
  580. return true
  581. }
  582. timeout := time.After(500 * time.Millisecond)
  583. for i := 0; i < len(currentPeers); i++ {
  584. select {
  585. case <-r.stopCh:
  586. return false
  587. case <-timeout:
  588. r.logger.Debug("Pre-vote timeout (got %d/%d pre-votes)", preVotes, needed)
  589. return false
  590. case granted := <-preVoteCh:
  591. if granted {
  592. preVotes++
  593. if preVotes >= needed {
  594. r.logger.Debug("Pre-vote succeeded (got %d/%d pre-votes)", preVotes, needed)
  595. return true
  596. }
  597. }
  598. }
  599. // Check if we're still a candidate
  600. r.mu.RLock()
  601. if r.state != Candidate {
  602. r.mu.RUnlock()
  603. return false
  604. }
  605. r.mu.RUnlock()
  606. }
  607. r.logger.Debug("Pre-vote failed (got %d/%d pre-votes)", preVotes, needed)
  608. return false
  609. }
  610. // runLeader handles leader behavior
  611. func (r *Raft) runLeader() {
  612. r.logger.Debug("Running as leader in term %d", r.currentTerm)
  613. // Send initial heartbeat
  614. r.sendHeartbeats()
  615. // Start heartbeat timer
  616. r.mu.Lock()
  617. r.heartbeatTimer = time.NewTimer(r.config.HeartbeatInterval)
  618. r.mu.Unlock()
  619. for {
  620. select {
  621. case <-r.stopCh:
  622. return
  623. case <-r.heartbeatTimer.C:
  624. r.mu.RLock()
  625. if r.state != Leader {
  626. r.mu.RUnlock()
  627. return
  628. }
  629. r.mu.RUnlock()
  630. r.sendHeartbeats()
  631. r.heartbeatTimer.Reset(r.config.HeartbeatInterval)
  632. case <-r.commitCh:
  633. r.updateCommitIndex()
  634. }
  635. }
  636. }
  637. // becomeFollower transitions to follower state
  638. func (r *Raft) becomeFollower(term uint64) {
  639. oldState := r.state
  640. oldTerm := r.currentTerm
  641. r.state = Follower
  642. r.currentTerm = term
  643. // Update metrics
  644. atomic.StoreUint64(&r.metrics.Term, term)
  645. r.votedFor = ""
  646. r.leaderID = ""
  647. // Must persist before responding - use mustPersistState for critical transitions
  648. r.mustPersistState()
  649. r.resetElectionTimer()
  650. // Clear leadership transfer state
  651. r.transferring = false
  652. r.transferTarget = ""
  653. // Only log significant state changes
  654. if oldState == Leader && term > oldTerm {
  655. // Stepping down from leader is notable
  656. r.logger.Debug("Stepped down from leader in term %d", term)
  657. }
  658. }
  659. // becomeLeader transitions to leader state
  660. func (r *Raft) becomeLeader() {
  661. r.state = Leader
  662. r.leaderID = r.nodeID
  663. // Update metrics
  664. atomic.AddUint64(&r.metrics.ElectionsWon, 1)
  665. // Initialize leader state for all peers
  666. lastIndex := r.log.LastIndex()
  667. for nodeID, addr := range r.clusterNodes {
  668. if nodeID != r.nodeID {
  669. r.nextIndex[addr] = lastIndex + 1
  670. r.matchIndex[addr] = 0
  671. }
  672. }
  673. // Also handle legacy peers
  674. for _, peer := range r.peers {
  675. if _, exists := r.nextIndex[peer]; !exists {
  676. r.nextIndex[peer] = lastIndex + 1
  677. r.matchIndex[peer] = 0
  678. }
  679. }
  680. r.logger.Info("Became leader in term %d with %d peers", r.currentTerm, len(r.clusterNodes)-1)
  681. // Append a no-op entry to commit entries from previous terms
  682. // This is a standard Raft optimization - the leader appends a no-op
  683. // entry in its current term to quickly commit all pending entries
  684. noopEntry := LogEntry{
  685. Index: r.log.LastIndex() + 1,
  686. Term: r.currentTerm,
  687. Type: EntryNoop,
  688. Command: nil,
  689. }
  690. if err := r.log.Append(noopEntry); err != nil {
  691. r.logger.Error("Failed to append no-op entry: %v", err)
  692. } else {
  693. r.logger.Debug("Appended no-op entry at index %d for term %d", noopEntry.Index, noopEntry.Term)
  694. }
  695. }
  696. // sendHeartbeats sends AppendEntries RPCs to all peers
  697. func (r *Raft) sendHeartbeats() {
  698. r.mu.RLock()
  699. if r.state != Leader {
  700. r.mu.RUnlock()
  701. return
  702. }
  703. currentTerm := r.currentTerm
  704. leaderCommit := r.commitIndex
  705. // Get all peer addresses
  706. peerAddrs := make([]string, 0, len(r.clusterNodes))
  707. for nodeID, addr := range r.clusterNodes {
  708. if nodeID != r.nodeID {
  709. peerAddrs = append(peerAddrs, addr)
  710. }
  711. }
  712. // Also include legacy peers not in clusterNodes
  713. for _, peer := range r.peers {
  714. found := false
  715. for _, addr := range peerAddrs {
  716. if addr == peer {
  717. found = true
  718. break
  719. }
  720. }
  721. if !found {
  722. peerAddrs = append(peerAddrs, peer)
  723. }
  724. }
  725. r.mu.RUnlock()
  726. for _, peer := range peerAddrs {
  727. go r.replicateToPeer(peer, currentTerm, leaderCommit)
  728. }
  729. }
  730. // replicateToPeer sends AppendEntries to a specific peer
  731. func (r *Raft) replicateToPeer(peer string, term, leaderCommit uint64) {
  732. r.mu.RLock()
  733. if r.state != Leader || r.currentTerm != term {
  734. r.mu.RUnlock()
  735. return
  736. }
  737. nextIndex := r.nextIndex[peer]
  738. r.mu.RUnlock()
  739. // Get entries to send
  740. entries, prevLogIndex, prevLogTerm, err := r.log.GetEntriesForFollower(nextIndex, r.config.MaxLogEntriesPerRequest)
  741. if err == ErrCompacted {
  742. // Need to send snapshot
  743. r.sendSnapshot(peer)
  744. return
  745. }
  746. if err != nil {
  747. r.logger.Debug("Failed to get entries for %s: %v", peer, err)
  748. return
  749. }
  750. args := &AppendEntriesArgs{
  751. Term: term,
  752. LeaderID: r.nodeID,
  753. PrevLogIndex: prevLogIndex,
  754. PrevLogTerm: prevLogTerm,
  755. Entries: entries,
  756. LeaderCommit: leaderCommit,
  757. }
  758. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  759. defer cancel()
  760. reply, err := r.transport.AppendEntries(ctx, peer, args)
  761. if err != nil {
  762. r.logger.Debug("AppendEntries to %s failed: %v", peer, err)
  763. return
  764. }
  765. atomic.AddInt64(&r.stats.AppendsSent, 1)
  766. r.mu.Lock()
  767. defer r.mu.Unlock()
  768. if reply.Term > r.currentTerm {
  769. r.becomeFollower(reply.Term)
  770. return
  771. }
  772. if r.state != Leader || r.currentTerm != term {
  773. return
  774. }
  775. if reply.Success {
  776. // Update contact time
  777. r.lastContact[peer] = time.Now()
  778. // Update nextIndex and matchIndex
  779. if len(entries) > 0 {
  780. newMatchIndex := entries[len(entries)-1].Index
  781. if newMatchIndex > r.matchIndex[peer] {
  782. r.matchIndex[peer] = newMatchIndex
  783. r.nextIndex[peer] = newMatchIndex + 1
  784. // Try to update commit index
  785. select {
  786. case r.commitCh <- struct{}{}:
  787. default:
  788. }
  789. }
  790. }
  791. } else {
  792. // Even if failed (log conflict), we contacted the peer
  793. r.lastContact[peer] = time.Now()
  794. // Decrement nextIndex and retry
  795. if reply.ConflictTerm > 0 {
  796. // Find the last entry of ConflictTerm in our log
  797. found := false
  798. for idx := r.log.LastIndex(); idx >= r.log.FirstIndex(); idx-- {
  799. t, err := r.log.GetTerm(idx)
  800. if err != nil {
  801. break
  802. }
  803. if t == reply.ConflictTerm {
  804. r.nextIndex[peer] = idx + 1
  805. found = true
  806. break
  807. }
  808. if t < reply.ConflictTerm {
  809. break
  810. }
  811. }
  812. if !found {
  813. r.nextIndex[peer] = reply.ConflictIndex
  814. }
  815. } else if reply.ConflictIndex > 0 {
  816. r.nextIndex[peer] = reply.ConflictIndex
  817. } else {
  818. r.nextIndex[peer] = max(1, r.nextIndex[peer]-1)
  819. }
  820. }
  821. }
  822. // sendSnapshot sends a snapshot to a peer with chunked transfer support
  823. func (r *Raft) sendSnapshot(peer string) {
  824. r.mu.RLock()
  825. if r.state != Leader {
  826. r.mu.RUnlock()
  827. return
  828. }
  829. term := r.currentTerm
  830. chunkSize := r.config.SnapshotChunkSize
  831. if chunkSize <= 0 {
  832. chunkSize = 1024 * 1024 // Default 1MB
  833. }
  834. r.mu.RUnlock()
  835. data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
  836. if err != nil {
  837. r.logger.Error("Failed to get snapshot: %v", err)
  838. return
  839. }
  840. atomic.AddUint64(&r.metrics.SnapshotsSent, 1)
  841. r.logger.Info("Sending snapshot to %s: %d bytes, lastIndex=%d, lastTerm=%d",
  842. peer, len(data), lastIndex, lastTerm)
  843. // Send snapshot in chunks
  844. totalSize := len(data)
  845. offset := 0
  846. for offset < totalSize {
  847. // Check if we're still leader
  848. r.mu.RLock()
  849. if r.state != Leader || r.currentTerm != term {
  850. r.mu.RUnlock()
  851. r.logger.Debug("Aborting snapshot send: no longer leader")
  852. return
  853. }
  854. r.mu.RUnlock()
  855. // Calculate chunk size
  856. end := offset + chunkSize
  857. if end > totalSize {
  858. end = totalSize
  859. }
  860. chunk := data[offset:end]
  861. done := end >= totalSize
  862. args := &InstallSnapshotArgs{
  863. Term: term,
  864. LeaderID: r.nodeID,
  865. LastIncludedIndex: lastIndex,
  866. LastIncludedTerm: lastTerm,
  867. Offset: uint64(offset),
  868. Data: chunk,
  869. Done: done,
  870. }
  871. ctx, cancel := context.WithTimeout(context.Background(), r.config.SnapshotRPCTimeout)
  872. reply, err := r.transport.InstallSnapshot(ctx, peer, args)
  873. cancel()
  874. if err != nil {
  875. r.logger.Error("InstallSnapshot chunk to %s failed at offset %d: %v", peer, offset, err)
  876. return
  877. }
  878. if reply.Term > r.currentTerm {
  879. r.mu.Lock()
  880. r.becomeFollower(reply.Term)
  881. r.mu.Unlock()
  882. return
  883. }
  884. if !reply.Success {
  885. r.logger.Error("InstallSnapshot chunk rejected by %s at offset %d", peer, offset)
  886. return
  887. }
  888. offset = end
  889. }
  890. // Snapshot fully sent and accepted
  891. r.mu.Lock()
  892. defer r.mu.Unlock()
  893. if r.state != Leader || r.currentTerm != term {
  894. return
  895. }
  896. r.nextIndex[peer] = lastIndex + 1
  897. r.matchIndex[peer] = lastIndex
  898. r.logger.Info("Snapshot to %s completed, nextIndex=%d", peer, lastIndex+1)
  899. }
  900. // updateCommitIndex updates the commit index based on matchIndex
  901. func (r *Raft) updateCommitIndex() {
  902. r.mu.Lock()
  903. defer r.mu.Unlock()
  904. if r.state != Leader {
  905. return
  906. }
  907. // For config change entries, use the OLD cluster for majority calculation
  908. // This ensures that adding a node doesn't require the new node's vote
  909. // until the config change itself is committed
  910. votingNodes := r.clusterNodes
  911. if r.pendingConfigChange && r.oldClusterNodes != nil {
  912. votingNodes = r.oldClusterNodes
  913. }
  914. // Get current cluster size from voting nodes
  915. clusterSize := len(votingNodes)
  916. if clusterSize == 0 {
  917. clusterSize = len(r.peers) + 1
  918. }
  919. // Find the highest index replicated on a majority
  920. for n := r.log.LastIndex(); n > r.commitIndex; n-- {
  921. term, err := r.log.GetTerm(n)
  922. if err != nil {
  923. continue
  924. }
  925. // Only commit entries from current term
  926. if term != r.currentTerm {
  927. continue
  928. }
  929. // Count replicas (including self)
  930. count := 1 // Self
  931. for nodeID, addr := range votingNodes {
  932. if nodeID != r.nodeID {
  933. if r.matchIndex[addr] >= n {
  934. count++
  935. }
  936. }
  937. }
  938. // Also check legacy peers
  939. for _, peer := range r.peers {
  940. // Avoid double counting if peer is already in votingNodes
  941. alreadyCounted := false
  942. for _, addr := range votingNodes {
  943. if addr == peer {
  944. alreadyCounted = true
  945. break
  946. }
  947. }
  948. if !alreadyCounted && r.matchIndex[peer] >= n {
  949. count++
  950. }
  951. }
  952. // Majority is (n/2) + 1
  953. needed := clusterSize/2 + 1
  954. if count >= needed {
  955. // If commitIndex advances, notify apply loop
  956. if n > r.commitIndex {
  957. r.commitIndex = n
  958. select {
  959. case r.applyNotifyCh <- struct{}{}:
  960. default:
  961. }
  962. }
  963. break
  964. }
  965. }
  966. }
  967. // applyLoop applies committed entries to the state machine
  968. // Uses event-driven model with fallback polling for reliability
  969. func (r *Raft) applyLoop() {
  970. // Use a ticker as fallback for missed signals (matches original 10ms polling)
  971. ticker := time.NewTicker(10 * time.Millisecond)
  972. defer ticker.Stop()
  973. for {
  974. select {
  975. case <-r.stopCh:
  976. return
  977. case <-r.applyNotifyCh:
  978. // Event-driven: triggered when commitIndex is updated
  979. r.applyCommitted()
  980. case <-ticker.C:
  981. // Fallback: check periodically in case we missed a signal
  982. r.applyCommitted()
  983. }
  984. }
  985. }
  986. // applyCommitted applies all committed but not yet applied entries
  987. func (r *Raft) applyCommitted() {
  988. r.mu.Lock()
  989. commitIndex := r.commitIndex
  990. lastApplied := r.lastApplied
  991. firstIndex := r.log.FirstIndex()
  992. lastLogIndex := r.log.LastIndex()
  993. r.mu.Unlock()
  994. // Safety check: ensure lastApplied is within valid range
  995. // If lastApplied is below firstIndex (due to snapshot), skip to firstIndex
  996. if lastApplied < firstIndex && firstIndex > 0 {
  997. r.mu.Lock()
  998. r.lastApplied = firstIndex
  999. lastApplied = firstIndex
  1000. r.mu.Unlock()
  1001. r.logger.Debug("Adjusted lastApplied to firstIndex %d after compaction", firstIndex)
  1002. }
  1003. // Safety check: don't try to apply beyond what we have in log
  1004. if commitIndex > lastLogIndex {
  1005. commitIndex = lastLogIndex
  1006. }
  1007. for lastApplied < commitIndex {
  1008. lastApplied++
  1009. // Skip if entry has been compacted
  1010. if lastApplied < firstIndex {
  1011. continue
  1012. }
  1013. entry, err := r.log.GetEntry(lastApplied)
  1014. if err != nil {
  1015. // If entry is compacted, skip ahead
  1016. if err == ErrCompacted {
  1017. r.mu.Lock()
  1018. newFirstIndex := r.log.FirstIndex()
  1019. if lastApplied < newFirstIndex {
  1020. r.lastApplied = newFirstIndex
  1021. lastApplied = newFirstIndex
  1022. }
  1023. r.mu.Unlock()
  1024. continue
  1025. }
  1026. r.logger.Error("Failed to get entry %d: %v (firstIndex=%d, lastIndex=%d)",
  1027. lastApplied, err, r.log.FirstIndex(), r.log.LastIndex())
  1028. break
  1029. }
  1030. // Handle config change entries
  1031. if entry.Type == EntryConfig {
  1032. r.applyConfigChange(entry)
  1033. r.mu.Lock()
  1034. r.lastApplied = lastApplied
  1035. r.mu.Unlock()
  1036. continue
  1037. }
  1038. // Handle no-op entries (just update lastApplied, don't send to state machine)
  1039. if entry.Type == EntryNoop {
  1040. r.mu.Lock()
  1041. r.lastApplied = lastApplied
  1042. r.mu.Unlock()
  1043. continue
  1044. }
  1045. // Normal command entry
  1046. msg := ApplyMsg{
  1047. CommandValid: true,
  1048. Command: entry.Command,
  1049. CommandIndex: entry.Index,
  1050. CommandTerm: entry.Term,
  1051. }
  1052. select {
  1053. case r.applyCh <- msg:
  1054. case <-r.stopCh:
  1055. return
  1056. }
  1057. r.mu.Lock()
  1058. r.lastApplied = lastApplied
  1059. r.mu.Unlock()
  1060. }
  1061. // Check if log compaction is needed
  1062. // Run asynchronously to avoid blocking the apply loop if snapshotting is slow
  1063. go r.maybeCompactLog()
  1064. }
  1065. // maybeCompactLog checks if automatic log compaction should be triggered
  1066. // It uses a dynamic threshold to prevent "compaction thrashing":
  1067. // - First compaction triggers at SnapshotThreshold (default 100,000)
  1068. // - After compaction, next threshold = current_log_size * 1.5
  1069. // - This prevents every new entry from triggering compaction if log stays large
  1070. func (r *Raft) maybeCompactLog() {
  1071. // Skip if no snapshot provider is configured
  1072. if r.config.SnapshotProvider == nil {
  1073. return
  1074. }
  1075. // Skip if log compaction is explicitly disabled
  1076. if !r.config.LogCompactionEnabled {
  1077. return
  1078. }
  1079. // Check if compaction is already in progress (atomic check-and-set)
  1080. if !atomic.CompareAndSwapInt32(&r.compacting, 0, 1) {
  1081. return
  1082. }
  1083. // Ensure we release the compacting flag when done
  1084. defer atomic.StoreInt32(&r.compacting, 0)
  1085. r.mu.RLock()
  1086. lastApplied := r.lastApplied
  1087. firstIndex := r.log.FirstIndex()
  1088. initialThreshold := r.config.SnapshotThreshold
  1089. minRetention := r.config.SnapshotMinRetention
  1090. isLeader := r.state == Leader
  1091. dynamicThreshold := r.nextCompactionThreshold
  1092. r.mu.RUnlock()
  1093. // Guard against underflow: ensure lastApplied > firstIndex
  1094. if lastApplied <= firstIndex {
  1095. return
  1096. }
  1097. // Calculate current log size
  1098. logSize := lastApplied - firstIndex
  1099. // Determine effective threshold:
  1100. // - Use initial threshold if no compaction has occurred yet (dynamicThreshold == 0)
  1101. // - Otherwise use the dynamic threshold
  1102. effectiveThreshold := initialThreshold
  1103. if dynamicThreshold > 0 {
  1104. effectiveThreshold = dynamicThreshold
  1105. }
  1106. // Check if we have enough entries to warrant compaction
  1107. if logSize <= effectiveThreshold {
  1108. return
  1109. }
  1110. // Guard against underflow: ensure lastApplied > minRetention
  1111. if lastApplied <= minRetention {
  1112. return
  1113. }
  1114. // Calculate the safe compaction point
  1115. // We need to retain at least minRetention entries for follower catch-up
  1116. compactUpTo := lastApplied - minRetention
  1117. if compactUpTo <= firstIndex {
  1118. return // Not enough entries to compact while maintaining retention
  1119. }
  1120. // For leader, also consider the minimum nextIndex of all followers
  1121. // to avoid compacting entries that followers still need
  1122. if isLeader {
  1123. r.mu.RLock()
  1124. minNextIndex := lastApplied
  1125. for _, nextIdx := range r.nextIndex {
  1126. if nextIdx < minNextIndex {
  1127. minNextIndex = nextIdx
  1128. }
  1129. }
  1130. r.mu.RUnlock()
  1131. // Don't compact entries that followers still need
  1132. // Keep a buffer of minRetention entries before the slowest follower
  1133. if minNextIndex > minRetention {
  1134. followerSafePoint := minNextIndex - minRetention
  1135. if followerSafePoint < compactUpTo {
  1136. compactUpTo = followerSafePoint
  1137. }
  1138. } else {
  1139. // Slowest follower is too far behind, don't compact
  1140. // They will need a full snapshot anyway
  1141. compactUpTo = firstIndex // Effectively skip compaction
  1142. }
  1143. }
  1144. // Final check - make sure we're actually compacting something meaningful
  1145. if compactUpTo <= firstIndex {
  1146. return
  1147. }
  1148. // Get snapshot from application layer
  1149. snapshotData, err := r.config.SnapshotProvider(compactUpTo)
  1150. if err != nil {
  1151. r.logger.Error("Failed to get snapshot from provider: %v", err)
  1152. return
  1153. }
  1154. // Get the term at the compaction point
  1155. term, err := r.log.GetTerm(compactUpTo)
  1156. if err != nil {
  1157. r.logger.Error("Failed to get term for compaction index %d: %v", compactUpTo, err)
  1158. return
  1159. }
  1160. // Save snapshot
  1161. if err := r.storage.SaveSnapshot(snapshotData, compactUpTo, term); err != nil {
  1162. r.logger.Error("Failed to save snapshot: %v", err)
  1163. return
  1164. }
  1165. // Compact the log
  1166. // CRITICAL: We must hold the write lock during compaction to prevent
  1167. // concurrent writes (AppendEntries) unless we can guarantee underlying
  1168. // file integrity with concurrent modification. The requirement is to
  1169. // pause writes during compaction.
  1170. r.mu.Lock()
  1171. if err := r.log.Compact(compactUpTo); err != nil {
  1172. r.mu.Unlock()
  1173. r.logger.Error("Failed to compact log: %v", err)
  1174. return
  1175. }
  1176. r.mu.Unlock()
  1177. // Calculate new log size after compaction
  1178. newLogSize := lastApplied - compactUpTo
  1179. // Update dynamic threshold for next compaction
  1180. // We use a linear growth model: next threshold = current size + SnapshotThreshold
  1181. // This ensures that we trigger compaction roughly every SnapshotThreshold entries.
  1182. r.mu.Lock()
  1183. r.nextCompactionThreshold = newLogSize + r.config.SnapshotThreshold
  1184. // Ensure threshold doesn't go below the initial threshold
  1185. if r.nextCompactionThreshold < initialThreshold {
  1186. r.nextCompactionThreshold = initialThreshold
  1187. }
  1188. r.mu.Unlock()
  1189. r.logger.Info("Auto compaction completed: compacted up to index %d (term %d), log size %d -> %d, next threshold: %d",
  1190. compactUpTo, term, logSize, newLogSize, r.nextCompactionThreshold)
  1191. }
  1192. // resetElectionTimer resets the election timeout
  1193. func (r *Raft) resetElectionTimer() {
  1194. timeout := r.config.ElectionTimeoutMin +
  1195. time.Duration(rand.Int63n(int64(r.config.ElectionTimeoutMax-r.config.ElectionTimeoutMin)))
  1196. if r.electionTimer == nil {
  1197. r.electionTimer = time.NewTimer(timeout)
  1198. } else {
  1199. if !r.electionTimer.Stop() {
  1200. select {
  1201. case <-r.electionTimer.C:
  1202. default:
  1203. }
  1204. }
  1205. r.electionTimer.Reset(timeout)
  1206. }
  1207. }
  1208. // persistState saves the current state to stable storage
  1209. // Returns error if persistence fails - caller MUST handle this for safety
  1210. func (r *Raft) persistState() error {
  1211. state := &PersistentState{
  1212. CurrentTerm: r.currentTerm,
  1213. VotedFor: r.votedFor,
  1214. }
  1215. if err := r.storage.SaveState(state); err != nil {
  1216. r.logger.Error("Failed to persist state: %v", err)
  1217. return fmt.Errorf("%w: %v", ErrPersistFailed, err)
  1218. }
  1219. return nil
  1220. }
  1221. // mustPersistState saves state and panics on failure
  1222. // Use this only in critical paths where failure is unrecoverable
  1223. func (r *Raft) mustPersistState() {
  1224. if err := r.persistState(); err != nil {
  1225. // In production, you might want to trigger a graceful shutdown instead
  1226. r.logger.Error("CRITICAL: Failed to persist state, node may be in inconsistent state: %v", err)
  1227. panic(err)
  1228. }
  1229. }
  1230. // HandleRequestVote handles RequestVote RPCs (including pre-vote)
  1231. func (r *Raft) HandleRequestVote(args *RequestVoteArgs) *RequestVoteReply {
  1232. r.mu.Lock()
  1233. defer r.mu.Unlock()
  1234. reply := &RequestVoteReply{
  1235. Term: r.currentTerm,
  1236. VoteGranted: false,
  1237. }
  1238. // Handle pre-vote separately
  1239. if args.PreVote {
  1240. return r.handlePreVote(args)
  1241. }
  1242. // Reply false if term < currentTerm
  1243. if args.Term < r.currentTerm {
  1244. return reply
  1245. }
  1246. // If term > currentTerm, become follower
  1247. if args.Term > r.currentTerm {
  1248. r.becomeFollower(args.Term)
  1249. }
  1250. reply.Term = r.currentTerm
  1251. // Check if we can vote for this candidate
  1252. if (r.votedFor == "" || r.votedFor == args.CandidateID) &&
  1253. r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
  1254. r.votedFor = args.CandidateID
  1255. if err := r.persistState(); err != nil {
  1256. // Cannot grant vote if we can't persist the decision
  1257. r.logger.Error("Failed to persist vote for %s: %v", args.CandidateID, err)
  1258. return reply
  1259. }
  1260. r.resetElectionTimer()
  1261. reply.VoteGranted = true
  1262. r.logger.Debug("Granted vote to %s for term %d", args.CandidateID, args.Term)
  1263. }
  1264. return reply
  1265. }
  1266. // handlePreVote handles pre-vote requests
  1267. // Pre-vote doesn't change our state, just checks if we would vote
  1268. func (r *Raft) handlePreVote(args *RequestVoteArgs) *RequestVoteReply {
  1269. reply := &RequestVoteReply{
  1270. Term: r.currentTerm,
  1271. VoteGranted: false,
  1272. }
  1273. // For pre-vote, we check:
  1274. // 1. The candidate's term is at least as high as ours
  1275. // 2. The candidate's log is at least as up-to-date as ours
  1276. // 3. We don't have a current leader (or the candidate's term is higher)
  1277. if args.Term < r.currentTerm {
  1278. return reply
  1279. }
  1280. // Per Raft Pre-Vote optimization (§9.6): reject pre-vote if we have a current
  1281. // leader and the candidate's term is not higher than ours. This prevents
  1282. // disruptive elections when a partitioned node tries to rejoin.
  1283. if r.leaderID != "" && args.Term <= r.currentTerm {
  1284. r.logger.Debug("Rejecting pre-vote from %s: have leader %s", args.CandidateID, r.leaderID)
  1285. return reply
  1286. }
  1287. // Grant pre-vote if log is up-to-date
  1288. // Note: we don't check votedFor for pre-vote, and we don't update any state
  1289. if r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
  1290. reply.VoteGranted = true
  1291. r.logger.Debug("Granted pre-vote to %s for term %d", args.CandidateID, args.Term)
  1292. }
  1293. return reply
  1294. }
  1295. // HandleAppendEntries handles AppendEntries RPCs
  1296. func (r *Raft) HandleAppendEntries(args *AppendEntriesArgs) *AppendEntriesReply {
  1297. r.mu.Lock()
  1298. defer r.mu.Unlock()
  1299. atomic.AddInt64(&r.stats.AppendsReceived, 1)
  1300. reply := &AppendEntriesReply{
  1301. Term: r.currentTerm,
  1302. Success: false,
  1303. }
  1304. // Check if we're a standalone node being added to a cluster
  1305. // A standalone node has only itself in clusterNodes
  1306. isStandalone := len(r.clusterNodes) == 1
  1307. if _, hasSelf := r.clusterNodes[r.nodeID]; isStandalone && hasSelf {
  1308. // We're standalone and receiving AppendEntries from an external leader
  1309. // This means we're being added to a cluster - suppress elections
  1310. if !r.joiningCluster {
  1311. r.joiningCluster = true
  1312. r.joiningClusterTime = time.Now()
  1313. r.logger.Info("Detected cluster join in progress, suppressing elections")
  1314. }
  1315. // When joining, accept higher terms from the leader to sync up
  1316. if args.Term > r.currentTerm {
  1317. r.becomeFollower(args.Term)
  1318. }
  1319. }
  1320. // Reply false if term < currentTerm
  1321. if args.Term < r.currentTerm {
  1322. // But still reset timer if we're joining a cluster to prevent elections
  1323. if r.joiningCluster {
  1324. r.resetElectionTimer()
  1325. }
  1326. return reply
  1327. }
  1328. // If term > currentTerm, or we're a candidate, or we're a leader receiving
  1329. // AppendEntries from another leader (split-brain scenario during cluster merge),
  1330. // become follower. In Raft, there can only be one leader per term.
  1331. if args.Term > r.currentTerm || r.state == Candidate || r.state == Leader {
  1332. r.becomeFollower(args.Term)
  1333. }
  1334. // Update leader info and reset election timer
  1335. r.leaderID = args.LeaderID
  1336. r.lastHeartbeat = time.Now()
  1337. r.resetElectionTimer()
  1338. reply.Term = r.currentTerm
  1339. // Try to append entries
  1340. success, conflictIndex, conflictTerm := r.log.AppendEntriesFromLeader(
  1341. args.PrevLogIndex, args.PrevLogTerm, args.Entries)
  1342. if !success {
  1343. reply.ConflictIndex = conflictIndex
  1344. reply.ConflictTerm = conflictTerm
  1345. return reply
  1346. }
  1347. reply.Success = true
  1348. // Update commit index safely
  1349. if args.LeaderCommit > r.commitIndex {
  1350. // Get our actual last log index
  1351. lastLogIndex := r.log.LastIndex()
  1352. // Calculate what index the entries would have reached
  1353. lastNewEntry := args.PrevLogIndex
  1354. if len(args.Entries) > 0 {
  1355. lastNewEntry = args.Entries[len(args.Entries)-1].Index
  1356. }
  1357. // Commit index should not exceed what we actually have in log
  1358. newCommitIndex := args.LeaderCommit
  1359. if newCommitIndex > lastNewEntry {
  1360. newCommitIndex = lastNewEntry
  1361. }
  1362. if newCommitIndex > lastLogIndex {
  1363. newCommitIndex = lastLogIndex
  1364. }
  1365. // Only advance commit index
  1366. if newCommitIndex > r.commitIndex {
  1367. r.commitIndex = newCommitIndex
  1368. // Notify apply loop
  1369. select {
  1370. case r.applyNotifyCh <- struct{}{}:
  1371. default:
  1372. }
  1373. }
  1374. }
  1375. return reply
  1376. }
  1377. // HandleInstallSnapshot handles InstallSnapshot RPCs with chunked transfer support
  1378. func (r *Raft) HandleInstallSnapshot(args *InstallSnapshotArgs) *InstallSnapshotReply {
  1379. r.mu.Lock()
  1380. defer r.mu.Unlock()
  1381. reply := &InstallSnapshotReply{
  1382. Term: r.currentTerm,
  1383. Success: false,
  1384. }
  1385. if args.Term < r.currentTerm {
  1386. return reply
  1387. }
  1388. if args.Term > r.currentTerm {
  1389. r.becomeFollower(args.Term)
  1390. }
  1391. r.leaderID = args.LeaderID
  1392. r.lastHeartbeat = time.Now()
  1393. r.resetElectionTimer()
  1394. reply.Term = r.currentTerm
  1395. // Skip if we already have this or a newer snapshot applied
  1396. if args.LastIncludedIndex <= r.lastApplied {
  1397. r.logger.Debug("Ignoring snapshot at index %d, already applied up to %d",
  1398. args.LastIncludedIndex, r.lastApplied)
  1399. reply.Success = true // Still success to let leader know we don't need it
  1400. return reply
  1401. }
  1402. // Handle chunked transfer
  1403. if args.Offset == 0 {
  1404. // First chunk - start new pending snapshot
  1405. r.pendingSnapshot = &pendingSnapshotState{
  1406. lastIncludedIndex: args.LastIncludedIndex,
  1407. lastIncludedTerm: args.LastIncludedTerm,
  1408. data: make([]byte, 0),
  1409. receivedBytes: 0,
  1410. }
  1411. r.logger.Info("Starting snapshot reception at index %d, term %d",
  1412. args.LastIncludedIndex, args.LastIncludedTerm)
  1413. }
  1414. // Validate we're receiving the expected snapshot
  1415. if r.pendingSnapshot == nil ||
  1416. r.pendingSnapshot.lastIncludedIndex != args.LastIncludedIndex ||
  1417. r.pendingSnapshot.lastIncludedTerm != args.LastIncludedTerm {
  1418. r.logger.Warn("Unexpected snapshot chunk: expected index %d, got %d",
  1419. r.pendingSnapshot.lastIncludedIndex, args.LastIncludedIndex)
  1420. return reply
  1421. }
  1422. // Validate offset matches what we've received
  1423. if uint64(args.Offset) != r.pendingSnapshot.receivedBytes {
  1424. r.logger.Warn("Unexpected chunk offset: expected %d, got %d",
  1425. r.pendingSnapshot.receivedBytes, args.Offset)
  1426. return reply
  1427. }
  1428. // Append chunk data
  1429. r.pendingSnapshot.data = append(r.pendingSnapshot.data, args.Data...)
  1430. r.pendingSnapshot.receivedBytes += uint64(len(args.Data))
  1431. reply.Success = true
  1432. r.logger.Debug("Received snapshot chunk: offset=%d, size=%d, done=%v",
  1433. args.Offset, len(args.Data), args.Done)
  1434. // If not done, wait for more chunks
  1435. if !args.Done {
  1436. return reply
  1437. }
  1438. // All chunks received - apply the snapshot
  1439. r.logger.Info("Installing complete snapshot: %d bytes at index %d, term %d",
  1440. len(r.pendingSnapshot.data), args.LastIncludedIndex, args.LastIncludedTerm)
  1441. atomic.AddUint64(&r.metrics.SnapshotsInstalled, 1)
  1442. // Save snapshot
  1443. if err := r.storage.SaveSnapshot(r.pendingSnapshot.data, args.LastIncludedIndex, args.LastIncludedTerm); err != nil {
  1444. r.logger.Error("Failed to save snapshot: %v", err)
  1445. r.pendingSnapshot = nil
  1446. reply.Success = false
  1447. return reply
  1448. }
  1449. // Compact log
  1450. if err := r.log.Compact(args.LastIncludedIndex); err != nil {
  1451. r.logger.Error("Failed to compact log: %v", err)
  1452. }
  1453. // Update state - must update both commitIndex and lastApplied
  1454. if args.LastIncludedIndex > r.commitIndex {
  1455. r.commitIndex = args.LastIncludedIndex
  1456. // Notify apply loop
  1457. select {
  1458. case r.applyNotifyCh <- struct{}{}:
  1459. default:
  1460. }
  1461. }
  1462. // Always update lastApplied to snapshot index to prevent trying to apply compacted entries
  1463. r.lastApplied = args.LastIncludedIndex
  1464. // Send snapshot to application (non-blocking with timeout)
  1465. // Use the complete pendingSnapshot data, not the last chunk
  1466. msg := ApplyMsg{
  1467. SnapshotValid: true,
  1468. Snapshot: r.pendingSnapshot.data,
  1469. SnapshotIndex: args.LastIncludedIndex,
  1470. SnapshotTerm: args.LastIncludedTerm,
  1471. }
  1472. // Clear pending snapshot
  1473. r.pendingSnapshot = nil
  1474. // Try to send, but don't block indefinitely
  1475. select {
  1476. case r.applyCh <- msg:
  1477. r.logger.Debug("Sent snapshot to application")
  1478. case <-time.After(100 * time.Millisecond):
  1479. r.logger.Warn("Timeout sending snapshot to application, will retry")
  1480. // The application will still get correct state via normal apply loop
  1481. }
  1482. return reply
  1483. }
  1484. // Propose proposes a new command to be replicated
  1485. func (r *Raft) Propose(command []byte) (uint64, uint64, bool) {
  1486. r.mu.Lock()
  1487. defer r.mu.Unlock()
  1488. if r.state != Leader {
  1489. return 0, 0, false
  1490. }
  1491. // Check connectivity (Lease Check)
  1492. // If we haven't heard from a majority of peers within ElectionTimeout,
  1493. // we shouldn't accept new commands because we might be partitioned.
  1494. if len(r.clusterNodes) > 1 {
  1495. activePeers := 1 // Self
  1496. now := time.Now()
  1497. timeout := r.config.ElectionTimeoutMax
  1498. for _, peer := range r.peers {
  1499. if last, ok := r.lastContact[peer]; ok && now.Sub(last) < timeout {
  1500. activePeers++
  1501. }
  1502. }
  1503. // Check majority
  1504. needed := len(r.clusterNodes)/2 + 1
  1505. if activePeers < needed {
  1506. r.logger.Warn("Rejecting Propose: lost contact with majority (active: %d, needed: %d)", activePeers, needed)
  1507. return 0, 0, false
  1508. }
  1509. }
  1510. index, err := r.log.AppendCommand(r.currentTerm, command)
  1511. if err != nil {
  1512. r.logger.Error("Failed to append command: %v", err)
  1513. return 0, 0, false
  1514. }
  1515. r.matchIndex[r.nodeID] = index
  1516. // For single-node cluster, we are the only voter and can commit immediately
  1517. // This fixes the issue where commitCh never gets triggered without other peers
  1518. if len(r.clusterNodes) <= 1 && len(r.peers) == 0 {
  1519. // Single node: self is majority, trigger commit immediately
  1520. select {
  1521. case r.commitCh <- struct{}{}:
  1522. default:
  1523. }
  1524. } else {
  1525. // Multi-node: trigger replication to other nodes
  1526. r.triggerReplication()
  1527. }
  1528. return index, r.currentTerm, true
  1529. }
  1530. // triggerReplication signals the replication loop to send heartbeats
  1531. // This uses a non-blocking send to batch replication requests
  1532. func (r *Raft) triggerReplication() {
  1533. select {
  1534. case r.replicationCh <- struct{}{}:
  1535. default:
  1536. // Replication already scheduled
  1537. }
  1538. }
  1539. // replicationLoop handles batched replication
  1540. // Uses simple delay-based batching: flush immediately when signaled, then wait
  1541. // to allow more requests to accumulate before the next flush.
  1542. func (r *Raft) replicationLoop() {
  1543. for {
  1544. select {
  1545. case <-r.stopCh:
  1546. return
  1547. case <-r.replicationCh:
  1548. // Flush and replicate immediately
  1549. r.flushAndReplicate()
  1550. // Wait briefly to allow batching of subsequent requests
  1551. // This gives time for more proposals to queue up before the next flush
  1552. time.Sleep(1 * time.Millisecond)
  1553. }
  1554. }
  1555. }
  1556. // flushAndReplicate flushes logs and sends heartbeats
  1557. func (r *Raft) flushAndReplicate() {
  1558. // Ensure logs are flushed to OS cache before sending to followers
  1559. // This implements Group Commit with Flush (fast) instead of Sync (slow)
  1560. if err := r.log.Flush(); err != nil {
  1561. r.logger.Error("Failed to flush log: %v", err)
  1562. }
  1563. r.sendHeartbeats()
  1564. }
  1565. // ProposeWithForward proposes a command, forwarding to leader if necessary
  1566. // This is the recommended method for applications to use
  1567. func (r *Raft) ProposeWithForward(command []byte) (index uint64, term uint64, err error) {
  1568. r.mu.RLock()
  1569. // Check if we are part of the cluster
  1570. // If we are not in clusterNodes, we shouldn't accept data commands
  1571. if _, exists := r.clusterNodes[r.nodeID]; !exists {
  1572. r.mu.RUnlock()
  1573. return 0, 0, fmt.Errorf("node %s is not part of the cluster", r.nodeID)
  1574. }
  1575. r.mu.RUnlock()
  1576. // Try local propose first
  1577. idx, t, isLeader := r.Propose(command)
  1578. if isLeader {
  1579. return idx, t, nil
  1580. }
  1581. // Not leader, forward to leader
  1582. r.mu.RLock()
  1583. leaderID := r.leaderID
  1584. // Use clusterNodes (dynamically maintained) to find leader address
  1585. leaderAddr := r.clusterNodes[leaderID]
  1586. r.mu.RUnlock()
  1587. if leaderID == "" {
  1588. return 0, 0, fmt.Errorf("no leader available")
  1589. }
  1590. if leaderAddr == "" {
  1591. return 0, 0, fmt.Errorf("leader %s address not found in cluster", leaderID)
  1592. }
  1593. // Forward to leader
  1594. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  1595. defer cancel()
  1596. args := &ProposeArgs{Command: command}
  1597. reply, err := r.transport.ForwardPropose(ctx, leaderAddr, args)
  1598. if err != nil {
  1599. return 0, 0, fmt.Errorf("forward failed: %w", err)
  1600. }
  1601. if !reply.Success {
  1602. return 0, 0, fmt.Errorf("leader rejected: %s", reply.Error)
  1603. }
  1604. return reply.Index, reply.Term, nil
  1605. }
  1606. // ForwardGet forwards a get request to the leader
  1607. func (r *Raft) ForwardGet(key string) (string, bool, error) {
  1608. r.mu.RLock()
  1609. // Check if we are part of the cluster
  1610. if _, exists := r.clusterNodes[r.nodeID]; !exists {
  1611. r.mu.RUnlock()
  1612. return "", false, fmt.Errorf("node %s is not part of the cluster", r.nodeID)
  1613. }
  1614. r.mu.RUnlock()
  1615. // Check if we are leader (local read)
  1616. if r.state == Leader {
  1617. if r.config.GetHandler != nil {
  1618. val, found := r.config.GetHandler(key)
  1619. return val, found, nil
  1620. }
  1621. return "", false, fmt.Errorf("get handler not configured")
  1622. }
  1623. r.mu.RLock()
  1624. leaderID := r.leaderID
  1625. leaderAddr := r.clusterNodes[leaderID]
  1626. r.mu.RUnlock()
  1627. if leaderID == "" {
  1628. return "", false, ErrNoLeader
  1629. }
  1630. if leaderAddr == "" {
  1631. return "", false, fmt.Errorf("leader %s address not found", leaderID)
  1632. }
  1633. // Forward to leader
  1634. ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
  1635. defer cancel()
  1636. args := &GetArgs{Key: key}
  1637. reply, err := r.transport.ForwardGet(ctx, leaderAddr, args)
  1638. if err != nil {
  1639. return "", false, fmt.Errorf("forward failed: %w", err)
  1640. }
  1641. return reply.Value, reply.Found, nil
  1642. }
  1643. // HandlePropose handles forwarded propose requests
  1644. func (r *Raft) HandlePropose(args *ProposeArgs) *ProposeReply {
  1645. index, term, isLeader := r.Propose(args.Command)
  1646. if !isLeader {
  1647. return &ProposeReply{
  1648. Success: false,
  1649. Error: "not leader",
  1650. }
  1651. }
  1652. return &ProposeReply{
  1653. Success: true,
  1654. Index: index,
  1655. Term: term,
  1656. }
  1657. }
  1658. // HandleAddNode handles forwarded AddNode requests
  1659. func (r *Raft) HandleAddNode(args *AddNodeArgs) *AddNodeReply {
  1660. err := r.AddNode(args.NodeID, args.Address)
  1661. if err != nil {
  1662. return &AddNodeReply{
  1663. Success: false,
  1664. Error: err.Error(),
  1665. }
  1666. }
  1667. return &AddNodeReply{
  1668. Success: true,
  1669. }
  1670. }
  1671. // HandleRemoveNode handles forwarded RemoveNode requests
  1672. func (r *Raft) HandleRemoveNode(args *RemoveNodeArgs) *RemoveNodeReply {
  1673. err := r.RemoveNode(args.NodeID)
  1674. if err != nil {
  1675. return &RemoveNodeReply{
  1676. Success: false,
  1677. Error: err.Error(),
  1678. }
  1679. }
  1680. return &RemoveNodeReply{
  1681. Success: true,
  1682. }
  1683. }
  1684. // GetState returns the current term and whether this node is leader
  1685. func (r *Raft) GetState() (uint64, bool) {
  1686. r.mu.RLock()
  1687. defer r.mu.RUnlock()
  1688. return r.currentTerm, r.state == Leader
  1689. }
  1690. // GetLeaderID returns the current leader ID
  1691. func (r *Raft) GetLeaderID() string {
  1692. r.mu.RLock()
  1693. defer r.mu.RUnlock()
  1694. return r.leaderID
  1695. }
  1696. // GetStats returns runtime statistics
  1697. func (r *Raft) GetStats() Stats {
  1698. r.mu.RLock()
  1699. defer r.mu.RUnlock()
  1700. lastIndex, lastTerm := r.log.LastIndexAndTerm()
  1701. firstIndex := r.log.FirstIndex()
  1702. // Copy cluster nodes
  1703. nodes := make(map[string]string)
  1704. for k, v := range r.clusterNodes {
  1705. nodes[k] = v
  1706. }
  1707. clusterSize := len(r.clusterNodes)
  1708. if clusterSize == 0 {
  1709. clusterSize = len(r.peers) + 1
  1710. }
  1711. return Stats{
  1712. Term: r.currentTerm,
  1713. State: r.state.String(),
  1714. FirstLogIndex: firstIndex,
  1715. LastLogIndex: lastIndex,
  1716. LastLogTerm: lastTerm,
  1717. CommitIndex: r.commitIndex,
  1718. LastApplied: r.lastApplied,
  1719. LeaderID: r.leaderID,
  1720. AppendsSent: atomic.LoadInt64(&r.stats.AppendsSent),
  1721. AppendsReceived: atomic.LoadInt64(&r.stats.AppendsReceived),
  1722. ClusterSize: clusterSize,
  1723. ClusterNodes: nodes,
  1724. }
  1725. }
  1726. // restoreFromSnapshot restores the FSM from a snapshot at startup
  1727. // This is called during Start() to ensure the FSM has the correct state
  1728. // before processing any new commands
  1729. func (r *Raft) restoreFromSnapshot() error {
  1730. // Get snapshot from storage
  1731. data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
  1732. if err != nil {
  1733. return fmt.Errorf("failed to get snapshot: %w", err)
  1734. }
  1735. // No snapshot exists
  1736. if len(data) == 0 || lastIndex == 0 {
  1737. return nil
  1738. }
  1739. r.logger.Info("Restoring FSM from snapshot at index %d, term %d (%d bytes)",
  1740. lastIndex, lastTerm, len(data))
  1741. // Update lastApplied to snapshot index to prevent re-applying compacted entries
  1742. r.mu.Lock()
  1743. if lastIndex > r.lastApplied {
  1744. r.lastApplied = lastIndex
  1745. }
  1746. if lastIndex > r.commitIndex {
  1747. r.commitIndex = lastIndex
  1748. }
  1749. r.mu.Unlock()
  1750. // Send snapshot to FSM for restoration
  1751. // Use a goroutine with timeout to avoid blocking if applyCh is full
  1752. msg := ApplyMsg{
  1753. SnapshotValid: true,
  1754. Snapshot: data,
  1755. SnapshotIndex: lastIndex,
  1756. SnapshotTerm: lastTerm,
  1757. }
  1758. // Try to send with a timeout
  1759. select {
  1760. case r.applyCh <- msg:
  1761. r.logger.Info("FSM restoration triggered from snapshot at index %d", lastIndex)
  1762. case <-time.After(5 * time.Second):
  1763. return fmt.Errorf("timeout sending snapshot to applyCh")
  1764. }
  1765. return nil
  1766. }
  1767. // TakeSnapshot takes a snapshot of the current state
  1768. func (r *Raft) TakeSnapshot(data []byte, index uint64) error {
  1769. r.mu.Lock()
  1770. defer r.mu.Unlock()
  1771. if index > r.lastApplied {
  1772. return fmt.Errorf("snapshot index %d exceeds lastApplied %d", index, r.lastApplied)
  1773. }
  1774. term, err := r.log.GetTerm(index)
  1775. if err != nil {
  1776. return fmt.Errorf("failed to get term for index %d: %w", index, err)
  1777. }
  1778. if err := r.storage.SaveSnapshot(data, index, term); err != nil {
  1779. return fmt.Errorf("failed to save snapshot: %w", err)
  1780. }
  1781. if err := r.log.Compact(index); err != nil {
  1782. return fmt.Errorf("failed to compact log: %w", err)
  1783. }
  1784. r.logger.Info("Took snapshot at index %d, term %d", index, term)
  1785. return nil
  1786. }
  1787. func max(a, b uint64) uint64 {
  1788. if a > b {
  1789. return a
  1790. }
  1791. return b
  1792. }
  1793. // ==================== Membership Change API ====================
  1794. //
  1795. // This implementation uses Single-Node Membership Change (also known as one-at-a-time changes)
  1796. // as described in the Raft dissertation (§4.3). This is safe because:
  1797. //
  1798. // 1. We only allow one configuration change at a time (pendingConfigChange flag)
  1799. // 2. For commits, we use the OLD cluster majority until the config change is committed
  1800. // 3. The new node starts receiving entries immediately but doesn't affect majority calculation
  1801. //
  1802. // This approach is simpler than Joint Consensus and is sufficient for most use cases.
  1803. // The invariant maintained is: any two majorities (old or new) must overlap.
  1804. //
  1805. // For adding a node: old majority = N/2+1, new majority = (N+1)/2+1 = N/2+1 (overlaps)
  1806. // For removing a node: old majority = N/2+1, new majority = (N-1)/2+1 (overlaps if N > 1)
  1807. //
  1808. // WARNING: Avoid adding/removing multiple nodes rapidly. Wait for each change to be committed.
  1809. // AddNode adds a new node to the cluster
  1810. // This can only be called on the leader
  1811. // The new node must already be running and reachable
  1812. //
  1813. // Safety guarantees:
  1814. // - Only one config change can be in progress at a time
  1815. // - The old cluster majority is used until the config change is committed
  1816. // - Returns error if leadership is lost during the operation
  1817. func (r *Raft) AddNode(nodeID, address string) error {
  1818. r.mu.Lock()
  1819. // Must be leader
  1820. if r.state != Leader {
  1821. leaderID := r.leaderID
  1822. r.mu.Unlock()
  1823. return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  1824. }
  1825. // Check if we're in the middle of a leadership transfer
  1826. if r.transferring {
  1827. r.mu.Unlock()
  1828. return fmt.Errorf("leadership transfer in progress")
  1829. }
  1830. // Check if there's already a pending config change
  1831. if r.pendingConfigChange {
  1832. r.mu.Unlock()
  1833. return ErrConfigInFlight
  1834. }
  1835. // Validate nodeID and address
  1836. if nodeID == "" {
  1837. r.mu.Unlock()
  1838. return fmt.Errorf("nodeID cannot be empty")
  1839. }
  1840. if address == "" {
  1841. r.mu.Unlock()
  1842. return fmt.Errorf("address cannot be empty")
  1843. }
  1844. // Check if node already exists
  1845. if _, exists := r.clusterNodes[nodeID]; exists {
  1846. r.mu.Unlock()
  1847. return fmt.Errorf("node %s already exists in cluster", nodeID)
  1848. }
  1849. // Check if address is already used by another node
  1850. for existingID, existingAddr := range r.clusterNodes {
  1851. if existingAddr == address {
  1852. r.mu.Unlock()
  1853. return fmt.Errorf("address %s is already used by node %s", address, existingID)
  1854. }
  1855. }
  1856. // Save old cluster nodes for majority calculation during config change
  1857. // This ensures we use the OLD cluster size until the config change is committed
  1858. r.oldClusterNodes = make(map[string]string)
  1859. for k, v := range r.clusterNodes {
  1860. r.oldClusterNodes[k] = v
  1861. }
  1862. // Create new config with the added node
  1863. newNodes := make(map[string]string)
  1864. for k, v := range r.clusterNodes {
  1865. newNodes[k] = v
  1866. }
  1867. newNodes[nodeID] = address
  1868. // Create config change entry with ClusterConfig
  1869. configIndex := r.log.LastIndex() + 1
  1870. entry := LogEntry{
  1871. Index: configIndex,
  1872. Term: r.currentTerm,
  1873. Type: EntryConfig,
  1874. Config: &ClusterConfig{Nodes: newNodes},
  1875. }
  1876. // Mark config change as pending and store the index
  1877. r.pendingConfigChange = true
  1878. r.configChangeIndex = configIndex
  1879. // Immediately apply the new configuration (for single-node changes, this is safe)
  1880. // The new node will start receiving AppendEntries immediately
  1881. r.clusterNodes[nodeID] = address
  1882. r.peers = append(r.peers, address)
  1883. // Set nextIndex to 1 (or firstIndex) so the new node syncs from the beginning
  1884. // This is crucial - the new node's log is empty, so we must start from index 1
  1885. firstIndex := r.log.FirstIndex()
  1886. if firstIndex == 0 {
  1887. firstIndex = 1
  1888. }
  1889. r.nextIndex[address] = firstIndex
  1890. r.matchIndex[address] = 0
  1891. r.mu.Unlock()
  1892. // Append the config change entry to log
  1893. if err := r.log.Append(entry); err != nil {
  1894. r.mu.Lock()
  1895. r.pendingConfigChange = false
  1896. r.oldClusterNodes = nil
  1897. r.configChangeIndex = 0
  1898. // Rollback
  1899. delete(r.clusterNodes, nodeID)
  1900. r.rebuildPeersList()
  1901. r.mu.Unlock()
  1902. return fmt.Errorf("failed to append config entry: %w", err)
  1903. }
  1904. r.logger.Info("Adding node %s (%s) to cluster", nodeID, address)
  1905. // Trigger immediate replication
  1906. r.triggerReplication()
  1907. return nil
  1908. }
  1909. // RemoveNode removes a node from the cluster
  1910. // This can only be called on the leader
  1911. // The node being removed can be any node except the leader itself
  1912. //
  1913. // Safety guarantees:
  1914. // - Only one config change can be in progress at a time
  1915. // - Cannot remove the leader (transfer leadership first)
  1916. // - Cannot reduce cluster to 0 nodes
  1917. // - The old cluster majority is used until the config change is committed
  1918. func (r *Raft) RemoveNode(nodeID string) error {
  1919. r.mu.Lock()
  1920. // Must be leader
  1921. if r.state != Leader {
  1922. leaderID := r.leaderID
  1923. r.mu.Unlock()
  1924. return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  1925. }
  1926. // Check if we're in the middle of a leadership transfer
  1927. if r.transferring {
  1928. r.mu.Unlock()
  1929. return fmt.Errorf("leadership transfer in progress")
  1930. }
  1931. // Cannot remove self - return error immediately WITHOUT logging or appending entry
  1932. if nodeID == r.nodeID {
  1933. r.mu.Unlock()
  1934. return fmt.Errorf("cannot remove self from cluster, use TransferLeadership first")
  1935. }
  1936. // Validate nodeID
  1937. if nodeID == "" {
  1938. r.mu.Unlock()
  1939. return fmt.Errorf("nodeID cannot be empty")
  1940. }
  1941. // Check if there's already a pending config change
  1942. if r.pendingConfigChange {
  1943. r.mu.Unlock()
  1944. return ErrConfigInFlight
  1945. }
  1946. // Check if node exists
  1947. if _, exists := r.clusterNodes[nodeID]; !exists {
  1948. r.mu.Unlock()
  1949. return fmt.Errorf("node %s not found in cluster", nodeID)
  1950. }
  1951. // FORCE REMOVAL CHECK
  1952. // Check connectivity to the node being removed.
  1953. // If unreachable or invalid, we force remove it without standard consensus if needed,
  1954. // or at least bypass some strict checks.
  1955. // For Raft safety, we still need to commit a config change log entry so other nodes know about the removal.
  1956. // However, if the node is "invalid" (e.g. bad address), it won't respond to RPCs.
  1957. // We should allow removal even if we can't contact it.
  1958. // Check validity of address (simple heuristic)
  1959. nodeAddr := r.clusterNodes[nodeID]
  1960. isValidNode := true
  1961. if nodeAddr == "" || strings.HasPrefix(nodeAddr, ".") || !strings.Contains(nodeAddr, ":") {
  1962. isValidNode = false
  1963. r.logger.Warn("Removing invalid node %s with address %s", nodeID, nodeAddr)
  1964. }
  1965. // Cannot reduce cluster below 1 node, unless it's an invalid node cleanup
  1966. if isValidNode && len(r.clusterNodes) <= 1 {
  1967. r.mu.Unlock()
  1968. return fmt.Errorf("cannot remove last node from cluster")
  1969. }
  1970. // Save old cluster nodes for majority calculation during config change
  1971. r.oldClusterNodes = make(map[string]string)
  1972. for k, v := range r.clusterNodes {
  1973. r.oldClusterNodes[k] = v
  1974. }
  1975. // Create new config without the removed node
  1976. newNodes := make(map[string]string)
  1977. for k, v := range r.clusterNodes {
  1978. if k != nodeID {
  1979. newNodes[k] = v
  1980. }
  1981. }
  1982. // Create config change entry with ClusterConfig
  1983. configIndex := r.log.LastIndex() + 1
  1984. entry := LogEntry{
  1985. Index: configIndex,
  1986. Term: r.currentTerm,
  1987. Type: EntryConfig,
  1988. Config: &ClusterConfig{Nodes: newNodes},
  1989. }
  1990. // Mark config change as pending and store the index
  1991. r.pendingConfigChange = true
  1992. r.configChangeIndex = configIndex
  1993. // Get the address of node being removed for cleanup
  1994. removedAddr := r.clusterNodes[nodeID]
  1995. // Immediately apply the new configuration
  1996. delete(r.clusterNodes, nodeID)
  1997. r.rebuildPeersList()
  1998. delete(r.nextIndex, removedAddr)
  1999. delete(r.matchIndex, removedAddr)
  2000. r.mu.Unlock()
  2001. // Append the config change entry to log
  2002. if err := r.log.Append(entry); err != nil {
  2003. r.mu.Lock()
  2004. r.pendingConfigChange = false
  2005. r.oldClusterNodes = nil
  2006. r.configChangeIndex = 0
  2007. // Rollback - this is tricky but we try our best
  2008. r.clusterNodes[nodeID] = removedAddr
  2009. r.rebuildPeersList()
  2010. r.mu.Unlock()
  2011. return fmt.Errorf("failed to append config entry: %w", err)
  2012. }
  2013. r.logger.Info("Removing node %s from cluster, config at index %d", nodeID, entry.Index)
  2014. // Trigger replication
  2015. go r.sendHeartbeats()
  2016. return nil
  2017. }
  2018. // AddNodeWithForward adds a node, forwarding to leader if necessary
  2019. // This is the recommended method for applications to use
  2020. func (r *Raft) AddNodeWithForward(nodeID, address string) error {
  2021. // Try local operation first
  2022. err := r.AddNode(nodeID, address)
  2023. if err == nil {
  2024. return nil
  2025. }
  2026. // Check if we're not the leader
  2027. r.mu.RLock()
  2028. state := r.state
  2029. leaderID := r.leaderID
  2030. leaderAddr := r.clusterNodes[leaderID]
  2031. r.mu.RUnlock()
  2032. if state == Leader {
  2033. // We are leader but AddNode failed for other reasons
  2034. return err
  2035. }
  2036. // Not leader, forward to leader
  2037. if leaderID == "" {
  2038. return fmt.Errorf("no leader available")
  2039. }
  2040. if leaderAddr == "" {
  2041. return fmt.Errorf("leader %s address not found in cluster", leaderID)
  2042. }
  2043. // Forward to leader
  2044. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  2045. defer cancel()
  2046. args := &AddNodeArgs{NodeID: nodeID, Address: address}
  2047. reply, err := r.transport.ForwardAddNode(ctx, leaderAddr, args)
  2048. if err != nil {
  2049. return fmt.Errorf("forward failed: %w", err)
  2050. }
  2051. if !reply.Success {
  2052. return fmt.Errorf("leader rejected: %s", reply.Error)
  2053. }
  2054. return nil
  2055. }
  2056. // RemoveNodeWithForward removes a node, forwarding to leader if necessary
  2057. // This is the recommended method for applications to use
  2058. func (r *Raft) RemoveNodeWithForward(nodeID string) error {
  2059. // Try local operation first
  2060. err := r.RemoveNode(nodeID)
  2061. if err == nil {
  2062. return nil
  2063. }
  2064. // Check if we're not the leader
  2065. r.mu.RLock()
  2066. state := r.state
  2067. leaderID := r.leaderID
  2068. leaderAddr := r.clusterNodes[leaderID]
  2069. r.mu.RUnlock()
  2070. if state == Leader {
  2071. // We are leader but RemoveNode failed for other reasons
  2072. return err
  2073. }
  2074. // Not leader, forward to leader
  2075. if leaderID == "" {
  2076. return fmt.Errorf("no leader available")
  2077. }
  2078. if leaderAddr == "" {
  2079. return fmt.Errorf("leader %s address not found in cluster", leaderID)
  2080. }
  2081. // Forward to leader
  2082. // Short timeout for removal as it shouldn't block long
  2083. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  2084. defer cancel()
  2085. args := &RemoveNodeArgs{NodeID: nodeID}
  2086. reply, err := r.transport.ForwardRemoveNode(ctx, leaderAddr, args)
  2087. if err != nil {
  2088. return fmt.Errorf("forward failed: %w", err)
  2089. }
  2090. if !reply.Success {
  2091. return fmt.Errorf("leader rejected: %s", reply.Error)
  2092. }
  2093. return nil
  2094. }
  2095. // rebuildPeersList rebuilds the peers slice from clusterNodes
  2096. func (r *Raft) rebuildPeersList() {
  2097. r.peers = make([]string, 0, len(r.clusterNodes)-1)
  2098. for nodeID, addr := range r.clusterNodes {
  2099. if nodeID != r.nodeID {
  2100. r.peers = append(r.peers, addr)
  2101. }
  2102. }
  2103. }
  2104. // GetClusterNodes returns a copy of the current cluster membership
  2105. func (r *Raft) GetClusterNodes() map[string]string {
  2106. r.mu.RLock()
  2107. defer r.mu.RUnlock()
  2108. nodes := make(map[string]string)
  2109. for k, v := range r.clusterNodes {
  2110. nodes[k] = v
  2111. }
  2112. return nodes
  2113. }
  2114. // applyConfigChange applies a configuration change entry
  2115. func (r *Raft) applyConfigChange(entry *LogEntry) {
  2116. if entry.Config == nil || entry.Config.Nodes == nil {
  2117. r.logger.Warn("Invalid config change entry at index %d", entry.Index)
  2118. return
  2119. }
  2120. r.mu.Lock()
  2121. defer r.mu.Unlock()
  2122. // Update cluster configuration
  2123. r.clusterNodes = make(map[string]string)
  2124. for k, v := range entry.Config.Nodes {
  2125. r.clusterNodes[k] = v
  2126. }
  2127. r.rebuildPeersList()
  2128. // Persist the new configuration
  2129. if err := r.storage.SaveClusterConfig(&ClusterConfig{Nodes: r.clusterNodes}); err != nil {
  2130. r.logger.Error("Failed to persist cluster config: %v", err)
  2131. }
  2132. // Clear pending flag and old cluster state
  2133. r.pendingConfigChange = false
  2134. r.oldClusterNodes = nil
  2135. r.configChangeIndex = 0
  2136. // If we were joining a cluster and now have multiple nodes, we've successfully joined
  2137. if r.joiningCluster && len(r.clusterNodes) > 1 {
  2138. r.joiningCluster = false
  2139. r.logger.Info("Successfully joined cluster with %d nodes", len(r.clusterNodes))
  2140. }
  2141. // Check if we have been removed from the cluster
  2142. if _, exists := r.clusterNodes[r.nodeID]; !exists {
  2143. r.logger.Warn("Node %s removed from cluster configuration", r.nodeID)
  2144. // We could shut down here, or just stay as a listener.
  2145. // For now, let's step down to Follower if we are not already.
  2146. if r.state != Follower {
  2147. r.becomeFollower(r.currentTerm)
  2148. }
  2149. }
  2150. r.logger.Info("Applied config change at index %d, cluster now has %d nodes", entry.Index, len(r.clusterNodes))
  2151. // If we're the leader, update leader state
  2152. if r.state == Leader {
  2153. // Check if we are still in the cluster
  2154. if _, exists := r.clusterNodes[r.nodeID]; !exists {
  2155. r.logger.Warn("Leader removed from cluster (self), stepping down")
  2156. r.becomeFollower(r.currentTerm)
  2157. return
  2158. }
  2159. // Initialize nextIndex/matchIndex for any new nodes
  2160. lastIndex := r.log.LastIndex()
  2161. for nodeID, addr := range r.clusterNodes {
  2162. if nodeID != r.nodeID {
  2163. if _, exists := r.nextIndex[addr]; !exists {
  2164. r.nextIndex[addr] = lastIndex + 1
  2165. r.matchIndex[addr] = 0
  2166. }
  2167. }
  2168. }
  2169. // Clean up removed nodes
  2170. validAddrs := make(map[string]bool)
  2171. for nodeID, addr := range r.clusterNodes {
  2172. if nodeID != r.nodeID {
  2173. validAddrs[addr] = true
  2174. }
  2175. }
  2176. for addr := range r.nextIndex {
  2177. if !validAddrs[addr] {
  2178. delete(r.nextIndex, addr)
  2179. delete(r.matchIndex, addr)
  2180. }
  2181. }
  2182. }
  2183. }
  2184. // ==================== ReadIndex (Linearizable Reads) ====================
  2185. // readIndexLoop handles read index requests
  2186. func (r *Raft) readIndexLoop() {
  2187. for {
  2188. select {
  2189. case <-r.stopCh:
  2190. return
  2191. case req := <-r.readIndexCh:
  2192. r.processReadIndexRequest(req)
  2193. }
  2194. }
  2195. }
  2196. // processReadIndexRequest processes a single read index request
  2197. func (r *Raft) processReadIndexRequest(req *readIndexRequest) {
  2198. r.mu.RLock()
  2199. if r.state != Leader {
  2200. r.mu.RUnlock()
  2201. req.done <- ErrNotLeader
  2202. return
  2203. }
  2204. r.mu.RUnlock()
  2205. // Confirm leadership by sending heartbeats and waiting for majority ack
  2206. if !r.confirmLeadership() {
  2207. req.done <- ErrLeadershipLost
  2208. return
  2209. }
  2210. // Wait for apply to catch up to readIndex
  2211. if err := r.waitApply(req.readIndex, r.config.ProposeTimeout); err != nil {
  2212. req.done <- err
  2213. return
  2214. }
  2215. req.done <- nil
  2216. }
  2217. // ReadIndex implements linearizable reads
  2218. // It ensures that the read sees all writes that were committed before the read started
  2219. func (r *Raft) ReadIndex() (uint64, error) {
  2220. r.mu.RLock()
  2221. if r.state != Leader {
  2222. leaderID := r.leaderID
  2223. r.mu.RUnlock()
  2224. return 0, NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
  2225. }
  2226. readIndex := r.commitIndex
  2227. r.mu.RUnlock()
  2228. atomic.AddUint64(&r.metrics.ReadIndexRequests, 1)
  2229. // Create request and send to processing loop
  2230. req := &readIndexRequest{
  2231. readIndex: readIndex,
  2232. done: make(chan error, 1),
  2233. }
  2234. select {
  2235. case r.readIndexCh <- req:
  2236. case <-r.stopCh:
  2237. return 0, ErrShutdown
  2238. case <-time.After(r.config.ProposeTimeout):
  2239. return 0, ErrTimeout
  2240. }
  2241. // Wait for result
  2242. select {
  2243. case err := <-req.done:
  2244. if err != nil {
  2245. return 0, err
  2246. }
  2247. atomic.AddUint64(&r.metrics.ReadIndexSuccess, 1)
  2248. return readIndex, nil
  2249. case <-r.stopCh:
  2250. return 0, ErrShutdown
  2251. case <-time.After(r.config.ProposeTimeout):
  2252. return 0, ErrTimeout
  2253. }
  2254. }
  2255. // confirmLeadership confirms we're still leader by getting acks from majority
  2256. func (r *Raft) confirmLeadership() bool {
  2257. r.mu.RLock()
  2258. if r.state != Leader {
  2259. r.mu.RUnlock()
  2260. return false
  2261. }
  2262. currentTerm := r.currentTerm
  2263. clusterSize := len(r.clusterNodes)
  2264. if clusterSize == 0 {
  2265. clusterSize = len(r.peers) + 1
  2266. }
  2267. r.mu.RUnlock()
  2268. // Single node cluster - we're always the leader
  2269. if clusterSize == 1 {
  2270. return true
  2271. }
  2272. // Send heartbeats and count acks
  2273. r.sendHeartbeats()
  2274. // Wait briefly and check if we're still leader
  2275. time.Sleep(r.config.HeartbeatInterval)
  2276. r.mu.RLock()
  2277. stillLeader := r.state == Leader && r.currentTerm == currentTerm
  2278. r.mu.RUnlock()
  2279. return stillLeader
  2280. }
  2281. // waitApply waits until lastApplied >= index
  2282. func (r *Raft) waitApply(index uint64, timeout time.Duration) error {
  2283. deadline := time.Now().Add(timeout)
  2284. for {
  2285. r.mu.RLock()
  2286. lastApplied := r.lastApplied
  2287. r.mu.RUnlock()
  2288. if lastApplied >= index {
  2289. return nil
  2290. }
  2291. if time.Now().After(deadline) {
  2292. return ErrTimeout
  2293. }
  2294. time.Sleep(1 * time.Millisecond)
  2295. }
  2296. }
  2297. // ==================== Health Check ====================
  2298. // HealthCheck returns the current health status of the node
  2299. func (r *Raft) HealthCheck() HealthStatus {
  2300. r.mu.RLock()
  2301. defer r.mu.RUnlock()
  2302. clusterNodes := make(map[string]string)
  2303. for k, v := range r.clusterNodes {
  2304. clusterNodes[k] = v
  2305. }
  2306. clusterSize := len(r.clusterNodes)
  2307. if clusterSize == 0 {
  2308. clusterSize = len(r.peers) + 1
  2309. }
  2310. logBehind := uint64(0)
  2311. if r.commitIndex > r.lastApplied {
  2312. logBehind = r.commitIndex - r.lastApplied
  2313. }
  2314. // Consider healthy if we're leader or have a known leader
  2315. isHealthy := r.state == Leader || r.leaderID != ""
  2316. return HealthStatus{
  2317. NodeID: r.nodeID,
  2318. State: r.state.String(),
  2319. Term: r.currentTerm,
  2320. LeaderID: r.leaderID,
  2321. ClusterSize: clusterSize,
  2322. ClusterNodes: clusterNodes,
  2323. CommitIndex: r.commitIndex,
  2324. LastApplied: r.lastApplied,
  2325. LogBehind: logBehind,
  2326. LastHeartbeat: r.lastHeartbeat,
  2327. IsHealthy: isHealthy,
  2328. Uptime: time.Since(r.startTime),
  2329. }
  2330. }
  2331. // GetMetrics returns the current metrics
  2332. func (r *Raft) GetMetrics() Metrics {
  2333. return Metrics{
  2334. Term: atomic.LoadUint64(&r.metrics.Term),
  2335. ProposalsTotal: atomic.LoadUint64(&r.metrics.ProposalsTotal),
  2336. ProposalsSuccess: atomic.LoadUint64(&r.metrics.ProposalsSuccess),
  2337. ProposalsFailed: atomic.LoadUint64(&r.metrics.ProposalsFailed),
  2338. ProposalsForwarded: atomic.LoadUint64(&r.metrics.ProposalsForwarded),
  2339. AppendsSent: atomic.LoadUint64(&r.metrics.AppendsSent),
  2340. AppendsReceived: atomic.LoadUint64(&r.metrics.AppendsReceived),
  2341. AppendsSuccess: atomic.LoadUint64(&r.metrics.AppendsSuccess),
  2342. AppendsFailed: atomic.LoadUint64(&r.metrics.AppendsFailed),
  2343. ElectionsStarted: atomic.LoadUint64(&r.metrics.ElectionsStarted),
  2344. ElectionsWon: atomic.LoadUint64(&r.metrics.ElectionsWon),
  2345. PreVotesStarted: atomic.LoadUint64(&r.metrics.PreVotesStarted),
  2346. PreVotesGranted: atomic.LoadUint64(&r.metrics.PreVotesGranted),
  2347. SnapshotsTaken: atomic.LoadUint64(&r.metrics.SnapshotsTaken),
  2348. SnapshotsInstalled: atomic.LoadUint64(&r.metrics.SnapshotsInstalled),
  2349. SnapshotsSent: atomic.LoadUint64(&r.metrics.SnapshotsSent),
  2350. ReadIndexRequests: atomic.LoadUint64(&r.metrics.ReadIndexRequests),
  2351. ReadIndexSuccess: atomic.LoadUint64(&r.metrics.ReadIndexSuccess),
  2352. LeadershipTransfers: atomic.LoadUint64(&r.metrics.LeadershipTransfers),
  2353. LeadershipTransferSuccess: atomic.LoadUint64(&r.metrics.LeadershipTransferSuccess),
  2354. }
  2355. }
  2356. // ==================== Leadership Transfer ====================
  2357. // TransferLeadership transfers leadership to the specified node
  2358. func (r *Raft) TransferLeadership(targetID string) error {
  2359. r.mu.Lock()
  2360. if r.state != Leader {
  2361. r.mu.Unlock()
  2362. return ErrNotLeader
  2363. }
  2364. if targetID == r.nodeID {
  2365. r.mu.Unlock()
  2366. return fmt.Errorf("cannot transfer to self")
  2367. }
  2368. targetAddr, exists := r.clusterNodes[targetID]
  2369. if !exists {
  2370. r.mu.Unlock()
  2371. return fmt.Errorf("target node %s not in cluster", targetID)
  2372. }
  2373. if r.transferring {
  2374. r.mu.Unlock()
  2375. return fmt.Errorf("leadership transfer already in progress")
  2376. }
  2377. r.transferring = true
  2378. r.transferTarget = targetID
  2379. r.transferDeadline = time.Now().Add(r.config.ElectionTimeoutMax * 2)
  2380. currentTerm := r.currentTerm
  2381. atomic.AddUint64(&r.metrics.LeadershipTransfers, 1)
  2382. r.mu.Unlock()
  2383. r.logger.Info("Starting leadership transfer to %s", targetID)
  2384. // Step 1: Sync target to our log
  2385. if err := r.syncFollowerToLatest(targetAddr); err != nil {
  2386. r.mu.Lock()
  2387. r.transferring = false
  2388. r.transferTarget = ""
  2389. r.mu.Unlock()
  2390. return fmt.Errorf("failed to sync target: %w", err)
  2391. }
  2392. // Step 2: Send TimeoutNow RPC
  2393. args := &TimeoutNowArgs{
  2394. Term: currentTerm,
  2395. LeaderID: r.nodeID,
  2396. }
  2397. ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
  2398. defer cancel()
  2399. reply, err := r.transport.TimeoutNow(ctx, targetAddr, args)
  2400. if err != nil {
  2401. r.mu.Lock()
  2402. r.transferring = false
  2403. r.transferTarget = ""
  2404. r.mu.Unlock()
  2405. return fmt.Errorf("TimeoutNow RPC failed: %w", err)
  2406. }
  2407. if !reply.Success {
  2408. r.mu.Lock()
  2409. r.transferring = false
  2410. r.transferTarget = ""
  2411. r.mu.Unlock()
  2412. return fmt.Errorf("target rejected leadership transfer")
  2413. }
  2414. atomic.AddUint64(&r.metrics.LeadershipTransferSuccess, 1)
  2415. r.logger.Info("Leadership transfer to %s initiated successfully", targetID)
  2416. // Note: We don't immediately step down; we wait for the target to win election
  2417. // and send us an AppendEntries with higher term
  2418. return nil
  2419. }
  2420. // syncFollowerToLatest ensures the follower is caught up to our log
  2421. func (r *Raft) syncFollowerToLatest(peerAddr string) error {
  2422. r.mu.RLock()
  2423. if r.state != Leader {
  2424. r.mu.RUnlock()
  2425. return ErrNotLeader
  2426. }
  2427. currentTerm := r.currentTerm
  2428. leaderCommit := r.commitIndex
  2429. lastIndex := r.log.LastIndex()
  2430. r.mu.RUnlock()
  2431. // Keep replicating until follower is caught up
  2432. deadline := time.Now().Add(r.config.ElectionTimeoutMax * 2)
  2433. for time.Now().Before(deadline) {
  2434. r.mu.RLock()
  2435. if r.state != Leader || r.currentTerm != currentTerm {
  2436. r.mu.RUnlock()
  2437. return ErrLeadershipLost
  2438. }
  2439. matchIndex := r.matchIndex[peerAddr]
  2440. r.mu.RUnlock()
  2441. if matchIndex >= lastIndex {
  2442. return nil // Caught up
  2443. }
  2444. // Trigger replication
  2445. r.replicateToPeer(peerAddr, currentTerm, leaderCommit)
  2446. time.Sleep(10 * time.Millisecond)
  2447. }
  2448. return ErrTimeout
  2449. }
  2450. // HandleTimeoutNow handles TimeoutNow RPC (for leadership transfer)
  2451. func (r *Raft) HandleTimeoutNow(args *TimeoutNowArgs) *TimeoutNowReply {
  2452. r.mu.Lock()
  2453. defer r.mu.Unlock()
  2454. reply := &TimeoutNowReply{
  2455. Term: r.currentTerm,
  2456. Success: false,
  2457. }
  2458. // Only accept if we're a follower and the term matches
  2459. if args.Term < r.currentTerm {
  2460. return reply
  2461. }
  2462. if r.state != Follower {
  2463. return reply
  2464. }
  2465. // Immediately start election
  2466. r.logger.Info("Received TimeoutNow from %s, starting immediate election", args.LeaderID)
  2467. r.state = Candidate
  2468. reply.Success = true
  2469. return reply
  2470. }
  2471. // HandleReadIndex handles ReadIndex RPC
  2472. func (r *Raft) HandleReadIndex(args *ReadIndexArgs) *ReadIndexReply {
  2473. reply := &ReadIndexReply{
  2474. Success: false,
  2475. }
  2476. readIndex, err := r.ReadIndex()
  2477. if err != nil {
  2478. reply.Error = err.Error()
  2479. return reply
  2480. }
  2481. reply.ReadIndex = readIndex
  2482. reply.Success = true
  2483. return reply
  2484. }
  2485. // HandleGet handles Get RPC for remote KV reads
  2486. func (r *Raft) HandleGet(args *GetArgs) *GetReply {
  2487. reply := &GetReply{
  2488. Found: false,
  2489. }
  2490. if r.config.GetHandler == nil {
  2491. reply.Error = "get handler not configured"
  2492. return reply
  2493. }
  2494. value, found := r.config.GetHandler(args.Key)
  2495. reply.Value = value
  2496. reply.Found = found
  2497. return reply
  2498. }