| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842 |
- package raft
- import (
- "context"
- "fmt"
- "math/rand"
- "os"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
- // Raft represents a Raft consensus node
- type Raft struct {
- mu sync.RWMutex
- // Node identity
- nodeID string
- peers []string
- // Cluster membership - maps nodeID to address
- clusterNodes map[string]string
- // Current state
- state NodeState
- currentTerm uint64
- votedFor string
- leaderID string
- // Log management
- log *LogManager
- storage Storage
- commitIndex uint64
- lastApplied uint64
- // Leader state
- nextIndex map[string]uint64
- matchIndex map[string]uint64
- // Configuration
- config *Config
- // Communication
- transport Transport
- // Channels
- applyCh chan ApplyMsg
- stopCh chan struct{}
- commitCh chan struct{}
- // Election timer
- electionTimer *time.Timer
- heartbeatTimer *time.Timer
- // Statistics (deprecated, use metrics instead)
- stats Stats
- // Metrics for monitoring
- metrics Metrics
- // Logger
- logger Logger
- // Running flag
- running int32
- // Replication trigger channel - used to batch replication requests
- replicationCh chan struct{}
- // Pending config change - only one config change can be pending at a time
- pendingConfigChange bool
- // Old cluster nodes - used for majority calculation during config change
- // This ensures we use the old cluster size until the config change is committed
- oldClusterNodes map[string]string
- // Index of the pending config change entry
- configChangeIndex uint64
- // Joining cluster state - when a standalone node is being added to a cluster
- // This prevents the node from starting elections while syncing
- joiningCluster bool
- joiningClusterTime time.Time
- // Compaction state - prevents concurrent compaction
- compacting int32
- // Dynamic compaction threshold - updated after each compaction
- // Next compaction triggers when log size >= this value
- // Formula: lastCompactionSize * 1.5 (or initial SnapshotThreshold)
- nextCompactionThreshold uint64
- // WaitGroup to track goroutines for clean shutdown
- wg sync.WaitGroup
- // Leadership transfer state
- transferring bool
- transferTarget string
- transferDeadline time.Time
- // Last heartbeat received (for health check)
- lastHeartbeat time.Time
- // Start time (for uptime calculation)
- startTime time.Time
- // ReadIndex waiting queue
- readIndexCh chan *readIndexRequest
- // Snapshot receiving state (for chunked transfer)
- pendingSnapshot *pendingSnapshotState
- // Last contact time for each peer (for leader check)
- lastContact map[string]time.Time
- }
- // readIndexRequest represents a pending read index request
- type readIndexRequest struct {
- readIndex uint64
- done chan error
- }
- // pendingSnapshotState tracks chunked snapshot reception
- type pendingSnapshotState struct {
- lastIncludedIndex uint64
- lastIncludedTerm uint64
- data []byte
- receivedBytes uint64
- }
- // Stats holds runtime statistics
- type Stats struct {
- Term uint64
- State string
- FirstLogIndex uint64 // Added for monitoring compaction
- LastLogIndex uint64
- LastLogTerm uint64
- CommitIndex uint64
- LastApplied uint64
- LeaderID string
- VotesReceived int
- AppendsSent int64
- AppendsReceived int64
- ClusterSize int // Number of nodes in cluster
- ClusterNodes map[string]string // NodeID -> Address mapping
- }
- // ConsoleLogger implements Logger with console output
- type ConsoleLogger struct {
- Prefix string
- Level int // 0=debug, 1=info, 2=warn, 3=error
- mu sync.Mutex
- }
- func NewConsoleLogger(prefix string, level int) *ConsoleLogger {
- return &ConsoleLogger{Prefix: prefix, Level: level}
- }
- func (l *ConsoleLogger) log(level int, levelStr, format string, args ...interface{}) {
- if level < l.Level {
- return
- }
- l.mu.Lock()
- defer l.mu.Unlock()
- msg := fmt.Sprintf(format, args...)
- fmt.Printf("[%s] %s [%s] %s\n", time.Now().Format("15:04:05.000"), l.Prefix, levelStr, msg)
- }
- func (l *ConsoleLogger) Debug(format string, args ...interface{}) {
- l.log(0, "DEBUG", format, args...)
- }
- func (l *ConsoleLogger) Info(format string, args ...interface{}) {
- l.log(1, "INFO", format, args...)
- }
- func (l *ConsoleLogger) Warn(format string, args ...interface{}) {
- l.log(2, "WARN", format, args...)
- }
- func (l *ConsoleLogger) Error(format string, args ...interface{}) {
- l.log(3, "ERROR", format, args...)
- }
- // NewRaft creates a new Raft node
- func NewRaft(config *Config, transport Transport, applyCh chan ApplyMsg) (*Raft, error) {
- if config.Logger == nil {
- config.Logger = NewConsoleLogger(config.NodeID, 1)
- }
- // Create storage
- storage, err := NewHybridStorage(config.DataDir, config.MemoryLogCapacity, config.Logger)
- if err != nil {
- return nil, fmt.Errorf("failed to create storage: %w", err)
- }
- // Create log manager
- logMgr := NewLogManager(storage, config.Logger)
- // Load persistent state
- state, err := storage.GetState()
- if err != nil {
- return nil, fmt.Errorf("failed to load state: %w", err)
- }
- // Load or initialize cluster configuration
- clusterNodes := make(map[string]string)
- // Try to load saved cluster config first
- savedConfig, err := storage.GetClusterConfig()
- if err != nil {
- return nil, fmt.Errorf("failed to load cluster config: %w", err)
- }
- if savedConfig != nil && len(savedConfig.Nodes) > 0 {
- // Use saved config
- clusterNodes = savedConfig.Nodes
- config.Logger.Info("Loaded cluster config with %d nodes", len(clusterNodes))
- } else {
- // Initialize from config
- if config.ClusterNodes != nil && len(config.ClusterNodes) > 0 {
- for k, v := range config.ClusterNodes {
- clusterNodes[k] = v
- }
- } else {
- // Build from Peers + self (backward compatibility)
- clusterNodes[config.NodeID] = config.ListenAddr
- if config.PeerMap != nil {
- for k, v := range config.PeerMap {
- clusterNodes[k] = v
- }
- }
- }
- // Save initial config
- if len(clusterNodes) > 0 {
- if err := storage.SaveClusterConfig(&ClusterConfig{Nodes: clusterNodes}); err != nil {
- config.Logger.Warn("Failed to save initial cluster config: %v", err)
- }
- }
- }
- // Build peers list from cluster nodes (excluding self)
- var peers []string
- for nodeID, addr := range clusterNodes {
- if nodeID != config.NodeID {
- peers = append(peers, addr)
- }
- }
- r := &Raft{
- nodeID: config.NodeID,
- peers: peers,
- clusterNodes: clusterNodes,
- state: Follower,
- currentTerm: state.CurrentTerm,
- votedFor: state.VotedFor,
- log: logMgr,
- storage: storage,
- config: config,
- transport: transport,
- applyCh: applyCh,
- stopCh: make(chan struct{}),
- commitCh: make(chan struct{}, 100), // Increased buffer for event-driven apply
- replicationCh: make(chan struct{}, 1),
- nextIndex: make(map[string]uint64),
- matchIndex: make(map[string]uint64),
- logger: config.Logger,
- readIndexCh: make(chan *readIndexRequest, 100),
- lastHeartbeat: time.Now(),
- lastContact: make(map[string]time.Time),
- }
- // Initialize metrics
- r.metrics.Term = state.CurrentTerm
- // Initialize lastApplied and commitIndex from config (if provided)
- if config.LastAppliedIndex > 0 {
- r.lastApplied = config.LastAppliedIndex
- r.commitIndex = config.LastAppliedIndex // Applied implies committed
- r.logger.Info("Initialized lastApplied and commitIndex to %d from config", config.LastAppliedIndex)
- }
- // Set RPC handler
- transport.SetRPCHandler(r)
- return r, nil
- }
- // Start starts the Raft node
- func (r *Raft) Start() error {
- if !atomic.CompareAndSwapInt32(&r.running, 0, 1) {
- return fmt.Errorf("already running")
- }
- // Record start time
- r.startTime = time.Now()
- // Start transport
- if err := r.transport.Start(); err != nil {
- return fmt.Errorf("failed to start transport: %w", err)
- }
- // Restore FSM from snapshot if exists
- // This must happen before starting apply loop to ensure FSM state is restored
- if err := r.restoreFromSnapshot(); err != nil {
- // Suppress warning if it's just a missing file (first start)
- if !os.IsNotExist(err) && !strings.Contains(err.Error(), "no such file") {
- r.logger.Warn("Failed to restore from snapshot: %v", err)
- } else {
- r.logger.Info("No snapshot found, starting with empty state")
- }
- // Continue anyway - the node can still function, just without historical state
- }
- // Start election timer
- r.resetElectionTimer()
- // Start background goroutines with WaitGroup tracking
- r.wg.Add(4)
- go func() {
- defer r.wg.Done()
- r.applyLoop()
- }()
- go func() {
- defer r.wg.Done()
- r.replicationLoop()
- }()
- go func() {
- defer r.wg.Done()
- r.mainLoop()
- }()
- go func() {
- defer r.wg.Done()
- r.readIndexLoop()
- }()
- r.logger.Info("Raft node started")
- return nil
- }
- // Stop stops the Raft node
- func (r *Raft) Stop() error {
- if !atomic.CompareAndSwapInt32(&r.running, 1, 0) {
- return fmt.Errorf("not running")
- }
- // Signal all goroutines to stop
- close(r.stopCh)
- // Stop timers first to prevent new operations
- r.mu.Lock()
- if r.electionTimer != nil {
- r.electionTimer.Stop()
- }
- if r.heartbeatTimer != nil {
- r.heartbeatTimer.Stop()
- }
- r.mu.Unlock()
- // Wait for all goroutines to finish with timeout
- done := make(chan struct{})
- go func() {
- r.wg.Wait()
- close(done)
- }()
- select {
- case <-done:
- // All goroutines exited cleanly
- case <-time.After(3 * time.Second):
- r.logger.Warn("Timeout waiting for goroutines to stop")
- }
- // Stop transport (has its own timeout)
- if err := r.transport.Stop(); err != nil {
- r.logger.Error("Failed to stop transport: %v", err)
- }
- if err := r.storage.Close(); err != nil {
- r.logger.Error("Failed to close storage: %v", err)
- }
- r.logger.Info("Raft node stopped")
- return nil
- }
- // mainLoop is the main event loop
- func (r *Raft) mainLoop() {
- for {
- select {
- case <-r.stopCh:
- return
- default:
- }
- r.mu.RLock()
- state := r.state
- r.mu.RUnlock()
- switch state {
- case Follower:
- r.runFollower()
- case Candidate:
- r.runCandidate()
- case Leader:
- r.runLeader()
- }
- }
- }
- // runFollower handles follower behavior
- func (r *Raft) runFollower() {
- r.logger.Debug("Running as follower in term %d", r.currentTerm)
- for {
- select {
- case <-r.stopCh:
- return
- case <-r.electionTimer.C:
- r.mu.Lock()
- if r.state == Follower {
- // If we're joining a cluster, don't start elections
- // Give time for the leader to sync us up
- if r.joiningCluster {
- // Allow joining for up to 30 seconds
- if time.Since(r.joiningClusterTime) < 30*time.Second {
- r.logger.Debug("Suppressing election during cluster join")
- r.resetElectionTimer()
- r.mu.Unlock()
- continue
- }
- // Timeout - clear joining state
- r.joiningCluster = false
- r.logger.Warn("Cluster join timeout, resuming normal election behavior")
- }
- r.logger.Debug("Election timeout, becoming candidate")
- r.state = Candidate
- r.leaderID = "" // Clear leaderID so we can vote for self in Pre-Vote
- }
- r.mu.Unlock()
- return
- }
- }
- }
- // runCandidate handles candidate behavior (leader election)
- // Implements Pre-Vote mechanism to prevent term explosion
- func (r *Raft) runCandidate() {
- // Phase 1: Pre-Vote (don't increment term yet)
- if !r.runPreVote() {
- // Pre-vote failed, wait for timeout before retrying
- r.mu.Lock()
- r.resetElectionTimer()
- r.mu.Unlock()
- select {
- case <-r.stopCh:
- return
- case <-r.electionTimer.C:
- // Timer expired, will retry pre-vote
- }
- return
- }
- // Phase 2: Actual election (pre-vote succeeded, now increment term)
- r.mu.Lock()
- r.currentTerm++
- r.votedFor = r.nodeID
- r.leaderID = ""
- currentTerm := r.currentTerm
- // Update metrics
- atomic.StoreUint64(&r.metrics.Term, currentTerm)
-
- if err := r.persistState(); err != nil {
- r.logger.Error("Failed to persist state during election: %v", err)
- r.mu.Unlock()
- return // Cannot proceed without persisting state
- }
- atomic.AddUint64(&r.metrics.ElectionsStarted, 1)
- r.resetElectionTimer()
- r.mu.Unlock()
- r.logger.Debug("Starting election for term %d", currentTerm)
- // Get current peers list
- r.mu.RLock()
- currentPeers := make([]string, len(r.peers))
- copy(currentPeers, r.peers)
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- r.mu.RUnlock()
- // Request actual votes from all peers
- votes := 1 // Vote for self
- voteCh := make(chan bool, len(currentPeers))
- lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
- for _, peer := range currentPeers {
- go func(peer string) {
- args := &RequestVoteArgs{
- Term: currentTerm,
- CandidateID: r.nodeID,
- LastLogIndex: lastLogIndex,
- LastLogTerm: lastLogTerm,
- PreVote: false,
- }
- ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
- defer cancel()
- reply, err := r.transport.RequestVote(ctx, peer, args)
- if err != nil {
- r.logger.Debug("RequestVote to %s failed: %v", peer, err)
- voteCh <- false
- return
- }
- r.mu.Lock()
- defer r.mu.Unlock()
- if reply.Term > r.currentTerm {
- r.becomeFollower(reply.Term)
- voteCh <- false
- return
- }
- voteCh <- reply.VoteGranted
- }(peer)
- }
- // Wait for votes - majority is (clusterSize/2) + 1
- needed := clusterSize/2 + 1
- // If we already have enough votes (single-node cluster), become leader immediately
- if votes >= needed {
- r.mu.Lock()
- if r.state == Candidate && r.currentTerm == currentTerm {
- r.becomeLeader()
- }
- r.mu.Unlock()
- return
- }
- for i := 0; i < len(currentPeers); i++ {
- select {
- case <-r.stopCh:
- return
- case <-r.electionTimer.C:
- r.logger.Debug("Election timeout, will start new election")
- return
- case granted := <-voteCh:
- if granted {
- votes++
- if votes >= needed {
- r.mu.Lock()
- if r.state == Candidate && r.currentTerm == currentTerm {
- r.becomeLeader()
- }
- r.mu.Unlock()
- return
- }
- }
- }
- // Check if we're still a candidate
- r.mu.RLock()
- if r.state != Candidate {
- r.mu.RUnlock()
- return
- }
- r.mu.RUnlock()
- }
- // Election failed, wait for timeout
- r.mu.RLock()
- stillCandidate := r.state == Candidate
- r.mu.RUnlock()
- if stillCandidate {
- r.logger.Debug("Election failed, waiting for timeout (got %d/%d votes)", votes, needed)
- select {
- case <-r.stopCh:
- return
- case <-r.electionTimer.C:
- // Timer expired, will start new election
- }
- }
- }
- // runPreVote sends pre-vote requests to check if we can win an election
- // Returns true if we got majority pre-votes, false otherwise
- func (r *Raft) runPreVote() bool {
- r.mu.RLock()
- currentTerm := r.currentTerm
- currentPeers := make([]string, len(r.peers))
- copy(currentPeers, r.peers)
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- leaderID := r.leaderID
- r.mu.RUnlock()
- // Pre-vote uses term+1 but doesn't actually increment it
- preVoteTerm := currentTerm + 1
- lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
- r.logger.Debug("Starting pre-vote for term %d", preVoteTerm)
- // Per Raft Pre-Vote optimization (§9.6): if we have a known leader,
- // don't vote for self. This prevents standalone nodes from constantly
- // electing themselves when they should be joining an existing cluster.
- preVotes := 0
- if leaderID == "" {
- preVotes = 1 // Vote for self only if we don't know of a leader
- } else {
- r.logger.Debug("Pre-vote: not self-voting because we have leader %s", leaderID)
- }
- preVoteCh := make(chan bool, len(currentPeers))
- for _, peer := range currentPeers {
- go func(peer string) {
- args := &RequestVoteArgs{
- Term: preVoteTerm,
- CandidateID: r.nodeID,
- LastLogIndex: lastLogIndex,
- LastLogTerm: lastLogTerm,
- PreVote: true,
- }
- ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
- defer cancel()
- reply, err := r.transport.RequestVote(ctx, peer, args)
- if err != nil {
- r.logger.Debug("PreVote to %s failed: %v", peer, err)
- preVoteCh <- false
- return
- }
- // For pre-vote, we don't step down even if reply.Term > currentTerm
- // because the other node might also be doing pre-vote
- preVoteCh <- reply.VoteGranted
- }(peer)
- }
- // Wait for pre-votes with a shorter timeout - majority is (clusterSize/2) + 1
- needed := clusterSize/2 + 1
- // If we already have enough votes (single-node cluster with no known leader), succeed immediately
- if preVotes >= needed {
- r.logger.Debug("Pre-vote succeeded immediately (got %d/%d pre-votes)", preVotes, needed)
- return true
- }
- timeout := time.After(500 * time.Millisecond)
- for i := 0; i < len(currentPeers); i++ {
- select {
- case <-r.stopCh:
- return false
- case <-timeout:
- r.logger.Debug("Pre-vote timeout (got %d/%d pre-votes)", preVotes, needed)
- return false
- case granted := <-preVoteCh:
- if granted {
- preVotes++
- if preVotes >= needed {
- r.logger.Debug("Pre-vote succeeded (got %d/%d pre-votes)", preVotes, needed)
- return true
- }
- }
- }
- // Check if we're still a candidate
- r.mu.RLock()
- if r.state != Candidate {
- r.mu.RUnlock()
- return false
- }
- r.mu.RUnlock()
- }
- r.logger.Debug("Pre-vote failed (got %d/%d pre-votes)", preVotes, needed)
- return false
- }
- // runLeader handles leader behavior
- func (r *Raft) runLeader() {
- r.logger.Debug("Running as leader in term %d", r.currentTerm)
- // Send initial heartbeat
- r.sendHeartbeats()
- // Start heartbeat timer
- r.mu.Lock()
- r.heartbeatTimer = time.NewTimer(r.config.HeartbeatInterval)
- r.mu.Unlock()
- for {
- select {
- case <-r.stopCh:
- return
- case <-r.heartbeatTimer.C:
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return
- }
- r.mu.RUnlock()
- r.sendHeartbeats()
- r.heartbeatTimer.Reset(r.config.HeartbeatInterval)
- case <-r.commitCh:
- r.updateCommitIndex()
- }
- }
- }
- // becomeFollower transitions to follower state
- func (r *Raft) becomeFollower(term uint64) {
- oldState := r.state
- oldTerm := r.currentTerm
- r.state = Follower
- r.currentTerm = term
- // Update metrics
- atomic.StoreUint64(&r.metrics.Term, term)
- r.votedFor = ""
- r.leaderID = ""
- // Must persist before responding - use mustPersistState for critical transitions
- r.mustPersistState()
- r.resetElectionTimer()
- // Clear leadership transfer state
- r.transferring = false
- r.transferTarget = ""
- // Only log significant state changes
- if oldState == Leader && term > oldTerm {
- // Stepping down from leader is notable
- r.logger.Debug("Stepped down from leader in term %d", term)
- }
- }
- // becomeLeader transitions to leader state
- func (r *Raft) becomeLeader() {
- r.state = Leader
- r.leaderID = r.nodeID
- // Update metrics
- atomic.AddUint64(&r.metrics.ElectionsWon, 1)
- // Initialize leader state for all peers
- lastIndex := r.log.LastIndex()
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- r.nextIndex[addr] = lastIndex + 1
- r.matchIndex[addr] = 0
- }
- }
- // Also handle legacy peers
- for _, peer := range r.peers {
- if _, exists := r.nextIndex[peer]; !exists {
- r.nextIndex[peer] = lastIndex + 1
- r.matchIndex[peer] = 0
- }
- }
- r.logger.Info("Became leader in term %d with %d peers", r.currentTerm, len(r.clusterNodes)-1)
- // Append a no-op entry to commit entries from previous terms
- // This is a standard Raft optimization - the leader appends a no-op
- // entry in its current term to quickly commit all pending entries
- noopEntry := LogEntry{
- Index: r.log.LastIndex() + 1,
- Term: r.currentTerm,
- Type: EntryNoop,
- Command: nil,
- }
- if err := r.log.Append(noopEntry); err != nil {
- r.logger.Error("Failed to append no-op entry: %v", err)
- } else {
- r.logger.Debug("Appended no-op entry at index %d for term %d", noopEntry.Index, noopEntry.Term)
- }
- }
- // sendHeartbeats sends AppendEntries RPCs to all peers
- func (r *Raft) sendHeartbeats() {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return
- }
- currentTerm := r.currentTerm
- leaderCommit := r.commitIndex
- // Get all peer addresses
- peerAddrs := make([]string, 0, len(r.clusterNodes))
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- peerAddrs = append(peerAddrs, addr)
- }
- }
- // Also include legacy peers not in clusterNodes
- for _, peer := range r.peers {
- found := false
- for _, addr := range peerAddrs {
- if addr == peer {
- found = true
- break
- }
- }
- if !found {
- peerAddrs = append(peerAddrs, peer)
- }
- }
- r.mu.RUnlock()
- for _, peer := range peerAddrs {
- go r.replicateToPeer(peer, currentTerm, leaderCommit)
- }
- }
- // replicateToPeer sends AppendEntries to a specific peer
- func (r *Raft) replicateToPeer(peer string, term, leaderCommit uint64) {
- r.mu.RLock()
- if r.state != Leader || r.currentTerm != term {
- r.mu.RUnlock()
- return
- }
- nextIndex := r.nextIndex[peer]
- r.mu.RUnlock()
- // Get entries to send
- entries, prevLogIndex, prevLogTerm, err := r.log.GetEntriesForFollower(nextIndex, r.config.MaxLogEntriesPerRequest)
- if err == ErrCompacted {
- // Need to send snapshot
- r.sendSnapshot(peer)
- return
- }
- if err != nil {
- r.logger.Debug("Failed to get entries for %s: %v", peer, err)
- return
- }
- args := &AppendEntriesArgs{
- Term: term,
- LeaderID: r.nodeID,
- PrevLogIndex: prevLogIndex,
- PrevLogTerm: prevLogTerm,
- Entries: entries,
- LeaderCommit: leaderCommit,
- }
- ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
- defer cancel()
- reply, err := r.transport.AppendEntries(ctx, peer, args)
- if err != nil {
- r.logger.Debug("AppendEntries to %s failed: %v", peer, err)
- return
- }
- atomic.AddInt64(&r.stats.AppendsSent, 1)
- r.mu.Lock()
- defer r.mu.Unlock()
- if reply.Term > r.currentTerm {
- r.becomeFollower(reply.Term)
- return
- }
- if r.state != Leader || r.currentTerm != term {
- return
- }
- if reply.Success {
- // Update contact time
- r.lastContact[peer] = time.Now()
- // Update nextIndex and matchIndex
- if len(entries) > 0 {
- newMatchIndex := entries[len(entries)-1].Index
- if newMatchIndex > r.matchIndex[peer] {
- r.matchIndex[peer] = newMatchIndex
- r.nextIndex[peer] = newMatchIndex + 1
- // Try to update commit index
- select {
- case r.commitCh <- struct{}{}:
- default:
- }
- }
- }
- } else {
- // Even if failed (log conflict), we contacted the peer
- r.lastContact[peer] = time.Now()
- // Decrement nextIndex and retry
- if reply.ConflictTerm > 0 {
- // Find the last entry of ConflictTerm in our log
- found := false
- for idx := r.log.LastIndex(); idx >= r.log.FirstIndex(); idx-- {
- t, err := r.log.GetTerm(idx)
- if err != nil {
- break
- }
- if t == reply.ConflictTerm {
- r.nextIndex[peer] = idx + 1
- found = true
- break
- }
- if t < reply.ConflictTerm {
- break
- }
- }
- if !found {
- r.nextIndex[peer] = reply.ConflictIndex
- }
- } else if reply.ConflictIndex > 0 {
- r.nextIndex[peer] = reply.ConflictIndex
- } else {
- r.nextIndex[peer] = max(1, r.nextIndex[peer]-1)
- }
- }
- }
- // sendSnapshot sends a snapshot to a peer with chunked transfer support
- func (r *Raft) sendSnapshot(peer string) {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return
- }
- term := r.currentTerm
- chunkSize := r.config.SnapshotChunkSize
- if chunkSize <= 0 {
- chunkSize = 1024 * 1024 // Default 1MB
- }
- r.mu.RUnlock()
- data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
- if err != nil {
- r.logger.Error("Failed to get snapshot: %v", err)
- return
- }
- atomic.AddUint64(&r.metrics.SnapshotsSent, 1)
- r.logger.Info("Sending snapshot to %s: %d bytes, lastIndex=%d, lastTerm=%d",
- peer, len(data), lastIndex, lastTerm)
- // Send snapshot in chunks
- totalSize := len(data)
- offset := 0
- for offset < totalSize {
- // Check if we're still leader
- r.mu.RLock()
- if r.state != Leader || r.currentTerm != term {
- r.mu.RUnlock()
- r.logger.Debug("Aborting snapshot send: no longer leader")
- return
- }
- r.mu.RUnlock()
- // Calculate chunk size
- end := offset + chunkSize
- if end > totalSize {
- end = totalSize
- }
- chunk := data[offset:end]
- done := end >= totalSize
- args := &InstallSnapshotArgs{
- Term: term,
- LeaderID: r.nodeID,
- LastIncludedIndex: lastIndex,
- LastIncludedTerm: lastTerm,
- Offset: uint64(offset),
- Data: chunk,
- Done: done,
- }
- ctx, cancel := context.WithTimeout(context.Background(), r.config.SnapshotRPCTimeout)
- reply, err := r.transport.InstallSnapshot(ctx, peer, args)
- cancel()
- if err != nil {
- r.logger.Error("InstallSnapshot chunk to %s failed at offset %d: %v", peer, offset, err)
- return
- }
- if reply.Term > r.currentTerm {
- r.mu.Lock()
- r.becomeFollower(reply.Term)
- r.mu.Unlock()
- return
- }
- if !reply.Success {
- r.logger.Error("InstallSnapshot chunk rejected by %s at offset %d", peer, offset)
- return
- }
- offset = end
- }
- // Snapshot fully sent and accepted
- r.mu.Lock()
- defer r.mu.Unlock()
- if r.state != Leader || r.currentTerm != term {
- return
- }
- r.nextIndex[peer] = lastIndex + 1
- r.matchIndex[peer] = lastIndex
- r.logger.Info("Snapshot to %s completed, nextIndex=%d", peer, lastIndex+1)
- }
- // updateCommitIndex updates the commit index based on matchIndex
- func (r *Raft) updateCommitIndex() {
- r.mu.Lock()
- defer r.mu.Unlock()
- if r.state != Leader {
- return
- }
- // For config change entries, use the OLD cluster for majority calculation
- // This ensures that adding a node doesn't require the new node's vote
- // until the config change itself is committed
- votingNodes := r.clusterNodes
- if r.pendingConfigChange && r.oldClusterNodes != nil {
- votingNodes = r.oldClusterNodes
- }
- // Get current cluster size from voting nodes
- clusterSize := len(votingNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- // Find the highest index replicated on a majority
- for n := r.log.LastIndex(); n > r.commitIndex; n-- {
- term, err := r.log.GetTerm(n)
- if err != nil {
- continue
- }
- // Only commit entries from current term
- if term != r.currentTerm {
- continue
- }
- // Count replicas (including self)
- count := 1 // Self
- for nodeID, addr := range votingNodes {
- if nodeID != r.nodeID {
- if r.matchIndex[addr] >= n {
- count++
- }
- }
- }
- // Also check legacy peers
- for _, peer := range r.peers {
- // Avoid double counting if peer is already in votingNodes
- alreadyCounted := false
- for _, addr := range votingNodes {
- if addr == peer {
- alreadyCounted = true
- break
- }
- }
- if !alreadyCounted && r.matchIndex[peer] >= n {
- count++
- }
- }
- // Majority is (n/2) + 1
- needed := clusterSize/2 + 1
- if count >= needed {
- r.commitIndex = n
- break
- }
- }
- }
- // applyLoop applies committed entries to the state machine
- // Uses event-driven model with fallback polling for reliability
- func (r *Raft) applyLoop() {
- // Use a ticker as fallback for missed signals (matches original 10ms polling)
- ticker := time.NewTicker(10 * time.Millisecond)
- defer ticker.Stop()
- for {
- select {
- case <-r.stopCh:
- return
- case <-r.commitCh:
- // Event-driven: triggered when commitIndex is updated
- r.applyCommitted()
- case <-ticker.C:
- // Fallback: check periodically in case we missed a signal
- r.applyCommitted()
- }
- }
- }
- // applyCommitted applies all committed but not yet applied entries
- func (r *Raft) applyCommitted() {
- r.mu.Lock()
- commitIndex := r.commitIndex
- lastApplied := r.lastApplied
- firstIndex := r.log.FirstIndex()
- lastLogIndex := r.log.LastIndex()
- r.mu.Unlock()
- // Safety check: ensure lastApplied is within valid range
- // If lastApplied is below firstIndex (due to snapshot), skip to firstIndex
- if lastApplied < firstIndex && firstIndex > 0 {
- r.mu.Lock()
- r.lastApplied = firstIndex
- lastApplied = firstIndex
- r.mu.Unlock()
- r.logger.Debug("Adjusted lastApplied to firstIndex %d after compaction", firstIndex)
- }
- // Safety check: don't try to apply beyond what we have in log
- if commitIndex > lastLogIndex {
- commitIndex = lastLogIndex
- }
- for lastApplied < commitIndex {
- lastApplied++
- // Skip if entry has been compacted
- if lastApplied < firstIndex {
- continue
- }
- entry, err := r.log.GetEntry(lastApplied)
- if err != nil {
- // If entry is compacted, skip ahead
- if err == ErrCompacted {
- r.mu.Lock()
- newFirstIndex := r.log.FirstIndex()
- if lastApplied < newFirstIndex {
- r.lastApplied = newFirstIndex
- lastApplied = newFirstIndex
- }
- r.mu.Unlock()
- continue
- }
- r.logger.Error("Failed to get entry %d: %v (firstIndex=%d, lastIndex=%d)",
- lastApplied, err, r.log.FirstIndex(), r.log.LastIndex())
- break
- }
- // Handle config change entries
- if entry.Type == EntryConfig {
- r.applyConfigChange(entry)
- r.mu.Lock()
- r.lastApplied = lastApplied
- r.mu.Unlock()
- continue
- }
- // Handle no-op entries (just update lastApplied, don't send to state machine)
- if entry.Type == EntryNoop {
- r.mu.Lock()
- r.lastApplied = lastApplied
- r.mu.Unlock()
- continue
- }
- // Normal command entry
- msg := ApplyMsg{
- CommandValid: true,
- Command: entry.Command,
- CommandIndex: entry.Index,
- CommandTerm: entry.Term,
- }
- select {
- case r.applyCh <- msg:
- case <-r.stopCh:
- return
- }
- r.mu.Lock()
- r.lastApplied = lastApplied
- r.mu.Unlock()
- }
- // Check if log compaction is needed
- // Run asynchronously to avoid blocking the apply loop if snapshotting is slow
- go r.maybeCompactLog()
- }
- // maybeCompactLog checks if automatic log compaction should be triggered
- // It uses a dynamic threshold to prevent "compaction thrashing":
- // - First compaction triggers at SnapshotThreshold (default 100,000)
- // - After compaction, next threshold = current_log_size * 1.5
- // - This prevents every new entry from triggering compaction if log stays large
- func (r *Raft) maybeCompactLog() {
- // Skip if no snapshot provider is configured
- if r.config.SnapshotProvider == nil {
- return
- }
- // Check if compaction is already in progress (atomic check-and-set)
- if !atomic.CompareAndSwapInt32(&r.compacting, 0, 1) {
- return
- }
- // Ensure we release the compacting flag when done
- defer atomic.StoreInt32(&r.compacting, 0)
- r.mu.RLock()
- lastApplied := r.lastApplied
- firstIndex := r.log.FirstIndex()
- initialThreshold := r.config.SnapshotThreshold
- minRetention := r.config.SnapshotMinRetention
- isLeader := r.state == Leader
- dynamicThreshold := r.nextCompactionThreshold
- r.mu.RUnlock()
- // Guard against underflow: ensure lastApplied > firstIndex
- if lastApplied <= firstIndex {
- return
- }
- // Calculate current log size
- logSize := lastApplied - firstIndex
- // Determine effective threshold:
- // - Use initial threshold if no compaction has occurred yet (dynamicThreshold == 0)
- // - Otherwise use the dynamic threshold
- effectiveThreshold := initialThreshold
- if dynamicThreshold > 0 {
- effectiveThreshold = dynamicThreshold
- }
- // Check if we have enough entries to warrant compaction
- if logSize <= effectiveThreshold {
- return
- }
- // Guard against underflow: ensure lastApplied > minRetention
- if lastApplied <= minRetention {
- return
- }
- // Calculate the safe compaction point
- // We need to retain at least minRetention entries for follower catch-up
- compactUpTo := lastApplied - minRetention
- if compactUpTo <= firstIndex {
- return // Not enough entries to compact while maintaining retention
- }
- // For leader, also consider the minimum nextIndex of all followers
- // to avoid compacting entries that followers still need
- if isLeader {
- r.mu.RLock()
- minNextIndex := lastApplied
- for _, nextIdx := range r.nextIndex {
- if nextIdx < minNextIndex {
- minNextIndex = nextIdx
- }
- }
- r.mu.RUnlock()
- // Don't compact entries that followers still need
- // Keep a buffer of minRetention entries before the slowest follower
- if minNextIndex > minRetention {
- followerSafePoint := minNextIndex - minRetention
- if followerSafePoint < compactUpTo {
- compactUpTo = followerSafePoint
- }
- } else {
- // Slowest follower is too far behind, don't compact
- // They will need a full snapshot anyway
- compactUpTo = firstIndex // Effectively skip compaction
- }
- }
- // Final check - make sure we're actually compacting something meaningful
- if compactUpTo <= firstIndex {
- return
- }
- // Get snapshot from application layer
- snapshotData, err := r.config.SnapshotProvider(compactUpTo)
- if err != nil {
- r.logger.Error("Failed to get snapshot from provider: %v", err)
- return
- }
- // Get the term at the compaction point
- term, err := r.log.GetTerm(compactUpTo)
- if err != nil {
- r.logger.Error("Failed to get term for compaction index %d: %v", compactUpTo, err)
- return
- }
- // Save snapshot
- if err := r.storage.SaveSnapshot(snapshotData, compactUpTo, term); err != nil {
- r.logger.Error("Failed to save snapshot: %v", err)
- return
- }
- // Compact the log
- // CRITICAL: We must hold the write lock during compaction to prevent
- // concurrent writes (AppendEntries) unless we can guarantee underlying
- // file integrity with concurrent modification. The requirement is to
- // pause writes during compaction.
- r.mu.Lock()
- if err := r.log.Compact(compactUpTo); err != nil {
- r.mu.Unlock()
- r.logger.Error("Failed to compact log: %v", err)
- return
- }
- r.mu.Unlock()
- // Calculate new log size after compaction
- newLogSize := lastApplied - compactUpTo
- // Update dynamic threshold for next compaction
- // We use a linear growth model: next threshold = current size + SnapshotThreshold
- // This ensures that we trigger compaction roughly every SnapshotThreshold entries.
- r.mu.Lock()
- r.nextCompactionThreshold = newLogSize + r.config.SnapshotThreshold
- // Ensure threshold doesn't go below the initial threshold
- if r.nextCompactionThreshold < initialThreshold {
- r.nextCompactionThreshold = initialThreshold
- }
- r.mu.Unlock()
- r.logger.Info("Auto compaction completed: compacted up to index %d (term %d), log size %d -> %d, next threshold: %d",
- compactUpTo, term, logSize, newLogSize, r.nextCompactionThreshold)
- }
- // resetElectionTimer resets the election timeout
- func (r *Raft) resetElectionTimer() {
- timeout := r.config.ElectionTimeoutMin +
- time.Duration(rand.Int63n(int64(r.config.ElectionTimeoutMax-r.config.ElectionTimeoutMin)))
- if r.electionTimer == nil {
- r.electionTimer = time.NewTimer(timeout)
- } else {
- if !r.electionTimer.Stop() {
- select {
- case <-r.electionTimer.C:
- default:
- }
- }
- r.electionTimer.Reset(timeout)
- }
- }
- // persistState saves the current state to stable storage
- // Returns error if persistence fails - caller MUST handle this for safety
- func (r *Raft) persistState() error {
- state := &PersistentState{
- CurrentTerm: r.currentTerm,
- VotedFor: r.votedFor,
- }
- if err := r.storage.SaveState(state); err != nil {
- r.logger.Error("Failed to persist state: %v", err)
- return fmt.Errorf("%w: %v", ErrPersistFailed, err)
- }
- return nil
- }
- // mustPersistState saves state and panics on failure
- // Use this only in critical paths where failure is unrecoverable
- func (r *Raft) mustPersistState() {
- if err := r.persistState(); err != nil {
- // In production, you might want to trigger a graceful shutdown instead
- r.logger.Error("CRITICAL: Failed to persist state, node may be in inconsistent state: %v", err)
- panic(err)
- }
- }
- // HandleRequestVote handles RequestVote RPCs (including pre-vote)
- func (r *Raft) HandleRequestVote(args *RequestVoteArgs) *RequestVoteReply {
- r.mu.Lock()
- defer r.mu.Unlock()
- reply := &RequestVoteReply{
- Term: r.currentTerm,
- VoteGranted: false,
- }
- // Handle pre-vote separately
- if args.PreVote {
- return r.handlePreVote(args)
- }
- // Reply false if term < currentTerm
- if args.Term < r.currentTerm {
- return reply
- }
- // If term > currentTerm, become follower
- if args.Term > r.currentTerm {
- r.becomeFollower(args.Term)
- }
- reply.Term = r.currentTerm
- // Check if we can vote for this candidate
- if (r.votedFor == "" || r.votedFor == args.CandidateID) &&
- r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
- r.votedFor = args.CandidateID
- if err := r.persistState(); err != nil {
- // Cannot grant vote if we can't persist the decision
- r.logger.Error("Failed to persist vote for %s: %v", args.CandidateID, err)
- return reply
- }
- r.resetElectionTimer()
- reply.VoteGranted = true
- r.logger.Debug("Granted vote to %s for term %d", args.CandidateID, args.Term)
- }
- return reply
- }
- // handlePreVote handles pre-vote requests
- // Pre-vote doesn't change our state, just checks if we would vote
- func (r *Raft) handlePreVote(args *RequestVoteArgs) *RequestVoteReply {
- reply := &RequestVoteReply{
- Term: r.currentTerm,
- VoteGranted: false,
- }
- // For pre-vote, we check:
- // 1. The candidate's term is at least as high as ours
- // 2. The candidate's log is at least as up-to-date as ours
- // 3. We don't have a current leader (or the candidate's term is higher)
- if args.Term < r.currentTerm {
- return reply
- }
- // Per Raft Pre-Vote optimization (§9.6): reject pre-vote if we have a current
- // leader and the candidate's term is not higher than ours. This prevents
- // disruptive elections when a partitioned node tries to rejoin.
- if r.leaderID != "" && args.Term <= r.currentTerm {
- r.logger.Debug("Rejecting pre-vote from %s: have leader %s", args.CandidateID, r.leaderID)
- return reply
- }
- // Grant pre-vote if log is up-to-date
- // Note: we don't check votedFor for pre-vote, and we don't update any state
- if r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
- reply.VoteGranted = true
- r.logger.Debug("Granted pre-vote to %s for term %d", args.CandidateID, args.Term)
- }
- return reply
- }
- // HandleAppendEntries handles AppendEntries RPCs
- func (r *Raft) HandleAppendEntries(args *AppendEntriesArgs) *AppendEntriesReply {
- r.mu.Lock()
- defer r.mu.Unlock()
- atomic.AddInt64(&r.stats.AppendsReceived, 1)
- reply := &AppendEntriesReply{
- Term: r.currentTerm,
- Success: false,
- }
- // Check if we're a standalone node being added to a cluster
- // A standalone node has only itself in clusterNodes
- isStandalone := len(r.clusterNodes) == 1
- if _, hasSelf := r.clusterNodes[r.nodeID]; isStandalone && hasSelf {
- // We're standalone and receiving AppendEntries from an external leader
- // This means we're being added to a cluster - suppress elections
- if !r.joiningCluster {
- r.joiningCluster = true
- r.joiningClusterTime = time.Now()
- r.logger.Info("Detected cluster join in progress, suppressing elections")
- }
- // When joining, accept higher terms from the leader to sync up
- if args.Term > r.currentTerm {
- r.becomeFollower(args.Term)
- }
- }
- // Reply false if term < currentTerm
- if args.Term < r.currentTerm {
- // But still reset timer if we're joining a cluster to prevent elections
- if r.joiningCluster {
- r.resetElectionTimer()
- }
- return reply
- }
- // If term > currentTerm, or we're a candidate, or we're a leader receiving
- // AppendEntries from another leader (split-brain scenario during cluster merge),
- // become follower. In Raft, there can only be one leader per term.
- if args.Term > r.currentTerm || r.state == Candidate || r.state == Leader {
- r.becomeFollower(args.Term)
- }
- // Update leader info and reset election timer
- r.leaderID = args.LeaderID
- r.lastHeartbeat = time.Now()
- r.resetElectionTimer()
- reply.Term = r.currentTerm
- // Try to append entries
- success, conflictIndex, conflictTerm := r.log.AppendEntriesFromLeader(
- args.PrevLogIndex, args.PrevLogTerm, args.Entries)
- if !success {
- reply.ConflictIndex = conflictIndex
- reply.ConflictTerm = conflictTerm
- return reply
- }
- reply.Success = true
- // Update commit index safely
- if args.LeaderCommit > r.commitIndex {
- // Get our actual last log index
- lastLogIndex := r.log.LastIndex()
- // Calculate what index the entries would have reached
- lastNewEntry := args.PrevLogIndex
- if len(args.Entries) > 0 {
- lastNewEntry = args.Entries[len(args.Entries)-1].Index
- }
- // Commit index should not exceed what we actually have in log
- newCommitIndex := args.LeaderCommit
- if newCommitIndex > lastNewEntry {
- newCommitIndex = lastNewEntry
- }
- if newCommitIndex > lastLogIndex {
- newCommitIndex = lastLogIndex
- }
- // Only advance commit index
- if newCommitIndex > r.commitIndex {
- r.commitIndex = newCommitIndex
- }
- }
- return reply
- }
- // HandleInstallSnapshot handles InstallSnapshot RPCs with chunked transfer support
- func (r *Raft) HandleInstallSnapshot(args *InstallSnapshotArgs) *InstallSnapshotReply {
- r.mu.Lock()
- defer r.mu.Unlock()
- reply := &InstallSnapshotReply{
- Term: r.currentTerm,
- Success: false,
- }
- if args.Term < r.currentTerm {
- return reply
- }
- if args.Term > r.currentTerm {
- r.becomeFollower(args.Term)
- }
- r.leaderID = args.LeaderID
- r.lastHeartbeat = time.Now()
- r.resetElectionTimer()
- reply.Term = r.currentTerm
- // Skip if we already have this or a newer snapshot applied
- if args.LastIncludedIndex <= r.lastApplied {
- r.logger.Debug("Ignoring snapshot at index %d, already applied up to %d",
- args.LastIncludedIndex, r.lastApplied)
- reply.Success = true // Still success to let leader know we don't need it
- return reply
- }
- // Handle chunked transfer
- if args.Offset == 0 {
- // First chunk - start new pending snapshot
- r.pendingSnapshot = &pendingSnapshotState{
- lastIncludedIndex: args.LastIncludedIndex,
- lastIncludedTerm: args.LastIncludedTerm,
- data: make([]byte, 0),
- receivedBytes: 0,
- }
- r.logger.Info("Starting snapshot reception at index %d, term %d",
- args.LastIncludedIndex, args.LastIncludedTerm)
- }
- // Validate we're receiving the expected snapshot
- if r.pendingSnapshot == nil ||
- r.pendingSnapshot.lastIncludedIndex != args.LastIncludedIndex ||
- r.pendingSnapshot.lastIncludedTerm != args.LastIncludedTerm {
- r.logger.Warn("Unexpected snapshot chunk: expected index %d, got %d",
- r.pendingSnapshot.lastIncludedIndex, args.LastIncludedIndex)
- return reply
- }
- // Validate offset matches what we've received
- if uint64(args.Offset) != r.pendingSnapshot.receivedBytes {
- r.logger.Warn("Unexpected chunk offset: expected %d, got %d",
- r.pendingSnapshot.receivedBytes, args.Offset)
- return reply
- }
- // Append chunk data
- r.pendingSnapshot.data = append(r.pendingSnapshot.data, args.Data...)
- r.pendingSnapshot.receivedBytes += uint64(len(args.Data))
- reply.Success = true
- r.logger.Debug("Received snapshot chunk: offset=%d, size=%d, done=%v",
- args.Offset, len(args.Data), args.Done)
- // If not done, wait for more chunks
- if !args.Done {
- return reply
- }
- // All chunks received - apply the snapshot
- r.logger.Info("Installing complete snapshot: %d bytes at index %d, term %d",
- len(r.pendingSnapshot.data), args.LastIncludedIndex, args.LastIncludedTerm)
- atomic.AddUint64(&r.metrics.SnapshotsInstalled, 1)
- // Save snapshot
- if err := r.storage.SaveSnapshot(r.pendingSnapshot.data, args.LastIncludedIndex, args.LastIncludedTerm); err != nil {
- r.logger.Error("Failed to save snapshot: %v", err)
- r.pendingSnapshot = nil
- reply.Success = false
- return reply
- }
- // Compact log
- if err := r.log.Compact(args.LastIncludedIndex); err != nil {
- r.logger.Error("Failed to compact log: %v", err)
- }
- // Update state - must update both commitIndex and lastApplied
- if args.LastIncludedIndex > r.commitIndex {
- r.commitIndex = args.LastIncludedIndex
- }
- // Always update lastApplied to snapshot index to prevent trying to apply compacted entries
- r.lastApplied = args.LastIncludedIndex
- // Send snapshot to application (non-blocking with timeout)
- // Use the complete pendingSnapshot data, not the last chunk
- msg := ApplyMsg{
- SnapshotValid: true,
- Snapshot: r.pendingSnapshot.data,
- SnapshotIndex: args.LastIncludedIndex,
- SnapshotTerm: args.LastIncludedTerm,
- }
- // Clear pending snapshot
- r.pendingSnapshot = nil
- // Try to send, but don't block indefinitely
- select {
- case r.applyCh <- msg:
- r.logger.Debug("Sent snapshot to application")
- case <-time.After(100 * time.Millisecond):
- r.logger.Warn("Timeout sending snapshot to application, will retry")
- // The application will still get correct state via normal apply loop
- }
- return reply
- }
- // Propose proposes a new command to be replicated
- func (r *Raft) Propose(command []byte) (uint64, uint64, bool) {
- r.mu.Lock()
- defer r.mu.Unlock()
- if r.state != Leader {
- return 0, 0, false
- }
- // Check connectivity (Lease Check)
- // If we haven't heard from a majority of peers within ElectionTimeout,
- // we shouldn't accept new commands because we might be partitioned.
- if len(r.clusterNodes) > 1 {
- activePeers := 1 // Self
- now := time.Now()
- timeout := r.config.ElectionTimeoutMax
- for _, peer := range r.peers {
- if last, ok := r.lastContact[peer]; ok && now.Sub(last) < timeout {
- activePeers++
- }
- }
- // Check majority
- needed := len(r.clusterNodes)/2 + 1
- if activePeers < needed {
- r.logger.Warn("Rejecting Propose: lost contact with majority (active: %d, needed: %d)", activePeers, needed)
- return 0, 0, false
- }
- }
- index, err := r.log.AppendCommand(r.currentTerm, command)
- if err != nil {
- r.logger.Error("Failed to append command: %v", err)
- return 0, 0, false
- }
- r.matchIndex[r.nodeID] = index
- // For single-node cluster, we are the only voter and can commit immediately
- // This fixes the issue where commitCh never gets triggered without other peers
- if len(r.clusterNodes) <= 1 && len(r.peers) == 0 {
- // Single node: self is majority, trigger commit immediately
- select {
- case r.commitCh <- struct{}{}:
- default:
- }
- } else {
- // Multi-node: trigger replication to other nodes
- r.triggerReplication()
- }
- return index, r.currentTerm, true
- }
- // triggerReplication signals the replication loop to send heartbeats
- // This uses a non-blocking send to batch replication requests
- func (r *Raft) triggerReplication() {
- select {
- case r.replicationCh <- struct{}{}:
- default:
- // Replication already scheduled
- }
- }
- // replicationLoop handles batched replication
- // Uses simple delay-based batching: flush immediately when signaled, then wait
- // to allow more requests to accumulate before the next flush.
- func (r *Raft) replicationLoop() {
- for {
- select {
- case <-r.stopCh:
- return
- case <-r.replicationCh:
- // Flush and replicate immediately
- r.flushAndReplicate()
- // Wait briefly to allow batching of subsequent requests
- // This gives time for more proposals to queue up before the next flush
- time.Sleep(10 * time.Millisecond)
- }
- }
- }
- // flushAndReplicate flushes logs and sends heartbeats
- func (r *Raft) flushAndReplicate() {
- // Ensure logs are flushed to OS cache before sending to followers
- // This implements Group Commit with Flush (fast) instead of Sync (slow)
- if err := r.log.Flush(); err != nil {
- r.logger.Error("Failed to flush log: %v", err)
- }
- r.sendHeartbeats()
- }
- // ProposeWithForward proposes a command, forwarding to leader if necessary
- // This is the recommended method for applications to use
- func (r *Raft) ProposeWithForward(command []byte) (index uint64, term uint64, err error) {
- // Try local propose first
- idx, t, isLeader := r.Propose(command)
- if isLeader {
- return idx, t, nil
- }
- // Not leader, forward to leader
- r.mu.RLock()
- leaderID := r.leaderID
- // Use clusterNodes (dynamically maintained) to find leader address
- leaderAddr := r.clusterNodes[leaderID]
- r.mu.RUnlock()
- if leaderID == "" {
- return 0, 0, fmt.Errorf("no leader available")
- }
- if leaderAddr == "" {
- return 0, 0, fmt.Errorf("leader %s address not found in cluster", leaderID)
- }
- // Forward to leader
- ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
- defer cancel()
- args := &ProposeArgs{Command: command}
- reply, err := r.transport.ForwardPropose(ctx, leaderAddr, args)
- if err != nil {
- return 0, 0, fmt.Errorf("forward failed: %w", err)
- }
- if !reply.Success {
- return 0, 0, fmt.Errorf("leader rejected: %s", reply.Error)
- }
- return reply.Index, reply.Term, nil
- }
- // ForwardGet forwards a get request to the leader
- func (r *Raft) ForwardGet(key string) (string, bool, error) {
- // Check if we are leader (local read)
- if r.state == Leader {
- if r.config.GetHandler != nil {
- val, found := r.config.GetHandler(key)
- return val, found, nil
- }
- return "", false, fmt.Errorf("get handler not configured")
- }
- r.mu.RLock()
- leaderID := r.leaderID
- leaderAddr := r.clusterNodes[leaderID]
- r.mu.RUnlock()
- if leaderID == "" {
- return "", false, ErrNoLeader
- }
- if leaderAddr == "" {
- return "", false, fmt.Errorf("leader %s address not found", leaderID)
- }
- // Forward to leader
- ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
- defer cancel()
- args := &GetArgs{Key: key}
- reply, err := r.transport.ForwardGet(ctx, leaderAddr, args)
- if err != nil {
- return "", false, fmt.Errorf("forward failed: %w", err)
- }
- return reply.Value, reply.Found, nil
- }
- // HandlePropose handles forwarded propose requests
- func (r *Raft) HandlePropose(args *ProposeArgs) *ProposeReply {
- index, term, isLeader := r.Propose(args.Command)
- if !isLeader {
- return &ProposeReply{
- Success: false,
- Error: "not leader",
- }
- }
- return &ProposeReply{
- Success: true,
- Index: index,
- Term: term,
- }
- }
- // HandleAddNode handles forwarded AddNode requests
- func (r *Raft) HandleAddNode(args *AddNodeArgs) *AddNodeReply {
- err := r.AddNode(args.NodeID, args.Address)
- if err != nil {
- return &AddNodeReply{
- Success: false,
- Error: err.Error(),
- }
- }
- return &AddNodeReply{
- Success: true,
- }
- }
- // HandleRemoveNode handles forwarded RemoveNode requests
- func (r *Raft) HandleRemoveNode(args *RemoveNodeArgs) *RemoveNodeReply {
- err := r.RemoveNode(args.NodeID)
- if err != nil {
- return &RemoveNodeReply{
- Success: false,
- Error: err.Error(),
- }
- }
- return &RemoveNodeReply{
- Success: true,
- }
- }
- // GetState returns the current term and whether this node is leader
- func (r *Raft) GetState() (uint64, bool) {
- r.mu.RLock()
- defer r.mu.RUnlock()
- return r.currentTerm, r.state == Leader
- }
- // GetLeaderID returns the current leader ID
- func (r *Raft) GetLeaderID() string {
- r.mu.RLock()
- defer r.mu.RUnlock()
- return r.leaderID
- }
- // GetStats returns runtime statistics
- func (r *Raft) GetStats() Stats {
- r.mu.RLock()
- defer r.mu.RUnlock()
- lastIndex, lastTerm := r.log.LastIndexAndTerm()
- firstIndex := r.log.FirstIndex()
- // Copy cluster nodes
- nodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- nodes[k] = v
- }
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- return Stats{
- Term: r.currentTerm,
- State: r.state.String(),
- FirstLogIndex: firstIndex,
- LastLogIndex: lastIndex,
- LastLogTerm: lastTerm,
- CommitIndex: r.commitIndex,
- LastApplied: r.lastApplied,
- LeaderID: r.leaderID,
- AppendsSent: atomic.LoadInt64(&r.stats.AppendsSent),
- AppendsReceived: atomic.LoadInt64(&r.stats.AppendsReceived),
- ClusterSize: clusterSize,
- ClusterNodes: nodes,
- }
- }
- // restoreFromSnapshot restores the FSM from a snapshot at startup
- // This is called during Start() to ensure the FSM has the correct state
- // before processing any new commands
- func (r *Raft) restoreFromSnapshot() error {
- // Get snapshot from storage
- data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
- if err != nil {
- return fmt.Errorf("failed to get snapshot: %w", err)
- }
- // No snapshot exists
- if len(data) == 0 || lastIndex == 0 {
- return nil
- }
- r.logger.Info("Restoring FSM from snapshot at index %d, term %d (%d bytes)",
- lastIndex, lastTerm, len(data))
- // Update lastApplied to snapshot index to prevent re-applying compacted entries
- r.mu.Lock()
- if lastIndex > r.lastApplied {
- r.lastApplied = lastIndex
- }
- if lastIndex > r.commitIndex {
- r.commitIndex = lastIndex
- }
- r.mu.Unlock()
- // Send snapshot to FSM for restoration
- // Use a goroutine with timeout to avoid blocking if applyCh is full
- msg := ApplyMsg{
- SnapshotValid: true,
- Snapshot: data,
- SnapshotIndex: lastIndex,
- SnapshotTerm: lastTerm,
- }
- // Try to send with a timeout
- select {
- case r.applyCh <- msg:
- r.logger.Info("FSM restoration triggered from snapshot at index %d", lastIndex)
- case <-time.After(5 * time.Second):
- return fmt.Errorf("timeout sending snapshot to applyCh")
- }
- return nil
- }
- // TakeSnapshot takes a snapshot of the current state
- func (r *Raft) TakeSnapshot(data []byte, index uint64) error {
- r.mu.Lock()
- defer r.mu.Unlock()
- if index > r.lastApplied {
- return fmt.Errorf("snapshot index %d exceeds lastApplied %d", index, r.lastApplied)
- }
- term, err := r.log.GetTerm(index)
- if err != nil {
- return fmt.Errorf("failed to get term for index %d: %w", index, err)
- }
- if err := r.storage.SaveSnapshot(data, index, term); err != nil {
- return fmt.Errorf("failed to save snapshot: %w", err)
- }
- if err := r.log.Compact(index); err != nil {
- return fmt.Errorf("failed to compact log: %w", err)
- }
- r.logger.Info("Took snapshot at index %d, term %d", index, term)
- return nil
- }
- func max(a, b uint64) uint64 {
- if a > b {
- return a
- }
- return b
- }
- // ==================== Membership Change API ====================
- //
- // This implementation uses Single-Node Membership Change (also known as one-at-a-time changes)
- // as described in the Raft dissertation (§4.3). This is safe because:
- //
- // 1. We only allow one configuration change at a time (pendingConfigChange flag)
- // 2. For commits, we use the OLD cluster majority until the config change is committed
- // 3. The new node starts receiving entries immediately but doesn't affect majority calculation
- //
- // This approach is simpler than Joint Consensus and is sufficient for most use cases.
- // The invariant maintained is: any two majorities (old or new) must overlap.
- //
- // For adding a node: old majority = N/2+1, new majority = (N+1)/2+1 = N/2+1 (overlaps)
- // For removing a node: old majority = N/2+1, new majority = (N-1)/2+1 (overlaps if N > 1)
- //
- // WARNING: Avoid adding/removing multiple nodes rapidly. Wait for each change to be committed.
- // AddNode adds a new node to the cluster
- // This can only be called on the leader
- // The new node must already be running and reachable
- //
- // Safety guarantees:
- // - Only one config change can be in progress at a time
- // - The old cluster majority is used until the config change is committed
- // - Returns error if leadership is lost during the operation
- func (r *Raft) AddNode(nodeID, address string) error {
- r.mu.Lock()
- // Must be leader
- if r.state != Leader {
- leaderID := r.leaderID
- r.mu.Unlock()
- return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
- }
- // Check if we're in the middle of a leadership transfer
- if r.transferring {
- r.mu.Unlock()
- return fmt.Errorf("leadership transfer in progress")
- }
- // Check if there's already a pending config change
- if r.pendingConfigChange {
- r.mu.Unlock()
- return ErrConfigInFlight
- }
- // Validate nodeID and address
- if nodeID == "" {
- r.mu.Unlock()
- return fmt.Errorf("nodeID cannot be empty")
- }
- if address == "" {
- r.mu.Unlock()
- return fmt.Errorf("address cannot be empty")
- }
- // Check if node already exists
- if _, exists := r.clusterNodes[nodeID]; exists {
- r.mu.Unlock()
- return fmt.Errorf("node %s already exists in cluster", nodeID)
- }
- // Check if address is already used by another node
- for existingID, existingAddr := range r.clusterNodes {
- if existingAddr == address {
- r.mu.Unlock()
- return fmt.Errorf("address %s is already used by node %s", address, existingID)
- }
- }
- // Save old cluster nodes for majority calculation during config change
- // This ensures we use the OLD cluster size until the config change is committed
- r.oldClusterNodes = make(map[string]string)
- for k, v := range r.clusterNodes {
- r.oldClusterNodes[k] = v
- }
- // Create new config with the added node
- newNodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- newNodes[k] = v
- }
- newNodes[nodeID] = address
- // Create config change entry with ClusterConfig
- configIndex := r.log.LastIndex() + 1
- entry := LogEntry{
- Index: configIndex,
- Term: r.currentTerm,
- Type: EntryConfig,
- Config: &ClusterConfig{Nodes: newNodes},
- }
- // Mark config change as pending and store the index
- r.pendingConfigChange = true
- r.configChangeIndex = configIndex
- // Immediately apply the new configuration (for single-node changes, this is safe)
- // The new node will start receiving AppendEntries immediately
- r.clusterNodes[nodeID] = address
- r.peers = append(r.peers, address)
- // Set nextIndex to 1 (or firstIndex) so the new node syncs from the beginning
- // This is crucial - the new node's log is empty, so we must start from index 1
- firstIndex := r.log.FirstIndex()
- if firstIndex == 0 {
- firstIndex = 1
- }
- r.nextIndex[address] = firstIndex
- r.matchIndex[address] = 0
- r.mu.Unlock()
- // Append the config change entry to log
- if err := r.log.Append(entry); err != nil {
- r.mu.Lock()
- r.pendingConfigChange = false
- r.oldClusterNodes = nil
- r.configChangeIndex = 0
- // Rollback
- delete(r.clusterNodes, nodeID)
- r.rebuildPeersList()
- r.mu.Unlock()
- return fmt.Errorf("failed to append config entry: %w", err)
- }
- r.logger.Info("Adding node %s (%s) to cluster", nodeID, address)
- // Trigger immediate replication
- r.triggerReplication()
- return nil
- }
- // RemoveNode removes a node from the cluster
- // This can only be called on the leader
- // The node being removed can be any node except the leader itself
- //
- // Safety guarantees:
- // - Only one config change can be in progress at a time
- // - Cannot remove the leader (transfer leadership first)
- // - Cannot reduce cluster to 0 nodes
- // - The old cluster majority is used until the config change is committed
- func (r *Raft) RemoveNode(nodeID string) error {
- r.mu.Lock()
- // Must be leader
- if r.state != Leader {
- leaderID := r.leaderID
- r.mu.Unlock()
- return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
- }
- // Check if we're in the middle of a leadership transfer
- if r.transferring {
- r.mu.Unlock()
- return fmt.Errorf("leadership transfer in progress")
- }
- // Cannot remove self
- if nodeID == r.nodeID {
- r.mu.Unlock()
- return fmt.Errorf("cannot remove self from cluster, use TransferLeadership first")
- }
- // Validate nodeID
- if nodeID == "" {
- r.mu.Unlock()
- return fmt.Errorf("nodeID cannot be empty")
- }
- // Check if there's already a pending config change
- if r.pendingConfigChange {
- r.mu.Unlock()
- return ErrConfigInFlight
- }
- // Check if node exists
- if _, exists := r.clusterNodes[nodeID]; !exists {
- r.mu.Unlock()
- return fmt.Errorf("node %s not found in cluster", nodeID)
- }
- // Cannot reduce cluster below 1 node
- if len(r.clusterNodes) <= 1 {
- r.mu.Unlock()
- return fmt.Errorf("cannot remove last node from cluster")
- }
- // Save old cluster nodes for majority calculation during config change
- r.oldClusterNodes = make(map[string]string)
- for k, v := range r.clusterNodes {
- r.oldClusterNodes[k] = v
- }
- // Create new config without the removed node
- newNodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- if k != nodeID {
- newNodes[k] = v
- }
- }
- // Create config change entry with ClusterConfig
- configIndex := r.log.LastIndex() + 1
- entry := LogEntry{
- Index: configIndex,
- Term: r.currentTerm,
- Type: EntryConfig,
- Config: &ClusterConfig{Nodes: newNodes},
- }
- // Mark config change as pending and store the index
- r.pendingConfigChange = true
- r.configChangeIndex = configIndex
- // Get the address of node being removed for cleanup
- removedAddr := r.clusterNodes[nodeID]
- // Immediately apply the new configuration
- delete(r.clusterNodes, nodeID)
- r.rebuildPeersList()
- delete(r.nextIndex, removedAddr)
- delete(r.matchIndex, removedAddr)
- r.mu.Unlock()
- // Append the config change entry to log
- if err := r.log.Append(entry); err != nil {
- r.mu.Lock()
- r.pendingConfigChange = false
- r.oldClusterNodes = nil
- r.configChangeIndex = 0
- // Rollback - this is tricky but we try our best
- r.clusterNodes[nodeID] = removedAddr
- r.rebuildPeersList()
- r.mu.Unlock()
- return fmt.Errorf("failed to append config entry: %w", err)
- }
- r.logger.Info("Removing node %s from cluster, config at index %d", nodeID, entry.Index)
- // Trigger replication
- go r.sendHeartbeats()
- return nil
- }
- // AddNodeWithForward adds a node, forwarding to leader if necessary
- // This is the recommended method for applications to use
- func (r *Raft) AddNodeWithForward(nodeID, address string) error {
- // Try local operation first
- err := r.AddNode(nodeID, address)
- if err == nil {
- return nil
- }
- // Check if we're not the leader
- r.mu.RLock()
- state := r.state
- leaderID := r.leaderID
- leaderAddr := r.clusterNodes[leaderID]
- r.mu.RUnlock()
- if state == Leader {
- // We are leader but AddNode failed for other reasons
- return err
- }
- // Not leader, forward to leader
- if leaderID == "" {
- return fmt.Errorf("no leader available")
- }
- if leaderAddr == "" {
- return fmt.Errorf("leader %s address not found in cluster", leaderID)
- }
- // Forward to leader
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- args := &AddNodeArgs{NodeID: nodeID, Address: address}
- reply, err := r.transport.ForwardAddNode(ctx, leaderAddr, args)
- if err != nil {
- return fmt.Errorf("forward failed: %w", err)
- }
- if !reply.Success {
- return fmt.Errorf("leader rejected: %s", reply.Error)
- }
- return nil
- }
- // RemoveNodeWithForward removes a node, forwarding to leader if necessary
- // This is the recommended method for applications to use
- func (r *Raft) RemoveNodeWithForward(nodeID string) error {
- // Try local operation first
- err := r.RemoveNode(nodeID)
- if err == nil {
- return nil
- }
- // Check if we're not the leader
- r.mu.RLock()
- state := r.state
- leaderID := r.leaderID
- leaderAddr := r.clusterNodes[leaderID]
- r.mu.RUnlock()
- if state == Leader {
- // We are leader but RemoveNode failed for other reasons
- return err
- }
- // Not leader, forward to leader
- if leaderID == "" {
- return fmt.Errorf("no leader available")
- }
- if leaderAddr == "" {
- return fmt.Errorf("leader %s address not found in cluster", leaderID)
- }
- // Forward to leader
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- args := &RemoveNodeArgs{NodeID: nodeID}
- reply, err := r.transport.ForwardRemoveNode(ctx, leaderAddr, args)
- if err != nil {
- return fmt.Errorf("forward failed: %w", err)
- }
- if !reply.Success {
- return fmt.Errorf("leader rejected: %s", reply.Error)
- }
- return nil
- }
- // rebuildPeersList rebuilds the peers slice from clusterNodes
- func (r *Raft) rebuildPeersList() {
- r.peers = make([]string, 0, len(r.clusterNodes)-1)
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- r.peers = append(r.peers, addr)
- }
- }
- }
- // GetClusterNodes returns a copy of the current cluster membership
- func (r *Raft) GetClusterNodes() map[string]string {
- r.mu.RLock()
- defer r.mu.RUnlock()
- nodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- nodes[k] = v
- }
- return nodes
- }
- // applyConfigChange applies a configuration change entry
- func (r *Raft) applyConfigChange(entry *LogEntry) {
- if entry.Config == nil || entry.Config.Nodes == nil {
- r.logger.Warn("Invalid config change entry at index %d", entry.Index)
- return
- }
- r.mu.Lock()
- defer r.mu.Unlock()
- // Update cluster configuration
- r.clusterNodes = make(map[string]string)
- for k, v := range entry.Config.Nodes {
- r.clusterNodes[k] = v
- }
- r.rebuildPeersList()
- // Persist the new configuration
- if err := r.storage.SaveClusterConfig(&ClusterConfig{Nodes: r.clusterNodes}); err != nil {
- r.logger.Error("Failed to persist cluster config: %v", err)
- }
- // Clear pending flag and old cluster state
- r.pendingConfigChange = false
- r.oldClusterNodes = nil
- r.configChangeIndex = 0
- // If we were joining a cluster and now have multiple nodes, we've successfully joined
- if r.joiningCluster && len(r.clusterNodes) > 1 {
- r.joiningCluster = false
- r.logger.Info("Successfully joined cluster with %d nodes", len(r.clusterNodes))
- }
- r.logger.Info("Applied config change at index %d, cluster now has %d nodes", entry.Index, len(r.clusterNodes))
- // If we're the leader, update leader state
- if r.state == Leader {
- // Initialize nextIndex/matchIndex for any new nodes
- lastIndex := r.log.LastIndex()
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- if _, exists := r.nextIndex[addr]; !exists {
- r.nextIndex[addr] = lastIndex + 1
- r.matchIndex[addr] = 0
- }
- }
- }
- // Clean up removed nodes
- validAddrs := make(map[string]bool)
- for nodeID, addr := range r.clusterNodes {
- if nodeID != r.nodeID {
- validAddrs[addr] = true
- }
- }
- for addr := range r.nextIndex {
- if !validAddrs[addr] {
- delete(r.nextIndex, addr)
- delete(r.matchIndex, addr)
- }
- }
- }
- }
- // ==================== ReadIndex (Linearizable Reads) ====================
- // readIndexLoop handles read index requests
- func (r *Raft) readIndexLoop() {
- for {
- select {
- case <-r.stopCh:
- return
- case req := <-r.readIndexCh:
- r.processReadIndexRequest(req)
- }
- }
- }
- // processReadIndexRequest processes a single read index request
- func (r *Raft) processReadIndexRequest(req *readIndexRequest) {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- req.done <- ErrNotLeader
- return
- }
- r.mu.RUnlock()
- // Confirm leadership by sending heartbeats and waiting for majority ack
- if !r.confirmLeadership() {
- req.done <- ErrLeadershipLost
- return
- }
- // Wait for apply to catch up to readIndex
- if err := r.waitApply(req.readIndex, r.config.ProposeTimeout); err != nil {
- req.done <- err
- return
- }
- req.done <- nil
- }
- // ReadIndex implements linearizable reads
- // It ensures that the read sees all writes that were committed before the read started
- func (r *Raft) ReadIndex() (uint64, error) {
- r.mu.RLock()
- if r.state != Leader {
- leaderID := r.leaderID
- r.mu.RUnlock()
- return 0, NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
- }
- readIndex := r.commitIndex
- r.mu.RUnlock()
- atomic.AddUint64(&r.metrics.ReadIndexRequests, 1)
- // Create request and send to processing loop
- req := &readIndexRequest{
- readIndex: readIndex,
- done: make(chan error, 1),
- }
- select {
- case r.readIndexCh <- req:
- case <-r.stopCh:
- return 0, ErrShutdown
- case <-time.After(r.config.ProposeTimeout):
- return 0, ErrTimeout
- }
- // Wait for result
- select {
- case err := <-req.done:
- if err != nil {
- return 0, err
- }
- atomic.AddUint64(&r.metrics.ReadIndexSuccess, 1)
- return readIndex, nil
- case <-r.stopCh:
- return 0, ErrShutdown
- case <-time.After(r.config.ProposeTimeout):
- return 0, ErrTimeout
- }
- }
- // confirmLeadership confirms we're still leader by getting acks from majority
- func (r *Raft) confirmLeadership() bool {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return false
- }
- currentTerm := r.currentTerm
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- r.mu.RUnlock()
- // Single node cluster - we're always the leader
- if clusterSize == 1 {
- return true
- }
- // Send heartbeats and count acks
- r.sendHeartbeats()
- // Wait briefly and check if we're still leader
- time.Sleep(r.config.HeartbeatInterval)
- r.mu.RLock()
- stillLeader := r.state == Leader && r.currentTerm == currentTerm
- r.mu.RUnlock()
- return stillLeader
- }
- // waitApply waits until lastApplied >= index
- func (r *Raft) waitApply(index uint64, timeout time.Duration) error {
- deadline := time.Now().Add(timeout)
- for {
- r.mu.RLock()
- lastApplied := r.lastApplied
- r.mu.RUnlock()
- if lastApplied >= index {
- return nil
- }
- if time.Now().After(deadline) {
- return ErrTimeout
- }
- time.Sleep(1 * time.Millisecond)
- }
- }
- // ==================== Health Check ====================
- // HealthCheck returns the current health status of the node
- func (r *Raft) HealthCheck() HealthStatus {
- r.mu.RLock()
- defer r.mu.RUnlock()
- clusterNodes := make(map[string]string)
- for k, v := range r.clusterNodes {
- clusterNodes[k] = v
- }
- clusterSize := len(r.clusterNodes)
- if clusterSize == 0 {
- clusterSize = len(r.peers) + 1
- }
- logBehind := uint64(0)
- if r.commitIndex > r.lastApplied {
- logBehind = r.commitIndex - r.lastApplied
- }
- // Consider healthy if we're leader or have a known leader
- isHealthy := r.state == Leader || r.leaderID != ""
- return HealthStatus{
- NodeID: r.nodeID,
- State: r.state.String(),
- Term: r.currentTerm,
- LeaderID: r.leaderID,
- ClusterSize: clusterSize,
- ClusterNodes: clusterNodes,
- CommitIndex: r.commitIndex,
- LastApplied: r.lastApplied,
- LogBehind: logBehind,
- LastHeartbeat: r.lastHeartbeat,
- IsHealthy: isHealthy,
- Uptime: time.Since(r.startTime),
- }
- }
- // GetMetrics returns the current metrics
- func (r *Raft) GetMetrics() Metrics {
- return Metrics{
- Term: atomic.LoadUint64(&r.metrics.Term),
- ProposalsTotal: atomic.LoadUint64(&r.metrics.ProposalsTotal),
- ProposalsSuccess: atomic.LoadUint64(&r.metrics.ProposalsSuccess),
- ProposalsFailed: atomic.LoadUint64(&r.metrics.ProposalsFailed),
- ProposalsForwarded: atomic.LoadUint64(&r.metrics.ProposalsForwarded),
- AppendsSent: atomic.LoadUint64(&r.metrics.AppendsSent),
- AppendsReceived: atomic.LoadUint64(&r.metrics.AppendsReceived),
- AppendsSuccess: atomic.LoadUint64(&r.metrics.AppendsSuccess),
- AppendsFailed: atomic.LoadUint64(&r.metrics.AppendsFailed),
- ElectionsStarted: atomic.LoadUint64(&r.metrics.ElectionsStarted),
- ElectionsWon: atomic.LoadUint64(&r.metrics.ElectionsWon),
- PreVotesStarted: atomic.LoadUint64(&r.metrics.PreVotesStarted),
- PreVotesGranted: atomic.LoadUint64(&r.metrics.PreVotesGranted),
- SnapshotsTaken: atomic.LoadUint64(&r.metrics.SnapshotsTaken),
- SnapshotsInstalled: atomic.LoadUint64(&r.metrics.SnapshotsInstalled),
- SnapshotsSent: atomic.LoadUint64(&r.metrics.SnapshotsSent),
- ReadIndexRequests: atomic.LoadUint64(&r.metrics.ReadIndexRequests),
- ReadIndexSuccess: atomic.LoadUint64(&r.metrics.ReadIndexSuccess),
- LeadershipTransfers: atomic.LoadUint64(&r.metrics.LeadershipTransfers),
- LeadershipTransferSuccess: atomic.LoadUint64(&r.metrics.LeadershipTransferSuccess),
- }
- }
- // ==================== Leadership Transfer ====================
- // TransferLeadership transfers leadership to the specified node
- func (r *Raft) TransferLeadership(targetID string) error {
- r.mu.Lock()
- if r.state != Leader {
- r.mu.Unlock()
- return ErrNotLeader
- }
- if targetID == r.nodeID {
- r.mu.Unlock()
- return fmt.Errorf("cannot transfer to self")
- }
- targetAddr, exists := r.clusterNodes[targetID]
- if !exists {
- r.mu.Unlock()
- return fmt.Errorf("target node %s not in cluster", targetID)
- }
- if r.transferring {
- r.mu.Unlock()
- return fmt.Errorf("leadership transfer already in progress")
- }
- r.transferring = true
- r.transferTarget = targetID
- r.transferDeadline = time.Now().Add(r.config.ElectionTimeoutMax * 2)
- currentTerm := r.currentTerm
- atomic.AddUint64(&r.metrics.LeadershipTransfers, 1)
- r.mu.Unlock()
- r.logger.Info("Starting leadership transfer to %s", targetID)
- // Step 1: Sync target to our log
- if err := r.syncFollowerToLatest(targetAddr); err != nil {
- r.mu.Lock()
- r.transferring = false
- r.transferTarget = ""
- r.mu.Unlock()
- return fmt.Errorf("failed to sync target: %w", err)
- }
- // Step 2: Send TimeoutNow RPC
- args := &TimeoutNowArgs{
- Term: currentTerm,
- LeaderID: r.nodeID,
- }
- ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
- defer cancel()
- reply, err := r.transport.TimeoutNow(ctx, targetAddr, args)
- if err != nil {
- r.mu.Lock()
- r.transferring = false
- r.transferTarget = ""
- r.mu.Unlock()
- return fmt.Errorf("TimeoutNow RPC failed: %w", err)
- }
- if !reply.Success {
- r.mu.Lock()
- r.transferring = false
- r.transferTarget = ""
- r.mu.Unlock()
- return fmt.Errorf("target rejected leadership transfer")
- }
- atomic.AddUint64(&r.metrics.LeadershipTransferSuccess, 1)
- r.logger.Info("Leadership transfer to %s initiated successfully", targetID)
- // Note: We don't immediately step down; we wait for the target to win election
- // and send us an AppendEntries with higher term
- return nil
- }
- // syncFollowerToLatest ensures the follower is caught up to our log
- func (r *Raft) syncFollowerToLatest(peerAddr string) error {
- r.mu.RLock()
- if r.state != Leader {
- r.mu.RUnlock()
- return ErrNotLeader
- }
- currentTerm := r.currentTerm
- leaderCommit := r.commitIndex
- lastIndex := r.log.LastIndex()
- r.mu.RUnlock()
- // Keep replicating until follower is caught up
- deadline := time.Now().Add(r.config.ElectionTimeoutMax * 2)
- for time.Now().Before(deadline) {
- r.mu.RLock()
- if r.state != Leader || r.currentTerm != currentTerm {
- r.mu.RUnlock()
- return ErrLeadershipLost
- }
- matchIndex := r.matchIndex[peerAddr]
- r.mu.RUnlock()
- if matchIndex >= lastIndex {
- return nil // Caught up
- }
- // Trigger replication
- r.replicateToPeer(peerAddr, currentTerm, leaderCommit)
- time.Sleep(10 * time.Millisecond)
- }
- return ErrTimeout
- }
- // HandleTimeoutNow handles TimeoutNow RPC (for leadership transfer)
- func (r *Raft) HandleTimeoutNow(args *TimeoutNowArgs) *TimeoutNowReply {
- r.mu.Lock()
- defer r.mu.Unlock()
- reply := &TimeoutNowReply{
- Term: r.currentTerm,
- Success: false,
- }
- // Only accept if we're a follower and the term matches
- if args.Term < r.currentTerm {
- return reply
- }
- if r.state != Follower {
- return reply
- }
- // Immediately start election
- r.logger.Info("Received TimeoutNow from %s, starting immediate election", args.LeaderID)
- r.state = Candidate
- reply.Success = true
- return reply
- }
- // HandleReadIndex handles ReadIndex RPC
- func (r *Raft) HandleReadIndex(args *ReadIndexArgs) *ReadIndexReply {
- reply := &ReadIndexReply{
- Success: false,
- }
- readIndex, err := r.ReadIndex()
- if err != nil {
- reply.Error = err.Error()
- return reply
- }
- reply.ReadIndex = readIndex
- reply.Success = true
- return reply
- }
- // HandleGet handles Get RPC for remote KV reads
- func (r *Raft) HandleGet(args *GetArgs) *GetReply {
- reply := &GetReply{
- Found: false,
- }
- if r.config.GetHandler == nil {
- reply.Error = "get handler not configured"
- return reply
- }
- value, found := r.config.GetHandler(args.Key)
- reply.Value = value
- reply.Found = found
- return reply
- }
|