| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823 |
- package raft
- import (
- "context"
- "fmt"
- "math/rand"
- "os"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
- // Raft represents a Raft consensus node
- type Raft struct {
- mu sync.RWMutex
- // Node identity
- nodeID string
- peers []string
- // Cluster membership - maps nodeID to address
- clusterNodes map[string]string
- // Current state
- state NodeState
- currentTerm uint64
- votedFor string
- leaderID string
- // Log management
- log *LogManager
- storage Storage
- commitIndex uint64
- lastApplied uint64
- // Leader state
- nextIndex map[string]uint64
- matchIndex map[string]uint64
- // Configuration
- config *Config
- // Communication
- transport Transport
- // Channels
- applyCh chan ApplyMsg
- stopCh chan struct{}
- commitCh chan struct{}
- // Election timer
- electionTimer *time.Timer
- heartbeatTimer *time.Timer
- // Statistics (deprecated, use metrics instead)
- stats Stats
- // Metrics for monitoring
- metrics Metrics
- // Logger
- logger Logger
- // Running flag
- running int32
- // Replication trigger channel - used to batch replication requests
- replicationCh chan struct{}
- // Pending config change - only one config change can be pending at a time
- pendingConfigChange bool
- // Old cluster nodes - used for majority calculation during config change
- // This ensures we use the old cluster size until the config change is committed
- oldClusterNodes map[string]string
- // Index of the pending config change entry
- configChangeIndex uint64
- // Joining cluster state - when a standalone node is being added to a cluster
- // This prevents the node from starting elections while syncing
- joiningCluster bool
- joiningClusterTime time.Time
- // Compaction state - prevents concurrent compaction
- compacting int32
- // Dynamic compaction threshold - updated after each compaction
- // Next compaction triggers when log size >= this value
- // Formula: lastCompactionSize * 1.5 (or initial SnapshotThreshold)
- nextCompactionThreshold uint64
- // WaitGroup to track goroutines for clean shutdown
- wg sync.WaitGroup
- // Leadership transfer state
- transferring bool
- transferTarget string
- transferDeadline time.Time
- // Last heartbeat received (for health check)
- lastHeartbeat time.Time
- // Start time (for uptime calculation)
- startTime time.Time
- // ReadIndex waiting queue
- readIndexCh chan *readIndexRequest
- // Snapshot receiving state (for chunked transfer)
- pendingSnapshot *pendingSnapshotState
- // Last contact time for each peer (for leader check)
- lastContact map[string]time.Time
- }
- // readIndexRequest represents a pending read index request
- type readIndexRequest struct {
- readIndex uint64
- done chan error
- }
- // pendingSnapshotState tracks chunked snapshot reception
- type pendingSnapshotState struct {
- lastIncludedIndex uint64
- lastIncludedTerm uint64
- data []byte
- receivedBytes uint64
- }
- // Stats holds runtime statistics
- type Stats struct {
- Term uint64
- State string
- LastLogIndex uint64
- LastLogTerm uint64
- CommitIndex uint64
- LastApplied uint64
- LeaderID string
- VotesReceived int
- AppendsSent int64
- AppendsReceived int64
- ClusterSize int // Number of nodes in cluster
- ClusterNodes map[string]string // NodeID -> Address mapping
- }
- // ConsoleLogger implements Logger with console output
- type ConsoleLogger struct {
- Prefix string
- Level int // 0=debug, 1=info, 2=warn, 3=error
- mu sync.Mutex
- }
- func NewConsoleLogger(prefix string, level int) *ConsoleLogger {
- return &ConsoleLogger{Prefix: prefix, Level: level}
- }
- func (l *ConsoleLogger) log(level int, levelStr, format string, args ...interface{}) {
- if level < l.Level {
- return
- }
- l.mu.Lock()
- defer l.mu.Unlock()
- msg := fmt.Sprintf(format, args...)
- fmt.Printf("[%s] %s [%s] %s\n", time.Now().Format("15:04:05.000"), l.Prefix, levelStr, msg)
- }
- func (l *ConsoleLogger) Debug(format string, args ...interface{}) {
- l.log(0, "DEBUG", format, args...)
- }
- func (l *ConsoleLogger) Info(format string, args ...interface{}) {
- l.log(1, "INFO", format, args...)
- }
- func (l *ConsoleLogger) Warn(format string, args ...interface{}) {
- l.log(2, "WARN", format, args...)
- }
- func (l *ConsoleLogger) Error(format string, args ...interface{}) {
- l.log(3, "ERROR", format, args...)
- }
- // NewRaft creates a new Raft node
- func NewRaft(config *Config, transport Transport, applyCh chan ApplyMsg) (*Raft, error) {
- if config.Logger == nil {
- config.Logger = NewConsoleLogger(config.NodeID, 1)
- }
- // Create storage
- storage, err := NewHybridStorage(config.DataDir, config.MemoryLogCapacity, config.Logger)
- if err != nil {
- return nil, fmt.Errorf("failed to create storage: %w", err)
- }
- // Create log manager
- logMgr := NewLogManager(storage, config.Logger)
- // Load persistent state
- state, err := storage.GetState()
- if err != nil {
- return nil, fmt.Errorf("failed to load state: %w", err)
- }
- // Load or initialize cluster configuration
- clusterNodes := make(map[string]string)
- // Try to load saved cluster config first
- savedConfig, err := storage.GetClusterConfig()
- if err != nil {
- return nil, fmt.Errorf("failed to load cluster config: %w", err)
- }
- if savedConfig != nil && len(savedConfig.Nodes) > 0 {
- // Use saved config
- clusterNodes = savedConfig.Nodes
- config.Logger.Info("Loaded cluster config with %d nodes", len(clusterNodes))
- } else {
- // Initialize from config
- if config.ClusterNodes != nil && len(config.ClusterNodes) > 0 {
- for k, v := range config.ClusterNodes {
- clusterNodes[k] = v
- }
- } else {
- // Build from Peers + self (backward compatibility)
- clusterNodes[config.NodeID] = config.ListenAddr
- if config.PeerMap != nil {
- for k, v := range config.PeerMap {
- clusterNodes[k] = v
- }
- }
- }
- // Save initial config
- if len(clusterNodes) > 0 {
- if err := storage.SaveClusterConfig(&ClusterConfig{Nodes: clusterNodes}); err != nil {
- config.Logger.Warn("Failed to save initial cluster config: %v", err)
- }
- }
- }
- // Build peers list from cluster nodes (excluding self)
- var peers []string
- for nodeID, addr := range clusterNodes {
- if nodeID != config.NodeID {
- peers = append(peers, addr)
- }
- }
- r := &Raft{
- nodeID: config.NodeID,
- peers: peers,
- clusterNodes: clusterNodes,
- state: Follower,
- currentTerm: state.CurrentTerm,
- votedFor: state.VotedFor,
- log: logMgr,
- storage: storage,
- config: config,
- transport: transport,
- applyCh: applyCh,
- stopCh: make(chan struct{}),
- commitCh: make(chan struct{}, 100), // Increased buffer for event-driven apply
- replicationCh: make(chan struct{}, 1),
- nextIndex: make(map[string]uint64),
- matchIndex: make(map[string]uint64),
- logger: config.Logger,
- readIndexCh: make(chan *readIndexRequest, 100),
- lastHeartbeat: time.Now(),
- lastContact: make(map[string]time.Time),
- }
- // Initialize metrics
- r.metrics.Term = state.CurrentTerm
- // Set RPC handler
- transport.SetRPCHandler(r)
- return r, nil
- }
- // Start starts the Raft node
- func (r *Raft) Start() error {
- if !atomic.CompareAndSwapInt32(&r.running, 0, 1) {
- return fmt.Errorf("already running")
- }
- // Record start time
- r.startTime = time.Now()
- // Start transport
- if err := r.transport.Start(); err != nil {
- return fmt.Errorf("failed to start transport: %w", err)
- }
- // Restore FSM from snapshot if exists
- // This must happen before starting apply loop to ensure FSM state is restored
- if err := r.restoreFromSnapshot(); err != nil {
- // Suppress warning if it's just a missing file (first start)
- if !os.IsNotExist(err) && !strings.Contains(err.Error(), "no such file") {
- r.logger.Warn("Failed to restore from snapshot: %v", err)
- } else {
- r.logger.Info("No snapshot found, starting with empty state")
- }
- // Continue anyway - the node can still function, just without historical state
- }
- // Start election timer
- r.resetElectionTimer()
- // Start background goroutines with WaitGroup tracking
- r.wg.Add(4)
- go func() {
- defer r.wg.Done()
- r.applyLoop()
- }()
- go func() {
- defer r.wg.Done()
- r.replicationLoop()
- }()
- go func() {
- defer r.wg.Done()
- r.mainLoop()
- }()
- go func() {
- defer r.wg.Done()
- r.readIndexLoop()
- }()
- r.logger.Info("Raft node started")
- return nil
- }
- // Stop stops the Raft node
- func (r *Raft) Stop() error {
- if !atomic.CompareAndSwapInt32(&r.running, 1, 0) {
- return fmt.Errorf("not running")
- }
- // Signal all goroutines to stop
- close(r.stopCh)
- // Stop timers first to prevent new operations
- r.mu.Lock()
- if r.electionTimer != nil {
- r.electionTimer.Stop()
- }
- if r.heartbeatTimer != nil {
- r.heartbeatTimer.Stop()
- }
- r.mu.Unlock()
- // Wait for all goroutines to finish with timeout
- done := make(chan struct{})
- go func() {
- r.wg.Wait()
- close(done)
- }()
- select {
- case <-done:
- // All goroutines exited cleanly
- case <-time.After(3 * time.Second):
- r.logger.Warn("Timeout waiting for goroutines to stop")
- }
- // Stop transport (has its own timeout)
- if err := r.transport.Stop(); err != nil {
- r.logger.Error("Failed to stop transport: %v", err)
- }
- if err := r.storage.Close(); err != nil {
- r.logger.Error("Failed to close storage: %v", err)
- }
- r.logger.Info("Raft node stopped")
- return nil
- }
- // mainLoop is the main event loop
- func (r *Raft) mainLoop() {
- for {
- select {
- case <-r.stopCh:
- return
- default:
- }
- r.mu.RLock()
- state := r.state
- r.mu.RUnlock()
- switch state {
- case Follower:
- r.runFollower()
- case Candidate:
- r.runCandidate()
- case Leader:
- r.runLeader()
- }
- }
- }
- // runFollower handles follower behavior
- func (r *Raft) runFollower() {
- r.logger.Debug("Running as follower in term %d", r.currentTerm)
- for {
- select {
- case <-r.stopCh:
- return
- case <-r.electionTimer.C:
- r.mu.Lock()
- if r.state == Follower {
- // If we're joining a cluster, don't start elections
- // Give time for the leader to sync us up
- if r.joiningCluster {
- // Allow joining for up to 30 seconds
- if time.Since(r.joiningClusterTime) < 30*time.Second {
- r.logger.Debug("Suppressing election during cluster join")
- r.resetElectionTimer()
- r.mu.Unlock()
- continue
- }
- // Timeout - clear joining state
- r.joiningCluster = false
- r.logger.Warn("Cluster join timeout, resuming normal election behavior")
- }
- r.logger.Debug("Election timeout, becoming candidate")
- r.state = Candidate
- r.leaderID = "" // Clear leaderID so we can vote for self in Pre-Vote
- }
- r.mu.Unlock()
- return
- }
- }
- }
- // runCandidate handles candidate behavior (leader election)
- // Implements Pre-Vote mechanism to prevent term explosion
- func (r *Raft) runCandidate() {
- // Phase 1: Pre-Vote (don't increment term yet)
- if !r.runPreVote() {
- // Pre-vote failed, wait for timeout before retrying
- r.mu.Lock()
- r.resetElectionTimer()
- r.mu.Unlock()
- select {
- case <-r.stopCh:
- return
- case <-r.electionTimer.C:
- // Timer expired, will retry pre-vote
- }
- return
- }
- // Phase 2: Actual election (pre-vote succeeded, now increment term)
- r.mu.Lock()
- r.currentTerm++
- r.votedFor = r.nodeID
- r.leaderID = ""
- currentTerm := r.currentTerm
- // Update metrics
- atomic.StoreUint64(&r.metrics.Term, currentTerm)
-
- if err := r.persistState(); err != nil {
- r.logger.Error("Failed to persist state during election: %v", err)
- r.mu.Unlock()
- return // Cannot proceed without persisting state
- }
- atomic.AddUint64(&r.metrics.ElectionsStarted, 1)
- r.resetElectionTimer()
- r.mu.Unlock()
- r.logger.Debug("Starting election for term %d", currentTerm)
- // Get current peers list
- r.mu.RLock()
- currentPeers := make([]string, len(r.peers))
- copy(currentPeers, r.peers)
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- r.mu.RUnlock()
- // Request actual votes from all peers
- votes := 1 // Vote for self
- voteCh := make(chan bool, len(currentPeers))
- lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
- for _, peer := range currentPeers {
- go func(peer string) {
- args := &RequestVoteArgs{
- Term: currentTerm,
- CandidateID: r.nodeID,
- LastLogIndex: lastLogIndex,
- LastLogTerm: lastLogTerm,
- PreVote: false,
- }
- ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
- defer cancel()
- reply, err := r.transport.RequestVote(ctx, peer, args)
- if err != nil {
- r.logger.Debug("RequestVote to %s failed: %v", peer, err)
- voteCh <- false
- return
- }
- r.mu.Lock()
- defer r.mu.Unlock()
- if reply.Term > r.currentTerm {
- r.becomeFollower(reply.Term)
- voteCh <- false
- return
- }
- voteCh <- reply.VoteGranted
- }(peer)
- }
- // Wait for votes - majority is (clusterSize/2) + 1
- needed := clusterSize/2 + 1
- // If we already have enough votes (single-node cluster), become leader immediately
- if votes >= needed {
- r.mu.Lock()
- if r.state == Candidate && r.currentTerm == currentTerm {
- r.becomeLeader()
- }
- r.mu.Unlock()
- return
- }
- for i := 0; i < len(currentPeers); i++ {
- select {
- case <-r.stopCh:
- return
- case <-r.electionTimer.C:
- r.logger.Debug("Election timeout, will start new election")
- return
- case granted := <-voteCh:
- if granted {
- votes++
- if votes >= needed {
- r.mu.Lock()
- if r.state == Candidate && r.currentTerm == currentTerm {
- r.becomeLeader()
- }
- r.mu.Unlock()
- return
- }
- }
- }
- // Check if we're still a candidate
- r.mu.RLock()
- if r.state != Candidate {
- r.mu.RUnlock()
- return
- }
- r.mu.RUnlock()
- }
- // Election failed, wait for timeout
- r.mu.RLock()
- stillCandidate := r.state == Candidate
- r.mu.RUnlock()
- if stillCandidate {
- r.logger.Debug("Election failed, waiting for timeout (got %d/%d votes)", votes, needed)
- select {
- case <-r.stopCh:
- return
- case <-r.electionTimer.C:
- // Timer expired, will start new election
- }
- }
- }
- // runPreVote sends pre-vote requests to check if we can win an election
- // Returns true if we got majority pre-votes, false otherwise
- func (r *Raft) runPreVote() bool {
- r.mu.RLock()
- currentTerm := r.currentTerm
- currentPeers := make([]string, len(r.peers))
- copy(currentPeers, r.peers)
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- leaderID := r.leaderID
- r.mu.RUnlock()
- // Pre-vote uses term+1 but doesn't actually increment it
- preVoteTerm := currentTerm + 1
- lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
- r.logger.Debug("Starting pre-vote for term %d", preVoteTerm)
- // Per Raft Pre-Vote optimization (§9.6): if we have a known leader,
- // don't vote for self. This prevents standalone nodes from constantly
- // electing themselves when they should be joining an existing cluster.
- preVotes := 0
- if leaderID == "" {
- preVotes = 1 // Vote for self only if we don't know of a leader
- } else {
- r.logger.Debug("Pre-vote: not self-voting because we have leader %s", leaderID)
- }
- preVoteCh := make(chan bool, len(currentPeers))
- for _, peer := range currentPeers {
- go func(peer string) {
- args := &RequestVoteArgs{
- Term: preVoteTerm,
- CandidateID: r.nodeID,
- LastLogIndex: lastLogIndex,
- LastLogTerm: lastLogTerm,
- PreVote: true,
- }
- ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
- defer cancel()
- reply, err := r.transport.RequestVote(ctx, peer, args)
- if err != nil {
- r.logger.Debug("PreVote to %s failed: %v", peer, err)
- preVoteCh <- false
- return
- }
- // For pre-vote, we don't step down even if reply.Term > currentTerm
- // because the other node might also be doing pre-vote
- preVoteCh <- reply.VoteGranted
- }(peer)
- }
- // Wait for pre-votes with a shorter timeout - majority is (clusterSize/2) + 1
- needed := clusterSize/2 + 1
- // If we already have enough votes (single-node cluster with no known leader), succeed immediately
- if preVotes >= needed {
- r.logger.Debug("Pre-vote succeeded immediately (got %d/%d pre-votes)", preVotes, needed)
- return true
- }
- timeout := time.After(500 * time.Millisecond)
- for i := 0; i < len(currentPeers); i++ {
- select {
- case <-r.stopCh:
- return false
- case <-timeout:
- r.logger.Debug("Pre-vote timeout (got %d/%d pre-votes)", preVotes, needed)
- return false
- case granted := <-preVoteCh:
- if granted {
- preVotes++
- if preVotes >= needed {
- r.logger.Debug("Pre-vote succeeded (got %d/%d pre-votes)", preVotes, needed)
- return true
- }
- }
- }
- // Check if we're still a candidate
- r.mu.RLock()
- if r.state != Candidate {
- r.mu.RUnlock()
- return false
- }
- r.mu.RUnlock()
- }
- r.logger.Debug("Pre-vote failed (got %d/%d pre-votes)", preVotes, needed)
- return false
- }
- // runLeader handles leader behavior
- func (r *Raft) runLeader() {
- r.logger.Debug("Running as leader in term %d", r.currentTerm)
- // Send initial heartbeat
- r.sendHeartbeats()
- // Start heartbeat timer
- r.mu.Lock()
- r.heartbeatTimer = time.NewTimer(r.config.HeartbeatInterval)
- r.mu.Unlock()
- for {
- select {
- case <-r.stopCh:
- return
- case <-r.heartbeatTimer.C:
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return
- }
- r.mu.RUnlock()
- r.sendHeartbeats()
- r.heartbeatTimer.Reset(r.config.HeartbeatInterval)
- case <-r.commitCh:
- r.updateCommitIndex()
- }
- }
- }
- // becomeFollower transitions to follower state
- func (r *Raft) becomeFollower(term uint64) {
- oldState := r.state
- oldTerm := r.currentTerm
- r.state = Follower
- r.currentTerm = term
- // Update metrics
- atomic.StoreUint64(&r.metrics.Term, term)
- r.votedFor = ""
- r.leaderID = ""
- // Must persist before responding - use mustPersistState for critical transitions
- r.mustPersistState()
- r.resetElectionTimer()
- // Clear leadership transfer state
- r.transferring = false
- r.transferTarget = ""
- // Only log significant state changes
- if oldState == Leader && term > oldTerm {
- // Stepping down from leader is notable
- r.logger.Debug("Stepped down from leader in term %d", term)
- }
- }
- // becomeLeader transitions to leader state
- func (r *Raft) becomeLeader() {
- r.state = Leader
- r.leaderID = r.nodeID
- // Update metrics
- atomic.AddUint64(&r.metrics.ElectionsWon, 1)
- // Initialize leader state for all peers
- lastIndex := r.log.LastIndex()
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- r.nextIndex[addr] = lastIndex + 1
- r.matchIndex[addr] = 0
- }
- }
- // Also handle legacy peers
- for _, peer := range r.peers {
- if _, exists := r.nextIndex[peer]; !exists {
- r.nextIndex[peer] = lastIndex + 1
- r.matchIndex[peer] = 0
- }
- }
- r.logger.Info("Became leader in term %d with %d peers", r.currentTerm, len(r.clusterNodes)-1)
- // Append a no-op entry to commit entries from previous terms
- // This is a standard Raft optimization - the leader appends a no-op
- // entry in its current term to quickly commit all pending entries
- noopEntry := LogEntry{
- Index: r.log.LastIndex() + 1,
- Term: r.currentTerm,
- Type: EntryNoop,
- Command: nil,
- }
- if err := r.log.Append(noopEntry); err != nil {
- r.logger.Error("Failed to append no-op entry: %v", err)
- } else {
- r.logger.Debug("Appended no-op entry at index %d for term %d", noopEntry.Index, noopEntry.Term)
- }
- }
- // sendHeartbeats sends AppendEntries RPCs to all peers
- func (r *Raft) sendHeartbeats() {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return
- }
- currentTerm := r.currentTerm
- leaderCommit := r.commitIndex
- // Get all peer addresses
- peerAddrs := make([]string, 0, len(r.clusterNodes))
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- peerAddrs = append(peerAddrs, addr)
- }
- }
- // Also include legacy peers not in clusterNodes
- for _, peer := range r.peers {
- found := false
- for _, addr := range peerAddrs {
- if addr == peer {
- found = true
- break
- }
- }
- if !found {
- peerAddrs = append(peerAddrs, peer)
- }
- }
- r.mu.RUnlock()
- for _, peer := range peerAddrs {
- go r.replicateToPeer(peer, currentTerm, leaderCommit)
- }
- }
- // replicateToPeer sends AppendEntries to a specific peer
- func (r *Raft) replicateToPeer(peer string, term, leaderCommit uint64) {
- r.mu.RLock()
- if r.state != Leader || r.currentTerm != term {
- r.mu.RUnlock()
- return
- }
- nextIndex := r.nextIndex[peer]
- r.mu.RUnlock()
- // Get entries to send
- entries, prevLogIndex, prevLogTerm, err := r.log.GetEntriesForFollower(nextIndex, r.config.MaxLogEntriesPerRequest)
- if err == ErrCompacted {
- // Need to send snapshot
- r.sendSnapshot(peer)
- return
- }
- if err != nil {
- r.logger.Debug("Failed to get entries for %s: %v", peer, err)
- return
- }
- args := &AppendEntriesArgs{
- Term: term,
- LeaderID: r.nodeID,
- PrevLogIndex: prevLogIndex,
- PrevLogTerm: prevLogTerm,
- Entries: entries,
- LeaderCommit: leaderCommit,
- }
- ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
- defer cancel()
- reply, err := r.transport.AppendEntries(ctx, peer, args)
- if err != nil {
- r.logger.Debug("AppendEntries to %s failed: %v", peer, err)
- return
- }
- atomic.AddInt64(&r.stats.AppendsSent, 1)
- r.mu.Lock()
- defer r.mu.Unlock()
- if reply.Term > r.currentTerm {
- r.becomeFollower(reply.Term)
- return
- }
- if r.state != Leader || r.currentTerm != term {
- return
- }
- if reply.Success {
- // Update contact time
- r.lastContact[peer] = time.Now()
- // Update nextIndex and matchIndex
- if len(entries) > 0 {
- newMatchIndex := entries[len(entries)-1].Index
- if newMatchIndex > r.matchIndex[peer] {
- r.matchIndex[peer] = newMatchIndex
- r.nextIndex[peer] = newMatchIndex + 1
- // Try to update commit index
- select {
- case r.commitCh <- struct{}{}:
- default:
- }
- }
- }
- } else {
- // Even if failed (log conflict), we contacted the peer
- r.lastContact[peer] = time.Now()
- // Decrement nextIndex and retry
- if reply.ConflictTerm > 0 {
- // Find the last entry of ConflictTerm in our log
- found := false
- for idx := r.log.LastIndex(); idx >= r.log.FirstIndex(); idx-- {
- t, err := r.log.GetTerm(idx)
- if err != nil {
- break
- }
- if t == reply.ConflictTerm {
- r.nextIndex[peer] = idx + 1
- found = true
- break
- }
- if t < reply.ConflictTerm {
- break
- }
- }
- if !found {
- r.nextIndex[peer] = reply.ConflictIndex
- }
- } else if reply.ConflictIndex > 0 {
- r.nextIndex[peer] = reply.ConflictIndex
- } else {
- r.nextIndex[peer] = max(1, r.nextIndex[peer]-1)
- }
- }
- }
- // sendSnapshot sends a snapshot to a peer with chunked transfer support
- func (r *Raft) sendSnapshot(peer string) {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return
- }
- term := r.currentTerm
- chunkSize := r.config.SnapshotChunkSize
- if chunkSize <= 0 {
- chunkSize = 1024 * 1024 // Default 1MB
- }
- r.mu.RUnlock()
- data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
- if err != nil {
- r.logger.Error("Failed to get snapshot: %v", err)
- return
- }
- atomic.AddUint64(&r.metrics.SnapshotsSent, 1)
- r.logger.Info("Sending snapshot to %s: %d bytes, lastIndex=%d, lastTerm=%d",
- peer, len(data), lastIndex, lastTerm)
- // Send snapshot in chunks
- totalSize := len(data)
- offset := 0
- for offset < totalSize {
- // Check if we're still leader
- r.mu.RLock()
- if r.state != Leader || r.currentTerm != term {
- r.mu.RUnlock()
- r.logger.Debug("Aborting snapshot send: no longer leader")
- return
- }
- r.mu.RUnlock()
- // Calculate chunk size
- end := offset + chunkSize
- if end > totalSize {
- end = totalSize
- }
- chunk := data[offset:end]
- done := end >= totalSize
- args := &InstallSnapshotArgs{
- Term: term,
- LeaderID: r.nodeID,
- LastIncludedIndex: lastIndex,
- LastIncludedTerm: lastTerm,
- Offset: uint64(offset),
- Data: chunk,
- Done: done,
- }
- ctx, cancel := context.WithTimeout(context.Background(), r.config.SnapshotRPCTimeout)
- reply, err := r.transport.InstallSnapshot(ctx, peer, args)
- cancel()
- if err != nil {
- r.logger.Error("InstallSnapshot chunk to %s failed at offset %d: %v", peer, offset, err)
- return
- }
- if reply.Term > r.currentTerm {
- r.mu.Lock()
- r.becomeFollower(reply.Term)
- r.mu.Unlock()
- return
- }
- if !reply.Success {
- r.logger.Error("InstallSnapshot chunk rejected by %s at offset %d", peer, offset)
- return
- }
- offset = end
- }
- // Snapshot fully sent and accepted
- r.mu.Lock()
- defer r.mu.Unlock()
- if r.state != Leader || r.currentTerm != term {
- return
- }
- r.nextIndex[peer] = lastIndex + 1
- r.matchIndex[peer] = lastIndex
- r.logger.Info("Snapshot to %s completed, nextIndex=%d", peer, lastIndex+1)
- }
- // updateCommitIndex updates the commit index based on matchIndex
- func (r *Raft) updateCommitIndex() {
- r.mu.Lock()
- defer r.mu.Unlock()
- if r.state != Leader {
- return
- }
- // For config change entries, use the OLD cluster for majority calculation
- // This ensures that adding a node doesn't require the new node's vote
- // until the config change itself is committed
- votingNodes := r.clusterNodes
- if r.pendingConfigChange && r.oldClusterNodes != nil {
- votingNodes = r.oldClusterNodes
- }
- // Get current cluster size from voting nodes
- clusterSize := len(votingNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- // Find the highest index replicated on a majority
- for n := r.log.LastIndex(); n > r.commitIndex; n-- {
- term, err := r.log.GetTerm(n)
- if err != nil {
- continue
- }
- // Only commit entries from current term
- if term != r.currentTerm {
- continue
- }
- // Count replicas (including self)
- count := 1 // Self
- for nodeID, addr := range votingNodes {
- if nodeID != r.nodeID {
- if r.matchIndex[addr] >= n {
- count++
- }
- }
- }
- // Also check legacy peers
- for _, peer := range r.peers {
- // Avoid double counting if peer is already in votingNodes
- alreadyCounted := false
- for _, addr := range votingNodes {
- if addr == peer {
- alreadyCounted = true
- break
- }
- }
- if !alreadyCounted && r.matchIndex[peer] >= n {
- count++
- }
- }
- // Majority is (n/2) + 1
- needed := clusterSize/2 + 1
- if count >= needed {
- r.commitIndex = n
- break
- }
- }
- }
- // applyLoop applies committed entries to the state machine
- // Uses event-driven model with fallback polling for reliability
- func (r *Raft) applyLoop() {
- // Use a ticker as fallback for missed signals (matches original 10ms polling)
- ticker := time.NewTicker(10 * time.Millisecond)
- defer ticker.Stop()
- for {
- select {
- case <-r.stopCh:
- return
- case <-r.commitCh:
- // Event-driven: triggered when commitIndex is updated
- r.applyCommitted()
- case <-ticker.C:
- // Fallback: check periodically in case we missed a signal
- r.applyCommitted()
- }
- }
- }
- // applyCommitted applies all committed but not yet applied entries
- func (r *Raft) applyCommitted() {
- r.mu.Lock()
- commitIndex := r.commitIndex
- lastApplied := r.lastApplied
- firstIndex := r.log.FirstIndex()
- lastLogIndex := r.log.LastIndex()
- r.mu.Unlock()
- // Safety check: ensure lastApplied is within valid range
- // If lastApplied is below firstIndex (due to snapshot), skip to firstIndex
- if lastApplied < firstIndex && firstIndex > 0 {
- r.mu.Lock()
- r.lastApplied = firstIndex
- lastApplied = firstIndex
- r.mu.Unlock()
- r.logger.Debug("Adjusted lastApplied to firstIndex %d after compaction", firstIndex)
- }
- // Safety check: don't try to apply beyond what we have in log
- if commitIndex > lastLogIndex {
- commitIndex = lastLogIndex
- }
- for lastApplied < commitIndex {
- lastApplied++
- // Skip if entry has been compacted
- if lastApplied < firstIndex {
- continue
- }
- entry, err := r.log.GetEntry(lastApplied)
- if err != nil {
- // If entry is compacted, skip ahead
- if err == ErrCompacted {
- r.mu.Lock()
- newFirstIndex := r.log.FirstIndex()
- if lastApplied < newFirstIndex {
- r.lastApplied = newFirstIndex
- lastApplied = newFirstIndex
- }
- r.mu.Unlock()
- continue
- }
- r.logger.Error("Failed to get entry %d: %v (firstIndex=%d, lastIndex=%d)",
- lastApplied, err, r.log.FirstIndex(), r.log.LastIndex())
- break
- }
- // Handle config change entries
- if entry.Type == EntryConfig {
- r.applyConfigChange(entry)
- r.mu.Lock()
- r.lastApplied = lastApplied
- r.mu.Unlock()
- continue
- }
- // Handle no-op entries (just update lastApplied, don't send to state machine)
- if entry.Type == EntryNoop {
- r.mu.Lock()
- r.lastApplied = lastApplied
- r.mu.Unlock()
- continue
- }
- // Normal command entry
- msg := ApplyMsg{
- CommandValid: true,
- Command: entry.Command,
- CommandIndex: entry.Index,
- CommandTerm: entry.Term,
- }
- select {
- case r.applyCh <- msg:
- case <-r.stopCh:
- return
- }
- r.mu.Lock()
- r.lastApplied = lastApplied
- r.mu.Unlock()
- }
- // Check if log compaction is needed
- r.maybeCompactLog()
- }
- // maybeCompactLog checks if automatic log compaction should be triggered
- // It uses a dynamic threshold to prevent "compaction thrashing":
- // - First compaction triggers at SnapshotThreshold (default 100,000)
- // - After compaction, next threshold = current_log_size * 1.5
- // - This prevents every new entry from triggering compaction if log stays large
- func (r *Raft) maybeCompactLog() {
- // Skip if no snapshot provider is configured
- if r.config.SnapshotProvider == nil {
- return
- }
- // Check if compaction is already in progress (atomic check-and-set)
- if !atomic.CompareAndSwapInt32(&r.compacting, 0, 1) {
- return
- }
- // Ensure we release the compacting flag when done
- defer atomic.StoreInt32(&r.compacting, 0)
- r.mu.RLock()
- lastApplied := r.lastApplied
- firstIndex := r.log.FirstIndex()
- initialThreshold := r.config.SnapshotThreshold
- minRetention := r.config.SnapshotMinRetention
- isLeader := r.state == Leader
- dynamicThreshold := r.nextCompactionThreshold
- r.mu.RUnlock()
- // Guard against underflow: ensure lastApplied > firstIndex
- if lastApplied <= firstIndex {
- return
- }
- // Calculate current log size
- logSize := lastApplied - firstIndex
- // Determine effective threshold:
- // - Use initial threshold if no compaction has occurred yet (dynamicThreshold == 0)
- // - Otherwise use the dynamic threshold
- effectiveThreshold := initialThreshold
- if dynamicThreshold > 0 {
- effectiveThreshold = dynamicThreshold
- }
- // Check if we have enough entries to warrant compaction
- if logSize <= effectiveThreshold {
- return
- }
- // Guard against underflow: ensure lastApplied > minRetention
- if lastApplied <= minRetention {
- return
- }
- // Calculate the safe compaction point
- // We need to retain at least minRetention entries for follower catch-up
- compactUpTo := lastApplied - minRetention
- if compactUpTo <= firstIndex {
- return // Not enough entries to compact while maintaining retention
- }
- // For leader, also consider the minimum nextIndex of all followers
- // to avoid compacting entries that followers still need
- if isLeader {
- r.mu.RLock()
- minNextIndex := lastApplied
- for _, nextIdx := range r.nextIndex {
- if nextIdx < minNextIndex {
- minNextIndex = nextIdx
- }
- }
- r.mu.RUnlock()
- // Don't compact entries that followers still need
- // Keep a buffer of minRetention entries before the slowest follower
- if minNextIndex > minRetention {
- followerSafePoint := minNextIndex - minRetention
- if followerSafePoint < compactUpTo {
- compactUpTo = followerSafePoint
- }
- } else {
- // Slowest follower is too far behind, don't compact
- // They will need a full snapshot anyway
- compactUpTo = firstIndex // Effectively skip compaction
- }
- }
- // Final check - make sure we're actually compacting something meaningful
- if compactUpTo <= firstIndex {
- return
- }
- // Get snapshot from application layer
- snapshotData, err := r.config.SnapshotProvider()
- if err != nil {
- r.logger.Error("Failed to get snapshot from provider: %v", err)
- return
- }
- // Get the term at the compaction point
- term, err := r.log.GetTerm(compactUpTo)
- if err != nil {
- r.logger.Error("Failed to get term for compaction index %d: %v", compactUpTo, err)
- return
- }
- // Save snapshot
- if err := r.storage.SaveSnapshot(snapshotData, compactUpTo, term); err != nil {
- r.logger.Error("Failed to save snapshot: %v", err)
- return
- }
- // Compact the log
- if err := r.log.Compact(compactUpTo); err != nil {
- r.logger.Error("Failed to compact log: %v", err)
- return
- }
- // Calculate new log size after compaction
- newLogSize := lastApplied - compactUpTo
- // Update dynamic threshold for next compaction: current size * 1.5
- // This prevents "compaction thrashing" where every entry triggers compaction
- r.mu.Lock()
- r.nextCompactionThreshold = newLogSize + newLogSize/2 // newLogSize * 1.5
- // Ensure threshold doesn't go below the initial threshold
- if r.nextCompactionThreshold < initialThreshold {
- r.nextCompactionThreshold = initialThreshold
- }
- r.mu.Unlock()
- r.logger.Info("Auto compaction completed: compacted up to index %d (term %d), log size %d -> %d, next threshold: %d",
- compactUpTo, term, logSize, newLogSize, r.nextCompactionThreshold)
- }
- // resetElectionTimer resets the election timeout
- func (r *Raft) resetElectionTimer() {
- timeout := r.config.ElectionTimeoutMin +
- time.Duration(rand.Int63n(int64(r.config.ElectionTimeoutMax-r.config.ElectionTimeoutMin)))
- if r.electionTimer == nil {
- r.electionTimer = time.NewTimer(timeout)
- } else {
- if !r.electionTimer.Stop() {
- select {
- case <-r.electionTimer.C:
- default:
- }
- }
- r.electionTimer.Reset(timeout)
- }
- }
- // persistState saves the current state to stable storage
- // Returns error if persistence fails - caller MUST handle this for safety
- func (r *Raft) persistState() error {
- state := &PersistentState{
- CurrentTerm: r.currentTerm,
- VotedFor: r.votedFor,
- }
- if err := r.storage.SaveState(state); err != nil {
- r.logger.Error("Failed to persist state: %v", err)
- return fmt.Errorf("%w: %v", ErrPersistFailed, err)
- }
- return nil
- }
- // mustPersistState saves state and panics on failure
- // Use this only in critical paths where failure is unrecoverable
- func (r *Raft) mustPersistState() {
- if err := r.persistState(); err != nil {
- // In production, you might want to trigger a graceful shutdown instead
- r.logger.Error("CRITICAL: Failed to persist state, node may be in inconsistent state: %v", err)
- panic(err)
- }
- }
- // HandleRequestVote handles RequestVote RPCs (including pre-vote)
- func (r *Raft) HandleRequestVote(args *RequestVoteArgs) *RequestVoteReply {
- r.mu.Lock()
- defer r.mu.Unlock()
- reply := &RequestVoteReply{
- Term: r.currentTerm,
- VoteGranted: false,
- }
- // Handle pre-vote separately
- if args.PreVote {
- return r.handlePreVote(args)
- }
- // Reply false if term < currentTerm
- if args.Term < r.currentTerm {
- return reply
- }
- // If term > currentTerm, become follower
- if args.Term > r.currentTerm {
- r.becomeFollower(args.Term)
- }
- reply.Term = r.currentTerm
- // Check if we can vote for this candidate
- if (r.votedFor == "" || r.votedFor == args.CandidateID) &&
- r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
- r.votedFor = args.CandidateID
- if err := r.persistState(); err != nil {
- // Cannot grant vote if we can't persist the decision
- r.logger.Error("Failed to persist vote for %s: %v", args.CandidateID, err)
- return reply
- }
- r.resetElectionTimer()
- reply.VoteGranted = true
- r.logger.Debug("Granted vote to %s for term %d", args.CandidateID, args.Term)
- }
- return reply
- }
- // handlePreVote handles pre-vote requests
- // Pre-vote doesn't change our state, just checks if we would vote
- func (r *Raft) handlePreVote(args *RequestVoteArgs) *RequestVoteReply {
- reply := &RequestVoteReply{
- Term: r.currentTerm,
- VoteGranted: false,
- }
- // For pre-vote, we check:
- // 1. The candidate's term is at least as high as ours
- // 2. The candidate's log is at least as up-to-date as ours
- // 3. We don't have a current leader (or the candidate's term is higher)
- if args.Term < r.currentTerm {
- return reply
- }
- // Per Raft Pre-Vote optimization (§9.6): reject pre-vote if we have a current
- // leader and the candidate's term is not higher than ours. This prevents
- // disruptive elections when a partitioned node tries to rejoin.
- if r.leaderID != "" && args.Term <= r.currentTerm {
- r.logger.Debug("Rejecting pre-vote from %s: have leader %s", args.CandidateID, r.leaderID)
- return reply
- }
- // Grant pre-vote if log is up-to-date
- // Note: we don't check votedFor for pre-vote, and we don't update any state
- if r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
- reply.VoteGranted = true
- r.logger.Debug("Granted pre-vote to %s for term %d", args.CandidateID, args.Term)
- }
- return reply
- }
- // HandleAppendEntries handles AppendEntries RPCs
- func (r *Raft) HandleAppendEntries(args *AppendEntriesArgs) *AppendEntriesReply {
- r.mu.Lock()
- defer r.mu.Unlock()
- atomic.AddInt64(&r.stats.AppendsReceived, 1)
- reply := &AppendEntriesReply{
- Term: r.currentTerm,
- Success: false,
- }
- // Check if we're a standalone node being added to a cluster
- // A standalone node has only itself in clusterNodes
- isStandalone := len(r.clusterNodes) == 1
- if _, hasSelf := r.clusterNodes[r.nodeID]; isStandalone && hasSelf {
- // We're standalone and receiving AppendEntries from an external leader
- // This means we're being added to a cluster - suppress elections
- if !r.joiningCluster {
- r.joiningCluster = true
- r.joiningClusterTime = time.Now()
- r.logger.Info("Detected cluster join in progress, suppressing elections")
- }
- // When joining, accept higher terms from the leader to sync up
- if args.Term > r.currentTerm {
- r.becomeFollower(args.Term)
- }
- }
- // Reply false if term < currentTerm
- if args.Term < r.currentTerm {
- // But still reset timer if we're joining a cluster to prevent elections
- if r.joiningCluster {
- r.resetElectionTimer()
- }
- return reply
- }
- // If term > currentTerm, or we're a candidate, or we're a leader receiving
- // AppendEntries from another leader (split-brain scenario during cluster merge),
- // become follower. In Raft, there can only be one leader per term.
- if args.Term > r.currentTerm || r.state == Candidate || r.state == Leader {
- r.becomeFollower(args.Term)
- }
- // Update leader info and reset election timer
- r.leaderID = args.LeaderID
- r.lastHeartbeat = time.Now()
- r.resetElectionTimer()
- reply.Term = r.currentTerm
- // Try to append entries
- success, conflictIndex, conflictTerm := r.log.AppendEntriesFromLeader(
- args.PrevLogIndex, args.PrevLogTerm, args.Entries)
- if !success {
- reply.ConflictIndex = conflictIndex
- reply.ConflictTerm = conflictTerm
- return reply
- }
- reply.Success = true
- // Update commit index safely
- if args.LeaderCommit > r.commitIndex {
- // Get our actual last log index
- lastLogIndex := r.log.LastIndex()
- // Calculate what index the entries would have reached
- lastNewEntry := args.PrevLogIndex
- if len(args.Entries) > 0 {
- lastNewEntry = args.Entries[len(args.Entries)-1].Index
- }
- // Commit index should not exceed what we actually have in log
- newCommitIndex := args.LeaderCommit
- if newCommitIndex > lastNewEntry {
- newCommitIndex = lastNewEntry
- }
- if newCommitIndex > lastLogIndex {
- newCommitIndex = lastLogIndex
- }
- // Only advance commit index
- if newCommitIndex > r.commitIndex {
- r.commitIndex = newCommitIndex
- }
- }
- return reply
- }
- // HandleInstallSnapshot handles InstallSnapshot RPCs with chunked transfer support
- func (r *Raft) HandleInstallSnapshot(args *InstallSnapshotArgs) *InstallSnapshotReply {
- r.mu.Lock()
- defer r.mu.Unlock()
- reply := &InstallSnapshotReply{
- Term: r.currentTerm,
- Success: false,
- }
- if args.Term < r.currentTerm {
- return reply
- }
- if args.Term > r.currentTerm {
- r.becomeFollower(args.Term)
- }
- r.leaderID = args.LeaderID
- r.lastHeartbeat = time.Now()
- r.resetElectionTimer()
- reply.Term = r.currentTerm
- // Skip if we already have this or a newer snapshot applied
- if args.LastIncludedIndex <= r.lastApplied {
- r.logger.Debug("Ignoring snapshot at index %d, already applied up to %d",
- args.LastIncludedIndex, r.lastApplied)
- reply.Success = true // Still success to let leader know we don't need it
- return reply
- }
- // Handle chunked transfer
- if args.Offset == 0 {
- // First chunk - start new pending snapshot
- r.pendingSnapshot = &pendingSnapshotState{
- lastIncludedIndex: args.LastIncludedIndex,
- lastIncludedTerm: args.LastIncludedTerm,
- data: make([]byte, 0),
- receivedBytes: 0,
- }
- r.logger.Info("Starting snapshot reception at index %d, term %d",
- args.LastIncludedIndex, args.LastIncludedTerm)
- }
- // Validate we're receiving the expected snapshot
- if r.pendingSnapshot == nil ||
- r.pendingSnapshot.lastIncludedIndex != args.LastIncludedIndex ||
- r.pendingSnapshot.lastIncludedTerm != args.LastIncludedTerm {
- r.logger.Warn("Unexpected snapshot chunk: expected index %d, got %d",
- r.pendingSnapshot.lastIncludedIndex, args.LastIncludedIndex)
- return reply
- }
- // Validate offset matches what we've received
- if uint64(args.Offset) != r.pendingSnapshot.receivedBytes {
- r.logger.Warn("Unexpected chunk offset: expected %d, got %d",
- r.pendingSnapshot.receivedBytes, args.Offset)
- return reply
- }
- // Append chunk data
- r.pendingSnapshot.data = append(r.pendingSnapshot.data, args.Data...)
- r.pendingSnapshot.receivedBytes += uint64(len(args.Data))
- reply.Success = true
- r.logger.Debug("Received snapshot chunk: offset=%d, size=%d, done=%v",
- args.Offset, len(args.Data), args.Done)
- // If not done, wait for more chunks
- if !args.Done {
- return reply
- }
- // All chunks received - apply the snapshot
- r.logger.Info("Installing complete snapshot: %d bytes at index %d, term %d",
- len(r.pendingSnapshot.data), args.LastIncludedIndex, args.LastIncludedTerm)
- atomic.AddUint64(&r.metrics.SnapshotsInstalled, 1)
- // Save snapshot
- if err := r.storage.SaveSnapshot(r.pendingSnapshot.data, args.LastIncludedIndex, args.LastIncludedTerm); err != nil {
- r.logger.Error("Failed to save snapshot: %v", err)
- r.pendingSnapshot = nil
- reply.Success = false
- return reply
- }
- // Compact log
- if err := r.log.Compact(args.LastIncludedIndex); err != nil {
- r.logger.Error("Failed to compact log: %v", err)
- }
- // Update state - must update both commitIndex and lastApplied
- if args.LastIncludedIndex > r.commitIndex {
- r.commitIndex = args.LastIncludedIndex
- }
- // Always update lastApplied to snapshot index to prevent trying to apply compacted entries
- r.lastApplied = args.LastIncludedIndex
- // Send snapshot to application (non-blocking with timeout)
- // Use the complete pendingSnapshot data, not the last chunk
- msg := ApplyMsg{
- SnapshotValid: true,
- Snapshot: r.pendingSnapshot.data,
- SnapshotIndex: args.LastIncludedIndex,
- SnapshotTerm: args.LastIncludedTerm,
- }
- // Clear pending snapshot
- r.pendingSnapshot = nil
- // Try to send, but don't block indefinitely
- select {
- case r.applyCh <- msg:
- r.logger.Debug("Sent snapshot to application")
- case <-time.After(100 * time.Millisecond):
- r.logger.Warn("Timeout sending snapshot to application, will retry")
- // The application will still get correct state via normal apply loop
- }
- return reply
- }
- // Propose proposes a new command to be replicated
- func (r *Raft) Propose(command []byte) (uint64, uint64, bool) {
- r.mu.Lock()
- defer r.mu.Unlock()
- if r.state != Leader {
- return 0, 0, false
- }
- // Check connectivity (Lease Check)
- // If we haven't heard from a majority of peers within ElectionTimeout,
- // we shouldn't accept new commands because we might be partitioned.
- if len(r.clusterNodes) > 1 {
- activePeers := 1 // Self
- now := time.Now()
- timeout := r.config.ElectionTimeoutMax
- for _, peer := range r.peers {
- if last, ok := r.lastContact[peer]; ok && now.Sub(last) < timeout {
- activePeers++
- }
- }
- // Check majority
- needed := len(r.clusterNodes)/2 + 1
- if activePeers < needed {
- r.logger.Warn("Rejecting Propose: lost contact with majority (active: %d, needed: %d)", activePeers, needed)
- return 0, 0, false
- }
- }
- index, err := r.log.AppendCommand(r.currentTerm, command)
- if err != nil {
- r.logger.Error("Failed to append command: %v", err)
- return 0, 0, false
- }
- r.matchIndex[r.nodeID] = index
- // For single-node cluster, we are the only voter and can commit immediately
- // This fixes the issue where commitCh never gets triggered without other peers
- if len(r.clusterNodes) <= 1 && len(r.peers) == 0 {
- // Single node: self is majority, trigger commit immediately
- select {
- case r.commitCh <- struct{}{}:
- default:
- }
- } else {
- // Multi-node: trigger replication to other nodes
- r.triggerReplication()
- }
- return index, r.currentTerm, true
- }
- // triggerReplication signals the replication loop to send heartbeats
- // This uses a non-blocking send to batch replication requests
- func (r *Raft) triggerReplication() {
- select {
- case r.replicationCh <- struct{}{}:
- default:
- // Replication already scheduled
- }
- }
- // replicationLoop handles batched replication
- // Uses simple delay-based batching: flush immediately when signaled, then wait
- // to allow more requests to accumulate before the next flush.
- func (r *Raft) replicationLoop() {
- for {
- select {
- case <-r.stopCh:
- return
- case <-r.replicationCh:
- // Flush and replicate immediately
- r.flushAndReplicate()
- // Wait briefly to allow batching of subsequent requests
- // This gives time for more proposals to queue up before the next flush
- time.Sleep(10 * time.Millisecond)
- }
- }
- }
- // flushAndReplicate flushes logs and sends heartbeats
- func (r *Raft) flushAndReplicate() {
- // Ensure logs are flushed to OS cache before sending to followers
- // This implements Group Commit with Flush (fast) instead of Sync (slow)
- if err := r.log.Flush(); err != nil {
- r.logger.Error("Failed to flush log: %v", err)
- }
- r.sendHeartbeats()
- }
- // ProposeWithForward proposes a command, forwarding to leader if necessary
- // This is the recommended method for applications to use
- func (r *Raft) ProposeWithForward(command []byte) (index uint64, term uint64, err error) {
- // Try local propose first
- idx, t, isLeader := r.Propose(command)
- if isLeader {
- return idx, t, nil
- }
- // Not leader, forward to leader
- r.mu.RLock()
- leaderID := r.leaderID
- // Use clusterNodes (dynamically maintained) to find leader address
- leaderAddr := r.clusterNodes[leaderID]
- r.mu.RUnlock()
- if leaderID == "" {
- return 0, 0, fmt.Errorf("no leader available")
- }
- if leaderAddr == "" {
- return 0, 0, fmt.Errorf("leader %s address not found in cluster", leaderID)
- }
- // Forward to leader
- ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
- defer cancel()
- args := &ProposeArgs{Command: command}
- reply, err := r.transport.ForwardPropose(ctx, leaderAddr, args)
- if err != nil {
- return 0, 0, fmt.Errorf("forward failed: %w", err)
- }
- if !reply.Success {
- return 0, 0, fmt.Errorf("leader rejected: %s", reply.Error)
- }
- return reply.Index, reply.Term, nil
- }
- // ForwardGet forwards a get request to the leader
- func (r *Raft) ForwardGet(key string) (string, bool, error) {
- // Check if we are leader (local read)
- if r.state == Leader {
- if r.config.GetHandler != nil {
- val, found := r.config.GetHandler(key)
- return val, found, nil
- }
- return "", false, fmt.Errorf("get handler not configured")
- }
- r.mu.RLock()
- leaderID := r.leaderID
- leaderAddr := r.clusterNodes[leaderID]
- r.mu.RUnlock()
- if leaderID == "" {
- return "", false, ErrNoLeader
- }
- if leaderAddr == "" {
- return "", false, fmt.Errorf("leader %s address not found", leaderID)
- }
- // Forward to leader
- ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
- defer cancel()
- args := &GetArgs{Key: key}
- reply, err := r.transport.ForwardGet(ctx, leaderAddr, args)
- if err != nil {
- return "", false, fmt.Errorf("forward failed: %w", err)
- }
- return reply.Value, reply.Found, nil
- }
- // HandlePropose handles forwarded propose requests
- func (r *Raft) HandlePropose(args *ProposeArgs) *ProposeReply {
- index, term, isLeader := r.Propose(args.Command)
- if !isLeader {
- return &ProposeReply{
- Success: false,
- Error: "not leader",
- }
- }
- return &ProposeReply{
- Success: true,
- Index: index,
- Term: term,
- }
- }
- // HandleAddNode handles forwarded AddNode requests
- func (r *Raft) HandleAddNode(args *AddNodeArgs) *AddNodeReply {
- err := r.AddNode(args.NodeID, args.Address)
- if err != nil {
- return &AddNodeReply{
- Success: false,
- Error: err.Error(),
- }
- }
- return &AddNodeReply{
- Success: true,
- }
- }
- // HandleRemoveNode handles forwarded RemoveNode requests
- func (r *Raft) HandleRemoveNode(args *RemoveNodeArgs) *RemoveNodeReply {
- err := r.RemoveNode(args.NodeID)
- if err != nil {
- return &RemoveNodeReply{
- Success: false,
- Error: err.Error(),
- }
- }
- return &RemoveNodeReply{
- Success: true,
- }
- }
- // GetState returns the current term and whether this node is leader
- func (r *Raft) GetState() (uint64, bool) {
- r.mu.RLock()
- defer r.mu.RUnlock()
- return r.currentTerm, r.state == Leader
- }
- // GetLeaderID returns the current leader ID
- func (r *Raft) GetLeaderID() string {
- r.mu.RLock()
- defer r.mu.RUnlock()
- return r.leaderID
- }
- // GetStats returns runtime statistics
- func (r *Raft) GetStats() Stats {
- r.mu.RLock()
- defer r.mu.RUnlock()
- lastIndex, lastTerm := r.log.LastIndexAndTerm()
- // Copy cluster nodes
- nodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- nodes[k] = v
- }
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- return Stats{
- Term: r.currentTerm,
- State: r.state.String(),
- LastLogIndex: lastIndex,
- LastLogTerm: lastTerm,
- CommitIndex: r.commitIndex,
- LastApplied: r.lastApplied,
- LeaderID: r.leaderID,
- AppendsSent: atomic.LoadInt64(&r.stats.AppendsSent),
- AppendsReceived: atomic.LoadInt64(&r.stats.AppendsReceived),
- ClusterSize: clusterSize,
- ClusterNodes: nodes,
- }
- }
- // restoreFromSnapshot restores the FSM from a snapshot at startup
- // This is called during Start() to ensure the FSM has the correct state
- // before processing any new commands
- func (r *Raft) restoreFromSnapshot() error {
- // Get snapshot from storage
- data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
- if err != nil {
- return fmt.Errorf("failed to get snapshot: %w", err)
- }
- // No snapshot exists
- if len(data) == 0 || lastIndex == 0 {
- return nil
- }
- r.logger.Info("Restoring FSM from snapshot at index %d, term %d (%d bytes)",
- lastIndex, lastTerm, len(data))
- // Update lastApplied to snapshot index to prevent re-applying compacted entries
- r.mu.Lock()
- if lastIndex > r.lastApplied {
- r.lastApplied = lastIndex
- }
- if lastIndex > r.commitIndex {
- r.commitIndex = lastIndex
- }
- r.mu.Unlock()
- // Send snapshot to FSM for restoration
- // Use a goroutine with timeout to avoid blocking if applyCh is full
- msg := ApplyMsg{
- SnapshotValid: true,
- Snapshot: data,
- SnapshotIndex: lastIndex,
- SnapshotTerm: lastTerm,
- }
- // Try to send with a timeout
- select {
- case r.applyCh <- msg:
- r.logger.Info("FSM restoration triggered from snapshot at index %d", lastIndex)
- case <-time.After(5 * time.Second):
- return fmt.Errorf("timeout sending snapshot to applyCh")
- }
- return nil
- }
- // TakeSnapshot takes a snapshot of the current state
- func (r *Raft) TakeSnapshot(data []byte, index uint64) error {
- r.mu.Lock()
- defer r.mu.Unlock()
- if index > r.lastApplied {
- return fmt.Errorf("snapshot index %d exceeds lastApplied %d", index, r.lastApplied)
- }
- term, err := r.log.GetTerm(index)
- if err != nil {
- return fmt.Errorf("failed to get term for index %d: %w", index, err)
- }
- if err := r.storage.SaveSnapshot(data, index, term); err != nil {
- return fmt.Errorf("failed to save snapshot: %w", err)
- }
- if err := r.log.Compact(index); err != nil {
- return fmt.Errorf("failed to compact log: %w", err)
- }
- r.logger.Info("Took snapshot at index %d, term %d", index, term)
- return nil
- }
- func max(a, b uint64) uint64 {
- if a > b {
- return a
- }
- return b
- }
- // ==================== Membership Change API ====================
- //
- // This implementation uses Single-Node Membership Change (also known as one-at-a-time changes)
- // as described in the Raft dissertation (§4.3). This is safe because:
- //
- // 1. We only allow one configuration change at a time (pendingConfigChange flag)
- // 2. For commits, we use the OLD cluster majority until the config change is committed
- // 3. The new node starts receiving entries immediately but doesn't affect majority calculation
- //
- // This approach is simpler than Joint Consensus and is sufficient for most use cases.
- // The invariant maintained is: any two majorities (old or new) must overlap.
- //
- // For adding a node: old majority = N/2+1, new majority = (N+1)/2+1 = N/2+1 (overlaps)
- // For removing a node: old majority = N/2+1, new majority = (N-1)/2+1 (overlaps if N > 1)
- //
- // WARNING: Avoid adding/removing multiple nodes rapidly. Wait for each change to be committed.
- // AddNode adds a new node to the cluster
- // This can only be called on the leader
- // The new node must already be running and reachable
- //
- // Safety guarantees:
- // - Only one config change can be in progress at a time
- // - The old cluster majority is used until the config change is committed
- // - Returns error if leadership is lost during the operation
- func (r *Raft) AddNode(nodeID, address string) error {
- r.mu.Lock()
- // Must be leader
- if r.state != Leader {
- leaderID := r.leaderID
- r.mu.Unlock()
- return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
- }
- // Check if we're in the middle of a leadership transfer
- if r.transferring {
- r.mu.Unlock()
- return fmt.Errorf("leadership transfer in progress")
- }
- // Check if there's already a pending config change
- if r.pendingConfigChange {
- r.mu.Unlock()
- return ErrConfigInFlight
- }
- // Validate nodeID and address
- if nodeID == "" {
- r.mu.Unlock()
- return fmt.Errorf("nodeID cannot be empty")
- }
- if address == "" {
- r.mu.Unlock()
- return fmt.Errorf("address cannot be empty")
- }
- // Check if node already exists
- if _, exists := r.clusterNodes[nodeID]; exists {
- r.mu.Unlock()
- return fmt.Errorf("node %s already exists in cluster", nodeID)
- }
- // Check if address is already used by another node
- for existingID, existingAddr := range r.clusterNodes {
- if existingAddr == address {
- r.mu.Unlock()
- return fmt.Errorf("address %s is already used by node %s", address, existingID)
- }
- }
- // Save old cluster nodes for majority calculation during config change
- // This ensures we use the OLD cluster size until the config change is committed
- r.oldClusterNodes = make(map[string]string)
- for k, v := range r.clusterNodes {
- r.oldClusterNodes[k] = v
- }
- // Create new config with the added node
- newNodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- newNodes[k] = v
- }
- newNodes[nodeID] = address
- // Create config change entry with ClusterConfig
- configIndex := r.log.LastIndex() + 1
- entry := LogEntry{
- Index: configIndex,
- Term: r.currentTerm,
- Type: EntryConfig,
- Config: &ClusterConfig{Nodes: newNodes},
- }
- // Mark config change as pending and store the index
- r.pendingConfigChange = true
- r.configChangeIndex = configIndex
- // Immediately apply the new configuration (for single-node changes, this is safe)
- // The new node will start receiving AppendEntries immediately
- r.clusterNodes[nodeID] = address
- r.peers = append(r.peers, address)
- // Set nextIndex to 1 (or firstIndex) so the new node syncs from the beginning
- // This is crucial - the new node's log is empty, so we must start from index 1
- firstIndex := r.log.FirstIndex()
- if firstIndex == 0 {
- firstIndex = 1
- }
- r.nextIndex[address] = firstIndex
- r.matchIndex[address] = 0
- r.mu.Unlock()
- // Append the config change entry to log
- if err := r.log.Append(entry); err != nil {
- r.mu.Lock()
- r.pendingConfigChange = false
- r.oldClusterNodes = nil
- r.configChangeIndex = 0
- // Rollback
- delete(r.clusterNodes, nodeID)
- r.rebuildPeersList()
- r.mu.Unlock()
- return fmt.Errorf("failed to append config entry: %w", err)
- }
- r.logger.Info("Adding node %s (%s) to cluster", nodeID, address)
- // Trigger immediate replication
- r.triggerReplication()
- return nil
- }
- // RemoveNode removes a node from the cluster
- // This can only be called on the leader
- // The node being removed can be any node except the leader itself
- //
- // Safety guarantees:
- // - Only one config change can be in progress at a time
- // - Cannot remove the leader (transfer leadership first)
- // - Cannot reduce cluster to 0 nodes
- // - The old cluster majority is used until the config change is committed
- func (r *Raft) RemoveNode(nodeID string) error {
- r.mu.Lock()
- // Must be leader
- if r.state != Leader {
- leaderID := r.leaderID
- r.mu.Unlock()
- return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
- }
- // Check if we're in the middle of a leadership transfer
- if r.transferring {
- r.mu.Unlock()
- return fmt.Errorf("leadership transfer in progress")
- }
- // Cannot remove self
- if nodeID == r.nodeID {
- r.mu.Unlock()
- return fmt.Errorf("cannot remove self from cluster, use TransferLeadership first")
- }
- // Validate nodeID
- if nodeID == "" {
- r.mu.Unlock()
- return fmt.Errorf("nodeID cannot be empty")
- }
- // Check if there's already a pending config change
- if r.pendingConfigChange {
- r.mu.Unlock()
- return ErrConfigInFlight
- }
- // Check if node exists
- if _, exists := r.clusterNodes[nodeID]; !exists {
- r.mu.Unlock()
- return fmt.Errorf("node %s not found in cluster", nodeID)
- }
- // Cannot reduce cluster below 1 node
- if len(r.clusterNodes) <= 1 {
- r.mu.Unlock()
- return fmt.Errorf("cannot remove last node from cluster")
- }
- // Save old cluster nodes for majority calculation during config change
- r.oldClusterNodes = make(map[string]string)
- for k, v := range r.clusterNodes {
- r.oldClusterNodes[k] = v
- }
- // Create new config without the removed node
- newNodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- if k != nodeID {
- newNodes[k] = v
- }
- }
- // Create config change entry with ClusterConfig
- configIndex := r.log.LastIndex() + 1
- entry := LogEntry{
- Index: configIndex,
- Term: r.currentTerm,
- Type: EntryConfig,
- Config: &ClusterConfig{Nodes: newNodes},
- }
- // Mark config change as pending and store the index
- r.pendingConfigChange = true
- r.configChangeIndex = configIndex
- // Get the address of node being removed for cleanup
- removedAddr := r.clusterNodes[nodeID]
- // Immediately apply the new configuration
- delete(r.clusterNodes, nodeID)
- r.rebuildPeersList()
- delete(r.nextIndex, removedAddr)
- delete(r.matchIndex, removedAddr)
- r.mu.Unlock()
- // Append the config change entry to log
- if err := r.log.Append(entry); err != nil {
- r.mu.Lock()
- r.pendingConfigChange = false
- r.oldClusterNodes = nil
- r.configChangeIndex = 0
- // Rollback - this is tricky but we try our best
- r.clusterNodes[nodeID] = removedAddr
- r.rebuildPeersList()
- r.mu.Unlock()
- return fmt.Errorf("failed to append config entry: %w", err)
- }
- r.logger.Info("Removing node %s from cluster, config at index %d", nodeID, entry.Index)
- // Trigger replication
- go r.sendHeartbeats()
- return nil
- }
- // AddNodeWithForward adds a node, forwarding to leader if necessary
- // This is the recommended method for applications to use
- func (r *Raft) AddNodeWithForward(nodeID, address string) error {
- // Try local operation first
- err := r.AddNode(nodeID, address)
- if err == nil {
- return nil
- }
- // Check if we're not the leader
- r.mu.RLock()
- state := r.state
- leaderID := r.leaderID
- leaderAddr := r.clusterNodes[leaderID]
- r.mu.RUnlock()
- if state == Leader {
- // We are leader but AddNode failed for other reasons
- return err
- }
- // Not leader, forward to leader
- if leaderID == "" {
- return fmt.Errorf("no leader available")
- }
- if leaderAddr == "" {
- return fmt.Errorf("leader %s address not found in cluster", leaderID)
- }
- // Forward to leader
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- args := &AddNodeArgs{NodeID: nodeID, Address: address}
- reply, err := r.transport.ForwardAddNode(ctx, leaderAddr, args)
- if err != nil {
- return fmt.Errorf("forward failed: %w", err)
- }
- if !reply.Success {
- return fmt.Errorf("leader rejected: %s", reply.Error)
- }
- return nil
- }
- // RemoveNodeWithForward removes a node, forwarding to leader if necessary
- // This is the recommended method for applications to use
- func (r *Raft) RemoveNodeWithForward(nodeID string) error {
- // Try local operation first
- err := r.RemoveNode(nodeID)
- if err == nil {
- return nil
- }
- // Check if we're not the leader
- r.mu.RLock()
- state := r.state
- leaderID := r.leaderID
- leaderAddr := r.clusterNodes[leaderID]
- r.mu.RUnlock()
- if state == Leader {
- // We are leader but RemoveNode failed for other reasons
- return err
- }
- // Not leader, forward to leader
- if leaderID == "" {
- return fmt.Errorf("no leader available")
- }
- if leaderAddr == "" {
- return fmt.Errorf("leader %s address not found in cluster", leaderID)
- }
- // Forward to leader
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- args := &RemoveNodeArgs{NodeID: nodeID}
- reply, err := r.transport.ForwardRemoveNode(ctx, leaderAddr, args)
- if err != nil {
- return fmt.Errorf("forward failed: %w", err)
- }
- if !reply.Success {
- return fmt.Errorf("leader rejected: %s", reply.Error)
- }
- return nil
- }
- // rebuildPeersList rebuilds the peers slice from clusterNodes
- func (r *Raft) rebuildPeersList() {
- r.peers = make([]string, 0, len(r.clusterNodes)-1)
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- r.peers = append(r.peers, addr)
- }
- }
- }
- // GetClusterNodes returns a copy of the current cluster membership
- func (r *Raft) GetClusterNodes() map[string]string {
- r.mu.RLock()
- defer r.mu.RUnlock()
- nodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- nodes[k] = v
- }
- return nodes
- }
- // applyConfigChange applies a configuration change entry
- func (r *Raft) applyConfigChange(entry *LogEntry) {
- if entry.Config == nil || entry.Config.Nodes == nil {
- r.logger.Warn("Invalid config change entry at index %d", entry.Index)
- return
- }
- r.mu.Lock()
- defer r.mu.Unlock()
- // Update cluster configuration
- r.clusterNodes = make(map[string]string)
- for k, v := range entry.Config.Nodes {
- r.clusterNodes[k] = v
- }
- r.rebuildPeersList()
- // Persist the new configuration
- if err := r.storage.SaveClusterConfig(&ClusterConfig{Nodes: r.clusterNodes}); err != nil {
- r.logger.Error("Failed to persist cluster config: %v", err)
- }
- // Clear pending flag and old cluster state
- r.pendingConfigChange = false
- r.oldClusterNodes = nil
- r.configChangeIndex = 0
- // If we were joining a cluster and now have multiple nodes, we've successfully joined
- if r.joiningCluster && len(r.clusterNodes) > 1 {
- r.joiningCluster = false
- r.logger.Info("Successfully joined cluster with %d nodes", len(r.clusterNodes))
- }
- r.logger.Info("Applied config change at index %d, cluster now has %d nodes", entry.Index, len(r.clusterNodes))
- // If we're the leader, update leader state
- if r.state == Leader {
- // Initialize nextIndex/matchIndex for any new nodes
- lastIndex := r.log.LastIndex()
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- if _, exists := r.nextIndex[addr]; !exists {
- r.nextIndex[addr] = lastIndex + 1
- r.matchIndex[addr] = 0
- }
- }
- }
- // Clean up removed nodes
- validAddrs := make(map[string]bool)
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- validAddrs[addr] = true
- }
- }
- for addr := range r.nextIndex {
- if !validAddrs[addr] {
- delete(r.nextIndex, addr)
- delete(r.matchIndex, addr)
- }
- }
- }
- }
- // ==================== ReadIndex (Linearizable Reads) ====================
- // readIndexLoop handles read index requests
- func (r *Raft) readIndexLoop() {
- for {
- select {
- case <-r.stopCh:
- return
- case req := <-r.readIndexCh:
- r.processReadIndexRequest(req)
- }
- }
- }
- // processReadIndexRequest processes a single read index request
- func (r *Raft) processReadIndexRequest(req *readIndexRequest) {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- req.done <- ErrNotLeader
- return
- }
- r.mu.RUnlock()
- // Confirm leadership by sending heartbeats and waiting for majority ack
- if !r.confirmLeadership() {
- req.done <- ErrLeadershipLost
- return
- }
- // Wait for apply to catch up to readIndex
- if err := r.waitApply(req.readIndex, r.config.ProposeTimeout); err != nil {
- req.done <- err
- return
- }
- req.done <- nil
- }
- // ReadIndex implements linearizable reads
- // It ensures that the read sees all writes that were committed before the read started
- func (r *Raft) ReadIndex() (uint64, error) {
- r.mu.RLock()
- if r.state != Leader {
- leaderID := r.leaderID
- r.mu.RUnlock()
- return 0, NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
- }
- readIndex := r.commitIndex
- r.mu.RUnlock()
- atomic.AddUint64(&r.metrics.ReadIndexRequests, 1)
- // Create request and send to processing loop
- req := &readIndexRequest{
- readIndex: readIndex,
- done: make(chan error, 1),
- }
- select {
- case r.readIndexCh <- req:
- case <-r.stopCh:
- return 0, ErrShutdown
- case <-time.After(r.config.ProposeTimeout):
- return 0, ErrTimeout
- }
- // Wait for result
- select {
- case err := <-req.done:
- if err != nil {
- return 0, err
- }
- atomic.AddUint64(&r.metrics.ReadIndexSuccess, 1)
- return readIndex, nil
- case <-r.stopCh:
- return 0, ErrShutdown
- case <-time.After(r.config.ProposeTimeout):
- return 0, ErrTimeout
- }
- }
- // confirmLeadership confirms we're still leader by getting acks from majority
- func (r *Raft) confirmLeadership() bool {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return false
- }
- currentTerm := r.currentTerm
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- r.mu.RUnlock()
- // Single node cluster - we're always the leader
- if clusterSize == 1 {
- return true
- }
- // Send heartbeats and count acks
- r.sendHeartbeats()
- // Wait briefly and check if we're still leader
- time.Sleep(r.config.HeartbeatInterval)
- r.mu.RLock()
- stillLeader := r.state == Leader && r.currentTerm == currentTerm
- r.mu.RUnlock()
- return stillLeader
- }
- // waitApply waits until lastApplied >= index
- func (r *Raft) waitApply(index uint64, timeout time.Duration) error {
- deadline := time.Now().Add(timeout)
- for {
- r.mu.RLock()
- lastApplied := r.lastApplied
- r.mu.RUnlock()
- if lastApplied >= index {
- return nil
- }
- if time.Now().After(deadline) {
- return ErrTimeout
- }
- time.Sleep(1 * time.Millisecond)
- }
- }
- // ==================== Health Check ====================
- // HealthCheck returns the current health status of the node
- func (r *Raft) HealthCheck() HealthStatus {
- r.mu.RLock()
- defer r.mu.RUnlock()
- clusterNodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- clusterNodes[k] = v
- }
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- logBehind := uint64(0)
- if r.commitIndex > r.lastApplied {
- logBehind = r.commitIndex - r.lastApplied
- }
- // Consider healthy if we're leader or have a known leader
- isHealthy := r.state == Leader || r.leaderID != ""
- return HealthStatus{
- NodeID: r.nodeID,
- State: r.state.String(),
- Term: r.currentTerm,
- LeaderID: r.leaderID,
- ClusterSize: clusterSize,
- ClusterNodes: clusterNodes,
- CommitIndex: r.commitIndex,
- LastApplied: r.lastApplied,
- LogBehind: logBehind,
- LastHeartbeat: r.lastHeartbeat,
- IsHealthy: isHealthy,
- Uptime: time.Since(r.startTime),
- }
- }
- // GetMetrics returns the current metrics
- func (r *Raft) GetMetrics() Metrics {
- return Metrics{
- Term: atomic.LoadUint64(&r.metrics.Term),
- ProposalsTotal: atomic.LoadUint64(&r.metrics.ProposalsTotal),
- ProposalsSuccess: atomic.LoadUint64(&r.metrics.ProposalsSuccess),
- ProposalsFailed: atomic.LoadUint64(&r.metrics.ProposalsFailed),
- ProposalsForwarded: atomic.LoadUint64(&r.metrics.ProposalsForwarded),
- AppendsSent: atomic.LoadUint64(&r.metrics.AppendsSent),
- AppendsReceived: atomic.LoadUint64(&r.metrics.AppendsReceived),
- AppendsSuccess: atomic.LoadUint64(&r.metrics.AppendsSuccess),
- AppendsFailed: atomic.LoadUint64(&r.metrics.AppendsFailed),
- ElectionsStarted: atomic.LoadUint64(&r.metrics.ElectionsStarted),
- ElectionsWon: atomic.LoadUint64(&r.metrics.ElectionsWon),
- PreVotesStarted: atomic.LoadUint64(&r.metrics.PreVotesStarted),
- PreVotesGranted: atomic.LoadUint64(&r.metrics.PreVotesGranted),
- SnapshotsTaken: atomic.LoadUint64(&r.metrics.SnapshotsTaken),
- SnapshotsInstalled: atomic.LoadUint64(&r.metrics.SnapshotsInstalled),
- SnapshotsSent: atomic.LoadUint64(&r.metrics.SnapshotsSent),
- ReadIndexRequests: atomic.LoadUint64(&r.metrics.ReadIndexRequests),
- ReadIndexSuccess: atomic.LoadUint64(&r.metrics.ReadIndexSuccess),
- LeadershipTransfers: atomic.LoadUint64(&r.metrics.LeadershipTransfers),
- LeadershipTransferSuccess: atomic.LoadUint64(&r.metrics.LeadershipTransferSuccess),
- }
- }
- // ==================== Leadership Transfer ====================
- // TransferLeadership transfers leadership to the specified node
- func (r *Raft) TransferLeadership(targetID string) error {
- r.mu.Lock()
- if r.state != Leader {
- r.mu.Unlock()
- return ErrNotLeader
- }
- if targetID == r.nodeID {
- r.mu.Unlock()
- return fmt.Errorf("cannot transfer to self")
- }
- targetAddr, exists := r.clusterNodes[targetID]
- if !exists {
- r.mu.Unlock()
- return fmt.Errorf("target node %s not in cluster", targetID)
- }
- if r.transferring {
- r.mu.Unlock()
- return fmt.Errorf("leadership transfer already in progress")
- }
- r.transferring = true
- r.transferTarget = targetID
- r.transferDeadline = time.Now().Add(r.config.ElectionTimeoutMax * 2)
- currentTerm := r.currentTerm
- atomic.AddUint64(&r.metrics.LeadershipTransfers, 1)
- r.mu.Unlock()
- r.logger.Info("Starting leadership transfer to %s", targetID)
- // Step 1: Sync target to our log
- if err := r.syncFollowerToLatest(targetAddr); err != nil {
- r.mu.Lock()
- r.transferring = false
- r.transferTarget = ""
- r.mu.Unlock()
- return fmt.Errorf("failed to sync target: %w", err)
- }
- // Step 2: Send TimeoutNow RPC
- args := &TimeoutNowArgs{
- Term: currentTerm,
- LeaderID: r.nodeID,
- }
- ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
- defer cancel()
- reply, err := r.transport.TimeoutNow(ctx, targetAddr, args)
- if err != nil {
- r.mu.Lock()
- r.transferring = false
- r.transferTarget = ""
- r.mu.Unlock()
- return fmt.Errorf("TimeoutNow RPC failed: %w", err)
- }
- if !reply.Success {
- r.mu.Lock()
- r.transferring = false
- r.transferTarget = ""
- r.mu.Unlock()
- return fmt.Errorf("target rejected leadership transfer")
- }
- atomic.AddUint64(&r.metrics.LeadershipTransferSuccess, 1)
- r.logger.Info("Leadership transfer to %s initiated successfully", targetID)
- // Note: We don't immediately step down; we wait for the target to win election
- // and send us an AppendEntries with higher term
- return nil
- }
- // syncFollowerToLatest ensures the follower is caught up to our log
- func (r *Raft) syncFollowerToLatest(peerAddr string) error {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return ErrNotLeader
- }
- currentTerm := r.currentTerm
- leaderCommit := r.commitIndex
- lastIndex := r.log.LastIndex()
- r.mu.RUnlock()
- // Keep replicating until follower is caught up
- deadline := time.Now().Add(r.config.ElectionTimeoutMax * 2)
- for time.Now().Before(deadline) {
- r.mu.RLock()
- if r.state != Leader || r.currentTerm != currentTerm {
- r.mu.RUnlock()
- return ErrLeadershipLost
- }
- matchIndex := r.matchIndex[peerAddr]
- r.mu.RUnlock()
- if matchIndex >= lastIndex {
- return nil // Caught up
- }
- // Trigger replication
- r.replicateToPeer(peerAddr, currentTerm, leaderCommit)
- time.Sleep(10 * time.Millisecond)
- }
- return ErrTimeout
- }
- // HandleTimeoutNow handles TimeoutNow RPC (for leadership transfer)
- func (r *Raft) HandleTimeoutNow(args *TimeoutNowArgs) *TimeoutNowReply {
- r.mu.Lock()
- defer r.mu.Unlock()
- reply := &TimeoutNowReply{
- Term: r.currentTerm,
- Success: false,
- }
- // Only accept if we're a follower and the term matches
- if args.Term < r.currentTerm {
- return reply
- }
- if r.state != Follower {
- return reply
- }
- // Immediately start election
- r.logger.Info("Received TimeoutNow from %s, starting immediate election", args.LeaderID)
- r.state = Candidate
- reply.Success = true
- return reply
- }
- // HandleReadIndex handles ReadIndex RPC
- func (r *Raft) HandleReadIndex(args *ReadIndexArgs) *ReadIndexReply {
- reply := &ReadIndexReply{
- Success: false,
- }
- readIndex, err := r.ReadIndex()
- if err != nil {
- reply.Error = err.Error()
- return reply
- }
- reply.ReadIndex = readIndex
- reply.Success = true
- return reply
- }
- // HandleGet handles Get RPC for remote KV reads
- func (r *Raft) HandleGet(args *GetArgs) *GetReply {
- reply := &GetReply{
- Found: false,
- }
- if r.config.GetHandler == nil {
- reply.Error = "get handler not configured"
- return reply
- }
- value, found := r.config.GetHandler(args.Key)
- reply.Value = value
- reply.Found = found
- return reply
- }
|