package raft import ( "context" "fmt" "math/rand" "os" "strings" "sync" "sync/atomic" "time" ) // Raft represents a Raft consensus node type Raft struct { mu sync.RWMutex // Node identity nodeID string peers []string // Cluster membership - maps nodeID to address clusterNodes map[string]string // Current state state NodeState currentTerm uint64 votedFor string leaderID string // Log management log *LogManager storage Storage commitIndex uint64 lastApplied uint64 // Leader state nextIndex map[string]uint64 matchIndex map[string]uint64 // Configuration config *Config // Communication transport Transport // Channels applyCh chan ApplyMsg stopCh chan struct{} commitCh chan struct{} // Election timer electionTimer *time.Timer heartbeatTimer *time.Timer // Statistics (deprecated, use metrics instead) stats Stats // Metrics for monitoring metrics Metrics // Logger logger Logger // Running flag running int32 // Replication trigger channel - used to batch replication requests replicationCh chan struct{} // Pending config change - only one config change can be pending at a time pendingConfigChange bool // Old cluster nodes - used for majority calculation during config change // This ensures we use the old cluster size until the config change is committed oldClusterNodes map[string]string // Index of the pending config change entry configChangeIndex uint64 // Joining cluster state - when a standalone node is being added to a cluster // This prevents the node from starting elections while syncing joiningCluster bool joiningClusterTime time.Time // Compaction state - prevents concurrent compaction compacting int32 // Dynamic compaction threshold - updated after each compaction // Next compaction triggers when log size >= this value // Formula: lastCompactionSize * 1.5 (or initial SnapshotThreshold) nextCompactionThreshold uint64 // WaitGroup to track goroutines for clean shutdown wg sync.WaitGroup // Leadership transfer state transferring bool transferTarget string transferDeadline time.Time // Last heartbeat received (for health check) lastHeartbeat time.Time // Start time (for uptime calculation) startTime time.Time // ReadIndex waiting queue readIndexCh chan *readIndexRequest // Snapshot receiving state (for chunked transfer) pendingSnapshot *pendingSnapshotState // Last contact time for each peer (for leader check) lastContact map[string]time.Time } // readIndexRequest represents a pending read index request type readIndexRequest struct { readIndex uint64 done chan error } // pendingSnapshotState tracks chunked snapshot reception type pendingSnapshotState struct { lastIncludedIndex uint64 lastIncludedTerm uint64 data []byte receivedBytes uint64 } // Stats holds runtime statistics type Stats struct { Term uint64 State string FirstLogIndex uint64 // Added for monitoring compaction LastLogIndex uint64 LastLogTerm uint64 CommitIndex uint64 LastApplied uint64 LeaderID string VotesReceived int AppendsSent int64 AppendsReceived int64 ClusterSize int // Number of nodes in cluster ClusterNodes map[string]string // NodeID -> Address mapping } // ConsoleLogger implements Logger with console output type ConsoleLogger struct { Prefix string Level int // 0=debug, 1=info, 2=warn, 3=error mu sync.Mutex } func NewConsoleLogger(prefix string, level int) *ConsoleLogger { return &ConsoleLogger{Prefix: prefix, Level: level} } func (l *ConsoleLogger) log(level int, levelStr, format string, args ...interface{}) { if level < l.Level { return } l.mu.Lock() defer l.mu.Unlock() var levelColor string // Define colors locally to avoid dependency issues if cli.go is excluded const ( colorReset = "\033[0m" colorDim = "\033[90m" colorRed = "\033[31m" colorGreen = "\033[32m" colorYellow = "\033[33m" colorCyan = "\033[36m" ) switch level { case 0: levelColor = colorDim case 1: levelColor = colorGreen case 2: levelColor = colorYellow case 3: levelColor = colorRed default: levelColor = colorReset } msg := fmt.Sprintf(format, args...) // Format: [Time] Node [LEVEL] Message // Aligned for better readability fmt.Printf("%s[%s]%s %s%-8s%s %s[%-5s]%s %s\n", colorDim, time.Now().Format("15:04:05.000"), colorReset, colorCyan, l.Prefix, colorReset, levelColor, levelStr, colorReset, msg) } func (l *ConsoleLogger) Debug(format string, args ...interface{}) { l.log(0, "DEBUG", format, args...) } func (l *ConsoleLogger) Info(format string, args ...interface{}) { l.log(1, "INFO", format, args...) } func (l *ConsoleLogger) Warn(format string, args ...interface{}) { l.log(2, "WARN", format, args...) } func (l *ConsoleLogger) Error(format string, args ...interface{}) { l.log(3, "ERROR", format, args...) } // NewRaft creates a new Raft node func NewRaft(config *Config, transport Transport, applyCh chan ApplyMsg) (*Raft, error) { if config.Logger == nil { config.Logger = NewConsoleLogger(config.NodeID, 1) } // Create storage storage, err := NewHybridStorage(config.DataDir, config.MemoryLogCapacity, config.Logger) if err != nil { return nil, fmt.Errorf("failed to create storage: %w", err) } // Create log manager logMgr := NewLogManager(storage, config.Logger) // Load persistent state state, err := storage.GetState() if err != nil { return nil, fmt.Errorf("failed to load state: %w", err) } // Load or initialize cluster configuration clusterNodes := make(map[string]string) // Try to load saved cluster config first savedConfig, err := storage.GetClusterConfig() if err != nil { return nil, fmt.Errorf("failed to load cluster config: %w", err) } if savedConfig != nil && len(savedConfig.Nodes) > 0 { // Use saved config clusterNodes = savedConfig.Nodes config.Logger.Info("Loaded cluster config with %d nodes", len(clusterNodes)) } else { // Initialize from config if config.ClusterNodes != nil && len(config.ClusterNodes) > 0 { for k, v := range config.ClusterNodes { clusterNodes[k] = v } } else { // Build from Peers + self (backward compatibility) clusterNodes[config.NodeID] = config.ListenAddr if config.PeerMap != nil { for k, v := range config.PeerMap { clusterNodes[k] = v } } } // Save initial config if len(clusterNodes) > 0 { if err := storage.SaveClusterConfig(&ClusterConfig{Nodes: clusterNodes}); err != nil { config.Logger.Warn("Failed to save initial cluster config: %v", err) } } } // Build peers list from cluster nodes (excluding self) var peers []string for nodeID, addr := range clusterNodes { if nodeID != config.NodeID { peers = append(peers, addr) } } r := &Raft{ nodeID: config.NodeID, peers: peers, clusterNodes: clusterNodes, state: Follower, currentTerm: state.CurrentTerm, votedFor: state.VotedFor, log: logMgr, storage: storage, config: config, transport: transport, applyCh: applyCh, stopCh: make(chan struct{}), commitCh: make(chan struct{}, 100), // Increased buffer for event-driven apply replicationCh: make(chan struct{}, 1), nextIndex: make(map[string]uint64), matchIndex: make(map[string]uint64), logger: config.Logger, readIndexCh: make(chan *readIndexRequest, 100), lastHeartbeat: time.Now(), lastContact: make(map[string]time.Time), } // Initialize metrics r.metrics.Term = state.CurrentTerm // Initialize lastApplied and commitIndex from config (if provided) if config.LastAppliedIndex > 0 { r.lastApplied = config.LastAppliedIndex r.commitIndex = config.LastAppliedIndex // Applied implies committed r.logger.Info("Initialized lastApplied and commitIndex to %d from config", config.LastAppliedIndex) } // Set RPC handler transport.SetRPCHandler(r) return r, nil } // Start starts the Raft node func (r *Raft) Start() error { if !atomic.CompareAndSwapInt32(&r.running, 0, 1) { return fmt.Errorf("already running") } // Record start time r.startTime = time.Now() // Start transport if err := r.transport.Start(); err != nil { return fmt.Errorf("failed to start transport: %w", err) } // Restore FSM from snapshot if exists // This must happen before starting apply loop to ensure FSM state is restored if err := r.restoreFromSnapshot(); err != nil { // Suppress warning if it's just a missing file (first start) if !os.IsNotExist(err) && !strings.Contains(err.Error(), "no such file") { r.logger.Warn("Failed to restore from snapshot: %v", err) } else { r.logger.Info("No snapshot found, starting with empty state") } // Continue anyway - the node can still function, just without historical state } // Start election timer r.resetElectionTimer() // Start background goroutines with WaitGroup tracking r.wg.Add(4) go func() { defer r.wg.Done() r.applyLoop() }() go func() { defer r.wg.Done() r.replicationLoop() }() go func() { defer r.wg.Done() r.mainLoop() }() go func() { defer r.wg.Done() r.readIndexLoop() }() r.logger.Info("Raft node started") return nil } // Stop stops the Raft node func (r *Raft) Stop() error { if !atomic.CompareAndSwapInt32(&r.running, 1, 0) { return fmt.Errorf("not running") } // Signal all goroutines to stop close(r.stopCh) // Stop timers first to prevent new operations r.mu.Lock() if r.electionTimer != nil { r.electionTimer.Stop() } if r.heartbeatTimer != nil { r.heartbeatTimer.Stop() } r.mu.Unlock() // Wait for all goroutines to finish with timeout done := make(chan struct{}) go func() { r.wg.Wait() close(done) }() select { case <-done: // All goroutines exited cleanly case <-time.After(3 * time.Second): r.logger.Warn("Timeout waiting for goroutines to stop") } // Stop transport (has its own timeout) if err := r.transport.Stop(); err != nil { r.logger.Error("Failed to stop transport: %v", err) } if err := r.storage.Close(); err != nil { r.logger.Error("Failed to close storage: %v", err) } r.logger.Info("Raft node stopped") return nil } // mainLoop is the main event loop func (r *Raft) mainLoop() { for { select { case <-r.stopCh: return default: } r.mu.RLock() state := r.state r.mu.RUnlock() switch state { case Follower: r.runFollower() case Candidate: r.runCandidate() case Leader: r.runLeader() } } } // runFollower handles follower behavior func (r *Raft) runFollower() { r.logger.Debug("Running as follower in term %d", r.currentTerm) for { select { case <-r.stopCh: return case <-r.electionTimer.C: r.mu.Lock() if r.state == Follower { // If we're joining a cluster, don't start elections // Give time for the leader to sync us up if r.joiningCluster { // Allow joining for up to 30 seconds if time.Since(r.joiningClusterTime) < 30*time.Second { r.logger.Debug("Suppressing election during cluster join") r.resetElectionTimer() r.mu.Unlock() continue } // Timeout - clear joining state r.joiningCluster = false r.logger.Warn("Cluster join timeout, resuming normal election behavior") } r.logger.Debug("Election timeout, becoming candidate") r.state = Candidate r.leaderID = "" // Clear leaderID so we can vote for self in Pre-Vote } r.mu.Unlock() return } } } // runCandidate handles candidate behavior (leader election) // Implements Pre-Vote mechanism to prevent term explosion func (r *Raft) runCandidate() { // Phase 1: Pre-Vote (don't increment term yet) if !r.runPreVote() { // Pre-vote failed, wait for timeout before retrying r.mu.Lock() r.resetElectionTimer() r.mu.Unlock() select { case <-r.stopCh: return case <-r.electionTimer.C: // Timer expired, will retry pre-vote } return } // Phase 2: Actual election (pre-vote succeeded, now increment term) r.mu.Lock() r.currentTerm++ r.votedFor = r.nodeID r.leaderID = "" currentTerm := r.currentTerm // Update metrics atomic.StoreUint64(&r.metrics.Term, currentTerm) if err := r.persistState(); err != nil { r.logger.Error("Failed to persist state during election: %v", err) r.mu.Unlock() return // Cannot proceed without persisting state } atomic.AddUint64(&r.metrics.ElectionsStarted, 1) r.resetElectionTimer() r.mu.Unlock() r.logger.Debug("Starting election for term %d", currentTerm) // Get current peers list r.mu.RLock() currentPeers := make([]string, len(r.peers)) copy(currentPeers, r.peers) clusterSize := len(r.clusterNodes) if clusterSize == 0 { clusterSize = len(r.peers) + 1 } r.mu.RUnlock() // Request actual votes from all peers votes := 1 // Vote for self voteCh := make(chan bool, len(currentPeers)) lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm() for _, peer := range currentPeers { go func(peer string) { args := &RequestVoteArgs{ Term: currentTerm, CandidateID: r.nodeID, LastLogIndex: lastLogIndex, LastLogTerm: lastLogTerm, PreVote: false, } ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() reply, err := r.transport.RequestVote(ctx, peer, args) if err != nil { r.logger.Debug("RequestVote to %s failed: %v", peer, err) voteCh <- false return } r.mu.Lock() defer r.mu.Unlock() if reply.Term > r.currentTerm { r.becomeFollower(reply.Term) voteCh <- false return } voteCh <- reply.VoteGranted }(peer) } // Wait for votes - majority is (clusterSize/2) + 1 needed := clusterSize/2 + 1 // If we already have enough votes (single-node cluster), become leader immediately if votes >= needed { r.mu.Lock() if r.state == Candidate && r.currentTerm == currentTerm { r.becomeLeader() } r.mu.Unlock() return } for i := 0; i < len(currentPeers); i++ { select { case <-r.stopCh: return case <-r.electionTimer.C: r.logger.Debug("Election timeout, will start new election") return case granted := <-voteCh: if granted { votes++ if votes >= needed { r.mu.Lock() if r.state == Candidate && r.currentTerm == currentTerm { r.becomeLeader() } r.mu.Unlock() return } } } // Check if we're still a candidate r.mu.RLock() if r.state != Candidate { r.mu.RUnlock() return } r.mu.RUnlock() } // Election failed, wait for timeout r.mu.RLock() stillCandidate := r.state == Candidate r.mu.RUnlock() if stillCandidate { r.logger.Debug("Election failed, waiting for timeout (got %d/%d votes)", votes, needed) select { case <-r.stopCh: return case <-r.electionTimer.C: // Timer expired, will start new election } } } // runPreVote sends pre-vote requests to check if we can win an election // Returns true if we got majority pre-votes, false otherwise func (r *Raft) runPreVote() bool { r.mu.RLock() currentTerm := r.currentTerm currentPeers := make([]string, len(r.peers)) copy(currentPeers, r.peers) clusterSize := len(r.clusterNodes) if clusterSize == 0 { clusterSize = len(r.peers) + 1 } leaderID := r.leaderID r.mu.RUnlock() // Pre-vote uses term+1 but doesn't actually increment it preVoteTerm := currentTerm + 1 lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm() r.logger.Debug("Starting pre-vote for term %d", preVoteTerm) // Per Raft Pre-Vote optimization (§9.6): if we have a known leader, // don't vote for self. This prevents standalone nodes from constantly // electing themselves when they should be joining an existing cluster. preVotes := 0 if leaderID == "" { preVotes = 1 // Vote for self only if we don't know of a leader } else { r.logger.Debug("Pre-vote: not self-voting because we have leader %s", leaderID) } preVoteCh := make(chan bool, len(currentPeers)) for _, peer := range currentPeers { go func(peer string) { args := &RequestVoteArgs{ Term: preVoteTerm, CandidateID: r.nodeID, LastLogIndex: lastLogIndex, LastLogTerm: lastLogTerm, PreVote: true, } ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) defer cancel() reply, err := r.transport.RequestVote(ctx, peer, args) if err != nil { r.logger.Debug("PreVote to %s failed: %v", peer, err) preVoteCh <- false return } // For pre-vote, we don't step down even if reply.Term > currentTerm // because the other node might also be doing pre-vote preVoteCh <- reply.VoteGranted }(peer) } // Wait for pre-votes with a shorter timeout - majority is (clusterSize/2) + 1 needed := clusterSize/2 + 1 // If we already have enough votes (single-node cluster with no known leader), succeed immediately if preVotes >= needed { r.logger.Debug("Pre-vote succeeded immediately (got %d/%d pre-votes)", preVotes, needed) return true } timeout := time.After(500 * time.Millisecond) for i := 0; i < len(currentPeers); i++ { select { case <-r.stopCh: return false case <-timeout: r.logger.Debug("Pre-vote timeout (got %d/%d pre-votes)", preVotes, needed) return false case granted := <-preVoteCh: if granted { preVotes++ if preVotes >= needed { r.logger.Debug("Pre-vote succeeded (got %d/%d pre-votes)", preVotes, needed) return true } } } // Check if we're still a candidate r.mu.RLock() if r.state != Candidate { r.mu.RUnlock() return false } r.mu.RUnlock() } r.logger.Debug("Pre-vote failed (got %d/%d pre-votes)", preVotes, needed) return false } // runLeader handles leader behavior func (r *Raft) runLeader() { r.logger.Debug("Running as leader in term %d", r.currentTerm) // Send initial heartbeat r.sendHeartbeats() // Start heartbeat timer r.mu.Lock() r.heartbeatTimer = time.NewTimer(r.config.HeartbeatInterval) r.mu.Unlock() for { select { case <-r.stopCh: return case <-r.heartbeatTimer.C: r.mu.RLock() if r.state != Leader { r.mu.RUnlock() return } r.mu.RUnlock() r.sendHeartbeats() r.heartbeatTimer.Reset(r.config.HeartbeatInterval) case <-r.commitCh: r.updateCommitIndex() } } } // becomeFollower transitions to follower state func (r *Raft) becomeFollower(term uint64) { oldState := r.state oldTerm := r.currentTerm r.state = Follower r.currentTerm = term // Update metrics atomic.StoreUint64(&r.metrics.Term, term) r.votedFor = "" r.leaderID = "" // Must persist before responding - use mustPersistState for critical transitions r.mustPersistState() r.resetElectionTimer() // Clear leadership transfer state r.transferring = false r.transferTarget = "" // Only log significant state changes if oldState == Leader && term > oldTerm { // Stepping down from leader is notable r.logger.Debug("Stepped down from leader in term %d", term) } } // becomeLeader transitions to leader state func (r *Raft) becomeLeader() { r.state = Leader r.leaderID = r.nodeID // Update metrics atomic.AddUint64(&r.metrics.ElectionsWon, 1) // Initialize leader state for all peers lastIndex := r.log.LastIndex() for nodeID, addr := range r.clusterNodes { if nodeID != r.nodeID { r.nextIndex[addr] = lastIndex + 1 r.matchIndex[addr] = 0 } } // Also handle legacy peers for _, peer := range r.peers { if _, exists := r.nextIndex[peer]; !exists { r.nextIndex[peer] = lastIndex + 1 r.matchIndex[peer] = 0 } } r.logger.Info("Became leader in term %d with %d peers", r.currentTerm, len(r.clusterNodes)-1) // Append a no-op entry to commit entries from previous terms // This is a standard Raft optimization - the leader appends a no-op // entry in its current term to quickly commit all pending entries noopEntry := LogEntry{ Index: r.log.LastIndex() + 1, Term: r.currentTerm, Type: EntryNoop, Command: nil, } if err := r.log.Append(noopEntry); err != nil { r.logger.Error("Failed to append no-op entry: %v", err) } else { r.logger.Debug("Appended no-op entry at index %d for term %d", noopEntry.Index, noopEntry.Term) } } // sendHeartbeats sends AppendEntries RPCs to all peers func (r *Raft) sendHeartbeats() { r.mu.RLock() if r.state != Leader { r.mu.RUnlock() return } currentTerm := r.currentTerm leaderCommit := r.commitIndex // Get all peer addresses peerAddrs := make([]string, 0, len(r.clusterNodes)) for nodeID, addr := range r.clusterNodes { if nodeID != r.nodeID { peerAddrs = append(peerAddrs, addr) } } // Also include legacy peers not in clusterNodes for _, peer := range r.peers { found := false for _, addr := range peerAddrs { if addr == peer { found = true break } } if !found { peerAddrs = append(peerAddrs, peer) } } r.mu.RUnlock() for _, peer := range peerAddrs { go r.replicateToPeer(peer, currentTerm, leaderCommit) } } // replicateToPeer sends AppendEntries to a specific peer func (r *Raft) replicateToPeer(peer string, term, leaderCommit uint64) { r.mu.RLock() if r.state != Leader || r.currentTerm != term { r.mu.RUnlock() return } nextIndex := r.nextIndex[peer] r.mu.RUnlock() // Get entries to send entries, prevLogIndex, prevLogTerm, err := r.log.GetEntriesForFollower(nextIndex, r.config.MaxLogEntriesPerRequest) if err == ErrCompacted { // Need to send snapshot r.sendSnapshot(peer) return } if err != nil { r.logger.Debug("Failed to get entries for %s: %v", peer, err) return } args := &AppendEntriesArgs{ Term: term, LeaderID: r.nodeID, PrevLogIndex: prevLogIndex, PrevLogTerm: prevLogTerm, Entries: entries, LeaderCommit: leaderCommit, } ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() reply, err := r.transport.AppendEntries(ctx, peer, args) if err != nil { r.logger.Debug("AppendEntries to %s failed: %v", peer, err) return } atomic.AddInt64(&r.stats.AppendsSent, 1) r.mu.Lock() defer r.mu.Unlock() if reply.Term > r.currentTerm { r.becomeFollower(reply.Term) return } if r.state != Leader || r.currentTerm != term { return } if reply.Success { // Update contact time r.lastContact[peer] = time.Now() // Update nextIndex and matchIndex if len(entries) > 0 { newMatchIndex := entries[len(entries)-1].Index if newMatchIndex > r.matchIndex[peer] { r.matchIndex[peer] = newMatchIndex r.nextIndex[peer] = newMatchIndex + 1 // Try to update commit index select { case r.commitCh <- struct{}{}: default: } } } } else { // Even if failed (log conflict), we contacted the peer r.lastContact[peer] = time.Now() // Decrement nextIndex and retry if reply.ConflictTerm > 0 { // Find the last entry of ConflictTerm in our log found := false for idx := r.log.LastIndex(); idx >= r.log.FirstIndex(); idx-- { t, err := r.log.GetTerm(idx) if err != nil { break } if t == reply.ConflictTerm { r.nextIndex[peer] = idx + 1 found = true break } if t < reply.ConflictTerm { break } } if !found { r.nextIndex[peer] = reply.ConflictIndex } } else if reply.ConflictIndex > 0 { r.nextIndex[peer] = reply.ConflictIndex } else { r.nextIndex[peer] = max(1, r.nextIndex[peer]-1) } } } // sendSnapshot sends a snapshot to a peer with chunked transfer support func (r *Raft) sendSnapshot(peer string) { r.mu.RLock() if r.state != Leader { r.mu.RUnlock() return } term := r.currentTerm chunkSize := r.config.SnapshotChunkSize if chunkSize <= 0 { chunkSize = 1024 * 1024 // Default 1MB } r.mu.RUnlock() data, lastIndex, lastTerm, err := r.storage.GetSnapshot() if err != nil { r.logger.Error("Failed to get snapshot: %v", err) return } atomic.AddUint64(&r.metrics.SnapshotsSent, 1) r.logger.Info("Sending snapshot to %s: %d bytes, lastIndex=%d, lastTerm=%d", peer, len(data), lastIndex, lastTerm) // Send snapshot in chunks totalSize := len(data) offset := 0 for offset < totalSize { // Check if we're still leader r.mu.RLock() if r.state != Leader || r.currentTerm != term { r.mu.RUnlock() r.logger.Debug("Aborting snapshot send: no longer leader") return } r.mu.RUnlock() // Calculate chunk size end := offset + chunkSize if end > totalSize { end = totalSize } chunk := data[offset:end] done := end >= totalSize args := &InstallSnapshotArgs{ Term: term, LeaderID: r.nodeID, LastIncludedIndex: lastIndex, LastIncludedTerm: lastTerm, Offset: uint64(offset), Data: chunk, Done: done, } ctx, cancel := context.WithTimeout(context.Background(), r.config.SnapshotRPCTimeout) reply, err := r.transport.InstallSnapshot(ctx, peer, args) cancel() if err != nil { r.logger.Error("InstallSnapshot chunk to %s failed at offset %d: %v", peer, offset, err) return } if reply.Term > r.currentTerm { r.mu.Lock() r.becomeFollower(reply.Term) r.mu.Unlock() return } if !reply.Success { r.logger.Error("InstallSnapshot chunk rejected by %s at offset %d", peer, offset) return } offset = end } // Snapshot fully sent and accepted r.mu.Lock() defer r.mu.Unlock() if r.state != Leader || r.currentTerm != term { return } r.nextIndex[peer] = lastIndex + 1 r.matchIndex[peer] = lastIndex r.logger.Info("Snapshot to %s completed, nextIndex=%d", peer, lastIndex+1) } // updateCommitIndex updates the commit index based on matchIndex func (r *Raft) updateCommitIndex() { r.mu.Lock() defer r.mu.Unlock() if r.state != Leader { return } // For config change entries, use the OLD cluster for majority calculation // This ensures that adding a node doesn't require the new node's vote // until the config change itself is committed votingNodes := r.clusterNodes if r.pendingConfigChange && r.oldClusterNodes != nil { votingNodes = r.oldClusterNodes } // Get current cluster size from voting nodes clusterSize := len(votingNodes) if clusterSize == 0 { clusterSize = len(r.peers) + 1 } // Find the highest index replicated on a majority for n := r.log.LastIndex(); n > r.commitIndex; n-- { term, err := r.log.GetTerm(n) if err != nil { continue } // Only commit entries from current term if term != r.currentTerm { continue } // Count replicas (including self) count := 1 // Self for nodeID, addr := range votingNodes { if nodeID != r.nodeID { if r.matchIndex[addr] >= n { count++ } } } // Also check legacy peers for _, peer := range r.peers { // Avoid double counting if peer is already in votingNodes alreadyCounted := false for _, addr := range votingNodes { if addr == peer { alreadyCounted = true break } } if !alreadyCounted && r.matchIndex[peer] >= n { count++ } } // Majority is (n/2) + 1 needed := clusterSize/2 + 1 if count >= needed { r.commitIndex = n break } } } // applyLoop applies committed entries to the state machine // Uses event-driven model with fallback polling for reliability func (r *Raft) applyLoop() { // Use a ticker as fallback for missed signals (matches original 10ms polling) ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() for { select { case <-r.stopCh: return case <-r.commitCh: // Event-driven: triggered when commitIndex is updated r.applyCommitted() case <-ticker.C: // Fallback: check periodically in case we missed a signal r.applyCommitted() } } } // applyCommitted applies all committed but not yet applied entries func (r *Raft) applyCommitted() { r.mu.Lock() commitIndex := r.commitIndex lastApplied := r.lastApplied firstIndex := r.log.FirstIndex() lastLogIndex := r.log.LastIndex() r.mu.Unlock() // Safety check: ensure lastApplied is within valid range // If lastApplied is below firstIndex (due to snapshot), skip to firstIndex if lastApplied < firstIndex && firstIndex > 0 { r.mu.Lock() r.lastApplied = firstIndex lastApplied = firstIndex r.mu.Unlock() r.logger.Debug("Adjusted lastApplied to firstIndex %d after compaction", firstIndex) } // Safety check: don't try to apply beyond what we have in log if commitIndex > lastLogIndex { commitIndex = lastLogIndex } for lastApplied < commitIndex { lastApplied++ // Skip if entry has been compacted if lastApplied < firstIndex { continue } entry, err := r.log.GetEntry(lastApplied) if err != nil { // If entry is compacted, skip ahead if err == ErrCompacted { r.mu.Lock() newFirstIndex := r.log.FirstIndex() if lastApplied < newFirstIndex { r.lastApplied = newFirstIndex lastApplied = newFirstIndex } r.mu.Unlock() continue } r.logger.Error("Failed to get entry %d: %v (firstIndex=%d, lastIndex=%d)", lastApplied, err, r.log.FirstIndex(), r.log.LastIndex()) break } // Handle config change entries if entry.Type == EntryConfig { r.applyConfigChange(entry) r.mu.Lock() r.lastApplied = lastApplied r.mu.Unlock() continue } // Handle no-op entries (just update lastApplied, don't send to state machine) if entry.Type == EntryNoop { r.mu.Lock() r.lastApplied = lastApplied r.mu.Unlock() continue } // Normal command entry msg := ApplyMsg{ CommandValid: true, Command: entry.Command, CommandIndex: entry.Index, CommandTerm: entry.Term, } select { case r.applyCh <- msg: case <-r.stopCh: return } r.mu.Lock() r.lastApplied = lastApplied r.mu.Unlock() } // Check if log compaction is needed // Run asynchronously to avoid blocking the apply loop if snapshotting is slow go r.maybeCompactLog() } // maybeCompactLog checks if automatic log compaction should be triggered // It uses a dynamic threshold to prevent "compaction thrashing": // - First compaction triggers at SnapshotThreshold (default 100,000) // - After compaction, next threshold = current_log_size * 1.5 // - This prevents every new entry from triggering compaction if log stays large func (r *Raft) maybeCompactLog() { // Skip if no snapshot provider is configured if r.config.SnapshotProvider == nil { return } // Skip if log compaction is explicitly disabled if !r.config.LogCompactionEnabled { return } // Check if compaction is already in progress (atomic check-and-set) if !atomic.CompareAndSwapInt32(&r.compacting, 0, 1) { return } // Ensure we release the compacting flag when done defer atomic.StoreInt32(&r.compacting, 0) r.mu.RLock() lastApplied := r.lastApplied firstIndex := r.log.FirstIndex() initialThreshold := r.config.SnapshotThreshold minRetention := r.config.SnapshotMinRetention isLeader := r.state == Leader dynamicThreshold := r.nextCompactionThreshold r.mu.RUnlock() // Guard against underflow: ensure lastApplied > firstIndex if lastApplied <= firstIndex { return } // Calculate current log size logSize := lastApplied - firstIndex // Determine effective threshold: // - Use initial threshold if no compaction has occurred yet (dynamicThreshold == 0) // - Otherwise use the dynamic threshold effectiveThreshold := initialThreshold if dynamicThreshold > 0 { effectiveThreshold = dynamicThreshold } // Check if we have enough entries to warrant compaction if logSize <= effectiveThreshold { return } // Guard against underflow: ensure lastApplied > minRetention if lastApplied <= minRetention { return } // Calculate the safe compaction point // We need to retain at least minRetention entries for follower catch-up compactUpTo := lastApplied - minRetention if compactUpTo <= firstIndex { return // Not enough entries to compact while maintaining retention } // For leader, also consider the minimum nextIndex of all followers // to avoid compacting entries that followers still need if isLeader { r.mu.RLock() minNextIndex := lastApplied for _, nextIdx := range r.nextIndex { if nextIdx < minNextIndex { minNextIndex = nextIdx } } r.mu.RUnlock() // Don't compact entries that followers still need // Keep a buffer of minRetention entries before the slowest follower if minNextIndex > minRetention { followerSafePoint := minNextIndex - minRetention if followerSafePoint < compactUpTo { compactUpTo = followerSafePoint } } else { // Slowest follower is too far behind, don't compact // They will need a full snapshot anyway compactUpTo = firstIndex // Effectively skip compaction } } // Final check - make sure we're actually compacting something meaningful if compactUpTo <= firstIndex { return } // Get snapshot from application layer snapshotData, err := r.config.SnapshotProvider(compactUpTo) if err != nil { r.logger.Error("Failed to get snapshot from provider: %v", err) return } // Get the term at the compaction point term, err := r.log.GetTerm(compactUpTo) if err != nil { r.logger.Error("Failed to get term for compaction index %d: %v", compactUpTo, err) return } // Save snapshot if err := r.storage.SaveSnapshot(snapshotData, compactUpTo, term); err != nil { r.logger.Error("Failed to save snapshot: %v", err) return } // Compact the log // CRITICAL: We must hold the write lock during compaction to prevent // concurrent writes (AppendEntries) unless we can guarantee underlying // file integrity with concurrent modification. The requirement is to // pause writes during compaction. r.mu.Lock() if err := r.log.Compact(compactUpTo); err != nil { r.mu.Unlock() r.logger.Error("Failed to compact log: %v", err) return } r.mu.Unlock() // Calculate new log size after compaction newLogSize := lastApplied - compactUpTo // Update dynamic threshold for next compaction // We use a linear growth model: next threshold = current size + SnapshotThreshold // This ensures that we trigger compaction roughly every SnapshotThreshold entries. r.mu.Lock() r.nextCompactionThreshold = newLogSize + r.config.SnapshotThreshold // Ensure threshold doesn't go below the initial threshold if r.nextCompactionThreshold < initialThreshold { r.nextCompactionThreshold = initialThreshold } r.mu.Unlock() r.logger.Info("Auto compaction completed: compacted up to index %d (term %d), log size %d -> %d, next threshold: %d", compactUpTo, term, logSize, newLogSize, r.nextCompactionThreshold) } // resetElectionTimer resets the election timeout func (r *Raft) resetElectionTimer() { timeout := r.config.ElectionTimeoutMin + time.Duration(rand.Int63n(int64(r.config.ElectionTimeoutMax-r.config.ElectionTimeoutMin))) if r.electionTimer == nil { r.electionTimer = time.NewTimer(timeout) } else { if !r.electionTimer.Stop() { select { case <-r.electionTimer.C: default: } } r.electionTimer.Reset(timeout) } } // persistState saves the current state to stable storage // Returns error if persistence fails - caller MUST handle this for safety func (r *Raft) persistState() error { state := &PersistentState{ CurrentTerm: r.currentTerm, VotedFor: r.votedFor, } if err := r.storage.SaveState(state); err != nil { r.logger.Error("Failed to persist state: %v", err) return fmt.Errorf("%w: %v", ErrPersistFailed, err) } return nil } // mustPersistState saves state and panics on failure // Use this only in critical paths where failure is unrecoverable func (r *Raft) mustPersistState() { if err := r.persistState(); err != nil { // In production, you might want to trigger a graceful shutdown instead r.logger.Error("CRITICAL: Failed to persist state, node may be in inconsistent state: %v", err) panic(err) } } // HandleRequestVote handles RequestVote RPCs (including pre-vote) func (r *Raft) HandleRequestVote(args *RequestVoteArgs) *RequestVoteReply { r.mu.Lock() defer r.mu.Unlock() reply := &RequestVoteReply{ Term: r.currentTerm, VoteGranted: false, } // Handle pre-vote separately if args.PreVote { return r.handlePreVote(args) } // Reply false if term < currentTerm if args.Term < r.currentTerm { return reply } // If term > currentTerm, become follower if args.Term > r.currentTerm { r.becomeFollower(args.Term) } reply.Term = r.currentTerm // Check if we can vote for this candidate if (r.votedFor == "" || r.votedFor == args.CandidateID) && r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) { r.votedFor = args.CandidateID if err := r.persistState(); err != nil { // Cannot grant vote if we can't persist the decision r.logger.Error("Failed to persist vote for %s: %v", args.CandidateID, err) return reply } r.resetElectionTimer() reply.VoteGranted = true r.logger.Debug("Granted vote to %s for term %d", args.CandidateID, args.Term) } return reply } // handlePreVote handles pre-vote requests // Pre-vote doesn't change our state, just checks if we would vote func (r *Raft) handlePreVote(args *RequestVoteArgs) *RequestVoteReply { reply := &RequestVoteReply{ Term: r.currentTerm, VoteGranted: false, } // For pre-vote, we check: // 1. The candidate's term is at least as high as ours // 2. The candidate's log is at least as up-to-date as ours // 3. We don't have a current leader (or the candidate's term is higher) if args.Term < r.currentTerm { return reply } // Per Raft Pre-Vote optimization (§9.6): reject pre-vote if we have a current // leader and the candidate's term is not higher than ours. This prevents // disruptive elections when a partitioned node tries to rejoin. if r.leaderID != "" && args.Term <= r.currentTerm { r.logger.Debug("Rejecting pre-vote from %s: have leader %s", args.CandidateID, r.leaderID) return reply } // Grant pre-vote if log is up-to-date // Note: we don't check votedFor for pre-vote, and we don't update any state if r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) { reply.VoteGranted = true r.logger.Debug("Granted pre-vote to %s for term %d", args.CandidateID, args.Term) } return reply } // HandleAppendEntries handles AppendEntries RPCs func (r *Raft) HandleAppendEntries(args *AppendEntriesArgs) *AppendEntriesReply { r.mu.Lock() defer r.mu.Unlock() atomic.AddInt64(&r.stats.AppendsReceived, 1) reply := &AppendEntriesReply{ Term: r.currentTerm, Success: false, } // Check if we're a standalone node being added to a cluster // A standalone node has only itself in clusterNodes isStandalone := len(r.clusterNodes) == 1 if _, hasSelf := r.clusterNodes[r.nodeID]; isStandalone && hasSelf { // We're standalone and receiving AppendEntries from an external leader // This means we're being added to a cluster - suppress elections if !r.joiningCluster { r.joiningCluster = true r.joiningClusterTime = time.Now() r.logger.Info("Detected cluster join in progress, suppressing elections") } // When joining, accept higher terms from the leader to sync up if args.Term > r.currentTerm { r.becomeFollower(args.Term) } } // Reply false if term < currentTerm if args.Term < r.currentTerm { // But still reset timer if we're joining a cluster to prevent elections if r.joiningCluster { r.resetElectionTimer() } return reply } // If term > currentTerm, or we're a candidate, or we're a leader receiving // AppendEntries from another leader (split-brain scenario during cluster merge), // become follower. In Raft, there can only be one leader per term. if args.Term > r.currentTerm || r.state == Candidate || r.state == Leader { r.becomeFollower(args.Term) } // Update leader info and reset election timer r.leaderID = args.LeaderID r.lastHeartbeat = time.Now() r.resetElectionTimer() reply.Term = r.currentTerm // Try to append entries success, conflictIndex, conflictTerm := r.log.AppendEntriesFromLeader( args.PrevLogIndex, args.PrevLogTerm, args.Entries) if !success { reply.ConflictIndex = conflictIndex reply.ConflictTerm = conflictTerm return reply } reply.Success = true // Update commit index safely if args.LeaderCommit > r.commitIndex { // Get our actual last log index lastLogIndex := r.log.LastIndex() // Calculate what index the entries would have reached lastNewEntry := args.PrevLogIndex if len(args.Entries) > 0 { lastNewEntry = args.Entries[len(args.Entries)-1].Index } // Commit index should not exceed what we actually have in log newCommitIndex := args.LeaderCommit if newCommitIndex > lastNewEntry { newCommitIndex = lastNewEntry } if newCommitIndex > lastLogIndex { newCommitIndex = lastLogIndex } // Only advance commit index if newCommitIndex > r.commitIndex { r.commitIndex = newCommitIndex } } return reply } // HandleInstallSnapshot handles InstallSnapshot RPCs with chunked transfer support func (r *Raft) HandleInstallSnapshot(args *InstallSnapshotArgs) *InstallSnapshotReply { r.mu.Lock() defer r.mu.Unlock() reply := &InstallSnapshotReply{ Term: r.currentTerm, Success: false, } if args.Term < r.currentTerm { return reply } if args.Term > r.currentTerm { r.becomeFollower(args.Term) } r.leaderID = args.LeaderID r.lastHeartbeat = time.Now() r.resetElectionTimer() reply.Term = r.currentTerm // Skip if we already have this or a newer snapshot applied if args.LastIncludedIndex <= r.lastApplied { r.logger.Debug("Ignoring snapshot at index %d, already applied up to %d", args.LastIncludedIndex, r.lastApplied) reply.Success = true // Still success to let leader know we don't need it return reply } // Handle chunked transfer if args.Offset == 0 { // First chunk - start new pending snapshot r.pendingSnapshot = &pendingSnapshotState{ lastIncludedIndex: args.LastIncludedIndex, lastIncludedTerm: args.LastIncludedTerm, data: make([]byte, 0), receivedBytes: 0, } r.logger.Info("Starting snapshot reception at index %d, term %d", args.LastIncludedIndex, args.LastIncludedTerm) } // Validate we're receiving the expected snapshot if r.pendingSnapshot == nil || r.pendingSnapshot.lastIncludedIndex != args.LastIncludedIndex || r.pendingSnapshot.lastIncludedTerm != args.LastIncludedTerm { r.logger.Warn("Unexpected snapshot chunk: expected index %d, got %d", r.pendingSnapshot.lastIncludedIndex, args.LastIncludedIndex) return reply } // Validate offset matches what we've received if uint64(args.Offset) != r.pendingSnapshot.receivedBytes { r.logger.Warn("Unexpected chunk offset: expected %d, got %d", r.pendingSnapshot.receivedBytes, args.Offset) return reply } // Append chunk data r.pendingSnapshot.data = append(r.pendingSnapshot.data, args.Data...) r.pendingSnapshot.receivedBytes += uint64(len(args.Data)) reply.Success = true r.logger.Debug("Received snapshot chunk: offset=%d, size=%d, done=%v", args.Offset, len(args.Data), args.Done) // If not done, wait for more chunks if !args.Done { return reply } // All chunks received - apply the snapshot r.logger.Info("Installing complete snapshot: %d bytes at index %d, term %d", len(r.pendingSnapshot.data), args.LastIncludedIndex, args.LastIncludedTerm) atomic.AddUint64(&r.metrics.SnapshotsInstalled, 1) // Save snapshot if err := r.storage.SaveSnapshot(r.pendingSnapshot.data, args.LastIncludedIndex, args.LastIncludedTerm); err != nil { r.logger.Error("Failed to save snapshot: %v", err) r.pendingSnapshot = nil reply.Success = false return reply } // Compact log if err := r.log.Compact(args.LastIncludedIndex); err != nil { r.logger.Error("Failed to compact log: %v", err) } // Update state - must update both commitIndex and lastApplied if args.LastIncludedIndex > r.commitIndex { r.commitIndex = args.LastIncludedIndex } // Always update lastApplied to snapshot index to prevent trying to apply compacted entries r.lastApplied = args.LastIncludedIndex // Send snapshot to application (non-blocking with timeout) // Use the complete pendingSnapshot data, not the last chunk msg := ApplyMsg{ SnapshotValid: true, Snapshot: r.pendingSnapshot.data, SnapshotIndex: args.LastIncludedIndex, SnapshotTerm: args.LastIncludedTerm, } // Clear pending snapshot r.pendingSnapshot = nil // Try to send, but don't block indefinitely select { case r.applyCh <- msg: r.logger.Debug("Sent snapshot to application") case <-time.After(100 * time.Millisecond): r.logger.Warn("Timeout sending snapshot to application, will retry") // The application will still get correct state via normal apply loop } return reply } // Propose proposes a new command to be replicated func (r *Raft) Propose(command []byte) (uint64, uint64, bool) { r.mu.Lock() defer r.mu.Unlock() if r.state != Leader { return 0, 0, false } // Check connectivity (Lease Check) // If we haven't heard from a majority of peers within ElectionTimeout, // we shouldn't accept new commands because we might be partitioned. if len(r.clusterNodes) > 1 { activePeers := 1 // Self now := time.Now() timeout := r.config.ElectionTimeoutMax for _, peer := range r.peers { if last, ok := r.lastContact[peer]; ok && now.Sub(last) < timeout { activePeers++ } } // Check majority needed := len(r.clusterNodes)/2 + 1 if activePeers < needed { r.logger.Warn("Rejecting Propose: lost contact with majority (active: %d, needed: %d)", activePeers, needed) return 0, 0, false } } index, err := r.log.AppendCommand(r.currentTerm, command) if err != nil { r.logger.Error("Failed to append command: %v", err) return 0, 0, false } r.matchIndex[r.nodeID] = index // For single-node cluster, we are the only voter and can commit immediately // This fixes the issue where commitCh never gets triggered without other peers if len(r.clusterNodes) <= 1 && len(r.peers) == 0 { // Single node: self is majority, trigger commit immediately select { case r.commitCh <- struct{}{}: default: } } else { // Multi-node: trigger replication to other nodes r.triggerReplication() } return index, r.currentTerm, true } // triggerReplication signals the replication loop to send heartbeats // This uses a non-blocking send to batch replication requests func (r *Raft) triggerReplication() { select { case r.replicationCh <- struct{}{}: default: // Replication already scheduled } } // replicationLoop handles batched replication // Uses simple delay-based batching: flush immediately when signaled, then wait // to allow more requests to accumulate before the next flush. func (r *Raft) replicationLoop() { for { select { case <-r.stopCh: return case <-r.replicationCh: // Flush and replicate immediately r.flushAndReplicate() // Wait briefly to allow batching of subsequent requests // This gives time for more proposals to queue up before the next flush time.Sleep(10 * time.Millisecond) } } } // flushAndReplicate flushes logs and sends heartbeats func (r *Raft) flushAndReplicate() { // Ensure logs are flushed to OS cache before sending to followers // This implements Group Commit with Flush (fast) instead of Sync (slow) if err := r.log.Flush(); err != nil { r.logger.Error("Failed to flush log: %v", err) } r.sendHeartbeats() } // ProposeWithForward proposes a command, forwarding to leader if necessary // This is the recommended method for applications to use func (r *Raft) ProposeWithForward(command []byte) (index uint64, term uint64, err error) { r.mu.RLock() // Check if we are part of the cluster // If we are not in clusterNodes, we shouldn't accept data commands if _, exists := r.clusterNodes[r.nodeID]; !exists { r.mu.RUnlock() return 0, 0, fmt.Errorf("node %s is not part of the cluster", r.nodeID) } r.mu.RUnlock() // Try local propose first idx, t, isLeader := r.Propose(command) if isLeader { return idx, t, nil } // Not leader, forward to leader r.mu.RLock() leaderID := r.leaderID // Use clusterNodes (dynamically maintained) to find leader address leaderAddr := r.clusterNodes[leaderID] r.mu.RUnlock() if leaderID == "" { return 0, 0, fmt.Errorf("no leader available") } if leaderAddr == "" { return 0, 0, fmt.Errorf("leader %s address not found in cluster", leaderID) } // Forward to leader ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() args := &ProposeArgs{Command: command} reply, err := r.transport.ForwardPropose(ctx, leaderAddr, args) if err != nil { return 0, 0, fmt.Errorf("forward failed: %w", err) } if !reply.Success { return 0, 0, fmt.Errorf("leader rejected: %s", reply.Error) } return reply.Index, reply.Term, nil } // ForwardGet forwards a get request to the leader func (r *Raft) ForwardGet(key string) (string, bool, error) { r.mu.RLock() // Check if we are part of the cluster if _, exists := r.clusterNodes[r.nodeID]; !exists { r.mu.RUnlock() return "", false, fmt.Errorf("node %s is not part of the cluster", r.nodeID) } r.mu.RUnlock() // Check if we are leader (local read) if r.state == Leader { if r.config.GetHandler != nil { val, found := r.config.GetHandler(key) return val, found, nil } return "", false, fmt.Errorf("get handler not configured") } r.mu.RLock() leaderID := r.leaderID leaderAddr := r.clusterNodes[leaderID] r.mu.RUnlock() if leaderID == "" { return "", false, ErrNoLeader } if leaderAddr == "" { return "", false, fmt.Errorf("leader %s address not found", leaderID) } // Forward to leader ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout) defer cancel() args := &GetArgs{Key: key} reply, err := r.transport.ForwardGet(ctx, leaderAddr, args) if err != nil { return "", false, fmt.Errorf("forward failed: %w", err) } return reply.Value, reply.Found, nil } // HandlePropose handles forwarded propose requests func (r *Raft) HandlePropose(args *ProposeArgs) *ProposeReply { index, term, isLeader := r.Propose(args.Command) if !isLeader { return &ProposeReply{ Success: false, Error: "not leader", } } return &ProposeReply{ Success: true, Index: index, Term: term, } } // HandleAddNode handles forwarded AddNode requests func (r *Raft) HandleAddNode(args *AddNodeArgs) *AddNodeReply { err := r.AddNode(args.NodeID, args.Address) if err != nil { return &AddNodeReply{ Success: false, Error: err.Error(), } } return &AddNodeReply{ Success: true, } } // HandleRemoveNode handles forwarded RemoveNode requests func (r *Raft) HandleRemoveNode(args *RemoveNodeArgs) *RemoveNodeReply { err := r.RemoveNode(args.NodeID) if err != nil { return &RemoveNodeReply{ Success: false, Error: err.Error(), } } return &RemoveNodeReply{ Success: true, } } // GetState returns the current term and whether this node is leader func (r *Raft) GetState() (uint64, bool) { r.mu.RLock() defer r.mu.RUnlock() return r.currentTerm, r.state == Leader } // GetLeaderID returns the current leader ID func (r *Raft) GetLeaderID() string { r.mu.RLock() defer r.mu.RUnlock() return r.leaderID } // GetStats returns runtime statistics func (r *Raft) GetStats() Stats { r.mu.RLock() defer r.mu.RUnlock() lastIndex, lastTerm := r.log.LastIndexAndTerm() firstIndex := r.log.FirstIndex() // Copy cluster nodes nodes := make(map[string]string) for k, v := range r.clusterNodes { nodes[k] = v } clusterSize := len(r.clusterNodes) if clusterSize == 0 { clusterSize = len(r.peers) + 1 } return Stats{ Term: r.currentTerm, State: r.state.String(), FirstLogIndex: firstIndex, LastLogIndex: lastIndex, LastLogTerm: lastTerm, CommitIndex: r.commitIndex, LastApplied: r.lastApplied, LeaderID: r.leaderID, AppendsSent: atomic.LoadInt64(&r.stats.AppendsSent), AppendsReceived: atomic.LoadInt64(&r.stats.AppendsReceived), ClusterSize: clusterSize, ClusterNodes: nodes, } } // restoreFromSnapshot restores the FSM from a snapshot at startup // This is called during Start() to ensure the FSM has the correct state // before processing any new commands func (r *Raft) restoreFromSnapshot() error { // Get snapshot from storage data, lastIndex, lastTerm, err := r.storage.GetSnapshot() if err != nil { return fmt.Errorf("failed to get snapshot: %w", err) } // No snapshot exists if len(data) == 0 || lastIndex == 0 { return nil } r.logger.Info("Restoring FSM from snapshot at index %d, term %d (%d bytes)", lastIndex, lastTerm, len(data)) // Update lastApplied to snapshot index to prevent re-applying compacted entries r.mu.Lock() if lastIndex > r.lastApplied { r.lastApplied = lastIndex } if lastIndex > r.commitIndex { r.commitIndex = lastIndex } r.mu.Unlock() // Send snapshot to FSM for restoration // Use a goroutine with timeout to avoid blocking if applyCh is full msg := ApplyMsg{ SnapshotValid: true, Snapshot: data, SnapshotIndex: lastIndex, SnapshotTerm: lastTerm, } // Try to send with a timeout select { case r.applyCh <- msg: r.logger.Info("FSM restoration triggered from snapshot at index %d", lastIndex) case <-time.After(5 * time.Second): return fmt.Errorf("timeout sending snapshot to applyCh") } return nil } // TakeSnapshot takes a snapshot of the current state func (r *Raft) TakeSnapshot(data []byte, index uint64) error { r.mu.Lock() defer r.mu.Unlock() if index > r.lastApplied { return fmt.Errorf("snapshot index %d exceeds lastApplied %d", index, r.lastApplied) } term, err := r.log.GetTerm(index) if err != nil { return fmt.Errorf("failed to get term for index %d: %w", index, err) } if err := r.storage.SaveSnapshot(data, index, term); err != nil { return fmt.Errorf("failed to save snapshot: %w", err) } if err := r.log.Compact(index); err != nil { return fmt.Errorf("failed to compact log: %w", err) } r.logger.Info("Took snapshot at index %d, term %d", index, term) return nil } func max(a, b uint64) uint64 { if a > b { return a } return b } // ==================== Membership Change API ==================== // // This implementation uses Single-Node Membership Change (also known as one-at-a-time changes) // as described in the Raft dissertation (§4.3). This is safe because: // // 1. We only allow one configuration change at a time (pendingConfigChange flag) // 2. For commits, we use the OLD cluster majority until the config change is committed // 3. The new node starts receiving entries immediately but doesn't affect majority calculation // // This approach is simpler than Joint Consensus and is sufficient for most use cases. // The invariant maintained is: any two majorities (old or new) must overlap. // // For adding a node: old majority = N/2+1, new majority = (N+1)/2+1 = N/2+1 (overlaps) // For removing a node: old majority = N/2+1, new majority = (N-1)/2+1 (overlaps if N > 1) // // WARNING: Avoid adding/removing multiple nodes rapidly. Wait for each change to be committed. // AddNode adds a new node to the cluster // This can only be called on the leader // The new node must already be running and reachable // // Safety guarantees: // - Only one config change can be in progress at a time // - The old cluster majority is used until the config change is committed // - Returns error if leadership is lost during the operation func (r *Raft) AddNode(nodeID, address string) error { r.mu.Lock() // Must be leader if r.state != Leader { leaderID := r.leaderID r.mu.Unlock() return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff) } // Check if we're in the middle of a leadership transfer if r.transferring { r.mu.Unlock() return fmt.Errorf("leadership transfer in progress") } // Check if there's already a pending config change if r.pendingConfigChange { r.mu.Unlock() return ErrConfigInFlight } // Validate nodeID and address if nodeID == "" { r.mu.Unlock() return fmt.Errorf("nodeID cannot be empty") } if address == "" { r.mu.Unlock() return fmt.Errorf("address cannot be empty") } // Check if node already exists if _, exists := r.clusterNodes[nodeID]; exists { r.mu.Unlock() return fmt.Errorf("node %s already exists in cluster", nodeID) } // Check if address is already used by another node for existingID, existingAddr := range r.clusterNodes { if existingAddr == address { r.mu.Unlock() return fmt.Errorf("address %s is already used by node %s", address, existingID) } } // Save old cluster nodes for majority calculation during config change // This ensures we use the OLD cluster size until the config change is committed r.oldClusterNodes = make(map[string]string) for k, v := range r.clusterNodes { r.oldClusterNodes[k] = v } // Create new config with the added node newNodes := make(map[string]string) for k, v := range r.clusterNodes { newNodes[k] = v } newNodes[nodeID] = address // Create config change entry with ClusterConfig configIndex := r.log.LastIndex() + 1 entry := LogEntry{ Index: configIndex, Term: r.currentTerm, Type: EntryConfig, Config: &ClusterConfig{Nodes: newNodes}, } // Mark config change as pending and store the index r.pendingConfigChange = true r.configChangeIndex = configIndex // Immediately apply the new configuration (for single-node changes, this is safe) // The new node will start receiving AppendEntries immediately r.clusterNodes[nodeID] = address r.peers = append(r.peers, address) // Set nextIndex to 1 (or firstIndex) so the new node syncs from the beginning // This is crucial - the new node's log is empty, so we must start from index 1 firstIndex := r.log.FirstIndex() if firstIndex == 0 { firstIndex = 1 } r.nextIndex[address] = firstIndex r.matchIndex[address] = 0 r.mu.Unlock() // Append the config change entry to log if err := r.log.Append(entry); err != nil { r.mu.Lock() r.pendingConfigChange = false r.oldClusterNodes = nil r.configChangeIndex = 0 // Rollback delete(r.clusterNodes, nodeID) r.rebuildPeersList() r.mu.Unlock() return fmt.Errorf("failed to append config entry: %w", err) } r.logger.Info("Adding node %s (%s) to cluster", nodeID, address) // Trigger immediate replication r.triggerReplication() return nil } // RemoveNode removes a node from the cluster // This can only be called on the leader // The node being removed can be any node except the leader itself // // Safety guarantees: // - Only one config change can be in progress at a time // - Cannot remove the leader (transfer leadership first) // - Cannot reduce cluster to 0 nodes // - The old cluster majority is used until the config change is committed func (r *Raft) RemoveNode(nodeID string) error { r.mu.Lock() // Must be leader if r.state != Leader { leaderID := r.leaderID r.mu.Unlock() return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff) } // Check if we're in the middle of a leadership transfer if r.transferring { r.mu.Unlock() return fmt.Errorf("leadership transfer in progress") } // Cannot remove self if nodeID == r.nodeID { r.mu.Unlock() return fmt.Errorf("cannot remove self from cluster, use TransferLeadership first") } // Validate nodeID if nodeID == "" { r.mu.Unlock() return fmt.Errorf("nodeID cannot be empty") } // Check if there's already a pending config change if r.pendingConfigChange { r.mu.Unlock() return ErrConfigInFlight } // Check if node exists if _, exists := r.clusterNodes[nodeID]; !exists { r.mu.Unlock() return fmt.Errorf("node %s not found in cluster", nodeID) } // FORCE REMOVAL CHECK // Check connectivity to the node being removed. // If unreachable or invalid, we force remove it without standard consensus if needed, // or at least bypass some strict checks. // For Raft safety, we still need to commit a config change log entry so other nodes know about the removal. // However, if the node is "invalid" (e.g. bad address), it won't respond to RPCs. // We should allow removal even if we can't contact it. // Check validity of address (simple heuristic) nodeAddr := r.clusterNodes[nodeID] isValidNode := true if nodeAddr == "" || strings.HasPrefix(nodeAddr, ".") || !strings.Contains(nodeAddr, ":") { isValidNode = false r.logger.Warn("Removing invalid node %s with address %s", nodeID, nodeAddr) } // Cannot reduce cluster below 1 node, unless it's an invalid node cleanup if isValidNode && len(r.clusterNodes) <= 1 { r.mu.Unlock() return fmt.Errorf("cannot remove last node from cluster") } // Save old cluster nodes for majority calculation during config change r.oldClusterNodes = make(map[string]string) for k, v := range r.clusterNodes { r.oldClusterNodes[k] = v } // Create new config without the removed node newNodes := make(map[string]string) for k, v := range r.clusterNodes { if k != nodeID { newNodes[k] = v } } // Create config change entry with ClusterConfig configIndex := r.log.LastIndex() + 1 entry := LogEntry{ Index: configIndex, Term: r.currentTerm, Type: EntryConfig, Config: &ClusterConfig{Nodes: newNodes}, } // Mark config change as pending and store the index r.pendingConfigChange = true r.configChangeIndex = configIndex // Get the address of node being removed for cleanup removedAddr := r.clusterNodes[nodeID] // Immediately apply the new configuration delete(r.clusterNodes, nodeID) r.rebuildPeersList() delete(r.nextIndex, removedAddr) delete(r.matchIndex, removedAddr) r.mu.Unlock() // Append the config change entry to log if err := r.log.Append(entry); err != nil { r.mu.Lock() r.pendingConfigChange = false r.oldClusterNodes = nil r.configChangeIndex = 0 // Rollback - this is tricky but we try our best r.clusterNodes[nodeID] = removedAddr r.rebuildPeersList() r.mu.Unlock() return fmt.Errorf("failed to append config entry: %w", err) } r.logger.Info("Removing node %s from cluster, config at index %d", nodeID, entry.Index) // Trigger replication go r.sendHeartbeats() return nil } // AddNodeWithForward adds a node, forwarding to leader if necessary // This is the recommended method for applications to use func (r *Raft) AddNodeWithForward(nodeID, address string) error { // Try local operation first err := r.AddNode(nodeID, address) if err == nil { return nil } // Check if we're not the leader r.mu.RLock() state := r.state leaderID := r.leaderID leaderAddr := r.clusterNodes[leaderID] r.mu.RUnlock() if state == Leader { // We are leader but AddNode failed for other reasons return err } // Not leader, forward to leader if leaderID == "" { return fmt.Errorf("no leader available") } if leaderAddr == "" { return fmt.Errorf("leader %s address not found in cluster", leaderID) } // Forward to leader ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() args := &AddNodeArgs{NodeID: nodeID, Address: address} reply, err := r.transport.ForwardAddNode(ctx, leaderAddr, args) if err != nil { return fmt.Errorf("forward failed: %w", err) } if !reply.Success { return fmt.Errorf("leader rejected: %s", reply.Error) } return nil } // RemoveNodeWithForward removes a node, forwarding to leader if necessary // This is the recommended method for applications to use func (r *Raft) RemoveNodeWithForward(nodeID string) error { // Try local operation first err := r.RemoveNode(nodeID) if err == nil { return nil } // Check if we're not the leader r.mu.RLock() state := r.state leaderID := r.leaderID leaderAddr := r.clusterNodes[leaderID] r.mu.RUnlock() if state == Leader { // We are leader but RemoveNode failed for other reasons return err } // Not leader, forward to leader if leaderID == "" { return fmt.Errorf("no leader available") } if leaderAddr == "" { return fmt.Errorf("leader %s address not found in cluster", leaderID) } // Forward to leader // Short timeout for removal as it shouldn't block long ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() args := &RemoveNodeArgs{NodeID: nodeID} reply, err := r.transport.ForwardRemoveNode(ctx, leaderAddr, args) if err != nil { return fmt.Errorf("forward failed: %w", err) } if !reply.Success { return fmt.Errorf("leader rejected: %s", reply.Error) } return nil } // rebuildPeersList rebuilds the peers slice from clusterNodes func (r *Raft) rebuildPeersList() { r.peers = make([]string, 0, len(r.clusterNodes)-1) for nodeID, addr := range r.clusterNodes { if nodeID != r.nodeID { r.peers = append(r.peers, addr) } } } // GetClusterNodes returns a copy of the current cluster membership func (r *Raft) GetClusterNodes() map[string]string { r.mu.RLock() defer r.mu.RUnlock() nodes := make(map[string]string) for k, v := range r.clusterNodes { nodes[k] = v } return nodes } // applyConfigChange applies a configuration change entry func (r *Raft) applyConfigChange(entry *LogEntry) { if entry.Config == nil || entry.Config.Nodes == nil { r.logger.Warn("Invalid config change entry at index %d", entry.Index) return } r.mu.Lock() defer r.mu.Unlock() // Update cluster configuration r.clusterNodes = make(map[string]string) for k, v := range entry.Config.Nodes { r.clusterNodes[k] = v } r.rebuildPeersList() // Persist the new configuration if err := r.storage.SaveClusterConfig(&ClusterConfig{Nodes: r.clusterNodes}); err != nil { r.logger.Error("Failed to persist cluster config: %v", err) } // Clear pending flag and old cluster state r.pendingConfigChange = false r.oldClusterNodes = nil r.configChangeIndex = 0 // If we were joining a cluster and now have multiple nodes, we've successfully joined if r.joiningCluster && len(r.clusterNodes) > 1 { r.joiningCluster = false r.logger.Info("Successfully joined cluster with %d nodes", len(r.clusterNodes)) } // Check if we have been removed from the cluster if _, exists := r.clusterNodes[r.nodeID]; !exists { r.logger.Warn("Node %s removed from cluster configuration", r.nodeID) // We could shut down here, or just stay as a listener. // For now, let's step down to Follower if we are not already. if r.state != Follower { r.becomeFollower(r.currentTerm) } } r.logger.Info("Applied config change at index %d, cluster now has %d nodes", entry.Index, len(r.clusterNodes)) // If we're the leader, update leader state if r.state == Leader { // Check if we are still in the cluster if _, exists := r.clusterNodes[r.nodeID]; !exists { r.logger.Warn("Leader removed from cluster (self), stepping down") r.becomeFollower(r.currentTerm) return } // Initialize nextIndex/matchIndex for any new nodes lastIndex := r.log.LastIndex() for nodeID, addr := range r.clusterNodes { if nodeID != r.nodeID { if _, exists := r.nextIndex[addr]; !exists { r.nextIndex[addr] = lastIndex + 1 r.matchIndex[addr] = 0 } } } // Clean up removed nodes validAddrs := make(map[string]bool) for nodeID, addr := range r.clusterNodes { if nodeID != r.nodeID { validAddrs[addr] = true } } for addr := range r.nextIndex { if !validAddrs[addr] { delete(r.nextIndex, addr) delete(r.matchIndex, addr) } } } } // ==================== ReadIndex (Linearizable Reads) ==================== // readIndexLoop handles read index requests func (r *Raft) readIndexLoop() { for { select { case <-r.stopCh: return case req := <-r.readIndexCh: r.processReadIndexRequest(req) } } } // processReadIndexRequest processes a single read index request func (r *Raft) processReadIndexRequest(req *readIndexRequest) { r.mu.RLock() if r.state != Leader { r.mu.RUnlock() req.done <- ErrNotLeader return } r.mu.RUnlock() // Confirm leadership by sending heartbeats and waiting for majority ack if !r.confirmLeadership() { req.done <- ErrLeadershipLost return } // Wait for apply to catch up to readIndex if err := r.waitApply(req.readIndex, r.config.ProposeTimeout); err != nil { req.done <- err return } req.done <- nil } // ReadIndex implements linearizable reads // It ensures that the read sees all writes that were committed before the read started func (r *Raft) ReadIndex() (uint64, error) { r.mu.RLock() if r.state != Leader { leaderID := r.leaderID r.mu.RUnlock() return 0, NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff) } readIndex := r.commitIndex r.mu.RUnlock() atomic.AddUint64(&r.metrics.ReadIndexRequests, 1) // Create request and send to processing loop req := &readIndexRequest{ readIndex: readIndex, done: make(chan error, 1), } select { case r.readIndexCh <- req: case <-r.stopCh: return 0, ErrShutdown case <-time.After(r.config.ProposeTimeout): return 0, ErrTimeout } // Wait for result select { case err := <-req.done: if err != nil { return 0, err } atomic.AddUint64(&r.metrics.ReadIndexSuccess, 1) return readIndex, nil case <-r.stopCh: return 0, ErrShutdown case <-time.After(r.config.ProposeTimeout): return 0, ErrTimeout } } // confirmLeadership confirms we're still leader by getting acks from majority func (r *Raft) confirmLeadership() bool { r.mu.RLock() if r.state != Leader { r.mu.RUnlock() return false } currentTerm := r.currentTerm clusterSize := len(r.clusterNodes) if clusterSize == 0 { clusterSize = len(r.peers) + 1 } r.mu.RUnlock() // Single node cluster - we're always the leader if clusterSize == 1 { return true } // Send heartbeats and count acks r.sendHeartbeats() // Wait briefly and check if we're still leader time.Sleep(r.config.HeartbeatInterval) r.mu.RLock() stillLeader := r.state == Leader && r.currentTerm == currentTerm r.mu.RUnlock() return stillLeader } // waitApply waits until lastApplied >= index func (r *Raft) waitApply(index uint64, timeout time.Duration) error { deadline := time.Now().Add(timeout) for { r.mu.RLock() lastApplied := r.lastApplied r.mu.RUnlock() if lastApplied >= index { return nil } if time.Now().After(deadline) { return ErrTimeout } time.Sleep(1 * time.Millisecond) } } // ==================== Health Check ==================== // HealthCheck returns the current health status of the node func (r *Raft) HealthCheck() HealthStatus { r.mu.RLock() defer r.mu.RUnlock() clusterNodes := make(map[string]string) for k, v := range r.clusterNodes { clusterNodes[k] = v } clusterSize := len(r.clusterNodes) if clusterSize == 0 { clusterSize = len(r.peers) + 1 } logBehind := uint64(0) if r.commitIndex > r.lastApplied { logBehind = r.commitIndex - r.lastApplied } // Consider healthy if we're leader or have a known leader isHealthy := r.state == Leader || r.leaderID != "" return HealthStatus{ NodeID: r.nodeID, State: r.state.String(), Term: r.currentTerm, LeaderID: r.leaderID, ClusterSize: clusterSize, ClusterNodes: clusterNodes, CommitIndex: r.commitIndex, LastApplied: r.lastApplied, LogBehind: logBehind, LastHeartbeat: r.lastHeartbeat, IsHealthy: isHealthy, Uptime: time.Since(r.startTime), } } // GetMetrics returns the current metrics func (r *Raft) GetMetrics() Metrics { return Metrics{ Term: atomic.LoadUint64(&r.metrics.Term), ProposalsTotal: atomic.LoadUint64(&r.metrics.ProposalsTotal), ProposalsSuccess: atomic.LoadUint64(&r.metrics.ProposalsSuccess), ProposalsFailed: atomic.LoadUint64(&r.metrics.ProposalsFailed), ProposalsForwarded: atomic.LoadUint64(&r.metrics.ProposalsForwarded), AppendsSent: atomic.LoadUint64(&r.metrics.AppendsSent), AppendsReceived: atomic.LoadUint64(&r.metrics.AppendsReceived), AppendsSuccess: atomic.LoadUint64(&r.metrics.AppendsSuccess), AppendsFailed: atomic.LoadUint64(&r.metrics.AppendsFailed), ElectionsStarted: atomic.LoadUint64(&r.metrics.ElectionsStarted), ElectionsWon: atomic.LoadUint64(&r.metrics.ElectionsWon), PreVotesStarted: atomic.LoadUint64(&r.metrics.PreVotesStarted), PreVotesGranted: atomic.LoadUint64(&r.metrics.PreVotesGranted), SnapshotsTaken: atomic.LoadUint64(&r.metrics.SnapshotsTaken), SnapshotsInstalled: atomic.LoadUint64(&r.metrics.SnapshotsInstalled), SnapshotsSent: atomic.LoadUint64(&r.metrics.SnapshotsSent), ReadIndexRequests: atomic.LoadUint64(&r.metrics.ReadIndexRequests), ReadIndexSuccess: atomic.LoadUint64(&r.metrics.ReadIndexSuccess), LeadershipTransfers: atomic.LoadUint64(&r.metrics.LeadershipTransfers), LeadershipTransferSuccess: atomic.LoadUint64(&r.metrics.LeadershipTransferSuccess), } } // ==================== Leadership Transfer ==================== // TransferLeadership transfers leadership to the specified node func (r *Raft) TransferLeadership(targetID string) error { r.mu.Lock() if r.state != Leader { r.mu.Unlock() return ErrNotLeader } if targetID == r.nodeID { r.mu.Unlock() return fmt.Errorf("cannot transfer to self") } targetAddr, exists := r.clusterNodes[targetID] if !exists { r.mu.Unlock() return fmt.Errorf("target node %s not in cluster", targetID) } if r.transferring { r.mu.Unlock() return fmt.Errorf("leadership transfer already in progress") } r.transferring = true r.transferTarget = targetID r.transferDeadline = time.Now().Add(r.config.ElectionTimeoutMax * 2) currentTerm := r.currentTerm atomic.AddUint64(&r.metrics.LeadershipTransfers, 1) r.mu.Unlock() r.logger.Info("Starting leadership transfer to %s", targetID) // Step 1: Sync target to our log if err := r.syncFollowerToLatest(targetAddr); err != nil { r.mu.Lock() r.transferring = false r.transferTarget = "" r.mu.Unlock() return fmt.Errorf("failed to sync target: %w", err) } // Step 2: Send TimeoutNow RPC args := &TimeoutNowArgs{ Term: currentTerm, LeaderID: r.nodeID, } ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout) defer cancel() reply, err := r.transport.TimeoutNow(ctx, targetAddr, args) if err != nil { r.mu.Lock() r.transferring = false r.transferTarget = "" r.mu.Unlock() return fmt.Errorf("TimeoutNow RPC failed: %w", err) } if !reply.Success { r.mu.Lock() r.transferring = false r.transferTarget = "" r.mu.Unlock() return fmt.Errorf("target rejected leadership transfer") } atomic.AddUint64(&r.metrics.LeadershipTransferSuccess, 1) r.logger.Info("Leadership transfer to %s initiated successfully", targetID) // Note: We don't immediately step down; we wait for the target to win election // and send us an AppendEntries with higher term return nil } // syncFollowerToLatest ensures the follower is caught up to our log func (r *Raft) syncFollowerToLatest(peerAddr string) error { r.mu.RLock() if r.state != Leader { r.mu.RUnlock() return ErrNotLeader } currentTerm := r.currentTerm leaderCommit := r.commitIndex lastIndex := r.log.LastIndex() r.mu.RUnlock() // Keep replicating until follower is caught up deadline := time.Now().Add(r.config.ElectionTimeoutMax * 2) for time.Now().Before(deadline) { r.mu.RLock() if r.state != Leader || r.currentTerm != currentTerm { r.mu.RUnlock() return ErrLeadershipLost } matchIndex := r.matchIndex[peerAddr] r.mu.RUnlock() if matchIndex >= lastIndex { return nil // Caught up } // Trigger replication r.replicateToPeer(peerAddr, currentTerm, leaderCommit) time.Sleep(10 * time.Millisecond) } return ErrTimeout } // HandleTimeoutNow handles TimeoutNow RPC (for leadership transfer) func (r *Raft) HandleTimeoutNow(args *TimeoutNowArgs) *TimeoutNowReply { r.mu.Lock() defer r.mu.Unlock() reply := &TimeoutNowReply{ Term: r.currentTerm, Success: false, } // Only accept if we're a follower and the term matches if args.Term < r.currentTerm { return reply } if r.state != Follower { return reply } // Immediately start election r.logger.Info("Received TimeoutNow from %s, starting immediate election", args.LeaderID) r.state = Candidate reply.Success = true return reply } // HandleReadIndex handles ReadIndex RPC func (r *Raft) HandleReadIndex(args *ReadIndexArgs) *ReadIndexReply { reply := &ReadIndexReply{ Success: false, } readIndex, err := r.ReadIndex() if err != nil { reply.Error = err.Error() return reply } reply.ReadIndex = readIndex reply.Success = true return reply } // HandleGet handles Get RPC for remote KV reads func (r *Raft) HandleGet(args *GetArgs) *GetReply { reply := &GetReply{ Found: false, } if r.config.GetHandler == nil { reply.Error = "get handler not configured" return reply } value, found := r.config.GetHandler(args.Key) reply.Value = value reply.Found = found return reply }