| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924 |
- package raft
- import (
- "context"
- "fmt"
- "math/rand"
- "os"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
- // Raft represents a Raft consensus node
- type Raft struct {
- mu sync.RWMutex
- // Node identity
- nodeID string
- peers []string
- // Cluster membership - maps nodeID to address
- clusterNodes map[string]string
- // Current state
- state NodeState
- currentTerm uint64
- votedFor string
- leaderID string
- // Log management
- log *LogManager
- storage Storage
- commitIndex uint64
- lastApplied uint64
- // Leader state
- nextIndex map[string]uint64
- matchIndex map[string]uint64
- // Configuration
- config *Config
- // Communication
- transport Transport
- // Channels
- applyCh chan ApplyMsg
- stopCh chan struct{}
- commitCh chan struct{}
- // Election timer
- electionTimer *time.Timer
- heartbeatTimer *time.Timer
- // Statistics (deprecated, use metrics instead)
- stats Stats
- // Metrics for monitoring
- metrics Metrics
- // Logger
- logger Logger
- // Running flag
- running int32
- // Replication trigger channel - used to batch replication requests
- replicationCh chan struct{}
- // Pending config change - only one config change can be pending at a time
- pendingConfigChange bool
- // Old cluster nodes - used for majority calculation during config change
- // This ensures we use the old cluster size until the config change is committed
- oldClusterNodes map[string]string
- // Index of the pending config change entry
- configChangeIndex uint64
- // Joining cluster state - when a standalone node is being added to a cluster
- // This prevents the node from starting elections while syncing
- joiningCluster bool
- joiningClusterTime time.Time
- // Compaction state - prevents concurrent compaction
- compacting int32
- // Dynamic compaction threshold - updated after each compaction
- // Next compaction triggers when log size >= this value
- // Formula: lastCompactionSize * 1.5 (or initial SnapshotThreshold)
- nextCompactionThreshold uint64
- // WaitGroup to track goroutines for clean shutdown
- wg sync.WaitGroup
- // Leadership transfer state
- transferring bool
- transferTarget string
- transferDeadline time.Time
- // Last heartbeat received (for health check)
- lastHeartbeat time.Time
- // Start time (for uptime calculation)
- startTime time.Time
- // ReadIndex waiting queue
- readIndexCh chan *readIndexRequest
- // Snapshot receiving state (for chunked transfer)
- pendingSnapshot *pendingSnapshotState
- // Last contact time for each peer (for leader check)
- lastContact map[string]time.Time
- }
- // readIndexRequest represents a pending read index request
- type readIndexRequest struct {
- readIndex uint64
- done chan error
- }
- // pendingSnapshotState tracks chunked snapshot reception
- type pendingSnapshotState struct {
- lastIncludedIndex uint64
- lastIncludedTerm uint64
- data []byte
- receivedBytes uint64
- }
- // Stats holds runtime statistics
- type Stats struct {
- Term uint64
- State string
- FirstLogIndex uint64 // Added for monitoring compaction
- LastLogIndex uint64
- LastLogTerm uint64
- CommitIndex uint64
- LastApplied uint64
- LeaderID string
- VotesReceived int
- AppendsSent int64
- AppendsReceived int64
- ClusterSize int // Number of nodes in cluster
- ClusterNodes map[string]string // NodeID -> Address mapping
- }
- // ConsoleLogger implements Logger with console output
- type ConsoleLogger struct {
- Prefix string
- Level int // 0=debug, 1=info, 2=warn, 3=error
- mu sync.Mutex
- }
- func NewConsoleLogger(prefix string, level int) *ConsoleLogger {
- return &ConsoleLogger{Prefix: prefix, Level: level}
- }
- func (l *ConsoleLogger) log(level int, levelStr, format string, args ...interface{}) {
- if level < l.Level {
- return
- }
- l.mu.Lock()
- defer l.mu.Unlock()
- var levelColor string
- // Define colors locally to avoid dependency issues if cli.go is excluded
- const (
- colorReset = "\033[0m"
- colorDim = "\033[90m"
- colorRed = "\033[31m"
- colorGreen = "\033[32m"
- colorYellow = "\033[33m"
- colorCyan = "\033[36m"
- )
- switch level {
- case 0: levelColor = colorDim
- case 1: levelColor = colorGreen
- case 2: levelColor = colorYellow
- case 3: levelColor = colorRed
- default: levelColor = colorReset
- }
- msg := fmt.Sprintf(format, args...)
-
- // Format: [Time] Node [LEVEL] Message
- // Aligned for better readability
- fmt.Printf("%s[%s]%s %s%-8s%s %s[%-5s]%s %s\n",
- colorDim, time.Now().Format("15:04:05.000"), colorReset,
- colorCyan, l.Prefix, colorReset,
- levelColor, levelStr, colorReset,
- msg)
- }
- func (l *ConsoleLogger) Debug(format string, args ...interface{}) {
- l.log(0, "DEBUG", format, args...)
- }
- func (l *ConsoleLogger) Info(format string, args ...interface{}) {
- l.log(1, "INFO", format, args...)
- }
- func (l *ConsoleLogger) Warn(format string, args ...interface{}) {
- l.log(2, "WARN", format, args...)
- }
- func (l *ConsoleLogger) Error(format string, args ...interface{}) {
- l.log(3, "ERROR", format, args...)
- }
- // NewRaft creates a new Raft node
- func NewRaft(config *Config, transport Transport, applyCh chan ApplyMsg) (*Raft, error) {
- if config.Logger == nil {
- config.Logger = NewConsoleLogger(config.NodeID, 1)
- }
- // Create storage
- storage, err := NewHybridStorage(config.DataDir, config.MemoryLogCapacity, config.Logger)
- if err != nil {
- return nil, fmt.Errorf("failed to create storage: %w", err)
- }
- // Create log manager
- logMgr := NewLogManager(storage, config.Logger)
- // Load persistent state
- state, err := storage.GetState()
- if err != nil {
- return nil, fmt.Errorf("failed to load state: %w", err)
- }
- // Load or initialize cluster configuration
- clusterNodes := make(map[string]string)
- // Try to load saved cluster config first
- savedConfig, err := storage.GetClusterConfig()
- if err != nil {
- return nil, fmt.Errorf("failed to load cluster config: %w", err)
- }
- if savedConfig != nil && len(savedConfig.Nodes) > 0 {
- // Use saved config
- clusterNodes = savedConfig.Nodes
- config.Logger.Info("Loaded cluster config with %d nodes", len(clusterNodes))
- } else {
- // Initialize from config
- if config.ClusterNodes != nil && len(config.ClusterNodes) > 0 {
- for k, v := range config.ClusterNodes {
- clusterNodes[k] = v
- }
- } else {
- // Build from Peers + self (backward compatibility)
- clusterNodes[config.NodeID] = config.ListenAddr
- if config.PeerMap != nil {
- for k, v := range config.PeerMap {
- clusterNodes[k] = v
- }
- }
- }
- // Save initial config
- if len(clusterNodes) > 0 {
- if err := storage.SaveClusterConfig(&ClusterConfig{Nodes: clusterNodes}); err != nil {
- config.Logger.Warn("Failed to save initial cluster config: %v", err)
- }
- }
- }
- // Build peers list from cluster nodes (excluding self)
- var peers []string
- for nodeID, addr := range clusterNodes {
- if nodeID != config.NodeID {
- peers = append(peers, addr)
- }
- }
- r := &Raft{
- nodeID: config.NodeID,
- peers: peers,
- clusterNodes: clusterNodes,
- state: Follower,
- currentTerm: state.CurrentTerm,
- votedFor: state.VotedFor,
- log: logMgr,
- storage: storage,
- config: config,
- transport: transport,
- applyCh: applyCh,
- stopCh: make(chan struct{}),
- commitCh: make(chan struct{}, 100), // Increased buffer for event-driven apply
- replicationCh: make(chan struct{}, 1),
- nextIndex: make(map[string]uint64),
- matchIndex: make(map[string]uint64),
- logger: config.Logger,
- readIndexCh: make(chan *readIndexRequest, 100),
- lastHeartbeat: time.Now(),
- lastContact: make(map[string]time.Time),
- }
- // Initialize metrics
- r.metrics.Term = state.CurrentTerm
- // Initialize lastApplied and commitIndex from config (if provided)
- if config.LastAppliedIndex > 0 {
- r.lastApplied = config.LastAppliedIndex
- r.commitIndex = config.LastAppliedIndex // Applied implies committed
- r.logger.Info("Initialized lastApplied and commitIndex to %d from config", config.LastAppliedIndex)
- }
- // Set RPC handler
- transport.SetRPCHandler(r)
- return r, nil
- }
- // Start starts the Raft node
- func (r *Raft) Start() error {
- if !atomic.CompareAndSwapInt32(&r.running, 0, 1) {
- return fmt.Errorf("already running")
- }
- // Record start time
- r.startTime = time.Now()
- // Start transport
- if err := r.transport.Start(); err != nil {
- return fmt.Errorf("failed to start transport: %w", err)
- }
- // Restore FSM from snapshot if exists
- // This must happen before starting apply loop to ensure FSM state is restored
- if err := r.restoreFromSnapshot(); err != nil {
- // Suppress warning if it's just a missing file (first start)
- if !os.IsNotExist(err) && !strings.Contains(err.Error(), "no such file") {
- r.logger.Warn("Failed to restore from snapshot: %v", err)
- } else {
- r.logger.Info("No snapshot found, starting with empty state")
- }
- // Continue anyway - the node can still function, just without historical state
- }
- // Start election timer
- r.resetElectionTimer()
- // Start background goroutines with WaitGroup tracking
- r.wg.Add(4)
- go func() {
- defer r.wg.Done()
- r.applyLoop()
- }()
- go func() {
- defer r.wg.Done()
- r.replicationLoop()
- }()
- go func() {
- defer r.wg.Done()
- r.mainLoop()
- }()
- go func() {
- defer r.wg.Done()
- r.readIndexLoop()
- }()
- r.logger.Info("Raft node started")
- return nil
- }
- // Stop stops the Raft node
- func (r *Raft) Stop() error {
- if !atomic.CompareAndSwapInt32(&r.running, 1, 0) {
- return fmt.Errorf("not running")
- }
- // Signal all goroutines to stop
- close(r.stopCh)
- // Stop timers first to prevent new operations
- r.mu.Lock()
- if r.electionTimer != nil {
- r.electionTimer.Stop()
- }
- if r.heartbeatTimer != nil {
- r.heartbeatTimer.Stop()
- }
- r.mu.Unlock()
- // Wait for all goroutines to finish with timeout
- done := make(chan struct{})
- go func() {
- r.wg.Wait()
- close(done)
- }()
- select {
- case <-done:
- // All goroutines exited cleanly
- case <-time.After(3 * time.Second):
- r.logger.Warn("Timeout waiting for goroutines to stop")
- }
- // Stop transport (has its own timeout)
- if err := r.transport.Stop(); err != nil {
- r.logger.Error("Failed to stop transport: %v", err)
- }
- if err := r.storage.Close(); err != nil {
- r.logger.Error("Failed to close storage: %v", err)
- }
- r.logger.Info("Raft node stopped")
- return nil
- }
- // mainLoop is the main event loop
- func (r *Raft) mainLoop() {
- for {
- select {
- case <-r.stopCh:
- return
- default:
- }
- r.mu.RLock()
- state := r.state
- r.mu.RUnlock()
- switch state {
- case Follower:
- r.runFollower()
- case Candidate:
- r.runCandidate()
- case Leader:
- r.runLeader()
- }
- }
- }
- // runFollower handles follower behavior
- func (r *Raft) runFollower() {
- r.logger.Debug("Running as follower in term %d", r.currentTerm)
- for {
- select {
- case <-r.stopCh:
- return
- case <-r.electionTimer.C:
- r.mu.Lock()
- if r.state == Follower {
- // If we're joining a cluster, don't start elections
- // Give time for the leader to sync us up
- if r.joiningCluster {
- // Allow joining for up to 30 seconds
- if time.Since(r.joiningClusterTime) < 30*time.Second {
- r.logger.Debug("Suppressing election during cluster join")
- r.resetElectionTimer()
- r.mu.Unlock()
- continue
- }
- // Timeout - clear joining state
- r.joiningCluster = false
- r.logger.Warn("Cluster join timeout, resuming normal election behavior")
- }
- r.logger.Debug("Election timeout, becoming candidate")
- r.state = Candidate
- r.leaderID = "" // Clear leaderID so we can vote for self in Pre-Vote
- }
- r.mu.Unlock()
- return
- }
- }
- }
- // runCandidate handles candidate behavior (leader election)
- // Implements Pre-Vote mechanism to prevent term explosion
- func (r *Raft) runCandidate() {
- // Phase 1: Pre-Vote (don't increment term yet)
- if !r.runPreVote() {
- // Pre-vote failed, wait for timeout before retrying
- r.mu.Lock()
- r.resetElectionTimer()
- r.mu.Unlock()
- select {
- case <-r.stopCh:
- return
- case <-r.electionTimer.C:
- // Timer expired, will retry pre-vote
- }
- return
- }
- // Phase 2: Actual election (pre-vote succeeded, now increment term)
- r.mu.Lock()
- r.currentTerm++
- r.votedFor = r.nodeID
- r.leaderID = ""
- currentTerm := r.currentTerm
- // Update metrics
- atomic.StoreUint64(&r.metrics.Term, currentTerm)
-
- if err := r.persistState(); err != nil {
- r.logger.Error("Failed to persist state during election: %v", err)
- r.mu.Unlock()
- return // Cannot proceed without persisting state
- }
- atomic.AddUint64(&r.metrics.ElectionsStarted, 1)
- r.resetElectionTimer()
- r.mu.Unlock()
- r.logger.Debug("Starting election for term %d", currentTerm)
- // Get current peers list
- r.mu.RLock()
- currentPeers := make([]string, len(r.peers))
- copy(currentPeers, r.peers)
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- r.mu.RUnlock()
- // Request actual votes from all peers
- votes := 1 // Vote for self
- voteCh := make(chan bool, len(currentPeers))
- lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
- for _, peer := range currentPeers {
- go func(peer string) {
- args := &RequestVoteArgs{
- Term: currentTerm,
- CandidateID: r.nodeID,
- LastLogIndex: lastLogIndex,
- LastLogTerm: lastLogTerm,
- PreVote: false,
- }
- ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
- defer cancel()
- reply, err := r.transport.RequestVote(ctx, peer, args)
- if err != nil {
- r.logger.Debug("RequestVote to %s failed: %v", peer, err)
- voteCh <- false
- return
- }
- r.mu.Lock()
- defer r.mu.Unlock()
- if reply.Term > r.currentTerm {
- r.becomeFollower(reply.Term)
- voteCh <- false
- return
- }
- voteCh <- reply.VoteGranted
- }(peer)
- }
- // Wait for votes - majority is (clusterSize/2) + 1
- needed := clusterSize/2 + 1
- // If we already have enough votes (single-node cluster), become leader immediately
- if votes >= needed {
- r.mu.Lock()
- if r.state == Candidate && r.currentTerm == currentTerm {
- r.becomeLeader()
- }
- r.mu.Unlock()
- return
- }
- for i := 0; i < len(currentPeers); i++ {
- select {
- case <-r.stopCh:
- return
- case <-r.electionTimer.C:
- r.logger.Debug("Election timeout, will start new election")
- return
- case granted := <-voteCh:
- if granted {
- votes++
- if votes >= needed {
- r.mu.Lock()
- if r.state == Candidate && r.currentTerm == currentTerm {
- r.becomeLeader()
- }
- r.mu.Unlock()
- return
- }
- }
- }
- // Check if we're still a candidate
- r.mu.RLock()
- if r.state != Candidate {
- r.mu.RUnlock()
- return
- }
- r.mu.RUnlock()
- }
- // Election failed, wait for timeout
- r.mu.RLock()
- stillCandidate := r.state == Candidate
- r.mu.RUnlock()
- if stillCandidate {
- r.logger.Debug("Election failed, waiting for timeout (got %d/%d votes)", votes, needed)
- select {
- case <-r.stopCh:
- return
- case <-r.electionTimer.C:
- // Timer expired, will start new election
- }
- }
- }
- // runPreVote sends pre-vote requests to check if we can win an election
- // Returns true if we got majority pre-votes, false otherwise
- func (r *Raft) runPreVote() bool {
- r.mu.RLock()
- currentTerm := r.currentTerm
- currentPeers := make([]string, len(r.peers))
- copy(currentPeers, r.peers)
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- leaderID := r.leaderID
- r.mu.RUnlock()
- // Pre-vote uses term+1 but doesn't actually increment it
- preVoteTerm := currentTerm + 1
- lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
- r.logger.Debug("Starting pre-vote for term %d", preVoteTerm)
- // Per Raft Pre-Vote optimization (§9.6): if we have a known leader,
- // don't vote for self. This prevents standalone nodes from constantly
- // electing themselves when they should be joining an existing cluster.
- preVotes := 0
- if leaderID == "" {
- preVotes = 1 // Vote for self only if we don't know of a leader
- } else {
- r.logger.Debug("Pre-vote: not self-voting because we have leader %s", leaderID)
- }
- preVoteCh := make(chan bool, len(currentPeers))
- for _, peer := range currentPeers {
- go func(peer string) {
- args := &RequestVoteArgs{
- Term: preVoteTerm,
- CandidateID: r.nodeID,
- LastLogIndex: lastLogIndex,
- LastLogTerm: lastLogTerm,
- PreVote: true,
- }
- ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
- defer cancel()
- reply, err := r.transport.RequestVote(ctx, peer, args)
- if err != nil {
- r.logger.Debug("PreVote to %s failed: %v", peer, err)
- preVoteCh <- false
- return
- }
- // For pre-vote, we don't step down even if reply.Term > currentTerm
- // because the other node might also be doing pre-vote
- preVoteCh <- reply.VoteGranted
- }(peer)
- }
- // Wait for pre-votes with a shorter timeout - majority is (clusterSize/2) + 1
- needed := clusterSize/2 + 1
- // If we already have enough votes (single-node cluster with no known leader), succeed immediately
- if preVotes >= needed {
- r.logger.Debug("Pre-vote succeeded immediately (got %d/%d pre-votes)", preVotes, needed)
- return true
- }
- timeout := time.After(500 * time.Millisecond)
- for i := 0; i < len(currentPeers); i++ {
- select {
- case <-r.stopCh:
- return false
- case <-timeout:
- r.logger.Debug("Pre-vote timeout (got %d/%d pre-votes)", preVotes, needed)
- return false
- case granted := <-preVoteCh:
- if granted {
- preVotes++
- if preVotes >= needed {
- r.logger.Debug("Pre-vote succeeded (got %d/%d pre-votes)", preVotes, needed)
- return true
- }
- }
- }
- // Check if we're still a candidate
- r.mu.RLock()
- if r.state != Candidate {
- r.mu.RUnlock()
- return false
- }
- r.mu.RUnlock()
- }
- r.logger.Debug("Pre-vote failed (got %d/%d pre-votes)", preVotes, needed)
- return false
- }
- // runLeader handles leader behavior
- func (r *Raft) runLeader() {
- r.logger.Debug("Running as leader in term %d", r.currentTerm)
- // Send initial heartbeat
- r.sendHeartbeats()
- // Start heartbeat timer
- r.mu.Lock()
- r.heartbeatTimer = time.NewTimer(r.config.HeartbeatInterval)
- r.mu.Unlock()
- for {
- select {
- case <-r.stopCh:
- return
- case <-r.heartbeatTimer.C:
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return
- }
- r.mu.RUnlock()
- r.sendHeartbeats()
- r.heartbeatTimer.Reset(r.config.HeartbeatInterval)
- case <-r.commitCh:
- r.updateCommitIndex()
- }
- }
- }
- // becomeFollower transitions to follower state
- func (r *Raft) becomeFollower(term uint64) {
- oldState := r.state
- oldTerm := r.currentTerm
- r.state = Follower
- r.currentTerm = term
- // Update metrics
- atomic.StoreUint64(&r.metrics.Term, term)
- r.votedFor = ""
- r.leaderID = ""
- // Must persist before responding - use mustPersistState for critical transitions
- r.mustPersistState()
- r.resetElectionTimer()
- // Clear leadership transfer state
- r.transferring = false
- r.transferTarget = ""
- // Only log significant state changes
- if oldState == Leader && term > oldTerm {
- // Stepping down from leader is notable
- r.logger.Debug("Stepped down from leader in term %d", term)
- }
- }
- // becomeLeader transitions to leader state
- func (r *Raft) becomeLeader() {
- r.state = Leader
- r.leaderID = r.nodeID
- // Update metrics
- atomic.AddUint64(&r.metrics.ElectionsWon, 1)
- // Initialize leader state for all peers
- lastIndex := r.log.LastIndex()
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- r.nextIndex[addr] = lastIndex + 1
- r.matchIndex[addr] = 0
- }
- }
- // Also handle legacy peers
- for _, peer := range r.peers {
- if _, exists := r.nextIndex[peer]; !exists {
- r.nextIndex[peer] = lastIndex + 1
- r.matchIndex[peer] = 0
- }
- }
- r.logger.Info("Became leader in term %d with %d peers", r.currentTerm, len(r.clusterNodes)-1)
- // Append a no-op entry to commit entries from previous terms
- // This is a standard Raft optimization - the leader appends a no-op
- // entry in its current term to quickly commit all pending entries
- noopEntry := LogEntry{
- Index: r.log.LastIndex() + 1,
- Term: r.currentTerm,
- Type: EntryNoop,
- Command: nil,
- }
- if err := r.log.Append(noopEntry); err != nil {
- r.logger.Error("Failed to append no-op entry: %v", err)
- } else {
- r.logger.Debug("Appended no-op entry at index %d for term %d", noopEntry.Index, noopEntry.Term)
- }
- }
- // sendHeartbeats sends AppendEntries RPCs to all peers
- func (r *Raft) sendHeartbeats() {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return
- }
- currentTerm := r.currentTerm
- leaderCommit := r.commitIndex
- // Get all peer addresses
- peerAddrs := make([]string, 0, len(r.clusterNodes))
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- peerAddrs = append(peerAddrs, addr)
- }
- }
- // Also include legacy peers not in clusterNodes
- for _, peer := range r.peers {
- found := false
- for _, addr := range peerAddrs {
- if addr == peer {
- found = true
- break
- }
- }
- if !found {
- peerAddrs = append(peerAddrs, peer)
- }
- }
- r.mu.RUnlock()
- for _, peer := range peerAddrs {
- go r.replicateToPeer(peer, currentTerm, leaderCommit)
- }
- }
- // replicateToPeer sends AppendEntries to a specific peer
- func (r *Raft) replicateToPeer(peer string, term, leaderCommit uint64) {
- r.mu.RLock()
- if r.state != Leader || r.currentTerm != term {
- r.mu.RUnlock()
- return
- }
- nextIndex := r.nextIndex[peer]
- r.mu.RUnlock()
- // Get entries to send
- entries, prevLogIndex, prevLogTerm, err := r.log.GetEntriesForFollower(nextIndex, r.config.MaxLogEntriesPerRequest)
- if err == ErrCompacted {
- // Need to send snapshot
- r.sendSnapshot(peer)
- return
- }
- if err != nil {
- r.logger.Debug("Failed to get entries for %s: %v", peer, err)
- return
- }
- args := &AppendEntriesArgs{
- Term: term,
- LeaderID: r.nodeID,
- PrevLogIndex: prevLogIndex,
- PrevLogTerm: prevLogTerm,
- Entries: entries,
- LeaderCommit: leaderCommit,
- }
- ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
- defer cancel()
- reply, err := r.transport.AppendEntries(ctx, peer, args)
- if err != nil {
- r.logger.Debug("AppendEntries to %s failed: %v", peer, err)
- return
- }
- atomic.AddInt64(&r.stats.AppendsSent, 1)
- r.mu.Lock()
- defer r.mu.Unlock()
- if reply.Term > r.currentTerm {
- r.becomeFollower(reply.Term)
- return
- }
- if r.state != Leader || r.currentTerm != term {
- return
- }
- if reply.Success {
- // Update contact time
- r.lastContact[peer] = time.Now()
- // Update nextIndex and matchIndex
- if len(entries) > 0 {
- newMatchIndex := entries[len(entries)-1].Index
- if newMatchIndex > r.matchIndex[peer] {
- r.matchIndex[peer] = newMatchIndex
- r.nextIndex[peer] = newMatchIndex + 1
- // Try to update commit index
- select {
- case r.commitCh <- struct{}{}:
- default:
- }
- }
- }
- } else {
- // Even if failed (log conflict), we contacted the peer
- r.lastContact[peer] = time.Now()
- // Decrement nextIndex and retry
- if reply.ConflictTerm > 0 {
- // Find the last entry of ConflictTerm in our log
- found := false
- for idx := r.log.LastIndex(); idx >= r.log.FirstIndex(); idx-- {
- t, err := r.log.GetTerm(idx)
- if err != nil {
- break
- }
- if t == reply.ConflictTerm {
- r.nextIndex[peer] = idx + 1
- found = true
- break
- }
- if t < reply.ConflictTerm {
- break
- }
- }
- if !found {
- r.nextIndex[peer] = reply.ConflictIndex
- }
- } else if reply.ConflictIndex > 0 {
- r.nextIndex[peer] = reply.ConflictIndex
- } else {
- r.nextIndex[peer] = max(1, r.nextIndex[peer]-1)
- }
- }
- }
- // sendSnapshot sends a snapshot to a peer with chunked transfer support
- func (r *Raft) sendSnapshot(peer string) {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return
- }
- term := r.currentTerm
- chunkSize := r.config.SnapshotChunkSize
- if chunkSize <= 0 {
- chunkSize = 1024 * 1024 // Default 1MB
- }
- r.mu.RUnlock()
- data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
- if err != nil {
- r.logger.Error("Failed to get snapshot: %v", err)
- return
- }
- atomic.AddUint64(&r.metrics.SnapshotsSent, 1)
- r.logger.Info("Sending snapshot to %s: %d bytes, lastIndex=%d, lastTerm=%d",
- peer, len(data), lastIndex, lastTerm)
- // Send snapshot in chunks
- totalSize := len(data)
- offset := 0
- for offset < totalSize {
- // Check if we're still leader
- r.mu.RLock()
- if r.state != Leader || r.currentTerm != term {
- r.mu.RUnlock()
- r.logger.Debug("Aborting snapshot send: no longer leader")
- return
- }
- r.mu.RUnlock()
- // Calculate chunk size
- end := offset + chunkSize
- if end > totalSize {
- end = totalSize
- }
- chunk := data[offset:end]
- done := end >= totalSize
- args := &InstallSnapshotArgs{
- Term: term,
- LeaderID: r.nodeID,
- LastIncludedIndex: lastIndex,
- LastIncludedTerm: lastTerm,
- Offset: uint64(offset),
- Data: chunk,
- Done: done,
- }
- ctx, cancel := context.WithTimeout(context.Background(), r.config.SnapshotRPCTimeout)
- reply, err := r.transport.InstallSnapshot(ctx, peer, args)
- cancel()
- if err != nil {
- r.logger.Error("InstallSnapshot chunk to %s failed at offset %d: %v", peer, offset, err)
- return
- }
- if reply.Term > r.currentTerm {
- r.mu.Lock()
- r.becomeFollower(reply.Term)
- r.mu.Unlock()
- return
- }
- if !reply.Success {
- r.logger.Error("InstallSnapshot chunk rejected by %s at offset %d", peer, offset)
- return
- }
- offset = end
- }
- // Snapshot fully sent and accepted
- r.mu.Lock()
- defer r.mu.Unlock()
- if r.state != Leader || r.currentTerm != term {
- return
- }
- r.nextIndex[peer] = lastIndex + 1
- r.matchIndex[peer] = lastIndex
- r.logger.Info("Snapshot to %s completed, nextIndex=%d", peer, lastIndex+1)
- }
- // updateCommitIndex updates the commit index based on matchIndex
- func (r *Raft) updateCommitIndex() {
- r.mu.Lock()
- defer r.mu.Unlock()
- if r.state != Leader {
- return
- }
- // For config change entries, use the OLD cluster for majority calculation
- // This ensures that adding a node doesn't require the new node's vote
- // until the config change itself is committed
- votingNodes := r.clusterNodes
- if r.pendingConfigChange && r.oldClusterNodes != nil {
- votingNodes = r.oldClusterNodes
- }
- // Get current cluster size from voting nodes
- clusterSize := len(votingNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- // Find the highest index replicated on a majority
- for n := r.log.LastIndex(); n > r.commitIndex; n-- {
- term, err := r.log.GetTerm(n)
- if err != nil {
- continue
- }
- // Only commit entries from current term
- if term != r.currentTerm {
- continue
- }
- // Count replicas (including self)
- count := 1 // Self
- for nodeID, addr := range votingNodes {
- if nodeID != r.nodeID {
- if r.matchIndex[addr] >= n {
- count++
- }
- }
- }
- // Also check legacy peers
- for _, peer := range r.peers {
- // Avoid double counting if peer is already in votingNodes
- alreadyCounted := false
- for _, addr := range votingNodes {
- if addr == peer {
- alreadyCounted = true
- break
- }
- }
- if !alreadyCounted && r.matchIndex[peer] >= n {
- count++
- }
- }
- // Majority is (n/2) + 1
- needed := clusterSize/2 + 1
- if count >= needed {
- r.commitIndex = n
- break
- }
- }
- }
- // applyLoop applies committed entries to the state machine
- // Uses event-driven model with fallback polling for reliability
- func (r *Raft) applyLoop() {
- // Use a ticker as fallback for missed signals (matches original 10ms polling)
- ticker := time.NewTicker(10 * time.Millisecond)
- defer ticker.Stop()
- for {
- select {
- case <-r.stopCh:
- return
- case <-r.commitCh:
- // Event-driven: triggered when commitIndex is updated
- r.applyCommitted()
- case <-ticker.C:
- // Fallback: check periodically in case we missed a signal
- r.applyCommitted()
- }
- }
- }
- // applyCommitted applies all committed but not yet applied entries
- func (r *Raft) applyCommitted() {
- r.mu.Lock()
- commitIndex := r.commitIndex
- lastApplied := r.lastApplied
- firstIndex := r.log.FirstIndex()
- lastLogIndex := r.log.LastIndex()
- r.mu.Unlock()
- // Safety check: ensure lastApplied is within valid range
- // If lastApplied is below firstIndex (due to snapshot), skip to firstIndex
- if lastApplied < firstIndex && firstIndex > 0 {
- r.mu.Lock()
- r.lastApplied = firstIndex
- lastApplied = firstIndex
- r.mu.Unlock()
- r.logger.Debug("Adjusted lastApplied to firstIndex %d after compaction", firstIndex)
- }
- // Safety check: don't try to apply beyond what we have in log
- if commitIndex > lastLogIndex {
- commitIndex = lastLogIndex
- }
- for lastApplied < commitIndex {
- lastApplied++
- // Skip if entry has been compacted
- if lastApplied < firstIndex {
- continue
- }
- entry, err := r.log.GetEntry(lastApplied)
- if err != nil {
- // If entry is compacted, skip ahead
- if err == ErrCompacted {
- r.mu.Lock()
- newFirstIndex := r.log.FirstIndex()
- if lastApplied < newFirstIndex {
- r.lastApplied = newFirstIndex
- lastApplied = newFirstIndex
- }
- r.mu.Unlock()
- continue
- }
- r.logger.Error("Failed to get entry %d: %v (firstIndex=%d, lastIndex=%d)",
- lastApplied, err, r.log.FirstIndex(), r.log.LastIndex())
- break
- }
- // Handle config change entries
- if entry.Type == EntryConfig {
- r.applyConfigChange(entry)
- r.mu.Lock()
- r.lastApplied = lastApplied
- r.mu.Unlock()
- continue
- }
- // Handle no-op entries (just update lastApplied, don't send to state machine)
- if entry.Type == EntryNoop {
- r.mu.Lock()
- r.lastApplied = lastApplied
- r.mu.Unlock()
- continue
- }
- // Normal command entry
- msg := ApplyMsg{
- CommandValid: true,
- Command: entry.Command,
- CommandIndex: entry.Index,
- CommandTerm: entry.Term,
- }
- select {
- case r.applyCh <- msg:
- case <-r.stopCh:
- return
- }
- r.mu.Lock()
- r.lastApplied = lastApplied
- r.mu.Unlock()
- }
- // Check if log compaction is needed
- // Run asynchronously to avoid blocking the apply loop if snapshotting is slow
- go r.maybeCompactLog()
- }
- // maybeCompactLog checks if automatic log compaction should be triggered
- // It uses a dynamic threshold to prevent "compaction thrashing":
- // - First compaction triggers at SnapshotThreshold (default 100,000)
- // - After compaction, next threshold = current_log_size * 1.5
- // - This prevents every new entry from triggering compaction if log stays large
- func (r *Raft) maybeCompactLog() {
- // Skip if no snapshot provider is configured
- if r.config.SnapshotProvider == nil {
- return
- }
- // Skip if log compaction is explicitly disabled
- if !r.config.LogCompactionEnabled {
- return
- }
- // Check if compaction is already in progress (atomic check-and-set)
- if !atomic.CompareAndSwapInt32(&r.compacting, 0, 1) {
- return
- }
- // Ensure we release the compacting flag when done
- defer atomic.StoreInt32(&r.compacting, 0)
- r.mu.RLock()
- lastApplied := r.lastApplied
- firstIndex := r.log.FirstIndex()
- initialThreshold := r.config.SnapshotThreshold
- minRetention := r.config.SnapshotMinRetention
- isLeader := r.state == Leader
- dynamicThreshold := r.nextCompactionThreshold
- r.mu.RUnlock()
- // Guard against underflow: ensure lastApplied > firstIndex
- if lastApplied <= firstIndex {
- return
- }
- // Calculate current log size
- logSize := lastApplied - firstIndex
- // Determine effective threshold:
- // - Use initial threshold if no compaction has occurred yet (dynamicThreshold == 0)
- // - Otherwise use the dynamic threshold
- effectiveThreshold := initialThreshold
- if dynamicThreshold > 0 {
- effectiveThreshold = dynamicThreshold
- }
- // Check if we have enough entries to warrant compaction
- if logSize <= effectiveThreshold {
- return
- }
- // Guard against underflow: ensure lastApplied > minRetention
- if lastApplied <= minRetention {
- return
- }
- // Calculate the safe compaction point
- // We need to retain at least minRetention entries for follower catch-up
- compactUpTo := lastApplied - minRetention
- if compactUpTo <= firstIndex {
- return // Not enough entries to compact while maintaining retention
- }
- // For leader, also consider the minimum nextIndex of all followers
- // to avoid compacting entries that followers still need
- if isLeader {
- r.mu.RLock()
- minNextIndex := lastApplied
- for _, nextIdx := range r.nextIndex {
- if nextIdx < minNextIndex {
- minNextIndex = nextIdx
- }
- }
- r.mu.RUnlock()
- // Don't compact entries that followers still need
- // Keep a buffer of minRetention entries before the slowest follower
- if minNextIndex > minRetention {
- followerSafePoint := minNextIndex - minRetention
- if followerSafePoint < compactUpTo {
- compactUpTo = followerSafePoint
- }
- } else {
- // Slowest follower is too far behind, don't compact
- // They will need a full snapshot anyway
- compactUpTo = firstIndex // Effectively skip compaction
- }
- }
- // Final check - make sure we're actually compacting something meaningful
- if compactUpTo <= firstIndex {
- return
- }
- // Get snapshot from application layer
- snapshotData, err := r.config.SnapshotProvider(compactUpTo)
- if err != nil {
- r.logger.Error("Failed to get snapshot from provider: %v", err)
- return
- }
- // Get the term at the compaction point
- term, err := r.log.GetTerm(compactUpTo)
- if err != nil {
- r.logger.Error("Failed to get term for compaction index %d: %v", compactUpTo, err)
- return
- }
- // Save snapshot
- if err := r.storage.SaveSnapshot(snapshotData, compactUpTo, term); err != nil {
- r.logger.Error("Failed to save snapshot: %v", err)
- return
- }
- // Compact the log
- // CRITICAL: We must hold the write lock during compaction to prevent
- // concurrent writes (AppendEntries) unless we can guarantee underlying
- // file integrity with concurrent modification. The requirement is to
- // pause writes during compaction.
- r.mu.Lock()
- if err := r.log.Compact(compactUpTo); err != nil {
- r.mu.Unlock()
- r.logger.Error("Failed to compact log: %v", err)
- return
- }
- r.mu.Unlock()
- // Calculate new log size after compaction
- newLogSize := lastApplied - compactUpTo
- // Update dynamic threshold for next compaction
- // We use a linear growth model: next threshold = current size + SnapshotThreshold
- // This ensures that we trigger compaction roughly every SnapshotThreshold entries.
- r.mu.Lock()
- r.nextCompactionThreshold = newLogSize + r.config.SnapshotThreshold
- // Ensure threshold doesn't go below the initial threshold
- if r.nextCompactionThreshold < initialThreshold {
- r.nextCompactionThreshold = initialThreshold
- }
- r.mu.Unlock()
- r.logger.Info("Auto compaction completed: compacted up to index %d (term %d), log size %d -> %d, next threshold: %d",
- compactUpTo, term, logSize, newLogSize, r.nextCompactionThreshold)
- }
- // resetElectionTimer resets the election timeout
- func (r *Raft) resetElectionTimer() {
- timeout := r.config.ElectionTimeoutMin +
- time.Duration(rand.Int63n(int64(r.config.ElectionTimeoutMax-r.config.ElectionTimeoutMin)))
- if r.electionTimer == nil {
- r.electionTimer = time.NewTimer(timeout)
- } else {
- if !r.electionTimer.Stop() {
- select {
- case <-r.electionTimer.C:
- default:
- }
- }
- r.electionTimer.Reset(timeout)
- }
- }
- // persistState saves the current state to stable storage
- // Returns error if persistence fails - caller MUST handle this for safety
- func (r *Raft) persistState() error {
- state := &PersistentState{
- CurrentTerm: r.currentTerm,
- VotedFor: r.votedFor,
- }
- if err := r.storage.SaveState(state); err != nil {
- r.logger.Error("Failed to persist state: %v", err)
- return fmt.Errorf("%w: %v", ErrPersistFailed, err)
- }
- return nil
- }
- // mustPersistState saves state and panics on failure
- // Use this only in critical paths where failure is unrecoverable
- func (r *Raft) mustPersistState() {
- if err := r.persistState(); err != nil {
- // In production, you might want to trigger a graceful shutdown instead
- r.logger.Error("CRITICAL: Failed to persist state, node may be in inconsistent state: %v", err)
- panic(err)
- }
- }
- // HandleRequestVote handles RequestVote RPCs (including pre-vote)
- func (r *Raft) HandleRequestVote(args *RequestVoteArgs) *RequestVoteReply {
- r.mu.Lock()
- defer r.mu.Unlock()
- reply := &RequestVoteReply{
- Term: r.currentTerm,
- VoteGranted: false,
- }
- // Handle pre-vote separately
- if args.PreVote {
- return r.handlePreVote(args)
- }
- // Reply false if term < currentTerm
- if args.Term < r.currentTerm {
- return reply
- }
- // If term > currentTerm, become follower
- if args.Term > r.currentTerm {
- r.becomeFollower(args.Term)
- }
- reply.Term = r.currentTerm
- // Check if we can vote for this candidate
- if (r.votedFor == "" || r.votedFor == args.CandidateID) &&
- r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
- r.votedFor = args.CandidateID
- if err := r.persistState(); err != nil {
- // Cannot grant vote if we can't persist the decision
- r.logger.Error("Failed to persist vote for %s: %v", args.CandidateID, err)
- return reply
- }
- r.resetElectionTimer()
- reply.VoteGranted = true
- r.logger.Debug("Granted vote to %s for term %d", args.CandidateID, args.Term)
- }
- return reply
- }
- // handlePreVote handles pre-vote requests
- // Pre-vote doesn't change our state, just checks if we would vote
- func (r *Raft) handlePreVote(args *RequestVoteArgs) *RequestVoteReply {
- reply := &RequestVoteReply{
- Term: r.currentTerm,
- VoteGranted: false,
- }
- // For pre-vote, we check:
- // 1. The candidate's term is at least as high as ours
- // 2. The candidate's log is at least as up-to-date as ours
- // 3. We don't have a current leader (or the candidate's term is higher)
- if args.Term < r.currentTerm {
- return reply
- }
- // Per Raft Pre-Vote optimization (§9.6): reject pre-vote if we have a current
- // leader and the candidate's term is not higher than ours. This prevents
- // disruptive elections when a partitioned node tries to rejoin.
- if r.leaderID != "" && args.Term <= r.currentTerm {
- r.logger.Debug("Rejecting pre-vote from %s: have leader %s", args.CandidateID, r.leaderID)
- return reply
- }
- // Grant pre-vote if log is up-to-date
- // Note: we don't check votedFor for pre-vote, and we don't update any state
- if r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
- reply.VoteGranted = true
- r.logger.Debug("Granted pre-vote to %s for term %d", args.CandidateID, args.Term)
- }
- return reply
- }
- // HandleAppendEntries handles AppendEntries RPCs
- func (r *Raft) HandleAppendEntries(args *AppendEntriesArgs) *AppendEntriesReply {
- r.mu.Lock()
- defer r.mu.Unlock()
- atomic.AddInt64(&r.stats.AppendsReceived, 1)
- reply := &AppendEntriesReply{
- Term: r.currentTerm,
- Success: false,
- }
- // Check if we're a standalone node being added to a cluster
- // A standalone node has only itself in clusterNodes
- isStandalone := len(r.clusterNodes) == 1
- if _, hasSelf := r.clusterNodes[r.nodeID]; isStandalone && hasSelf {
- // We're standalone and receiving AppendEntries from an external leader
- // This means we're being added to a cluster - suppress elections
- if !r.joiningCluster {
- r.joiningCluster = true
- r.joiningClusterTime = time.Now()
- r.logger.Info("Detected cluster join in progress, suppressing elections")
- }
- // When joining, accept higher terms from the leader to sync up
- if args.Term > r.currentTerm {
- r.becomeFollower(args.Term)
- }
- }
- // Reply false if term < currentTerm
- if args.Term < r.currentTerm {
- // But still reset timer if we're joining a cluster to prevent elections
- if r.joiningCluster {
- r.resetElectionTimer()
- }
- return reply
- }
- // If term > currentTerm, or we're a candidate, or we're a leader receiving
- // AppendEntries from another leader (split-brain scenario during cluster merge),
- // become follower. In Raft, there can only be one leader per term.
- if args.Term > r.currentTerm || r.state == Candidate || r.state == Leader {
- r.becomeFollower(args.Term)
- }
- // Update leader info and reset election timer
- r.leaderID = args.LeaderID
- r.lastHeartbeat = time.Now()
- r.resetElectionTimer()
- reply.Term = r.currentTerm
- // Try to append entries
- success, conflictIndex, conflictTerm := r.log.AppendEntriesFromLeader(
- args.PrevLogIndex, args.PrevLogTerm, args.Entries)
- if !success {
- reply.ConflictIndex = conflictIndex
- reply.ConflictTerm = conflictTerm
- return reply
- }
- reply.Success = true
- // Update commit index safely
- if args.LeaderCommit > r.commitIndex {
- // Get our actual last log index
- lastLogIndex := r.log.LastIndex()
- // Calculate what index the entries would have reached
- lastNewEntry := args.PrevLogIndex
- if len(args.Entries) > 0 {
- lastNewEntry = args.Entries[len(args.Entries)-1].Index
- }
- // Commit index should not exceed what we actually have in log
- newCommitIndex := args.LeaderCommit
- if newCommitIndex > lastNewEntry {
- newCommitIndex = lastNewEntry
- }
- if newCommitIndex > lastLogIndex {
- newCommitIndex = lastLogIndex
- }
- // Only advance commit index
- if newCommitIndex > r.commitIndex {
- r.commitIndex = newCommitIndex
- }
- }
- return reply
- }
- // HandleInstallSnapshot handles InstallSnapshot RPCs with chunked transfer support
- func (r *Raft) HandleInstallSnapshot(args *InstallSnapshotArgs) *InstallSnapshotReply {
- r.mu.Lock()
- defer r.mu.Unlock()
- reply := &InstallSnapshotReply{
- Term: r.currentTerm,
- Success: false,
- }
- if args.Term < r.currentTerm {
- return reply
- }
- if args.Term > r.currentTerm {
- r.becomeFollower(args.Term)
- }
- r.leaderID = args.LeaderID
- r.lastHeartbeat = time.Now()
- r.resetElectionTimer()
- reply.Term = r.currentTerm
- // Skip if we already have this or a newer snapshot applied
- if args.LastIncludedIndex <= r.lastApplied {
- r.logger.Debug("Ignoring snapshot at index %d, already applied up to %d",
- args.LastIncludedIndex, r.lastApplied)
- reply.Success = true // Still success to let leader know we don't need it
- return reply
- }
- // Handle chunked transfer
- if args.Offset == 0 {
- // First chunk - start new pending snapshot
- r.pendingSnapshot = &pendingSnapshotState{
- lastIncludedIndex: args.LastIncludedIndex,
- lastIncludedTerm: args.LastIncludedTerm,
- data: make([]byte, 0),
- receivedBytes: 0,
- }
- r.logger.Info("Starting snapshot reception at index %d, term %d",
- args.LastIncludedIndex, args.LastIncludedTerm)
- }
- // Validate we're receiving the expected snapshot
- if r.pendingSnapshot == nil ||
- r.pendingSnapshot.lastIncludedIndex != args.LastIncludedIndex ||
- r.pendingSnapshot.lastIncludedTerm != args.LastIncludedTerm {
- r.logger.Warn("Unexpected snapshot chunk: expected index %d, got %d",
- r.pendingSnapshot.lastIncludedIndex, args.LastIncludedIndex)
- return reply
- }
- // Validate offset matches what we've received
- if uint64(args.Offset) != r.pendingSnapshot.receivedBytes {
- r.logger.Warn("Unexpected chunk offset: expected %d, got %d",
- r.pendingSnapshot.receivedBytes, args.Offset)
- return reply
- }
- // Append chunk data
- r.pendingSnapshot.data = append(r.pendingSnapshot.data, args.Data...)
- r.pendingSnapshot.receivedBytes += uint64(len(args.Data))
- reply.Success = true
- r.logger.Debug("Received snapshot chunk: offset=%d, size=%d, done=%v",
- args.Offset, len(args.Data), args.Done)
- // If not done, wait for more chunks
- if !args.Done {
- return reply
- }
- // All chunks received - apply the snapshot
- r.logger.Info("Installing complete snapshot: %d bytes at index %d, term %d",
- len(r.pendingSnapshot.data), args.LastIncludedIndex, args.LastIncludedTerm)
- atomic.AddUint64(&r.metrics.SnapshotsInstalled, 1)
- // Save snapshot
- if err := r.storage.SaveSnapshot(r.pendingSnapshot.data, args.LastIncludedIndex, args.LastIncludedTerm); err != nil {
- r.logger.Error("Failed to save snapshot: %v", err)
- r.pendingSnapshot = nil
- reply.Success = false
- return reply
- }
- // Compact log
- if err := r.log.Compact(args.LastIncludedIndex); err != nil {
- r.logger.Error("Failed to compact log: %v", err)
- }
- // Update state - must update both commitIndex and lastApplied
- if args.LastIncludedIndex > r.commitIndex {
- r.commitIndex = args.LastIncludedIndex
- }
- // Always update lastApplied to snapshot index to prevent trying to apply compacted entries
- r.lastApplied = args.LastIncludedIndex
- // Send snapshot to application (non-blocking with timeout)
- // Use the complete pendingSnapshot data, not the last chunk
- msg := ApplyMsg{
- SnapshotValid: true,
- Snapshot: r.pendingSnapshot.data,
- SnapshotIndex: args.LastIncludedIndex,
- SnapshotTerm: args.LastIncludedTerm,
- }
- // Clear pending snapshot
- r.pendingSnapshot = nil
- // Try to send, but don't block indefinitely
- select {
- case r.applyCh <- msg:
- r.logger.Debug("Sent snapshot to application")
- case <-time.After(100 * time.Millisecond):
- r.logger.Warn("Timeout sending snapshot to application, will retry")
- // The application will still get correct state via normal apply loop
- }
- return reply
- }
- // Propose proposes a new command to be replicated
- func (r *Raft) Propose(command []byte) (uint64, uint64, bool) {
- r.mu.Lock()
- defer r.mu.Unlock()
- if r.state != Leader {
- return 0, 0, false
- }
- // Check connectivity (Lease Check)
- // If we haven't heard from a majority of peers within ElectionTimeout,
- // we shouldn't accept new commands because we might be partitioned.
- if len(r.clusterNodes) > 1 {
- activePeers := 1 // Self
- now := time.Now()
- timeout := r.config.ElectionTimeoutMax
- for _, peer := range r.peers {
- if last, ok := r.lastContact[peer]; ok && now.Sub(last) < timeout {
- activePeers++
- }
- }
- // Check majority
- needed := len(r.clusterNodes)/2 + 1
- if activePeers < needed {
- r.logger.Warn("Rejecting Propose: lost contact with majority (active: %d, needed: %d)", activePeers, needed)
- return 0, 0, false
- }
- }
- index, err := r.log.AppendCommand(r.currentTerm, command)
- if err != nil {
- r.logger.Error("Failed to append command: %v", err)
- return 0, 0, false
- }
- r.matchIndex[r.nodeID] = index
- // For single-node cluster, we are the only voter and can commit immediately
- // This fixes the issue where commitCh never gets triggered without other peers
- if len(r.clusterNodes) <= 1 && len(r.peers) == 0 {
- // Single node: self is majority, trigger commit immediately
- select {
- case r.commitCh <- struct{}{}:
- default:
- }
- } else {
- // Multi-node: trigger replication to other nodes
- r.triggerReplication()
- }
- return index, r.currentTerm, true
- }
- // triggerReplication signals the replication loop to send heartbeats
- // This uses a non-blocking send to batch replication requests
- func (r *Raft) triggerReplication() {
- select {
- case r.replicationCh <- struct{}{}:
- default:
- // Replication already scheduled
- }
- }
- // replicationLoop handles batched replication
- // Uses simple delay-based batching: flush immediately when signaled, then wait
- // to allow more requests to accumulate before the next flush.
- func (r *Raft) replicationLoop() {
- for {
- select {
- case <-r.stopCh:
- return
- case <-r.replicationCh:
- // Flush and replicate immediately
- r.flushAndReplicate()
- // Wait briefly to allow batching of subsequent requests
- // This gives time for more proposals to queue up before the next flush
- time.Sleep(10 * time.Millisecond)
- }
- }
- }
- // flushAndReplicate flushes logs and sends heartbeats
- func (r *Raft) flushAndReplicate() {
- // Ensure logs are flushed to OS cache before sending to followers
- // This implements Group Commit with Flush (fast) instead of Sync (slow)
- if err := r.log.Flush(); err != nil {
- r.logger.Error("Failed to flush log: %v", err)
- }
- r.sendHeartbeats()
- }
- // ProposeWithForward proposes a command, forwarding to leader if necessary
- // This is the recommended method for applications to use
- func (r *Raft) ProposeWithForward(command []byte) (index uint64, term uint64, err error) {
- r.mu.RLock()
- // Check if we are part of the cluster
- // If we are not in clusterNodes, we shouldn't accept data commands
- if _, exists := r.clusterNodes[r.nodeID]; !exists {
- r.mu.RUnlock()
- return 0, 0, fmt.Errorf("node %s is not part of the cluster", r.nodeID)
- }
- r.mu.RUnlock()
- // Try local propose first
- idx, t, isLeader := r.Propose(command)
- if isLeader {
- return idx, t, nil
- }
- // Not leader, forward to leader
- r.mu.RLock()
- leaderID := r.leaderID
- // Use clusterNodes (dynamically maintained) to find leader address
- leaderAddr := r.clusterNodes[leaderID]
- r.mu.RUnlock()
- if leaderID == "" {
- return 0, 0, fmt.Errorf("no leader available")
- }
- if leaderAddr == "" {
- return 0, 0, fmt.Errorf("leader %s address not found in cluster", leaderID)
- }
- // Forward to leader
- ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
- defer cancel()
- args := &ProposeArgs{Command: command}
- reply, err := r.transport.ForwardPropose(ctx, leaderAddr, args)
- if err != nil {
- return 0, 0, fmt.Errorf("forward failed: %w", err)
- }
- if !reply.Success {
- return 0, 0, fmt.Errorf("leader rejected: %s", reply.Error)
- }
- return reply.Index, reply.Term, nil
- }
- // ForwardGet forwards a get request to the leader
- func (r *Raft) ForwardGet(key string) (string, bool, error) {
- r.mu.RLock()
- // Check if we are part of the cluster
- if _, exists := r.clusterNodes[r.nodeID]; !exists {
- r.mu.RUnlock()
- return "", false, fmt.Errorf("node %s is not part of the cluster", r.nodeID)
- }
- r.mu.RUnlock()
- // Check if we are leader (local read)
- if r.state == Leader {
- if r.config.GetHandler != nil {
- val, found := r.config.GetHandler(key)
- return val, found, nil
- }
- return "", false, fmt.Errorf("get handler not configured")
- }
- r.mu.RLock()
- leaderID := r.leaderID
- leaderAddr := r.clusterNodes[leaderID]
- r.mu.RUnlock()
- if leaderID == "" {
- return "", false, ErrNoLeader
- }
- if leaderAddr == "" {
- return "", false, fmt.Errorf("leader %s address not found", leaderID)
- }
- // Forward to leader
- ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
- defer cancel()
- args := &GetArgs{Key: key}
- reply, err := r.transport.ForwardGet(ctx, leaderAddr, args)
- if err != nil {
- return "", false, fmt.Errorf("forward failed: %w", err)
- }
- return reply.Value, reply.Found, nil
- }
- // HandlePropose handles forwarded propose requests
- func (r *Raft) HandlePropose(args *ProposeArgs) *ProposeReply {
- index, term, isLeader := r.Propose(args.Command)
- if !isLeader {
- return &ProposeReply{
- Success: false,
- Error: "not leader",
- }
- }
- return &ProposeReply{
- Success: true,
- Index: index,
- Term: term,
- }
- }
- // HandleAddNode handles forwarded AddNode requests
- func (r *Raft) HandleAddNode(args *AddNodeArgs) *AddNodeReply {
- err := r.AddNode(args.NodeID, args.Address)
- if err != nil {
- return &AddNodeReply{
- Success: false,
- Error: err.Error(),
- }
- }
- return &AddNodeReply{
- Success: true,
- }
- }
- // HandleRemoveNode handles forwarded RemoveNode requests
- func (r *Raft) HandleRemoveNode(args *RemoveNodeArgs) *RemoveNodeReply {
- err := r.RemoveNode(args.NodeID)
- if err != nil {
- return &RemoveNodeReply{
- Success: false,
- Error: err.Error(),
- }
- }
- return &RemoveNodeReply{
- Success: true,
- }
- }
- // GetState returns the current term and whether this node is leader
- func (r *Raft) GetState() (uint64, bool) {
- r.mu.RLock()
- defer r.mu.RUnlock()
- return r.currentTerm, r.state == Leader
- }
- // GetLeaderID returns the current leader ID
- func (r *Raft) GetLeaderID() string {
- r.mu.RLock()
- defer r.mu.RUnlock()
- return r.leaderID
- }
- // GetStats returns runtime statistics
- func (r *Raft) GetStats() Stats {
- r.mu.RLock()
- defer r.mu.RUnlock()
- lastIndex, lastTerm := r.log.LastIndexAndTerm()
- firstIndex := r.log.FirstIndex()
- // Copy cluster nodes
- nodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- nodes[k] = v
- }
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- return Stats{
- Term: r.currentTerm,
- State: r.state.String(),
- FirstLogIndex: firstIndex,
- LastLogIndex: lastIndex,
- LastLogTerm: lastTerm,
- CommitIndex: r.commitIndex,
- LastApplied: r.lastApplied,
- LeaderID: r.leaderID,
- AppendsSent: atomic.LoadInt64(&r.stats.AppendsSent),
- AppendsReceived: atomic.LoadInt64(&r.stats.AppendsReceived),
- ClusterSize: clusterSize,
- ClusterNodes: nodes,
- }
- }
- // restoreFromSnapshot restores the FSM from a snapshot at startup
- // This is called during Start() to ensure the FSM has the correct state
- // before processing any new commands
- func (r *Raft) restoreFromSnapshot() error {
- // Get snapshot from storage
- data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
- if err != nil {
- return fmt.Errorf("failed to get snapshot: %w", err)
- }
- // No snapshot exists
- if len(data) == 0 || lastIndex == 0 {
- return nil
- }
- r.logger.Info("Restoring FSM from snapshot at index %d, term %d (%d bytes)",
- lastIndex, lastTerm, len(data))
- // Update lastApplied to snapshot index to prevent re-applying compacted entries
- r.mu.Lock()
- if lastIndex > r.lastApplied {
- r.lastApplied = lastIndex
- }
- if lastIndex > r.commitIndex {
- r.commitIndex = lastIndex
- }
- r.mu.Unlock()
- // Send snapshot to FSM for restoration
- // Use a goroutine with timeout to avoid blocking if applyCh is full
- msg := ApplyMsg{
- SnapshotValid: true,
- Snapshot: data,
- SnapshotIndex: lastIndex,
- SnapshotTerm: lastTerm,
- }
- // Try to send with a timeout
- select {
- case r.applyCh <- msg:
- r.logger.Info("FSM restoration triggered from snapshot at index %d", lastIndex)
- case <-time.After(5 * time.Second):
- return fmt.Errorf("timeout sending snapshot to applyCh")
- }
- return nil
- }
- // TakeSnapshot takes a snapshot of the current state
- func (r *Raft) TakeSnapshot(data []byte, index uint64) error {
- r.mu.Lock()
- defer r.mu.Unlock()
- if index > r.lastApplied {
- return fmt.Errorf("snapshot index %d exceeds lastApplied %d", index, r.lastApplied)
- }
- term, err := r.log.GetTerm(index)
- if err != nil {
- return fmt.Errorf("failed to get term for index %d: %w", index, err)
- }
- if err := r.storage.SaveSnapshot(data, index, term); err != nil {
- return fmt.Errorf("failed to save snapshot: %w", err)
- }
- if err := r.log.Compact(index); err != nil {
- return fmt.Errorf("failed to compact log: %w", err)
- }
- r.logger.Info("Took snapshot at index %d, term %d", index, term)
- return nil
- }
- func max(a, b uint64) uint64 {
- if a > b {
- return a
- }
- return b
- }
- // ==================== Membership Change API ====================
- //
- // This implementation uses Single-Node Membership Change (also known as one-at-a-time changes)
- // as described in the Raft dissertation (§4.3). This is safe because:
- //
- // 1. We only allow one configuration change at a time (pendingConfigChange flag)
- // 2. For commits, we use the OLD cluster majority until the config change is committed
- // 3. The new node starts receiving entries immediately but doesn't affect majority calculation
- //
- // This approach is simpler than Joint Consensus and is sufficient for most use cases.
- // The invariant maintained is: any two majorities (old or new) must overlap.
- //
- // For adding a node: old majority = N/2+1, new majority = (N+1)/2+1 = N/2+1 (overlaps)
- // For removing a node: old majority = N/2+1, new majority = (N-1)/2+1 (overlaps if N > 1)
- //
- // WARNING: Avoid adding/removing multiple nodes rapidly. Wait for each change to be committed.
- // AddNode adds a new node to the cluster
- // This can only be called on the leader
- // The new node must already be running and reachable
- //
- // Safety guarantees:
- // - Only one config change can be in progress at a time
- // - The old cluster majority is used until the config change is committed
- // - Returns error if leadership is lost during the operation
- func (r *Raft) AddNode(nodeID, address string) error {
- r.mu.Lock()
- // Must be leader
- if r.state != Leader {
- leaderID := r.leaderID
- r.mu.Unlock()
- return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
- }
- // Check if we're in the middle of a leadership transfer
- if r.transferring {
- r.mu.Unlock()
- return fmt.Errorf("leadership transfer in progress")
- }
- // Check if there's already a pending config change
- if r.pendingConfigChange {
- r.mu.Unlock()
- return ErrConfigInFlight
- }
- // Validate nodeID and address
- if nodeID == "" {
- r.mu.Unlock()
- return fmt.Errorf("nodeID cannot be empty")
- }
- if address == "" {
- r.mu.Unlock()
- return fmt.Errorf("address cannot be empty")
- }
- // Check if node already exists
- if _, exists := r.clusterNodes[nodeID]; exists {
- r.mu.Unlock()
- return fmt.Errorf("node %s already exists in cluster", nodeID)
- }
- // Check if address is already used by another node
- for existingID, existingAddr := range r.clusterNodes {
- if existingAddr == address {
- r.mu.Unlock()
- return fmt.Errorf("address %s is already used by node %s", address, existingID)
- }
- }
- // Save old cluster nodes for majority calculation during config change
- // This ensures we use the OLD cluster size until the config change is committed
- r.oldClusterNodes = make(map[string]string)
- for k, v := range r.clusterNodes {
- r.oldClusterNodes[k] = v
- }
- // Create new config with the added node
- newNodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- newNodes[k] = v
- }
- newNodes[nodeID] = address
- // Create config change entry with ClusterConfig
- configIndex := r.log.LastIndex() + 1
- entry := LogEntry{
- Index: configIndex,
- Term: r.currentTerm,
- Type: EntryConfig,
- Config: &ClusterConfig{Nodes: newNodes},
- }
- // Mark config change as pending and store the index
- r.pendingConfigChange = true
- r.configChangeIndex = configIndex
- // Immediately apply the new configuration (for single-node changes, this is safe)
- // The new node will start receiving AppendEntries immediately
- r.clusterNodes[nodeID] = address
- r.peers = append(r.peers, address)
- // Set nextIndex to 1 (or firstIndex) so the new node syncs from the beginning
- // This is crucial - the new node's log is empty, so we must start from index 1
- firstIndex := r.log.FirstIndex()
- if firstIndex == 0 {
- firstIndex = 1
- }
- r.nextIndex[address] = firstIndex
- r.matchIndex[address] = 0
- r.mu.Unlock()
- // Append the config change entry to log
- if err := r.log.Append(entry); err != nil {
- r.mu.Lock()
- r.pendingConfigChange = false
- r.oldClusterNodes = nil
- r.configChangeIndex = 0
- // Rollback
- delete(r.clusterNodes, nodeID)
- r.rebuildPeersList()
- r.mu.Unlock()
- return fmt.Errorf("failed to append config entry: %w", err)
- }
- r.logger.Info("Adding node %s (%s) to cluster", nodeID, address)
- // Trigger immediate replication
- r.triggerReplication()
- return nil
- }
- // RemoveNode removes a node from the cluster
- // This can only be called on the leader
- // The node being removed can be any node except the leader itself
- //
- // Safety guarantees:
- // - Only one config change can be in progress at a time
- // - Cannot remove the leader (transfer leadership first)
- // - Cannot reduce cluster to 0 nodes
- // - The old cluster majority is used until the config change is committed
- func (r *Raft) RemoveNode(nodeID string) error {
- r.mu.Lock()
- // Must be leader
- if r.state != Leader {
- leaderID := r.leaderID
- r.mu.Unlock()
- return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
- }
- // Check if we're in the middle of a leadership transfer
- if r.transferring {
- r.mu.Unlock()
- return fmt.Errorf("leadership transfer in progress")
- }
- // Cannot remove self
- if nodeID == r.nodeID {
- r.mu.Unlock()
- return fmt.Errorf("cannot remove self from cluster, use TransferLeadership first")
- }
- // Validate nodeID
- if nodeID == "" {
- r.mu.Unlock()
- return fmt.Errorf("nodeID cannot be empty")
- }
- // Check if there's already a pending config change
- if r.pendingConfigChange {
- r.mu.Unlock()
- return ErrConfigInFlight
- }
- // Check if node exists
- if _, exists := r.clusterNodes[nodeID]; !exists {
- r.mu.Unlock()
- return fmt.Errorf("node %s not found in cluster", nodeID)
- }
- // FORCE REMOVAL CHECK
- // Check connectivity to the node being removed.
- // If unreachable or invalid, we force remove it without standard consensus if needed,
- // or at least bypass some strict checks.
- // For Raft safety, we still need to commit a config change log entry so other nodes know about the removal.
- // However, if the node is "invalid" (e.g. bad address), it won't respond to RPCs.
- // We should allow removal even if we can't contact it.
-
- // Check validity of address (simple heuristic)
- nodeAddr := r.clusterNodes[nodeID]
- isValidNode := true
- if nodeAddr == "" || strings.HasPrefix(nodeAddr, ".") || !strings.Contains(nodeAddr, ":") {
- isValidNode = false
- r.logger.Warn("Removing invalid node %s with address %s", nodeID, nodeAddr)
- }
- // Cannot reduce cluster below 1 node, unless it's an invalid node cleanup
- if isValidNode && len(r.clusterNodes) <= 1 {
- r.mu.Unlock()
- return fmt.Errorf("cannot remove last node from cluster")
- }
- // Save old cluster nodes for majority calculation during config change
- r.oldClusterNodes = make(map[string]string)
- for k, v := range r.clusterNodes {
- r.oldClusterNodes[k] = v
- }
- // Create new config without the removed node
- newNodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- if k != nodeID {
- newNodes[k] = v
- }
- }
- // Create config change entry with ClusterConfig
- configIndex := r.log.LastIndex() + 1
- entry := LogEntry{
- Index: configIndex,
- Term: r.currentTerm,
- Type: EntryConfig,
- Config: &ClusterConfig{Nodes: newNodes},
- }
- // Mark config change as pending and store the index
- r.pendingConfigChange = true
- r.configChangeIndex = configIndex
- // Get the address of node being removed for cleanup
- removedAddr := r.clusterNodes[nodeID]
- // Immediately apply the new configuration
- delete(r.clusterNodes, nodeID)
- r.rebuildPeersList()
- delete(r.nextIndex, removedAddr)
- delete(r.matchIndex, removedAddr)
- r.mu.Unlock()
- // Append the config change entry to log
- if err := r.log.Append(entry); err != nil {
- r.mu.Lock()
- r.pendingConfigChange = false
- r.oldClusterNodes = nil
- r.configChangeIndex = 0
- // Rollback - this is tricky but we try our best
- r.clusterNodes[nodeID] = removedAddr
- r.rebuildPeersList()
- r.mu.Unlock()
- return fmt.Errorf("failed to append config entry: %w", err)
- }
- r.logger.Info("Removing node %s from cluster, config at index %d", nodeID, entry.Index)
- // Trigger replication
- go r.sendHeartbeats()
- return nil
- }
- // AddNodeWithForward adds a node, forwarding to leader if necessary
- // This is the recommended method for applications to use
- func (r *Raft) AddNodeWithForward(nodeID, address string) error {
- // Try local operation first
- err := r.AddNode(nodeID, address)
- if err == nil {
- return nil
- }
- // Check if we're not the leader
- r.mu.RLock()
- state := r.state
- leaderID := r.leaderID
- leaderAddr := r.clusterNodes[leaderID]
- r.mu.RUnlock()
- if state == Leader {
- // We are leader but AddNode failed for other reasons
- return err
- }
- // Not leader, forward to leader
- if leaderID == "" {
- return fmt.Errorf("no leader available")
- }
- if leaderAddr == "" {
- return fmt.Errorf("leader %s address not found in cluster", leaderID)
- }
- // Forward to leader
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- args := &AddNodeArgs{NodeID: nodeID, Address: address}
- reply, err := r.transport.ForwardAddNode(ctx, leaderAddr, args)
- if err != nil {
- return fmt.Errorf("forward failed: %w", err)
- }
- if !reply.Success {
- return fmt.Errorf("leader rejected: %s", reply.Error)
- }
- return nil
- }
- // RemoveNodeWithForward removes a node, forwarding to leader if necessary
- // This is the recommended method for applications to use
- func (r *Raft) RemoveNodeWithForward(nodeID string) error {
- // Try local operation first
- err := r.RemoveNode(nodeID)
- if err == nil {
- return nil
- }
- // Check if we're not the leader
- r.mu.RLock()
- state := r.state
- leaderID := r.leaderID
- leaderAddr := r.clusterNodes[leaderID]
- r.mu.RUnlock()
- if state == Leader {
- // We are leader but RemoveNode failed for other reasons
- return err
- }
- // Not leader, forward to leader
- if leaderID == "" {
- return fmt.Errorf("no leader available")
- }
- if leaderAddr == "" {
- return fmt.Errorf("leader %s address not found in cluster", leaderID)
- }
- // Forward to leader
- // Short timeout for removal as it shouldn't block long
- ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
- defer cancel()
- args := &RemoveNodeArgs{NodeID: nodeID}
- reply, err := r.transport.ForwardRemoveNode(ctx, leaderAddr, args)
- if err != nil {
- return fmt.Errorf("forward failed: %w", err)
- }
- if !reply.Success {
- return fmt.Errorf("leader rejected: %s", reply.Error)
- }
- return nil
- }
- // rebuildPeersList rebuilds the peers slice from clusterNodes
- func (r *Raft) rebuildPeersList() {
- r.peers = make([]string, 0, len(r.clusterNodes)-1)
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- r.peers = append(r.peers, addr)
- }
- }
- }
- // GetClusterNodes returns a copy of the current cluster membership
- func (r *Raft) GetClusterNodes() map[string]string {
- r.mu.RLock()
- defer r.mu.RUnlock()
- nodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- nodes[k] = v
- }
- return nodes
- }
- // applyConfigChange applies a configuration change entry
- func (r *Raft) applyConfigChange(entry *LogEntry) {
- if entry.Config == nil || entry.Config.Nodes == nil {
- r.logger.Warn("Invalid config change entry at index %d", entry.Index)
- return
- }
- r.mu.Lock()
- defer r.mu.Unlock()
- // Update cluster configuration
- r.clusterNodes = make(map[string]string)
- for k, v := range entry.Config.Nodes {
- r.clusterNodes[k] = v
- }
- r.rebuildPeersList()
- // Persist the new configuration
- if err := r.storage.SaveClusterConfig(&ClusterConfig{Nodes: r.clusterNodes}); err != nil {
- r.logger.Error("Failed to persist cluster config: %v", err)
- }
- // Clear pending flag and old cluster state
- r.pendingConfigChange = false
- r.oldClusterNodes = nil
- r.configChangeIndex = 0
- // If we were joining a cluster and now have multiple nodes, we've successfully joined
- if r.joiningCluster && len(r.clusterNodes) > 1 {
- r.joiningCluster = false
- r.logger.Info("Successfully joined cluster with %d nodes", len(r.clusterNodes))
- }
- // Check if we have been removed from the cluster
- if _, exists := r.clusterNodes[r.nodeID]; !exists {
- r.logger.Warn("Node %s removed from cluster configuration", r.nodeID)
- // We could shut down here, or just stay as a listener.
- // For now, let's step down to Follower if we are not already.
- if r.state != Follower {
- r.becomeFollower(r.currentTerm)
- }
- }
- r.logger.Info("Applied config change at index %d, cluster now has %d nodes", entry.Index, len(r.clusterNodes))
- // If we're the leader, update leader state
- if r.state == Leader {
- // Check if we are still in the cluster
- if _, exists := r.clusterNodes[r.nodeID]; !exists {
- r.logger.Warn("Leader removed from cluster (self), stepping down")
- r.becomeFollower(r.currentTerm)
- return
- }
- // Initialize nextIndex/matchIndex for any new nodes
- lastIndex := r.log.LastIndex()
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- if _, exists := r.nextIndex[addr]; !exists {
- r.nextIndex[addr] = lastIndex + 1
- r.matchIndex[addr] = 0
- }
- }
- }
- // Clean up removed nodes
- validAddrs := make(map[string]bool)
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- validAddrs[addr] = true
- }
- }
- for addr := range r.nextIndex {
- if !validAddrs[addr] {
- delete(r.nextIndex, addr)
- delete(r.matchIndex, addr)
- }
- }
- }
- }
- // ==================== ReadIndex (Linearizable Reads) ====================
- // readIndexLoop handles read index requests
- func (r *Raft) readIndexLoop() {
- for {
- select {
- case <-r.stopCh:
- return
- case req := <-r.readIndexCh:
- r.processReadIndexRequest(req)
- }
- }
- }
- // processReadIndexRequest processes a single read index request
- func (r *Raft) processReadIndexRequest(req *readIndexRequest) {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- req.done <- ErrNotLeader
- return
- }
- r.mu.RUnlock()
- // Confirm leadership by sending heartbeats and waiting for majority ack
- if !r.confirmLeadership() {
- req.done <- ErrLeadershipLost
- return
- }
- // Wait for apply to catch up to readIndex
- if err := r.waitApply(req.readIndex, r.config.ProposeTimeout); err != nil {
- req.done <- err
- return
- }
- req.done <- nil
- }
- // ReadIndex implements linearizable reads
- // It ensures that the read sees all writes that were committed before the read started
- func (r *Raft) ReadIndex() (uint64, error) {
- r.mu.RLock()
- if r.state != Leader {
- leaderID := r.leaderID
- r.mu.RUnlock()
- return 0, NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
- }
- readIndex := r.commitIndex
- r.mu.RUnlock()
- atomic.AddUint64(&r.metrics.ReadIndexRequests, 1)
- // Create request and send to processing loop
- req := &readIndexRequest{
- readIndex: readIndex,
- done: make(chan error, 1),
- }
- select {
- case r.readIndexCh <- req:
- case <-r.stopCh:
- return 0, ErrShutdown
- case <-time.After(r.config.ProposeTimeout):
- return 0, ErrTimeout
- }
- // Wait for result
- select {
- case err := <-req.done:
- if err != nil {
- return 0, err
- }
- atomic.AddUint64(&r.metrics.ReadIndexSuccess, 1)
- return readIndex, nil
- case <-r.stopCh:
- return 0, ErrShutdown
- case <-time.After(r.config.ProposeTimeout):
- return 0, ErrTimeout
- }
- }
- // confirmLeadership confirms we're still leader by getting acks from majority
- func (r *Raft) confirmLeadership() bool {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return false
- }
- currentTerm := r.currentTerm
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- r.mu.RUnlock()
- // Single node cluster - we're always the leader
- if clusterSize == 1 {
- return true
- }
- // Send heartbeats and count acks
- r.sendHeartbeats()
- // Wait briefly and check if we're still leader
- time.Sleep(r.config.HeartbeatInterval)
- r.mu.RLock()
- stillLeader := r.state == Leader && r.currentTerm == currentTerm
- r.mu.RUnlock()
- return stillLeader
- }
- // waitApply waits until lastApplied >= index
- func (r *Raft) waitApply(index uint64, timeout time.Duration) error {
- deadline := time.Now().Add(timeout)
- for {
- r.mu.RLock()
- lastApplied := r.lastApplied
- r.mu.RUnlock()
- if lastApplied >= index {
- return nil
- }
- if time.Now().After(deadline) {
- return ErrTimeout
- }
- time.Sleep(1 * time.Millisecond)
- }
- }
- // ==================== Health Check ====================
- // HealthCheck returns the current health status of the node
- func (r *Raft) HealthCheck() HealthStatus {
- r.mu.RLock()
- defer r.mu.RUnlock()
- clusterNodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- clusterNodes[k] = v
- }
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- logBehind := uint64(0)
- if r.commitIndex > r.lastApplied {
- logBehind = r.commitIndex - r.lastApplied
- }
- // Consider healthy if we're leader or have a known leader
- isHealthy := r.state == Leader || r.leaderID != ""
- return HealthStatus{
- NodeID: r.nodeID,
- State: r.state.String(),
- Term: r.currentTerm,
- LeaderID: r.leaderID,
- ClusterSize: clusterSize,
- ClusterNodes: clusterNodes,
- CommitIndex: r.commitIndex,
- LastApplied: r.lastApplied,
- LogBehind: logBehind,
- LastHeartbeat: r.lastHeartbeat,
- IsHealthy: isHealthy,
- Uptime: time.Since(r.startTime),
- }
- }
- // GetMetrics returns the current metrics
- func (r *Raft) GetMetrics() Metrics {
- return Metrics{
- Term: atomic.LoadUint64(&r.metrics.Term),
- ProposalsTotal: atomic.LoadUint64(&r.metrics.ProposalsTotal),
- ProposalsSuccess: atomic.LoadUint64(&r.metrics.ProposalsSuccess),
- ProposalsFailed: atomic.LoadUint64(&r.metrics.ProposalsFailed),
- ProposalsForwarded: atomic.LoadUint64(&r.metrics.ProposalsForwarded),
- AppendsSent: atomic.LoadUint64(&r.metrics.AppendsSent),
- AppendsReceived: atomic.LoadUint64(&r.metrics.AppendsReceived),
- AppendsSuccess: atomic.LoadUint64(&r.metrics.AppendsSuccess),
- AppendsFailed: atomic.LoadUint64(&r.metrics.AppendsFailed),
- ElectionsStarted: atomic.LoadUint64(&r.metrics.ElectionsStarted),
- ElectionsWon: atomic.LoadUint64(&r.metrics.ElectionsWon),
- PreVotesStarted: atomic.LoadUint64(&r.metrics.PreVotesStarted),
- PreVotesGranted: atomic.LoadUint64(&r.metrics.PreVotesGranted),
- SnapshotsTaken: atomic.LoadUint64(&r.metrics.SnapshotsTaken),
- SnapshotsInstalled: atomic.LoadUint64(&r.metrics.SnapshotsInstalled),
- SnapshotsSent: atomic.LoadUint64(&r.metrics.SnapshotsSent),
- ReadIndexRequests: atomic.LoadUint64(&r.metrics.ReadIndexRequests),
- ReadIndexSuccess: atomic.LoadUint64(&r.metrics.ReadIndexSuccess),
- LeadershipTransfers: atomic.LoadUint64(&r.metrics.LeadershipTransfers),
- LeadershipTransferSuccess: atomic.LoadUint64(&r.metrics.LeadershipTransferSuccess),
- }
- }
- // ==================== Leadership Transfer ====================
- // TransferLeadership transfers leadership to the specified node
- func (r *Raft) TransferLeadership(targetID string) error {
- r.mu.Lock()
- if r.state != Leader {
- r.mu.Unlock()
- return ErrNotLeader
- }
- if targetID == r.nodeID {
- r.mu.Unlock()
- return fmt.Errorf("cannot transfer to self")
- }
- targetAddr, exists := r.clusterNodes[targetID]
- if !exists {
- r.mu.Unlock()
- return fmt.Errorf("target node %s not in cluster", targetID)
- }
- if r.transferring {
- r.mu.Unlock()
- return fmt.Errorf("leadership transfer already in progress")
- }
- r.transferring = true
- r.transferTarget = targetID
- r.transferDeadline = time.Now().Add(r.config.ElectionTimeoutMax * 2)
- currentTerm := r.currentTerm
- atomic.AddUint64(&r.metrics.LeadershipTransfers, 1)
- r.mu.Unlock()
- r.logger.Info("Starting leadership transfer to %s", targetID)
- // Step 1: Sync target to our log
- if err := r.syncFollowerToLatest(targetAddr); err != nil {
- r.mu.Lock()
- r.transferring = false
- r.transferTarget = ""
- r.mu.Unlock()
- return fmt.Errorf("failed to sync target: %w", err)
- }
- // Step 2: Send TimeoutNow RPC
- args := &TimeoutNowArgs{
- Term: currentTerm,
- LeaderID: r.nodeID,
- }
- ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
- defer cancel()
- reply, err := r.transport.TimeoutNow(ctx, targetAddr, args)
- if err != nil {
- r.mu.Lock()
- r.transferring = false
- r.transferTarget = ""
- r.mu.Unlock()
- return fmt.Errorf("TimeoutNow RPC failed: %w", err)
- }
- if !reply.Success {
- r.mu.Lock()
- r.transferring = false
- r.transferTarget = ""
- r.mu.Unlock()
- return fmt.Errorf("target rejected leadership transfer")
- }
- atomic.AddUint64(&r.metrics.LeadershipTransferSuccess, 1)
- r.logger.Info("Leadership transfer to %s initiated successfully", targetID)
- // Note: We don't immediately step down; we wait for the target to win election
- // and send us an AppendEntries with higher term
- return nil
- }
- // syncFollowerToLatest ensures the follower is caught up to our log
- func (r *Raft) syncFollowerToLatest(peerAddr string) error {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return ErrNotLeader
- }
- currentTerm := r.currentTerm
- leaderCommit := r.commitIndex
- lastIndex := r.log.LastIndex()
- r.mu.RUnlock()
- // Keep replicating until follower is caught up
- deadline := time.Now().Add(r.config.ElectionTimeoutMax * 2)
- for time.Now().Before(deadline) {
- r.mu.RLock()
- if r.state != Leader || r.currentTerm != currentTerm {
- r.mu.RUnlock()
- return ErrLeadershipLost
- }
- matchIndex := r.matchIndex[peerAddr]
- r.mu.RUnlock()
- if matchIndex >= lastIndex {
- return nil // Caught up
- }
- // Trigger replication
- r.replicateToPeer(peerAddr, currentTerm, leaderCommit)
- time.Sleep(10 * time.Millisecond)
- }
- return ErrTimeout
- }
- // HandleTimeoutNow handles TimeoutNow RPC (for leadership transfer)
- func (r *Raft) HandleTimeoutNow(args *TimeoutNowArgs) *TimeoutNowReply {
- r.mu.Lock()
- defer r.mu.Unlock()
- reply := &TimeoutNowReply{
- Term: r.currentTerm,
- Success: false,
- }
- // Only accept if we're a follower and the term matches
- if args.Term < r.currentTerm {
- return reply
- }
- if r.state != Follower {
- return reply
- }
- // Immediately start election
- r.logger.Info("Received TimeoutNow from %s, starting immediate election", args.LeaderID)
- r.state = Candidate
- reply.Success = true
- return reply
- }
- // HandleReadIndex handles ReadIndex RPC
- func (r *Raft) HandleReadIndex(args *ReadIndexArgs) *ReadIndexReply {
- reply := &ReadIndexReply{
- Success: false,
- }
- readIndex, err := r.ReadIndex()
- if err != nil {
- reply.Error = err.Error()
- return reply
- }
- reply.ReadIndex = readIndex
- reply.Success = true
- return reply
- }
- // HandleGet handles Get RPC for remote KV reads
- func (r *Raft) HandleGet(args *GetArgs) *GetReply {
- reply := &GetReply{
- Found: false,
- }
- if r.config.GetHandler == nil {
- reply.Error = "get handler not configured"
- return reply
- }
- value, found := r.config.GetHandler(args.Key)
- reply.Value = value
- reply.Found = found
- return reply
- }
|