|
|
@@ -0,0 +1,2738 @@
|
|
|
+package raft
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "fmt"
|
|
|
+ "math/rand"
|
|
|
+ "sync"
|
|
|
+ "sync/atomic"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+// Raft represents a Raft consensus node
|
|
|
+type Raft struct {
|
|
|
+ mu sync.RWMutex
|
|
|
+
|
|
|
+ // Node identity
|
|
|
+ nodeID string
|
|
|
+ peers []string
|
|
|
+
|
|
|
+ // Cluster membership - maps nodeID to address
|
|
|
+ clusterNodes map[string]string
|
|
|
+
|
|
|
+ // Current state
|
|
|
+ state NodeState
|
|
|
+ currentTerm uint64
|
|
|
+ votedFor string
|
|
|
+ leaderID string
|
|
|
+
|
|
|
+ // Log management
|
|
|
+ log *LogManager
|
|
|
+ storage Storage
|
|
|
+ commitIndex uint64
|
|
|
+ lastApplied uint64
|
|
|
+
|
|
|
+ // Leader state
|
|
|
+ nextIndex map[string]uint64
|
|
|
+ matchIndex map[string]uint64
|
|
|
+
|
|
|
+ // Configuration
|
|
|
+ config *Config
|
|
|
+
|
|
|
+ // Communication
|
|
|
+ transport Transport
|
|
|
+
|
|
|
+ // Channels
|
|
|
+ applyCh chan ApplyMsg
|
|
|
+ stopCh chan struct{}
|
|
|
+ commitCh chan struct{}
|
|
|
+
|
|
|
+ // Election timer
|
|
|
+ electionTimer *time.Timer
|
|
|
+ heartbeatTimer *time.Timer
|
|
|
+
|
|
|
+ // Statistics (deprecated, use metrics instead)
|
|
|
+ stats Stats
|
|
|
+
|
|
|
+ // Metrics for monitoring
|
|
|
+ metrics Metrics
|
|
|
+
|
|
|
+ // Logger
|
|
|
+ logger Logger
|
|
|
+
|
|
|
+ // Running flag
|
|
|
+ running int32
|
|
|
+
|
|
|
+ // Replication trigger channel - used to batch replication requests
|
|
|
+ replicationCh chan struct{}
|
|
|
+
|
|
|
+ // Pending config change - only one config change can be pending at a time
|
|
|
+ pendingConfigChange bool
|
|
|
+ // Old cluster nodes - used for majority calculation during config change
|
|
|
+ // This ensures we use the old cluster size until the config change is committed
|
|
|
+ oldClusterNodes map[string]string
|
|
|
+ // Index of the pending config change entry
|
|
|
+ configChangeIndex uint64
|
|
|
+
|
|
|
+ // Joining cluster state - when a standalone node is being added to a cluster
|
|
|
+ // This prevents the node from starting elections while syncing
|
|
|
+ joiningCluster bool
|
|
|
+ joiningClusterTime time.Time
|
|
|
+
|
|
|
+ // Compaction state - prevents concurrent compaction
|
|
|
+ compacting int32
|
|
|
+ // Dynamic compaction threshold - updated after each compaction
|
|
|
+ // Next compaction triggers when log size >= this value
|
|
|
+ // Formula: lastCompactionSize * 1.5 (or initial SnapshotThreshold)
|
|
|
+ nextCompactionThreshold uint64
|
|
|
+
|
|
|
+ // WaitGroup to track goroutines for clean shutdown
|
|
|
+ wg sync.WaitGroup
|
|
|
+
|
|
|
+ // Leadership transfer state
|
|
|
+ transferring bool
|
|
|
+ transferTarget string
|
|
|
+ transferDeadline time.Time
|
|
|
+
|
|
|
+ // Last heartbeat received (for health check)
|
|
|
+ lastHeartbeat time.Time
|
|
|
+
|
|
|
+ // Start time (for uptime calculation)
|
|
|
+ startTime time.Time
|
|
|
+
|
|
|
+ // ReadIndex waiting queue
|
|
|
+ readIndexCh chan *readIndexRequest
|
|
|
+
|
|
|
+ // Snapshot receiving state (for chunked transfer)
|
|
|
+ pendingSnapshot *pendingSnapshotState
|
|
|
+}
|
|
|
+
|
|
|
+// readIndexRequest represents a pending read index request
|
|
|
+type readIndexRequest struct {
|
|
|
+ readIndex uint64
|
|
|
+ done chan error
|
|
|
+}
|
|
|
+
|
|
|
+// pendingSnapshotState tracks chunked snapshot reception
|
|
|
+type pendingSnapshotState struct {
|
|
|
+ lastIncludedIndex uint64
|
|
|
+ lastIncludedTerm uint64
|
|
|
+ data []byte
|
|
|
+ receivedBytes uint64
|
|
|
+}
|
|
|
+
|
|
|
+// Stats holds runtime statistics
|
|
|
+type Stats struct {
|
|
|
+ Term uint64
|
|
|
+ State string
|
|
|
+ LastLogIndex uint64
|
|
|
+ LastLogTerm uint64
|
|
|
+ CommitIndex uint64
|
|
|
+ LastApplied uint64
|
|
|
+ LeaderID string
|
|
|
+ VotesReceived int
|
|
|
+ AppendsSent int64
|
|
|
+ AppendsReceived int64
|
|
|
+ ClusterSize int // Number of nodes in cluster
|
|
|
+ ClusterNodes map[string]string // NodeID -> Address mapping
|
|
|
+}
|
|
|
+
|
|
|
+// ConsoleLogger implements Logger with console output
|
|
|
+type ConsoleLogger struct {
|
|
|
+ Prefix string
|
|
|
+ Level int // 0=debug, 1=info, 2=warn, 3=error
|
|
|
+ mu sync.Mutex
|
|
|
+}
|
|
|
+
|
|
|
+func NewConsoleLogger(prefix string, level int) *ConsoleLogger {
|
|
|
+ return &ConsoleLogger{Prefix: prefix, Level: level}
|
|
|
+}
|
|
|
+
|
|
|
+func (l *ConsoleLogger) log(level int, levelStr, format string, args ...interface{}) {
|
|
|
+ if level < l.Level {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ l.mu.Lock()
|
|
|
+ defer l.mu.Unlock()
|
|
|
+ msg := fmt.Sprintf(format, args...)
|
|
|
+ fmt.Printf("[%s] %s [%s] %s\n", time.Now().Format("15:04:05.000"), l.Prefix, levelStr, msg)
|
|
|
+}
|
|
|
+
|
|
|
+func (l *ConsoleLogger) Debug(format string, args ...interface{}) {
|
|
|
+ l.log(0, "DEBUG", format, args...)
|
|
|
+}
|
|
|
+func (l *ConsoleLogger) Info(format string, args ...interface{}) {
|
|
|
+ l.log(1, "INFO", format, args...)
|
|
|
+}
|
|
|
+func (l *ConsoleLogger) Warn(format string, args ...interface{}) {
|
|
|
+ l.log(2, "WARN", format, args...)
|
|
|
+}
|
|
|
+func (l *ConsoleLogger) Error(format string, args ...interface{}) {
|
|
|
+ l.log(3, "ERROR", format, args...)
|
|
|
+}
|
|
|
+
|
|
|
+// NewRaft creates a new Raft node
|
|
|
+func NewRaft(config *Config, transport Transport, applyCh chan ApplyMsg) (*Raft, error) {
|
|
|
+ if config.Logger == nil {
|
|
|
+ config.Logger = NewConsoleLogger(config.NodeID, 1)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create storage
|
|
|
+ storage, err := NewHybridStorage(config.DataDir, config.MemoryLogCapacity, config.Logger)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("failed to create storage: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create log manager
|
|
|
+ logMgr := NewLogManager(storage, config.Logger)
|
|
|
+
|
|
|
+ // Load persistent state
|
|
|
+ state, err := storage.GetState()
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("failed to load state: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Load or initialize cluster configuration
|
|
|
+ clusterNodes := make(map[string]string)
|
|
|
+
|
|
|
+ // Try to load saved cluster config first
|
|
|
+ savedConfig, err := storage.GetClusterConfig()
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("failed to load cluster config: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if savedConfig != nil && len(savedConfig.Nodes) > 0 {
|
|
|
+ // Use saved config
|
|
|
+ clusterNodes = savedConfig.Nodes
|
|
|
+ config.Logger.Info("Loaded cluster config with %d nodes", len(clusterNodes))
|
|
|
+ } else {
|
|
|
+ // Initialize from config
|
|
|
+ if config.ClusterNodes != nil && len(config.ClusterNodes) > 0 {
|
|
|
+ for k, v := range config.ClusterNodes {
|
|
|
+ clusterNodes[k] = v
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // Build from Peers + self (backward compatibility)
|
|
|
+ clusterNodes[config.NodeID] = config.ListenAddr
|
|
|
+ if config.PeerMap != nil {
|
|
|
+ for k, v := range config.PeerMap {
|
|
|
+ clusterNodes[k] = v
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Save initial config
|
|
|
+ if len(clusterNodes) > 0 {
|
|
|
+ if err := storage.SaveClusterConfig(&ClusterConfig{Nodes: clusterNodes}); err != nil {
|
|
|
+ config.Logger.Warn("Failed to save initial cluster config: %v", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Build peers list from cluster nodes (excluding self)
|
|
|
+ var peers []string
|
|
|
+ for nodeID, addr := range clusterNodes {
|
|
|
+ if nodeID != config.NodeID {
|
|
|
+ peers = append(peers, addr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ r := &Raft{
|
|
|
+ nodeID: config.NodeID,
|
|
|
+ peers: peers,
|
|
|
+ clusterNodes: clusterNodes,
|
|
|
+ state: Follower,
|
|
|
+ currentTerm: state.CurrentTerm,
|
|
|
+ votedFor: state.VotedFor,
|
|
|
+ log: logMgr,
|
|
|
+ storage: storage,
|
|
|
+ config: config,
|
|
|
+ transport: transport,
|
|
|
+ applyCh: applyCh,
|
|
|
+ stopCh: make(chan struct{}),
|
|
|
+ commitCh: make(chan struct{}, 100), // Increased buffer for event-driven apply
|
|
|
+ replicationCh: make(chan struct{}, 1),
|
|
|
+ nextIndex: make(map[string]uint64),
|
|
|
+ matchIndex: make(map[string]uint64),
|
|
|
+ logger: config.Logger,
|
|
|
+ readIndexCh: make(chan *readIndexRequest, 100),
|
|
|
+ lastHeartbeat: time.Now(),
|
|
|
+ }
|
|
|
+
|
|
|
+ // Set RPC handler
|
|
|
+ transport.SetRPCHandler(r)
|
|
|
+
|
|
|
+ return r, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Start starts the Raft node
|
|
|
+func (r *Raft) Start() error {
|
|
|
+ if !atomic.CompareAndSwapInt32(&r.running, 0, 1) {
|
|
|
+ return fmt.Errorf("already running")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Record start time
|
|
|
+ r.startTime = time.Now()
|
|
|
+
|
|
|
+ // Start transport
|
|
|
+ if err := r.transport.Start(); err != nil {
|
|
|
+ return fmt.Errorf("failed to start transport: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Restore FSM from snapshot if exists
|
|
|
+ // This must happen before starting apply loop to ensure FSM state is restored
|
|
|
+ if err := r.restoreFromSnapshot(); err != nil {
|
|
|
+ r.logger.Warn("Failed to restore from snapshot: %v", err)
|
|
|
+ // Continue anyway - the node can still function, just without historical state
|
|
|
+ }
|
|
|
+
|
|
|
+ // Start election timer
|
|
|
+ r.resetElectionTimer()
|
|
|
+
|
|
|
+ // Start background goroutines with WaitGroup tracking
|
|
|
+ r.wg.Add(4)
|
|
|
+ go func() {
|
|
|
+ defer r.wg.Done()
|
|
|
+ r.applyLoop()
|
|
|
+ }()
|
|
|
+ go func() {
|
|
|
+ defer r.wg.Done()
|
|
|
+ r.replicationLoop()
|
|
|
+ }()
|
|
|
+ go func() {
|
|
|
+ defer r.wg.Done()
|
|
|
+ r.mainLoop()
|
|
|
+ }()
|
|
|
+ go func() {
|
|
|
+ defer r.wg.Done()
|
|
|
+ r.readIndexLoop()
|
|
|
+ }()
|
|
|
+
|
|
|
+ r.logger.Info("Raft node started")
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// Stop stops the Raft node
|
|
|
+func (r *Raft) Stop() error {
|
|
|
+ if !atomic.CompareAndSwapInt32(&r.running, 1, 0) {
|
|
|
+ return fmt.Errorf("not running")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Signal all goroutines to stop
|
|
|
+ close(r.stopCh)
|
|
|
+
|
|
|
+ // Stop timers first to prevent new operations
|
|
|
+ r.mu.Lock()
|
|
|
+ if r.electionTimer != nil {
|
|
|
+ r.electionTimer.Stop()
|
|
|
+ }
|
|
|
+ if r.heartbeatTimer != nil {
|
|
|
+ r.heartbeatTimer.Stop()
|
|
|
+ }
|
|
|
+ r.mu.Unlock()
|
|
|
+
|
|
|
+ // Wait for all goroutines to finish with timeout
|
|
|
+ done := make(chan struct{})
|
|
|
+ go func() {
|
|
|
+ r.wg.Wait()
|
|
|
+ close(done)
|
|
|
+ }()
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-done:
|
|
|
+ // All goroutines exited cleanly
|
|
|
+ case <-time.After(3 * time.Second):
|
|
|
+ r.logger.Warn("Timeout waiting for goroutines to stop")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Stop transport (has its own timeout)
|
|
|
+ if err := r.transport.Stop(); err != nil {
|
|
|
+ r.logger.Error("Failed to stop transport: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := r.storage.Close(); err != nil {
|
|
|
+ r.logger.Error("Failed to close storage: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ r.logger.Info("Raft node stopped")
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// mainLoop is the main event loop
|
|
|
+func (r *Raft) mainLoop() {
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-r.stopCh:
|
|
|
+ return
|
|
|
+ default:
|
|
|
+ }
|
|
|
+
|
|
|
+ r.mu.RLock()
|
|
|
+ state := r.state
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ switch state {
|
|
|
+ case Follower:
|
|
|
+ r.runFollower()
|
|
|
+ case Candidate:
|
|
|
+ r.runCandidate()
|
|
|
+ case Leader:
|
|
|
+ r.runLeader()
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// runFollower handles follower behavior
|
|
|
+func (r *Raft) runFollower() {
|
|
|
+ r.logger.Debug("Running as follower in term %d", r.currentTerm)
|
|
|
+
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-r.stopCh:
|
|
|
+ return
|
|
|
+
|
|
|
+ case <-r.electionTimer.C:
|
|
|
+ r.mu.Lock()
|
|
|
+ if r.state == Follower {
|
|
|
+ // If we're joining a cluster, don't start elections
|
|
|
+ // Give time for the leader to sync us up
|
|
|
+ if r.joiningCluster {
|
|
|
+ // Allow joining for up to 30 seconds
|
|
|
+ if time.Since(r.joiningClusterTime) < 30*time.Second {
|
|
|
+ r.logger.Debug("Suppressing election during cluster join")
|
|
|
+ r.resetElectionTimer()
|
|
|
+ r.mu.Unlock()
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ // Timeout - clear joining state
|
|
|
+ r.joiningCluster = false
|
|
|
+ r.logger.Warn("Cluster join timeout, resuming normal election behavior")
|
|
|
+ }
|
|
|
+ r.logger.Debug("Election timeout, becoming candidate")
|
|
|
+ r.state = Candidate
|
|
|
+ }
|
|
|
+ r.mu.Unlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// runCandidate handles candidate behavior (leader election)
|
|
|
+// Implements Pre-Vote mechanism to prevent term explosion
|
|
|
+func (r *Raft) runCandidate() {
|
|
|
+ // Phase 1: Pre-Vote (don't increment term yet)
|
|
|
+ if !r.runPreVote() {
|
|
|
+ // Pre-vote failed, wait for timeout before retrying
|
|
|
+ r.mu.Lock()
|
|
|
+ r.resetElectionTimer()
|
|
|
+ r.mu.Unlock()
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-r.stopCh:
|
|
|
+ return
|
|
|
+ case <-r.electionTimer.C:
|
|
|
+ // Timer expired, will retry pre-vote
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Phase 2: Actual election (pre-vote succeeded, now increment term)
|
|
|
+ r.mu.Lock()
|
|
|
+ r.currentTerm++
|
|
|
+ r.votedFor = r.nodeID
|
|
|
+ r.leaderID = ""
|
|
|
+ currentTerm := r.currentTerm
|
|
|
+ if err := r.persistState(); err != nil {
|
|
|
+ r.logger.Error("Failed to persist state during election: %v", err)
|
|
|
+ r.mu.Unlock()
|
|
|
+ return // Cannot proceed without persisting state
|
|
|
+ }
|
|
|
+ atomic.AddUint64(&r.metrics.ElectionsStarted, 1)
|
|
|
+ r.resetElectionTimer()
|
|
|
+ r.mu.Unlock()
|
|
|
+
|
|
|
+ r.logger.Debug("Starting election for term %d", currentTerm)
|
|
|
+
|
|
|
+ // Get current peers list
|
|
|
+ r.mu.RLock()
|
|
|
+ currentPeers := make([]string, len(r.peers))
|
|
|
+ copy(currentPeers, r.peers)
|
|
|
+ clusterSize := len(r.clusterNodes)
|
|
|
+ if clusterSize == 0 {
|
|
|
+ clusterSize = len(r.peers) + 1
|
|
|
+ }
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ // Request actual votes from all peers
|
|
|
+ votes := 1 // Vote for self
|
|
|
+ voteCh := make(chan bool, len(currentPeers))
|
|
|
+
|
|
|
+ lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
|
|
|
+
|
|
|
+ for _, peer := range currentPeers {
|
|
|
+ go func(peer string) {
|
|
|
+ args := &RequestVoteArgs{
|
|
|
+ Term: currentTerm,
|
|
|
+ CandidateID: r.nodeID,
|
|
|
+ LastLogIndex: lastLogIndex,
|
|
|
+ LastLogTerm: lastLogTerm,
|
|
|
+ PreVote: false,
|
|
|
+ }
|
|
|
+
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
|
|
+ defer cancel()
|
|
|
+
|
|
|
+ reply, err := r.transport.RequestVote(ctx, peer, args)
|
|
|
+ if err != nil {
|
|
|
+ r.logger.Debug("RequestVote to %s failed: %v", peer, err)
|
|
|
+ voteCh <- false
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ r.mu.Lock()
|
|
|
+ defer r.mu.Unlock()
|
|
|
+
|
|
|
+ if reply.Term > r.currentTerm {
|
|
|
+ r.becomeFollower(reply.Term)
|
|
|
+ voteCh <- false
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ voteCh <- reply.VoteGranted
|
|
|
+ }(peer)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait for votes - majority is (clusterSize/2) + 1
|
|
|
+ needed := clusterSize/2 + 1
|
|
|
+
|
|
|
+ // If we already have enough votes (single-node cluster), become leader immediately
|
|
|
+ if votes >= needed {
|
|
|
+ r.mu.Lock()
|
|
|
+ if r.state == Candidate && r.currentTerm == currentTerm {
|
|
|
+ r.becomeLeader()
|
|
|
+ }
|
|
|
+ r.mu.Unlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ for i := 0; i < len(currentPeers); i++ {
|
|
|
+ select {
|
|
|
+ case <-r.stopCh:
|
|
|
+ return
|
|
|
+
|
|
|
+ case <-r.electionTimer.C:
|
|
|
+ r.logger.Debug("Election timeout, will start new election")
|
|
|
+ return
|
|
|
+
|
|
|
+ case granted := <-voteCh:
|
|
|
+ if granted {
|
|
|
+ votes++
|
|
|
+ if votes >= needed {
|
|
|
+ r.mu.Lock()
|
|
|
+ if r.state == Candidate && r.currentTerm == currentTerm {
|
|
|
+ r.becomeLeader()
|
|
|
+ }
|
|
|
+ r.mu.Unlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if we're still a candidate
|
|
|
+ r.mu.RLock()
|
|
|
+ if r.state != Candidate {
|
|
|
+ r.mu.RUnlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ r.mu.RUnlock()
|
|
|
+ }
|
|
|
+
|
|
|
+ // Election failed, wait for timeout
|
|
|
+ r.mu.RLock()
|
|
|
+ stillCandidate := r.state == Candidate
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ if stillCandidate {
|
|
|
+ r.logger.Debug("Election failed, waiting for timeout (got %d/%d votes)", votes, needed)
|
|
|
+ select {
|
|
|
+ case <-r.stopCh:
|
|
|
+ return
|
|
|
+ case <-r.electionTimer.C:
|
|
|
+ // Timer expired, will start new election
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// runPreVote sends pre-vote requests to check if we can win an election
|
|
|
+// Returns true if we got majority pre-votes, false otherwise
|
|
|
+func (r *Raft) runPreVote() bool {
|
|
|
+ r.mu.RLock()
|
|
|
+ currentTerm := r.currentTerm
|
|
|
+ currentPeers := make([]string, len(r.peers))
|
|
|
+ copy(currentPeers, r.peers)
|
|
|
+ clusterSize := len(r.clusterNodes)
|
|
|
+ if clusterSize == 0 {
|
|
|
+ clusterSize = len(r.peers) + 1
|
|
|
+ }
|
|
|
+ leaderID := r.leaderID
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ // Pre-vote uses term+1 but doesn't actually increment it
|
|
|
+ preVoteTerm := currentTerm + 1
|
|
|
+
|
|
|
+ lastLogIndex, lastLogTerm := r.log.LastIndexAndTerm()
|
|
|
+
|
|
|
+ r.logger.Debug("Starting pre-vote for term %d", preVoteTerm)
|
|
|
+
|
|
|
+ // Per Raft Pre-Vote optimization (§9.6): if we have a known leader,
|
|
|
+ // don't vote for self. This prevents standalone nodes from constantly
|
|
|
+ // electing themselves when they should be joining an existing cluster.
|
|
|
+ preVotes := 0
|
|
|
+ if leaderID == "" {
|
|
|
+ preVotes = 1 // Vote for self only if we don't know of a leader
|
|
|
+ } else {
|
|
|
+ r.logger.Debug("Pre-vote: not self-voting because we have leader %s", leaderID)
|
|
|
+ }
|
|
|
+ preVoteCh := make(chan bool, len(currentPeers))
|
|
|
+
|
|
|
+ for _, peer := range currentPeers {
|
|
|
+ go func(peer string) {
|
|
|
+ args := &RequestVoteArgs{
|
|
|
+ Term: preVoteTerm,
|
|
|
+ CandidateID: r.nodeID,
|
|
|
+ LastLogIndex: lastLogIndex,
|
|
|
+ LastLogTerm: lastLogTerm,
|
|
|
+ PreVote: true,
|
|
|
+ }
|
|
|
+
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
|
|
|
+ defer cancel()
|
|
|
+
|
|
|
+ reply, err := r.transport.RequestVote(ctx, peer, args)
|
|
|
+ if err != nil {
|
|
|
+ r.logger.Debug("PreVote to %s failed: %v", peer, err)
|
|
|
+ preVoteCh <- false
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // For pre-vote, we don't step down even if reply.Term > currentTerm
|
|
|
+ // because the other node might also be doing pre-vote
|
|
|
+ preVoteCh <- reply.VoteGranted
|
|
|
+ }(peer)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait for pre-votes with a shorter timeout - majority is (clusterSize/2) + 1
|
|
|
+ needed := clusterSize/2 + 1
|
|
|
+
|
|
|
+ // If we already have enough votes (single-node cluster with no known leader), succeed immediately
|
|
|
+ if preVotes >= needed {
|
|
|
+ r.logger.Debug("Pre-vote succeeded immediately (got %d/%d pre-votes)", preVotes, needed)
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+ timeout := time.After(500 * time.Millisecond)
|
|
|
+
|
|
|
+ for i := 0; i < len(currentPeers); i++ {
|
|
|
+ select {
|
|
|
+ case <-r.stopCh:
|
|
|
+ return false
|
|
|
+
|
|
|
+ case <-timeout:
|
|
|
+ r.logger.Debug("Pre-vote timeout (got %d/%d pre-votes)", preVotes, needed)
|
|
|
+ return false
|
|
|
+
|
|
|
+ case granted := <-preVoteCh:
|
|
|
+ if granted {
|
|
|
+ preVotes++
|
|
|
+ if preVotes >= needed {
|
|
|
+ r.logger.Debug("Pre-vote succeeded (got %d/%d pre-votes)", preVotes, needed)
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if we're still a candidate
|
|
|
+ r.mu.RLock()
|
|
|
+ if r.state != Candidate {
|
|
|
+ r.mu.RUnlock()
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ r.mu.RUnlock()
|
|
|
+ }
|
|
|
+
|
|
|
+ r.logger.Debug("Pre-vote failed (got %d/%d pre-votes)", preVotes, needed)
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
+// runLeader handles leader behavior
|
|
|
+func (r *Raft) runLeader() {
|
|
|
+ r.logger.Debug("Running as leader in term %d", r.currentTerm)
|
|
|
+
|
|
|
+ // Send initial heartbeat
|
|
|
+ r.sendHeartbeats()
|
|
|
+
|
|
|
+ // Start heartbeat timer
|
|
|
+ r.mu.Lock()
|
|
|
+ r.heartbeatTimer = time.NewTimer(r.config.HeartbeatInterval)
|
|
|
+ r.mu.Unlock()
|
|
|
+
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-r.stopCh:
|
|
|
+ return
|
|
|
+
|
|
|
+ case <-r.heartbeatTimer.C:
|
|
|
+ r.mu.RLock()
|
|
|
+ if r.state != Leader {
|
|
|
+ r.mu.RUnlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ r.sendHeartbeats()
|
|
|
+ r.heartbeatTimer.Reset(r.config.HeartbeatInterval)
|
|
|
+
|
|
|
+ case <-r.commitCh:
|
|
|
+ r.updateCommitIndex()
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// becomeFollower transitions to follower state
|
|
|
+func (r *Raft) becomeFollower(term uint64) {
|
|
|
+ oldState := r.state
|
|
|
+ oldTerm := r.currentTerm
|
|
|
+
|
|
|
+ r.state = Follower
|
|
|
+ r.currentTerm = term
|
|
|
+ r.votedFor = ""
|
|
|
+ r.leaderID = ""
|
|
|
+ // Must persist before responding - use mustPersistState for critical transitions
|
|
|
+ r.mustPersistState()
|
|
|
+ r.resetElectionTimer()
|
|
|
+
|
|
|
+ // Clear leadership transfer state
|
|
|
+ r.transferring = false
|
|
|
+ r.transferTarget = ""
|
|
|
+
|
|
|
+ // Only log significant state changes
|
|
|
+ if oldState == Leader && term > oldTerm {
|
|
|
+ // Stepping down from leader is notable
|
|
|
+ r.logger.Debug("Stepped down from leader in term %d", term)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// becomeLeader transitions to leader state
|
|
|
+func (r *Raft) becomeLeader() {
|
|
|
+ r.state = Leader
|
|
|
+ r.leaderID = r.nodeID
|
|
|
+
|
|
|
+ // Update metrics
|
|
|
+ atomic.AddUint64(&r.metrics.ElectionsWon, 1)
|
|
|
+
|
|
|
+ // Initialize leader state for all peers
|
|
|
+ lastIndex := r.log.LastIndex()
|
|
|
+ for nodeID, addr := range r.clusterNodes {
|
|
|
+ if nodeID != r.nodeID {
|
|
|
+ r.nextIndex[addr] = lastIndex + 1
|
|
|
+ r.matchIndex[addr] = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Also handle legacy peers
|
|
|
+ for _, peer := range r.peers {
|
|
|
+ if _, exists := r.nextIndex[peer]; !exists {
|
|
|
+ r.nextIndex[peer] = lastIndex + 1
|
|
|
+ r.matchIndex[peer] = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ r.logger.Info("Became leader in term %d with %d peers", r.currentTerm, len(r.clusterNodes)-1)
|
|
|
+
|
|
|
+ // Append a no-op entry to commit entries from previous terms
|
|
|
+ // This is a standard Raft optimization - the leader appends a no-op
|
|
|
+ // entry in its current term to quickly commit all pending entries
|
|
|
+ noopEntry := LogEntry{
|
|
|
+ Index: r.log.LastIndex() + 1,
|
|
|
+ Term: r.currentTerm,
|
|
|
+ Type: EntryNoop,
|
|
|
+ Command: nil,
|
|
|
+ }
|
|
|
+ if err := r.log.Append(noopEntry); err != nil {
|
|
|
+ r.logger.Error("Failed to append no-op entry: %v", err)
|
|
|
+ } else {
|
|
|
+ r.logger.Debug("Appended no-op entry at index %d for term %d", noopEntry.Index, noopEntry.Term)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// sendHeartbeats sends AppendEntries RPCs to all peers
|
|
|
+func (r *Raft) sendHeartbeats() {
|
|
|
+ r.mu.RLock()
|
|
|
+ if r.state != Leader {
|
|
|
+ r.mu.RUnlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ currentTerm := r.currentTerm
|
|
|
+ leaderCommit := r.commitIndex
|
|
|
+ // Get all peer addresses
|
|
|
+ peerAddrs := make([]string, 0, len(r.clusterNodes))
|
|
|
+ for nodeID, addr := range r.clusterNodes {
|
|
|
+ if nodeID != r.nodeID {
|
|
|
+ peerAddrs = append(peerAddrs, addr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Also include legacy peers not in clusterNodes
|
|
|
+ for _, peer := range r.peers {
|
|
|
+ found := false
|
|
|
+ for _, addr := range peerAddrs {
|
|
|
+ if addr == peer {
|
|
|
+ found = true
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if !found {
|
|
|
+ peerAddrs = append(peerAddrs, peer)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ for _, peer := range peerAddrs {
|
|
|
+ go r.replicateToPeer(peer, currentTerm, leaderCommit)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// replicateToPeer sends AppendEntries to a specific peer
|
|
|
+func (r *Raft) replicateToPeer(peer string, term, leaderCommit uint64) {
|
|
|
+ r.mu.RLock()
|
|
|
+ if r.state != Leader || r.currentTerm != term {
|
|
|
+ r.mu.RUnlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ nextIndex := r.nextIndex[peer]
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ // Get entries to send
|
|
|
+ entries, prevLogIndex, prevLogTerm, err := r.log.GetEntriesForFollower(nextIndex, r.config.MaxLogEntriesPerRequest)
|
|
|
+ if err == ErrCompacted {
|
|
|
+ // Need to send snapshot
|
|
|
+ r.sendSnapshot(peer)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ r.logger.Debug("Failed to get entries for %s: %v", peer, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ args := &AppendEntriesArgs{
|
|
|
+ Term: term,
|
|
|
+ LeaderID: r.nodeID,
|
|
|
+ PrevLogIndex: prevLogIndex,
|
|
|
+ PrevLogTerm: prevLogTerm,
|
|
|
+ Entries: entries,
|
|
|
+ LeaderCommit: leaderCommit,
|
|
|
+ }
|
|
|
+
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
|
|
+ defer cancel()
|
|
|
+
|
|
|
+ reply, err := r.transport.AppendEntries(ctx, peer, args)
|
|
|
+ if err != nil {
|
|
|
+ r.logger.Debug("AppendEntries to %s failed: %v", peer, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ atomic.AddInt64(&r.stats.AppendsSent, 1)
|
|
|
+
|
|
|
+ r.mu.Lock()
|
|
|
+ defer r.mu.Unlock()
|
|
|
+
|
|
|
+ if reply.Term > r.currentTerm {
|
|
|
+ r.becomeFollower(reply.Term)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if r.state != Leader || r.currentTerm != term {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if reply.Success {
|
|
|
+ // Update nextIndex and matchIndex
|
|
|
+ if len(entries) > 0 {
|
|
|
+ newMatchIndex := entries[len(entries)-1].Index
|
|
|
+ if newMatchIndex > r.matchIndex[peer] {
|
|
|
+ r.matchIndex[peer] = newMatchIndex
|
|
|
+ r.nextIndex[peer] = newMatchIndex + 1
|
|
|
+
|
|
|
+ // Try to update commit index
|
|
|
+ select {
|
|
|
+ case r.commitCh <- struct{}{}:
|
|
|
+ default:
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // Decrement nextIndex and retry
|
|
|
+ if reply.ConflictTerm > 0 {
|
|
|
+ // Find the last entry of ConflictTerm in our log
|
|
|
+ found := false
|
|
|
+ for idx := r.log.LastIndex(); idx >= r.log.FirstIndex(); idx-- {
|
|
|
+ t, err := r.log.GetTerm(idx)
|
|
|
+ if err != nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if t == reply.ConflictTerm {
|
|
|
+ r.nextIndex[peer] = idx + 1
|
|
|
+ found = true
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if t < reply.ConflictTerm {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if !found {
|
|
|
+ r.nextIndex[peer] = reply.ConflictIndex
|
|
|
+ }
|
|
|
+ } else if reply.ConflictIndex > 0 {
|
|
|
+ r.nextIndex[peer] = reply.ConflictIndex
|
|
|
+ } else {
|
|
|
+ r.nextIndex[peer] = max(1, r.nextIndex[peer]-1)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// sendSnapshot sends a snapshot to a peer with chunked transfer support
|
|
|
+func (r *Raft) sendSnapshot(peer string) {
|
|
|
+ r.mu.RLock()
|
|
|
+ if r.state != Leader {
|
|
|
+ r.mu.RUnlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ term := r.currentTerm
|
|
|
+ chunkSize := r.config.SnapshotChunkSize
|
|
|
+ if chunkSize <= 0 {
|
|
|
+ chunkSize = 1024 * 1024 // Default 1MB
|
|
|
+ }
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
|
|
|
+ if err != nil {
|
|
|
+ r.logger.Error("Failed to get snapshot: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ atomic.AddUint64(&r.metrics.SnapshotsSent, 1)
|
|
|
+
|
|
|
+ r.logger.Info("Sending snapshot to %s: %d bytes, lastIndex=%d, lastTerm=%d",
|
|
|
+ peer, len(data), lastIndex, lastTerm)
|
|
|
+
|
|
|
+ // Send snapshot in chunks
|
|
|
+ totalSize := len(data)
|
|
|
+ offset := 0
|
|
|
+
|
|
|
+ for offset < totalSize {
|
|
|
+ // Check if we're still leader
|
|
|
+ r.mu.RLock()
|
|
|
+ if r.state != Leader || r.currentTerm != term {
|
|
|
+ r.mu.RUnlock()
|
|
|
+ r.logger.Debug("Aborting snapshot send: no longer leader")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ // Calculate chunk size
|
|
|
+ end := offset + chunkSize
|
|
|
+ if end > totalSize {
|
|
|
+ end = totalSize
|
|
|
+ }
|
|
|
+ chunk := data[offset:end]
|
|
|
+ done := end >= totalSize
|
|
|
+
|
|
|
+ args := &InstallSnapshotArgs{
|
|
|
+ Term: term,
|
|
|
+ LeaderID: r.nodeID,
|
|
|
+ LastIncludedIndex: lastIndex,
|
|
|
+ LastIncludedTerm: lastTerm,
|
|
|
+ Offset: uint64(offset),
|
|
|
+ Data: chunk,
|
|
|
+ Done: done,
|
|
|
+ }
|
|
|
+
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), r.config.SnapshotRPCTimeout)
|
|
|
+ reply, err := r.transport.InstallSnapshot(ctx, peer, args)
|
|
|
+ cancel()
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ r.logger.Error("InstallSnapshot chunk to %s failed at offset %d: %v", peer, offset, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if reply.Term > r.currentTerm {
|
|
|
+ r.mu.Lock()
|
|
|
+ r.becomeFollower(reply.Term)
|
|
|
+ r.mu.Unlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if !reply.Success {
|
|
|
+ r.logger.Error("InstallSnapshot chunk rejected by %s at offset %d", peer, offset)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ offset = end
|
|
|
+ }
|
|
|
+
|
|
|
+ // Snapshot fully sent and accepted
|
|
|
+ r.mu.Lock()
|
|
|
+ defer r.mu.Unlock()
|
|
|
+
|
|
|
+ if r.state != Leader || r.currentTerm != term {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ r.nextIndex[peer] = lastIndex + 1
|
|
|
+ r.matchIndex[peer] = lastIndex
|
|
|
+
|
|
|
+ r.logger.Info("Snapshot to %s completed, nextIndex=%d", peer, lastIndex+1)
|
|
|
+}
|
|
|
+
|
|
|
+// updateCommitIndex updates the commit index based on matchIndex
|
|
|
+func (r *Raft) updateCommitIndex() {
|
|
|
+ r.mu.Lock()
|
|
|
+ defer r.mu.Unlock()
|
|
|
+
|
|
|
+ if r.state != Leader {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // For config change entries, use the OLD cluster for majority calculation
|
|
|
+ // This ensures that adding a node doesn't require the new node's vote
|
|
|
+ // until the config change itself is committed
|
|
|
+ votingNodes := r.clusterNodes
|
|
|
+ if r.pendingConfigChange && r.oldClusterNodes != nil {
|
|
|
+ votingNodes = r.oldClusterNodes
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get current cluster size from voting nodes
|
|
|
+ clusterSize := len(votingNodes)
|
|
|
+ if clusterSize == 0 {
|
|
|
+ clusterSize = len(r.peers) + 1
|
|
|
+ }
|
|
|
+
|
|
|
+ // Find the highest index replicated on a majority
|
|
|
+ for n := r.log.LastIndex(); n > r.commitIndex; n-- {
|
|
|
+ term, err := r.log.GetTerm(n)
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // Only commit entries from current term
|
|
|
+ if term != r.currentTerm {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // Count replicas (including self)
|
|
|
+ count := 1 // Self
|
|
|
+ for nodeID, addr := range votingNodes {
|
|
|
+ if nodeID != r.nodeID {
|
|
|
+ if r.matchIndex[addr] >= n {
|
|
|
+ count++
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Also check legacy peers
|
|
|
+ for _, peer := range r.peers {
|
|
|
+ // Avoid double counting if peer is already in votingNodes
|
|
|
+ alreadyCounted := false
|
|
|
+ for _, addr := range votingNodes {
|
|
|
+ if addr == peer {
|
|
|
+ alreadyCounted = true
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if !alreadyCounted && r.matchIndex[peer] >= n {
|
|
|
+ count++
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Majority is (n/2) + 1
|
|
|
+ needed := clusterSize/2 + 1
|
|
|
+ if count >= needed {
|
|
|
+ r.commitIndex = n
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// applyLoop applies committed entries to the state machine
|
|
|
+// Uses event-driven model with fallback polling for reliability
|
|
|
+func (r *Raft) applyLoop() {
|
|
|
+ // Use a ticker as fallback for missed signals (matches original 10ms polling)
|
|
|
+ ticker := time.NewTicker(10 * time.Millisecond)
|
|
|
+ defer ticker.Stop()
|
|
|
+
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-r.stopCh:
|
|
|
+ return
|
|
|
+
|
|
|
+ case <-r.commitCh:
|
|
|
+ // Event-driven: triggered when commitIndex is updated
|
|
|
+ r.applyCommitted()
|
|
|
+
|
|
|
+ case <-ticker.C:
|
|
|
+ // Fallback: check periodically in case we missed a signal
|
|
|
+ r.applyCommitted()
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// applyCommitted applies all committed but not yet applied entries
|
|
|
+func (r *Raft) applyCommitted() {
|
|
|
+ r.mu.Lock()
|
|
|
+ commitIndex := r.commitIndex
|
|
|
+ lastApplied := r.lastApplied
|
|
|
+ firstIndex := r.log.FirstIndex()
|
|
|
+ lastLogIndex := r.log.LastIndex()
|
|
|
+ r.mu.Unlock()
|
|
|
+
|
|
|
+ // Safety check: ensure lastApplied is within valid range
|
|
|
+ // If lastApplied is below firstIndex (due to snapshot), skip to firstIndex
|
|
|
+ if lastApplied < firstIndex && firstIndex > 0 {
|
|
|
+ r.mu.Lock()
|
|
|
+ r.lastApplied = firstIndex
|
|
|
+ lastApplied = firstIndex
|
|
|
+ r.mu.Unlock()
|
|
|
+ r.logger.Debug("Adjusted lastApplied to firstIndex %d after compaction", firstIndex)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Safety check: don't try to apply beyond what we have in log
|
|
|
+ if commitIndex > lastLogIndex {
|
|
|
+ commitIndex = lastLogIndex
|
|
|
+ }
|
|
|
+
|
|
|
+ for lastApplied < commitIndex {
|
|
|
+ lastApplied++
|
|
|
+
|
|
|
+ // Skip if entry has been compacted
|
|
|
+ if lastApplied < firstIndex {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ entry, err := r.log.GetEntry(lastApplied)
|
|
|
+ if err != nil {
|
|
|
+ // If entry is compacted, skip ahead
|
|
|
+ if err == ErrCompacted {
|
|
|
+ r.mu.Lock()
|
|
|
+ newFirstIndex := r.log.FirstIndex()
|
|
|
+ if lastApplied < newFirstIndex {
|
|
|
+ r.lastApplied = newFirstIndex
|
|
|
+ lastApplied = newFirstIndex
|
|
|
+ }
|
|
|
+ r.mu.Unlock()
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ r.logger.Error("Failed to get entry %d: %v (firstIndex=%d, lastIndex=%d)",
|
|
|
+ lastApplied, err, r.log.FirstIndex(), r.log.LastIndex())
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ // Handle config change entries
|
|
|
+ if entry.Type == EntryConfig {
|
|
|
+ r.applyConfigChange(entry)
|
|
|
+ r.mu.Lock()
|
|
|
+ r.lastApplied = lastApplied
|
|
|
+ r.mu.Unlock()
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // Handle no-op entries (just update lastApplied, don't send to state machine)
|
|
|
+ if entry.Type == EntryNoop {
|
|
|
+ r.mu.Lock()
|
|
|
+ r.lastApplied = lastApplied
|
|
|
+ r.mu.Unlock()
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // Normal command entry
|
|
|
+ msg := ApplyMsg{
|
|
|
+ CommandValid: true,
|
|
|
+ Command: entry.Command,
|
|
|
+ CommandIndex: entry.Index,
|
|
|
+ CommandTerm: entry.Term,
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case r.applyCh <- msg:
|
|
|
+ case <-r.stopCh:
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ r.mu.Lock()
|
|
|
+ r.lastApplied = lastApplied
|
|
|
+ r.mu.Unlock()
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if log compaction is needed
|
|
|
+ r.maybeCompactLog()
|
|
|
+}
|
|
|
+
|
|
|
+// maybeCompactLog checks if automatic log compaction should be triggered
|
|
|
+// It uses a dynamic threshold to prevent "compaction thrashing":
|
|
|
+// - First compaction triggers at SnapshotThreshold (default 100,000)
|
|
|
+// - After compaction, next threshold = current_log_size * 1.5
|
|
|
+// - This prevents every new entry from triggering compaction if log stays large
|
|
|
+func (r *Raft) maybeCompactLog() {
|
|
|
+ // Skip if no snapshot provider is configured
|
|
|
+ if r.config.SnapshotProvider == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if compaction is already in progress (atomic check-and-set)
|
|
|
+ if !atomic.CompareAndSwapInt32(&r.compacting, 0, 1) {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Ensure we release the compacting flag when done
|
|
|
+ defer atomic.StoreInt32(&r.compacting, 0)
|
|
|
+
|
|
|
+ r.mu.RLock()
|
|
|
+ lastApplied := r.lastApplied
|
|
|
+ firstIndex := r.log.FirstIndex()
|
|
|
+ initialThreshold := r.config.SnapshotThreshold
|
|
|
+ minRetention := r.config.SnapshotMinRetention
|
|
|
+ isLeader := r.state == Leader
|
|
|
+ dynamicThreshold := r.nextCompactionThreshold
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ // Guard against underflow: ensure lastApplied > firstIndex
|
|
|
+ if lastApplied <= firstIndex {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Calculate current log size
|
|
|
+ logSize := lastApplied - firstIndex
|
|
|
+
|
|
|
+ // Determine effective threshold:
|
|
|
+ // - Use initial threshold if no compaction has occurred yet (dynamicThreshold == 0)
|
|
|
+ // - Otherwise use the dynamic threshold
|
|
|
+ effectiveThreshold := initialThreshold
|
|
|
+ if dynamicThreshold > 0 {
|
|
|
+ effectiveThreshold = dynamicThreshold
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if we have enough entries to warrant compaction
|
|
|
+ if logSize <= effectiveThreshold {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Guard against underflow: ensure lastApplied > minRetention
|
|
|
+ if lastApplied <= minRetention {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Calculate the safe compaction point
|
|
|
+ // We need to retain at least minRetention entries for follower catch-up
|
|
|
+ compactUpTo := lastApplied - minRetention
|
|
|
+ if compactUpTo <= firstIndex {
|
|
|
+ return // Not enough entries to compact while maintaining retention
|
|
|
+ }
|
|
|
+
|
|
|
+ // For leader, also consider the minimum nextIndex of all followers
|
|
|
+ // to avoid compacting entries that followers still need
|
|
|
+ if isLeader {
|
|
|
+ r.mu.RLock()
|
|
|
+ minNextIndex := lastApplied
|
|
|
+ for _, nextIdx := range r.nextIndex {
|
|
|
+ if nextIdx < minNextIndex {
|
|
|
+ minNextIndex = nextIdx
|
|
|
+ }
|
|
|
+ }
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ // Don't compact entries that followers still need
|
|
|
+ // Keep a buffer of minRetention entries before the slowest follower
|
|
|
+ if minNextIndex > minRetention {
|
|
|
+ followerSafePoint := minNextIndex - minRetention
|
|
|
+ if followerSafePoint < compactUpTo {
|
|
|
+ compactUpTo = followerSafePoint
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // Slowest follower is too far behind, don't compact
|
|
|
+ // They will need a full snapshot anyway
|
|
|
+ compactUpTo = firstIndex // Effectively skip compaction
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Final check - make sure we're actually compacting something meaningful
|
|
|
+ if compactUpTo <= firstIndex {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get snapshot from application layer
|
|
|
+ snapshotData, err := r.config.SnapshotProvider()
|
|
|
+ if err != nil {
|
|
|
+ r.logger.Error("Failed to get snapshot from provider: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get the term at the compaction point
|
|
|
+ term, err := r.log.GetTerm(compactUpTo)
|
|
|
+ if err != nil {
|
|
|
+ r.logger.Error("Failed to get term for compaction index %d: %v", compactUpTo, err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Save snapshot
|
|
|
+ if err := r.storage.SaveSnapshot(snapshotData, compactUpTo, term); err != nil {
|
|
|
+ r.logger.Error("Failed to save snapshot: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Compact the log
|
|
|
+ if err := r.log.Compact(compactUpTo); err != nil {
|
|
|
+ r.logger.Error("Failed to compact log: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Calculate new log size after compaction
|
|
|
+ newLogSize := lastApplied - compactUpTo
|
|
|
+
|
|
|
+ // Update dynamic threshold for next compaction: current size * 1.5
|
|
|
+ // This prevents "compaction thrashing" where every entry triggers compaction
|
|
|
+ r.mu.Lock()
|
|
|
+ r.nextCompactionThreshold = newLogSize + newLogSize/2 // newLogSize * 1.5
|
|
|
+ // Ensure threshold doesn't go below the initial threshold
|
|
|
+ if r.nextCompactionThreshold < initialThreshold {
|
|
|
+ r.nextCompactionThreshold = initialThreshold
|
|
|
+ }
|
|
|
+ r.mu.Unlock()
|
|
|
+
|
|
|
+ r.logger.Info("Auto compaction completed: compacted up to index %d (term %d), log size %d -> %d, next threshold: %d",
|
|
|
+ compactUpTo, term, logSize, newLogSize, r.nextCompactionThreshold)
|
|
|
+}
|
|
|
+
|
|
|
+// resetElectionTimer resets the election timeout
|
|
|
+func (r *Raft) resetElectionTimer() {
|
|
|
+ timeout := r.config.ElectionTimeoutMin +
|
|
|
+ time.Duration(rand.Int63n(int64(r.config.ElectionTimeoutMax-r.config.ElectionTimeoutMin)))
|
|
|
+
|
|
|
+ if r.electionTimer == nil {
|
|
|
+ r.electionTimer = time.NewTimer(timeout)
|
|
|
+ } else {
|
|
|
+ if !r.electionTimer.Stop() {
|
|
|
+ select {
|
|
|
+ case <-r.electionTimer.C:
|
|
|
+ default:
|
|
|
+ }
|
|
|
+ }
|
|
|
+ r.electionTimer.Reset(timeout)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// persistState saves the current state to stable storage
|
|
|
+// Returns error if persistence fails - caller MUST handle this for safety
|
|
|
+func (r *Raft) persistState() error {
|
|
|
+ state := &PersistentState{
|
|
|
+ CurrentTerm: r.currentTerm,
|
|
|
+ VotedFor: r.votedFor,
|
|
|
+ }
|
|
|
+ if err := r.storage.SaveState(state); err != nil {
|
|
|
+ r.logger.Error("Failed to persist state: %v", err)
|
|
|
+ return fmt.Errorf("%w: %v", ErrPersistFailed, err)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// mustPersistState saves state and panics on failure
|
|
|
+// Use this only in critical paths where failure is unrecoverable
|
|
|
+func (r *Raft) mustPersistState() {
|
|
|
+ if err := r.persistState(); err != nil {
|
|
|
+ // In production, you might want to trigger a graceful shutdown instead
|
|
|
+ r.logger.Error("CRITICAL: Failed to persist state, node may be in inconsistent state: %v", err)
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// HandleRequestVote handles RequestVote RPCs (including pre-vote)
|
|
|
+func (r *Raft) HandleRequestVote(args *RequestVoteArgs) *RequestVoteReply {
|
|
|
+ r.mu.Lock()
|
|
|
+ defer r.mu.Unlock()
|
|
|
+
|
|
|
+ reply := &RequestVoteReply{
|
|
|
+ Term: r.currentTerm,
|
|
|
+ VoteGranted: false,
|
|
|
+ }
|
|
|
+
|
|
|
+ // Handle pre-vote separately
|
|
|
+ if args.PreVote {
|
|
|
+ return r.handlePreVote(args)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Reply false if term < currentTerm
|
|
|
+ if args.Term < r.currentTerm {
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ // If term > currentTerm, become follower
|
|
|
+ if args.Term > r.currentTerm {
|
|
|
+ r.becomeFollower(args.Term)
|
|
|
+ }
|
|
|
+
|
|
|
+ reply.Term = r.currentTerm
|
|
|
+
|
|
|
+ // Check if we can vote for this candidate
|
|
|
+ if (r.votedFor == "" || r.votedFor == args.CandidateID) &&
|
|
|
+ r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
|
|
|
+ r.votedFor = args.CandidateID
|
|
|
+ if err := r.persistState(); err != nil {
|
|
|
+ // Cannot grant vote if we can't persist the decision
|
|
|
+ r.logger.Error("Failed to persist vote for %s: %v", args.CandidateID, err)
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+ r.resetElectionTimer()
|
|
|
+ reply.VoteGranted = true
|
|
|
+ r.logger.Debug("Granted vote to %s for term %d", args.CandidateID, args.Term)
|
|
|
+ }
|
|
|
+
|
|
|
+ return reply
|
|
|
+}
|
|
|
+
|
|
|
+// handlePreVote handles pre-vote requests
|
|
|
+// Pre-vote doesn't change our state, just checks if we would vote
|
|
|
+func (r *Raft) handlePreVote(args *RequestVoteArgs) *RequestVoteReply {
|
|
|
+ reply := &RequestVoteReply{
|
|
|
+ Term: r.currentTerm,
|
|
|
+ VoteGranted: false,
|
|
|
+ }
|
|
|
+
|
|
|
+ // For pre-vote, we check:
|
|
|
+ // 1. The candidate's term is at least as high as ours
|
|
|
+ // 2. The candidate's log is at least as up-to-date as ours
|
|
|
+ // 3. We don't have a current leader (or the candidate's term is higher)
|
|
|
+
|
|
|
+ if args.Term < r.currentTerm {
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ // Per Raft Pre-Vote optimization (§9.6): reject pre-vote if we have a current
|
|
|
+ // leader and the candidate's term is not higher than ours. This prevents
|
|
|
+ // disruptive elections when a partitioned node tries to rejoin.
|
|
|
+ if r.leaderID != "" && args.Term <= r.currentTerm {
|
|
|
+ r.logger.Debug("Rejecting pre-vote from %s: have leader %s", args.CandidateID, r.leaderID)
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ // Grant pre-vote if log is up-to-date
|
|
|
+ // Note: we don't check votedFor for pre-vote, and we don't update any state
|
|
|
+ if r.log.IsUpToDate(args.LastLogIndex, args.LastLogTerm) {
|
|
|
+ reply.VoteGranted = true
|
|
|
+ r.logger.Debug("Granted pre-vote to %s for term %d", args.CandidateID, args.Term)
|
|
|
+ }
|
|
|
+
|
|
|
+ return reply
|
|
|
+}
|
|
|
+
|
|
|
+// HandleAppendEntries handles AppendEntries RPCs
|
|
|
+func (r *Raft) HandleAppendEntries(args *AppendEntriesArgs) *AppendEntriesReply {
|
|
|
+ r.mu.Lock()
|
|
|
+ defer r.mu.Unlock()
|
|
|
+
|
|
|
+ atomic.AddInt64(&r.stats.AppendsReceived, 1)
|
|
|
+
|
|
|
+ reply := &AppendEntriesReply{
|
|
|
+ Term: r.currentTerm,
|
|
|
+ Success: false,
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if we're a standalone node being added to a cluster
|
|
|
+ // A standalone node has only itself in clusterNodes
|
|
|
+ isStandalone := len(r.clusterNodes) == 1
|
|
|
+ if _, hasSelf := r.clusterNodes[r.nodeID]; isStandalone && hasSelf {
|
|
|
+ // We're standalone and receiving AppendEntries from an external leader
|
|
|
+ // This means we're being added to a cluster - suppress elections
|
|
|
+ if !r.joiningCluster {
|
|
|
+ r.joiningCluster = true
|
|
|
+ r.joiningClusterTime = time.Now()
|
|
|
+ r.logger.Info("Detected cluster join in progress, suppressing elections")
|
|
|
+ }
|
|
|
+ // When joining, accept higher terms from the leader to sync up
|
|
|
+ if args.Term > r.currentTerm {
|
|
|
+ r.becomeFollower(args.Term)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Reply false if term < currentTerm
|
|
|
+ if args.Term < r.currentTerm {
|
|
|
+ // But still reset timer if we're joining a cluster to prevent elections
|
|
|
+ if r.joiningCluster {
|
|
|
+ r.resetElectionTimer()
|
|
|
+ }
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ // If term > currentTerm, or we're a candidate, or we're a leader receiving
|
|
|
+ // AppendEntries from another leader (split-brain scenario during cluster merge),
|
|
|
+ // become follower. In Raft, there can only be one leader per term.
|
|
|
+ if args.Term > r.currentTerm || r.state == Candidate || r.state == Leader {
|
|
|
+ r.becomeFollower(args.Term)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Update leader info and reset election timer
|
|
|
+ r.leaderID = args.LeaderID
|
|
|
+ r.lastHeartbeat = time.Now()
|
|
|
+ r.resetElectionTimer()
|
|
|
+
|
|
|
+ reply.Term = r.currentTerm
|
|
|
+
|
|
|
+ // Try to append entries
|
|
|
+ success, conflictIndex, conflictTerm := r.log.AppendEntriesFromLeader(
|
|
|
+ args.PrevLogIndex, args.PrevLogTerm, args.Entries)
|
|
|
+
|
|
|
+ if !success {
|
|
|
+ reply.ConflictIndex = conflictIndex
|
|
|
+ reply.ConflictTerm = conflictTerm
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ reply.Success = true
|
|
|
+
|
|
|
+ // Update commit index safely
|
|
|
+ if args.LeaderCommit > r.commitIndex {
|
|
|
+ // Get our actual last log index
|
|
|
+ lastLogIndex := r.log.LastIndex()
|
|
|
+
|
|
|
+ // Calculate what index the entries would have reached
|
|
|
+ lastNewEntry := args.PrevLogIndex
|
|
|
+ if len(args.Entries) > 0 {
|
|
|
+ lastNewEntry = args.Entries[len(args.Entries)-1].Index
|
|
|
+ }
|
|
|
+
|
|
|
+ // Commit index should not exceed what we actually have in log
|
|
|
+ newCommitIndex := args.LeaderCommit
|
|
|
+ if newCommitIndex > lastNewEntry {
|
|
|
+ newCommitIndex = lastNewEntry
|
|
|
+ }
|
|
|
+ if newCommitIndex > lastLogIndex {
|
|
|
+ newCommitIndex = lastLogIndex
|
|
|
+ }
|
|
|
+
|
|
|
+ // Only advance commit index
|
|
|
+ if newCommitIndex > r.commitIndex {
|
|
|
+ r.commitIndex = newCommitIndex
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return reply
|
|
|
+}
|
|
|
+
|
|
|
+// HandleInstallSnapshot handles InstallSnapshot RPCs with chunked transfer support
|
|
|
+func (r *Raft) HandleInstallSnapshot(args *InstallSnapshotArgs) *InstallSnapshotReply {
|
|
|
+ r.mu.Lock()
|
|
|
+ defer r.mu.Unlock()
|
|
|
+
|
|
|
+ reply := &InstallSnapshotReply{
|
|
|
+ Term: r.currentTerm,
|
|
|
+ Success: false,
|
|
|
+ }
|
|
|
+
|
|
|
+ if args.Term < r.currentTerm {
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ if args.Term > r.currentTerm {
|
|
|
+ r.becomeFollower(args.Term)
|
|
|
+ }
|
|
|
+
|
|
|
+ r.leaderID = args.LeaderID
|
|
|
+ r.lastHeartbeat = time.Now()
|
|
|
+ r.resetElectionTimer()
|
|
|
+
|
|
|
+ reply.Term = r.currentTerm
|
|
|
+
|
|
|
+ // Skip if we already have this or a newer snapshot applied
|
|
|
+ if args.LastIncludedIndex <= r.lastApplied {
|
|
|
+ r.logger.Debug("Ignoring snapshot at index %d, already applied up to %d",
|
|
|
+ args.LastIncludedIndex, r.lastApplied)
|
|
|
+ reply.Success = true // Still success to let leader know we don't need it
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ // Handle chunked transfer
|
|
|
+ if args.Offset == 0 {
|
|
|
+ // First chunk - start new pending snapshot
|
|
|
+ r.pendingSnapshot = &pendingSnapshotState{
|
|
|
+ lastIncludedIndex: args.LastIncludedIndex,
|
|
|
+ lastIncludedTerm: args.LastIncludedTerm,
|
|
|
+ data: make([]byte, 0),
|
|
|
+ receivedBytes: 0,
|
|
|
+ }
|
|
|
+ r.logger.Info("Starting snapshot reception at index %d, term %d",
|
|
|
+ args.LastIncludedIndex, args.LastIncludedTerm)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Validate we're receiving the expected snapshot
|
|
|
+ if r.pendingSnapshot == nil ||
|
|
|
+ r.pendingSnapshot.lastIncludedIndex != args.LastIncludedIndex ||
|
|
|
+ r.pendingSnapshot.lastIncludedTerm != args.LastIncludedTerm {
|
|
|
+ r.logger.Warn("Unexpected snapshot chunk: expected index %d, got %d",
|
|
|
+ r.pendingSnapshot.lastIncludedIndex, args.LastIncludedIndex)
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ // Validate offset matches what we've received
|
|
|
+ if uint64(args.Offset) != r.pendingSnapshot.receivedBytes {
|
|
|
+ r.logger.Warn("Unexpected chunk offset: expected %d, got %d",
|
|
|
+ r.pendingSnapshot.receivedBytes, args.Offset)
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ // Append chunk data
|
|
|
+ r.pendingSnapshot.data = append(r.pendingSnapshot.data, args.Data...)
|
|
|
+ r.pendingSnapshot.receivedBytes += uint64(len(args.Data))
|
|
|
+ reply.Success = true
|
|
|
+
|
|
|
+ r.logger.Debug("Received snapshot chunk: offset=%d, size=%d, done=%v",
|
|
|
+ args.Offset, len(args.Data), args.Done)
|
|
|
+
|
|
|
+ // If not done, wait for more chunks
|
|
|
+ if !args.Done {
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ // All chunks received - apply the snapshot
|
|
|
+ r.logger.Info("Installing complete snapshot: %d bytes at index %d, term %d",
|
|
|
+ len(r.pendingSnapshot.data), args.LastIncludedIndex, args.LastIncludedTerm)
|
|
|
+
|
|
|
+ atomic.AddUint64(&r.metrics.SnapshotsInstalled, 1)
|
|
|
+
|
|
|
+ // Save snapshot
|
|
|
+ if err := r.storage.SaveSnapshot(r.pendingSnapshot.data, args.LastIncludedIndex, args.LastIncludedTerm); err != nil {
|
|
|
+ r.logger.Error("Failed to save snapshot: %v", err)
|
|
|
+ r.pendingSnapshot = nil
|
|
|
+ reply.Success = false
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ // Compact log
|
|
|
+ if err := r.log.Compact(args.LastIncludedIndex); err != nil {
|
|
|
+ r.logger.Error("Failed to compact log: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Update state - must update both commitIndex and lastApplied
|
|
|
+ if args.LastIncludedIndex > r.commitIndex {
|
|
|
+ r.commitIndex = args.LastIncludedIndex
|
|
|
+ }
|
|
|
+
|
|
|
+ // Always update lastApplied to snapshot index to prevent trying to apply compacted entries
|
|
|
+ r.lastApplied = args.LastIncludedIndex
|
|
|
+
|
|
|
+ // Send snapshot to application (non-blocking with timeout)
|
|
|
+ // Use the complete pendingSnapshot data, not the last chunk
|
|
|
+ msg := ApplyMsg{
|
|
|
+ SnapshotValid: true,
|
|
|
+ Snapshot: r.pendingSnapshot.data,
|
|
|
+ SnapshotIndex: args.LastIncludedIndex,
|
|
|
+ SnapshotTerm: args.LastIncludedTerm,
|
|
|
+ }
|
|
|
+
|
|
|
+ // Clear pending snapshot
|
|
|
+ r.pendingSnapshot = nil
|
|
|
+
|
|
|
+ // Try to send, but don't block indefinitely
|
|
|
+ select {
|
|
|
+ case r.applyCh <- msg:
|
|
|
+ r.logger.Debug("Sent snapshot to application")
|
|
|
+ case <-time.After(100 * time.Millisecond):
|
|
|
+ r.logger.Warn("Timeout sending snapshot to application, will retry")
|
|
|
+ // The application will still get correct state via normal apply loop
|
|
|
+ }
|
|
|
+
|
|
|
+ return reply
|
|
|
+}
|
|
|
+
|
|
|
+// Propose proposes a new command to be replicated
|
|
|
+func (r *Raft) Propose(command []byte) (uint64, uint64, bool) {
|
|
|
+ r.mu.Lock()
|
|
|
+ defer r.mu.Unlock()
|
|
|
+
|
|
|
+ if r.state != Leader {
|
|
|
+ return 0, 0, false
|
|
|
+ }
|
|
|
+
|
|
|
+ index, err := r.log.AppendCommand(r.currentTerm, command)
|
|
|
+ if err != nil {
|
|
|
+ r.logger.Error("Failed to append command: %v", err)
|
|
|
+ return 0, 0, false
|
|
|
+ }
|
|
|
+
|
|
|
+ r.matchIndex[r.nodeID] = index
|
|
|
+
|
|
|
+ // For single-node cluster, we are the only voter and can commit immediately
|
|
|
+ // This fixes the issue where commitCh never gets triggered without other peers
|
|
|
+ if len(r.clusterNodes) <= 1 && len(r.peers) == 0 {
|
|
|
+ // Single node: self is majority, trigger commit immediately
|
|
|
+ select {
|
|
|
+ case r.commitCh <- struct{}{}:
|
|
|
+ default:
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // Multi-node: trigger replication to other nodes
|
|
|
+ r.triggerReplication()
|
|
|
+ }
|
|
|
+
|
|
|
+ return index, r.currentTerm, true
|
|
|
+}
|
|
|
+
|
|
|
+// triggerReplication signals the replication loop to send heartbeats
|
|
|
+// This uses a non-blocking send to batch replication requests
|
|
|
+func (r *Raft) triggerReplication() {
|
|
|
+ select {
|
|
|
+ case r.replicationCh <- struct{}{}:
|
|
|
+ default:
|
|
|
+ // Replication already scheduled
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// replicationLoop handles batched replication
|
|
|
+// Uses simple delay-based batching: flush immediately when signaled, then wait
|
|
|
+// to allow more requests to accumulate before the next flush.
|
|
|
+func (r *Raft) replicationLoop() {
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-r.stopCh:
|
|
|
+ return
|
|
|
+ case <-r.replicationCh:
|
|
|
+ // Flush and replicate immediately
|
|
|
+ r.flushAndReplicate()
|
|
|
+
|
|
|
+ // Wait briefly to allow batching of subsequent requests
|
|
|
+ // This gives time for more proposals to queue up before the next flush
|
|
|
+ time.Sleep(10 * time.Millisecond)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// flushAndReplicate flushes logs and sends heartbeats
|
|
|
+func (r *Raft) flushAndReplicate() {
|
|
|
+ // Ensure logs are flushed to OS cache before sending to followers
|
|
|
+ // This implements Group Commit with Flush (fast) instead of Sync (slow)
|
|
|
+ if err := r.log.Flush(); err != nil {
|
|
|
+ r.logger.Error("Failed to flush log: %v", err)
|
|
|
+ }
|
|
|
+ r.sendHeartbeats()
|
|
|
+}
|
|
|
+
|
|
|
+// ProposeWithForward proposes a command, forwarding to leader if necessary
|
|
|
+// This is the recommended method for applications to use
|
|
|
+func (r *Raft) ProposeWithForward(command []byte) (index uint64, term uint64, err error) {
|
|
|
+ // Try local propose first
|
|
|
+ idx, t, isLeader := r.Propose(command)
|
|
|
+ if isLeader {
|
|
|
+ return idx, t, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // Not leader, forward to leader
|
|
|
+ r.mu.RLock()
|
|
|
+ leaderID := r.leaderID
|
|
|
+ // Use clusterNodes (dynamically maintained) to find leader address
|
|
|
+ leaderAddr := r.clusterNodes[leaderID]
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ if leaderID == "" {
|
|
|
+ return 0, 0, fmt.Errorf("no leader available")
|
|
|
+ }
|
|
|
+
|
|
|
+ if leaderAddr == "" {
|
|
|
+ return 0, 0, fmt.Errorf("leader %s address not found in cluster", leaderID)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Forward to leader
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
|
+ defer cancel()
|
|
|
+
|
|
|
+ args := &ProposeArgs{Command: command}
|
|
|
+ reply, err := r.transport.ForwardPropose(ctx, leaderAddr, args)
|
|
|
+ if err != nil {
|
|
|
+ return 0, 0, fmt.Errorf("forward failed: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if !reply.Success {
|
|
|
+ return 0, 0, fmt.Errorf("leader rejected: %s", reply.Error)
|
|
|
+ }
|
|
|
+
|
|
|
+ return reply.Index, reply.Term, nil
|
|
|
+}
|
|
|
+
|
|
|
+// HandlePropose handles forwarded propose requests
|
|
|
+func (r *Raft) HandlePropose(args *ProposeArgs) *ProposeReply {
|
|
|
+ index, term, isLeader := r.Propose(args.Command)
|
|
|
+ if !isLeader {
|
|
|
+ return &ProposeReply{
|
|
|
+ Success: false,
|
|
|
+ Error: "not leader",
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return &ProposeReply{
|
|
|
+ Success: true,
|
|
|
+ Index: index,
|
|
|
+ Term: term,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// HandleAddNode handles forwarded AddNode requests
|
|
|
+func (r *Raft) HandleAddNode(args *AddNodeArgs) *AddNodeReply {
|
|
|
+ err := r.AddNode(args.NodeID, args.Address)
|
|
|
+ if err != nil {
|
|
|
+ return &AddNodeReply{
|
|
|
+ Success: false,
|
|
|
+ Error: err.Error(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return &AddNodeReply{
|
|
|
+ Success: true,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// HandleRemoveNode handles forwarded RemoveNode requests
|
|
|
+func (r *Raft) HandleRemoveNode(args *RemoveNodeArgs) *RemoveNodeReply {
|
|
|
+ err := r.RemoveNode(args.NodeID)
|
|
|
+ if err != nil {
|
|
|
+ return &RemoveNodeReply{
|
|
|
+ Success: false,
|
|
|
+ Error: err.Error(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return &RemoveNodeReply{
|
|
|
+ Success: true,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// GetState returns the current term and whether this node is leader
|
|
|
+func (r *Raft) GetState() (uint64, bool) {
|
|
|
+ r.mu.RLock()
|
|
|
+ defer r.mu.RUnlock()
|
|
|
+ return r.currentTerm, r.state == Leader
|
|
|
+}
|
|
|
+
|
|
|
+// GetLeaderID returns the current leader ID
|
|
|
+func (r *Raft) GetLeaderID() string {
|
|
|
+ r.mu.RLock()
|
|
|
+ defer r.mu.RUnlock()
|
|
|
+ return r.leaderID
|
|
|
+}
|
|
|
+
|
|
|
+// GetStats returns runtime statistics
|
|
|
+func (r *Raft) GetStats() Stats {
|
|
|
+ r.mu.RLock()
|
|
|
+ defer r.mu.RUnlock()
|
|
|
+
|
|
|
+ lastIndex, lastTerm := r.log.LastIndexAndTerm()
|
|
|
+
|
|
|
+ // Copy cluster nodes
|
|
|
+ nodes := make(map[string]string)
|
|
|
+ for k, v := range r.clusterNodes {
|
|
|
+ nodes[k] = v
|
|
|
+ }
|
|
|
+
|
|
|
+ clusterSize := len(r.clusterNodes)
|
|
|
+ if clusterSize == 0 {
|
|
|
+ clusterSize = len(r.peers) + 1
|
|
|
+ }
|
|
|
+
|
|
|
+ return Stats{
|
|
|
+ Term: r.currentTerm,
|
|
|
+ State: r.state.String(),
|
|
|
+ LastLogIndex: lastIndex,
|
|
|
+ LastLogTerm: lastTerm,
|
|
|
+ CommitIndex: r.commitIndex,
|
|
|
+ LastApplied: r.lastApplied,
|
|
|
+ LeaderID: r.leaderID,
|
|
|
+ AppendsSent: atomic.LoadInt64(&r.stats.AppendsSent),
|
|
|
+ AppendsReceived: atomic.LoadInt64(&r.stats.AppendsReceived),
|
|
|
+ ClusterSize: clusterSize,
|
|
|
+ ClusterNodes: nodes,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// restoreFromSnapshot restores the FSM from a snapshot at startup
|
|
|
+// This is called during Start() to ensure the FSM has the correct state
|
|
|
+// before processing any new commands
|
|
|
+func (r *Raft) restoreFromSnapshot() error {
|
|
|
+ // Get snapshot from storage
|
|
|
+ data, lastIndex, lastTerm, err := r.storage.GetSnapshot()
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to get snapshot: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // No snapshot exists
|
|
|
+ if len(data) == 0 || lastIndex == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ r.logger.Info("Restoring FSM from snapshot at index %d, term %d (%d bytes)",
|
|
|
+ lastIndex, lastTerm, len(data))
|
|
|
+
|
|
|
+ // Update lastApplied to snapshot index to prevent re-applying compacted entries
|
|
|
+ r.mu.Lock()
|
|
|
+ if lastIndex > r.lastApplied {
|
|
|
+ r.lastApplied = lastIndex
|
|
|
+ }
|
|
|
+ if lastIndex > r.commitIndex {
|
|
|
+ r.commitIndex = lastIndex
|
|
|
+ }
|
|
|
+ r.mu.Unlock()
|
|
|
+
|
|
|
+ // Send snapshot to FSM for restoration
|
|
|
+ // Use a goroutine with timeout to avoid blocking if applyCh is full
|
|
|
+ msg := ApplyMsg{
|
|
|
+ SnapshotValid: true,
|
|
|
+ Snapshot: data,
|
|
|
+ SnapshotIndex: lastIndex,
|
|
|
+ SnapshotTerm: lastTerm,
|
|
|
+ }
|
|
|
+
|
|
|
+ // Try to send with a timeout
|
|
|
+ select {
|
|
|
+ case r.applyCh <- msg:
|
|
|
+ r.logger.Info("FSM restoration triggered from snapshot at index %d", lastIndex)
|
|
|
+ case <-time.After(5 * time.Second):
|
|
|
+ return fmt.Errorf("timeout sending snapshot to applyCh")
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// TakeSnapshot takes a snapshot of the current state
|
|
|
+func (r *Raft) TakeSnapshot(data []byte, index uint64) error {
|
|
|
+ r.mu.Lock()
|
|
|
+ defer r.mu.Unlock()
|
|
|
+
|
|
|
+ if index > r.lastApplied {
|
|
|
+ return fmt.Errorf("snapshot index %d exceeds lastApplied %d", index, r.lastApplied)
|
|
|
+ }
|
|
|
+
|
|
|
+ term, err := r.log.GetTerm(index)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to get term for index %d: %w", index, err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := r.storage.SaveSnapshot(data, index, term); err != nil {
|
|
|
+ return fmt.Errorf("failed to save snapshot: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := r.log.Compact(index); err != nil {
|
|
|
+ return fmt.Errorf("failed to compact log: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ r.logger.Info("Took snapshot at index %d, term %d", index, term)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func max(a, b uint64) uint64 {
|
|
|
+ if a > b {
|
|
|
+ return a
|
|
|
+ }
|
|
|
+ return b
|
|
|
+}
|
|
|
+
|
|
|
+// ==================== Membership Change API ====================
|
|
|
+//
|
|
|
+// This implementation uses Single-Node Membership Change (also known as one-at-a-time changes)
|
|
|
+// as described in the Raft dissertation (§4.3). This is safe because:
|
|
|
+//
|
|
|
+// 1. We only allow one configuration change at a time (pendingConfigChange flag)
|
|
|
+// 2. For commits, we use the OLD cluster majority until the config change is committed
|
|
|
+// 3. The new node starts receiving entries immediately but doesn't affect majority calculation
|
|
|
+//
|
|
|
+// This approach is simpler than Joint Consensus and is sufficient for most use cases.
|
|
|
+// The invariant maintained is: any two majorities (old or new) must overlap.
|
|
|
+//
|
|
|
+// For adding a node: old majority = N/2+1, new majority = (N+1)/2+1 = N/2+1 (overlaps)
|
|
|
+// For removing a node: old majority = N/2+1, new majority = (N-1)/2+1 (overlaps if N > 1)
|
|
|
+//
|
|
|
+// WARNING: Avoid adding/removing multiple nodes rapidly. Wait for each change to be committed.
|
|
|
+
|
|
|
+// AddNode adds a new node to the cluster
|
|
|
+// This can only be called on the leader
|
|
|
+// The new node must already be running and reachable
|
|
|
+//
|
|
|
+// Safety guarantees:
|
|
|
+// - Only one config change can be in progress at a time
|
|
|
+// - The old cluster majority is used until the config change is committed
|
|
|
+// - Returns error if leadership is lost during the operation
|
|
|
+func (r *Raft) AddNode(nodeID, address string) error {
|
|
|
+ r.mu.Lock()
|
|
|
+
|
|
|
+ // Must be leader
|
|
|
+ if r.state != Leader {
|
|
|
+ leaderID := r.leaderID
|
|
|
+ r.mu.Unlock()
|
|
|
+ return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if we're in the middle of a leadership transfer
|
|
|
+ if r.transferring {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("leadership transfer in progress")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if there's already a pending config change
|
|
|
+ if r.pendingConfigChange {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return ErrConfigInFlight
|
|
|
+ }
|
|
|
+
|
|
|
+ // Validate nodeID and address
|
|
|
+ if nodeID == "" {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("nodeID cannot be empty")
|
|
|
+ }
|
|
|
+ if address == "" {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("address cannot be empty")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if node already exists
|
|
|
+ if _, exists := r.clusterNodes[nodeID]; exists {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("node %s already exists in cluster", nodeID)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if address is already used by another node
|
|
|
+ for existingID, existingAddr := range r.clusterNodes {
|
|
|
+ if existingAddr == address {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("address %s is already used by node %s", address, existingID)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Save old cluster nodes for majority calculation during config change
|
|
|
+ // This ensures we use the OLD cluster size until the config change is committed
|
|
|
+ r.oldClusterNodes = make(map[string]string)
|
|
|
+ for k, v := range r.clusterNodes {
|
|
|
+ r.oldClusterNodes[k] = v
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create new config with the added node
|
|
|
+ newNodes := make(map[string]string)
|
|
|
+ for k, v := range r.clusterNodes {
|
|
|
+ newNodes[k] = v
|
|
|
+ }
|
|
|
+ newNodes[nodeID] = address
|
|
|
+
|
|
|
+ // Create config change entry with ClusterConfig
|
|
|
+ configIndex := r.log.LastIndex() + 1
|
|
|
+ entry := LogEntry{
|
|
|
+ Index: configIndex,
|
|
|
+ Term: r.currentTerm,
|
|
|
+ Type: EntryConfig,
|
|
|
+ Config: &ClusterConfig{Nodes: newNodes},
|
|
|
+ }
|
|
|
+
|
|
|
+ // Mark config change as pending and store the index
|
|
|
+ r.pendingConfigChange = true
|
|
|
+ r.configChangeIndex = configIndex
|
|
|
+
|
|
|
+ // Immediately apply the new configuration (for single-node changes, this is safe)
|
|
|
+ // The new node will start receiving AppendEntries immediately
|
|
|
+ r.clusterNodes[nodeID] = address
|
|
|
+ r.peers = append(r.peers, address)
|
|
|
+ // Set nextIndex to 1 (or firstIndex) so the new node syncs from the beginning
|
|
|
+ // This is crucial - the new node's log is empty, so we must start from index 1
|
|
|
+ firstIndex := r.log.FirstIndex()
|
|
|
+ if firstIndex == 0 {
|
|
|
+ firstIndex = 1
|
|
|
+ }
|
|
|
+ r.nextIndex[address] = firstIndex
|
|
|
+ r.matchIndex[address] = 0
|
|
|
+
|
|
|
+ r.mu.Unlock()
|
|
|
+
|
|
|
+ // Append the config change entry to log
|
|
|
+ if err := r.log.Append(entry); err != nil {
|
|
|
+ r.mu.Lock()
|
|
|
+ r.pendingConfigChange = false
|
|
|
+ r.oldClusterNodes = nil
|
|
|
+ r.configChangeIndex = 0
|
|
|
+ // Rollback
|
|
|
+ delete(r.clusterNodes, nodeID)
|
|
|
+ r.rebuildPeersList()
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("failed to append config entry: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ r.logger.Info("Adding node %s (%s) to cluster", nodeID, address)
|
|
|
+
|
|
|
+ // Trigger immediate replication
|
|
|
+ r.triggerReplication()
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// RemoveNode removes a node from the cluster
|
|
|
+// This can only be called on the leader
|
|
|
+// The node being removed can be any node except the leader itself
|
|
|
+//
|
|
|
+// Safety guarantees:
|
|
|
+// - Only one config change can be in progress at a time
|
|
|
+// - Cannot remove the leader (transfer leadership first)
|
|
|
+// - Cannot reduce cluster to 0 nodes
|
|
|
+// - The old cluster majority is used until the config change is committed
|
|
|
+func (r *Raft) RemoveNode(nodeID string) error {
|
|
|
+ r.mu.Lock()
|
|
|
+
|
|
|
+ // Must be leader
|
|
|
+ if r.state != Leader {
|
|
|
+ leaderID := r.leaderID
|
|
|
+ r.mu.Unlock()
|
|
|
+ return NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if we're in the middle of a leadership transfer
|
|
|
+ if r.transferring {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("leadership transfer in progress")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Cannot remove self
|
|
|
+ if nodeID == r.nodeID {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("cannot remove self from cluster, use TransferLeadership first")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Validate nodeID
|
|
|
+ if nodeID == "" {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("nodeID cannot be empty")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if there's already a pending config change
|
|
|
+ if r.pendingConfigChange {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return ErrConfigInFlight
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if node exists
|
|
|
+ if _, exists := r.clusterNodes[nodeID]; !exists {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("node %s not found in cluster", nodeID)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Cannot reduce cluster below 1 node
|
|
|
+ if len(r.clusterNodes) <= 1 {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("cannot remove last node from cluster")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Save old cluster nodes for majority calculation during config change
|
|
|
+ r.oldClusterNodes = make(map[string]string)
|
|
|
+ for k, v := range r.clusterNodes {
|
|
|
+ r.oldClusterNodes[k] = v
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create new config without the removed node
|
|
|
+ newNodes := make(map[string]string)
|
|
|
+ for k, v := range r.clusterNodes {
|
|
|
+ if k != nodeID {
|
|
|
+ newNodes[k] = v
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create config change entry with ClusterConfig
|
|
|
+ configIndex := r.log.LastIndex() + 1
|
|
|
+ entry := LogEntry{
|
|
|
+ Index: configIndex,
|
|
|
+ Term: r.currentTerm,
|
|
|
+ Type: EntryConfig,
|
|
|
+ Config: &ClusterConfig{Nodes: newNodes},
|
|
|
+ }
|
|
|
+
|
|
|
+ // Mark config change as pending and store the index
|
|
|
+ r.pendingConfigChange = true
|
|
|
+ r.configChangeIndex = configIndex
|
|
|
+
|
|
|
+ // Get the address of node being removed for cleanup
|
|
|
+ removedAddr := r.clusterNodes[nodeID]
|
|
|
+
|
|
|
+ // Immediately apply the new configuration
|
|
|
+ delete(r.clusterNodes, nodeID)
|
|
|
+ r.rebuildPeersList()
|
|
|
+ delete(r.nextIndex, removedAddr)
|
|
|
+ delete(r.matchIndex, removedAddr)
|
|
|
+
|
|
|
+ r.mu.Unlock()
|
|
|
+
|
|
|
+ // Append the config change entry to log
|
|
|
+ if err := r.log.Append(entry); err != nil {
|
|
|
+ r.mu.Lock()
|
|
|
+ r.pendingConfigChange = false
|
|
|
+ r.oldClusterNodes = nil
|
|
|
+ r.configChangeIndex = 0
|
|
|
+ // Rollback - this is tricky but we try our best
|
|
|
+ r.clusterNodes[nodeID] = removedAddr
|
|
|
+ r.rebuildPeersList()
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("failed to append config entry: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ r.logger.Info("Removing node %s from cluster, config at index %d", nodeID, entry.Index)
|
|
|
+
|
|
|
+ // Trigger replication
|
|
|
+ go r.sendHeartbeats()
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// AddNodeWithForward adds a node, forwarding to leader if necessary
|
|
|
+// This is the recommended method for applications to use
|
|
|
+func (r *Raft) AddNodeWithForward(nodeID, address string) error {
|
|
|
+ // Try local operation first
|
|
|
+ err := r.AddNode(nodeID, address)
|
|
|
+ if err == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if we're not the leader
|
|
|
+ r.mu.RLock()
|
|
|
+ state := r.state
|
|
|
+ leaderID := r.leaderID
|
|
|
+ leaderAddr := r.clusterNodes[leaderID]
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ if state == Leader {
|
|
|
+ // We are leader but AddNode failed for other reasons
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ // Not leader, forward to leader
|
|
|
+ if leaderID == "" {
|
|
|
+ return fmt.Errorf("no leader available")
|
|
|
+ }
|
|
|
+
|
|
|
+ if leaderAddr == "" {
|
|
|
+ return fmt.Errorf("leader %s address not found in cluster", leaderID)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Forward to leader
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
+ defer cancel()
|
|
|
+
|
|
|
+ args := &AddNodeArgs{NodeID: nodeID, Address: address}
|
|
|
+ reply, err := r.transport.ForwardAddNode(ctx, leaderAddr, args)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("forward failed: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if !reply.Success {
|
|
|
+ return fmt.Errorf("leader rejected: %s", reply.Error)
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// RemoveNodeWithForward removes a node, forwarding to leader if necessary
|
|
|
+// This is the recommended method for applications to use
|
|
|
+func (r *Raft) RemoveNodeWithForward(nodeID string) error {
|
|
|
+ // Try local operation first
|
|
|
+ err := r.RemoveNode(nodeID)
|
|
|
+ if err == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if we're not the leader
|
|
|
+ r.mu.RLock()
|
|
|
+ state := r.state
|
|
|
+ leaderID := r.leaderID
|
|
|
+ leaderAddr := r.clusterNodes[leaderID]
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ if state == Leader {
|
|
|
+ // We are leader but RemoveNode failed for other reasons
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ // Not leader, forward to leader
|
|
|
+ if leaderID == "" {
|
|
|
+ return fmt.Errorf("no leader available")
|
|
|
+ }
|
|
|
+
|
|
|
+ if leaderAddr == "" {
|
|
|
+ return fmt.Errorf("leader %s address not found in cluster", leaderID)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Forward to leader
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
+ defer cancel()
|
|
|
+
|
|
|
+ args := &RemoveNodeArgs{NodeID: nodeID}
|
|
|
+ reply, err := r.transport.ForwardRemoveNode(ctx, leaderAddr, args)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("forward failed: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if !reply.Success {
|
|
|
+ return fmt.Errorf("leader rejected: %s", reply.Error)
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// rebuildPeersList rebuilds the peers slice from clusterNodes
|
|
|
+func (r *Raft) rebuildPeersList() {
|
|
|
+ r.peers = make([]string, 0, len(r.clusterNodes)-1)
|
|
|
+ for nodeID, addr := range r.clusterNodes {
|
|
|
+ if nodeID != r.nodeID {
|
|
|
+ r.peers = append(r.peers, addr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// GetClusterNodes returns a copy of the current cluster membership
|
|
|
+func (r *Raft) GetClusterNodes() map[string]string {
|
|
|
+ r.mu.RLock()
|
|
|
+ defer r.mu.RUnlock()
|
|
|
+
|
|
|
+ nodes := make(map[string]string)
|
|
|
+ for k, v := range r.clusterNodes {
|
|
|
+ nodes[k] = v
|
|
|
+ }
|
|
|
+ return nodes
|
|
|
+}
|
|
|
+
|
|
|
+// applyConfigChange applies a configuration change entry
|
|
|
+func (r *Raft) applyConfigChange(entry *LogEntry) {
|
|
|
+ if entry.Config == nil || entry.Config.Nodes == nil {
|
|
|
+ r.logger.Warn("Invalid config change entry at index %d", entry.Index)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ r.mu.Lock()
|
|
|
+ defer r.mu.Unlock()
|
|
|
+
|
|
|
+ // Update cluster configuration
|
|
|
+ r.clusterNodes = make(map[string]string)
|
|
|
+ for k, v := range entry.Config.Nodes {
|
|
|
+ r.clusterNodes[k] = v
|
|
|
+ }
|
|
|
+ r.rebuildPeersList()
|
|
|
+
|
|
|
+ // Persist the new configuration
|
|
|
+ if err := r.storage.SaveClusterConfig(&ClusterConfig{Nodes: r.clusterNodes}); err != nil {
|
|
|
+ r.logger.Error("Failed to persist cluster config: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Clear pending flag and old cluster state
|
|
|
+ r.pendingConfigChange = false
|
|
|
+ r.oldClusterNodes = nil
|
|
|
+ r.configChangeIndex = 0
|
|
|
+
|
|
|
+ // If we were joining a cluster and now have multiple nodes, we've successfully joined
|
|
|
+ if r.joiningCluster && len(r.clusterNodes) > 1 {
|
|
|
+ r.joiningCluster = false
|
|
|
+ r.logger.Info("Successfully joined cluster with %d nodes", len(r.clusterNodes))
|
|
|
+ }
|
|
|
+
|
|
|
+ r.logger.Info("Applied config change at index %d, cluster now has %d nodes", entry.Index, len(r.clusterNodes))
|
|
|
+
|
|
|
+ // If we're the leader, update leader state
|
|
|
+ if r.state == Leader {
|
|
|
+ // Initialize nextIndex/matchIndex for any new nodes
|
|
|
+ lastIndex := r.log.LastIndex()
|
|
|
+ for nodeID, addr := range r.clusterNodes {
|
|
|
+ if nodeID != r.nodeID {
|
|
|
+ if _, exists := r.nextIndex[addr]; !exists {
|
|
|
+ r.nextIndex[addr] = lastIndex + 1
|
|
|
+ r.matchIndex[addr] = 0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Clean up removed nodes
|
|
|
+ validAddrs := make(map[string]bool)
|
|
|
+ for nodeID, addr := range r.clusterNodes {
|
|
|
+ if nodeID != r.nodeID {
|
|
|
+ validAddrs[addr] = true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for addr := range r.nextIndex {
|
|
|
+ if !validAddrs[addr] {
|
|
|
+ delete(r.nextIndex, addr)
|
|
|
+ delete(r.matchIndex, addr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// ==================== ReadIndex (Linearizable Reads) ====================
|
|
|
+
|
|
|
+// readIndexLoop handles read index requests
|
|
|
+func (r *Raft) readIndexLoop() {
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-r.stopCh:
|
|
|
+ return
|
|
|
+ case req := <-r.readIndexCh:
|
|
|
+ r.processReadIndexRequest(req)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// processReadIndexRequest processes a single read index request
|
|
|
+func (r *Raft) processReadIndexRequest(req *readIndexRequest) {
|
|
|
+ r.mu.RLock()
|
|
|
+ if r.state != Leader {
|
|
|
+ r.mu.RUnlock()
|
|
|
+ req.done <- ErrNotLeader
|
|
|
+ return
|
|
|
+ }
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ // Confirm leadership by sending heartbeats and waiting for majority ack
|
|
|
+ if !r.confirmLeadership() {
|
|
|
+ req.done <- ErrLeadershipLost
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait for apply to catch up to readIndex
|
|
|
+ if err := r.waitApply(req.readIndex, r.config.ProposeTimeout); err != nil {
|
|
|
+ req.done <- err
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ req.done <- nil
|
|
|
+}
|
|
|
+
|
|
|
+// ReadIndex implements linearizable reads
|
|
|
+// It ensures that the read sees all writes that were committed before the read started
|
|
|
+func (r *Raft) ReadIndex() (uint64, error) {
|
|
|
+ r.mu.RLock()
|
|
|
+ if r.state != Leader {
|
|
|
+ leaderID := r.leaderID
|
|
|
+ r.mu.RUnlock()
|
|
|
+ return 0, NewRaftError(ErrNotLeader, leaderID, r.config.RetryBackoff)
|
|
|
+ }
|
|
|
+ readIndex := r.commitIndex
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ atomic.AddUint64(&r.metrics.ReadIndexRequests, 1)
|
|
|
+
|
|
|
+ // Create request and send to processing loop
|
|
|
+ req := &readIndexRequest{
|
|
|
+ readIndex: readIndex,
|
|
|
+ done: make(chan error, 1),
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case r.readIndexCh <- req:
|
|
|
+ case <-r.stopCh:
|
|
|
+ return 0, ErrShutdown
|
|
|
+ case <-time.After(r.config.ProposeTimeout):
|
|
|
+ return 0, ErrTimeout
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait for result
|
|
|
+ select {
|
|
|
+ case err := <-req.done:
|
|
|
+ if err != nil {
|
|
|
+ return 0, err
|
|
|
+ }
|
|
|
+ atomic.AddUint64(&r.metrics.ReadIndexSuccess, 1)
|
|
|
+ return readIndex, nil
|
|
|
+ case <-r.stopCh:
|
|
|
+ return 0, ErrShutdown
|
|
|
+ case <-time.After(r.config.ProposeTimeout):
|
|
|
+ return 0, ErrTimeout
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// confirmLeadership confirms we're still leader by getting acks from majority
|
|
|
+func (r *Raft) confirmLeadership() bool {
|
|
|
+ r.mu.RLock()
|
|
|
+ if r.state != Leader {
|
|
|
+ r.mu.RUnlock()
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ currentTerm := r.currentTerm
|
|
|
+ clusterSize := len(r.clusterNodes)
|
|
|
+ if clusterSize == 0 {
|
|
|
+ clusterSize = len(r.peers) + 1
|
|
|
+ }
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ // Single node cluster - we're always the leader
|
|
|
+ if clusterSize == 1 {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+ // Send heartbeats and count acks
|
|
|
+ r.sendHeartbeats()
|
|
|
+
|
|
|
+ // Wait briefly and check if we're still leader
|
|
|
+ time.Sleep(r.config.HeartbeatInterval)
|
|
|
+
|
|
|
+ r.mu.RLock()
|
|
|
+ stillLeader := r.state == Leader && r.currentTerm == currentTerm
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ return stillLeader
|
|
|
+}
|
|
|
+
|
|
|
+// waitApply waits until lastApplied >= index
|
|
|
+func (r *Raft) waitApply(index uint64, timeout time.Duration) error {
|
|
|
+ deadline := time.Now().Add(timeout)
|
|
|
+ for {
|
|
|
+ r.mu.RLock()
|
|
|
+ lastApplied := r.lastApplied
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ if lastApplied >= index {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if time.Now().After(deadline) {
|
|
|
+ return ErrTimeout
|
|
|
+ }
|
|
|
+
|
|
|
+ time.Sleep(1 * time.Millisecond)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// ==================== Health Check ====================
|
|
|
+
|
|
|
+// HealthCheck returns the current health status of the node
|
|
|
+func (r *Raft) HealthCheck() HealthStatus {
|
|
|
+ r.mu.RLock()
|
|
|
+ defer r.mu.RUnlock()
|
|
|
+
|
|
|
+ clusterNodes := make(map[string]string)
|
|
|
+ for k, v := range r.clusterNodes {
|
|
|
+ clusterNodes[k] = v
|
|
|
+ }
|
|
|
+
|
|
|
+ clusterSize := len(r.clusterNodes)
|
|
|
+ if clusterSize == 0 {
|
|
|
+ clusterSize = len(r.peers) + 1
|
|
|
+ }
|
|
|
+
|
|
|
+ logBehind := uint64(0)
|
|
|
+ if r.commitIndex > r.lastApplied {
|
|
|
+ logBehind = r.commitIndex - r.lastApplied
|
|
|
+ }
|
|
|
+
|
|
|
+ // Consider healthy if we're leader or have a known leader
|
|
|
+ isHealthy := r.state == Leader || r.leaderID != ""
|
|
|
+
|
|
|
+ return HealthStatus{
|
|
|
+ NodeID: r.nodeID,
|
|
|
+ State: r.state.String(),
|
|
|
+ Term: r.currentTerm,
|
|
|
+ LeaderID: r.leaderID,
|
|
|
+ ClusterSize: clusterSize,
|
|
|
+ ClusterNodes: clusterNodes,
|
|
|
+ CommitIndex: r.commitIndex,
|
|
|
+ LastApplied: r.lastApplied,
|
|
|
+ LogBehind: logBehind,
|
|
|
+ LastHeartbeat: r.lastHeartbeat,
|
|
|
+ IsHealthy: isHealthy,
|
|
|
+ Uptime: time.Since(r.startTime),
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// GetMetrics returns the current metrics
|
|
|
+func (r *Raft) GetMetrics() Metrics {
|
|
|
+ return Metrics{
|
|
|
+ Term: atomic.LoadUint64(&r.metrics.Term),
|
|
|
+ ProposalsTotal: atomic.LoadUint64(&r.metrics.ProposalsTotal),
|
|
|
+ ProposalsSuccess: atomic.LoadUint64(&r.metrics.ProposalsSuccess),
|
|
|
+ ProposalsFailed: atomic.LoadUint64(&r.metrics.ProposalsFailed),
|
|
|
+ ProposalsForwarded: atomic.LoadUint64(&r.metrics.ProposalsForwarded),
|
|
|
+ AppendsSent: atomic.LoadUint64(&r.metrics.AppendsSent),
|
|
|
+ AppendsReceived: atomic.LoadUint64(&r.metrics.AppendsReceived),
|
|
|
+ AppendsSuccess: atomic.LoadUint64(&r.metrics.AppendsSuccess),
|
|
|
+ AppendsFailed: atomic.LoadUint64(&r.metrics.AppendsFailed),
|
|
|
+ ElectionsStarted: atomic.LoadUint64(&r.metrics.ElectionsStarted),
|
|
|
+ ElectionsWon: atomic.LoadUint64(&r.metrics.ElectionsWon),
|
|
|
+ PreVotesStarted: atomic.LoadUint64(&r.metrics.PreVotesStarted),
|
|
|
+ PreVotesGranted: atomic.LoadUint64(&r.metrics.PreVotesGranted),
|
|
|
+ SnapshotsTaken: atomic.LoadUint64(&r.metrics.SnapshotsTaken),
|
|
|
+ SnapshotsInstalled: atomic.LoadUint64(&r.metrics.SnapshotsInstalled),
|
|
|
+ SnapshotsSent: atomic.LoadUint64(&r.metrics.SnapshotsSent),
|
|
|
+ ReadIndexRequests: atomic.LoadUint64(&r.metrics.ReadIndexRequests),
|
|
|
+ ReadIndexSuccess: atomic.LoadUint64(&r.metrics.ReadIndexSuccess),
|
|
|
+ LeadershipTransfers: atomic.LoadUint64(&r.metrics.LeadershipTransfers),
|
|
|
+ LeadershipTransferSuccess: atomic.LoadUint64(&r.metrics.LeadershipTransferSuccess),
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// ==================== Leadership Transfer ====================
|
|
|
+
|
|
|
+// TransferLeadership transfers leadership to the specified node
|
|
|
+func (r *Raft) TransferLeadership(targetID string) error {
|
|
|
+ r.mu.Lock()
|
|
|
+ if r.state != Leader {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return ErrNotLeader
|
|
|
+ }
|
|
|
+
|
|
|
+ if targetID == r.nodeID {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("cannot transfer to self")
|
|
|
+ }
|
|
|
+
|
|
|
+ targetAddr, exists := r.clusterNodes[targetID]
|
|
|
+ if !exists {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("target node %s not in cluster", targetID)
|
|
|
+ }
|
|
|
+
|
|
|
+ if r.transferring {
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("leadership transfer already in progress")
|
|
|
+ }
|
|
|
+
|
|
|
+ r.transferring = true
|
|
|
+ r.transferTarget = targetID
|
|
|
+ r.transferDeadline = time.Now().Add(r.config.ElectionTimeoutMax * 2)
|
|
|
+ currentTerm := r.currentTerm
|
|
|
+
|
|
|
+ atomic.AddUint64(&r.metrics.LeadershipTransfers, 1)
|
|
|
+ r.mu.Unlock()
|
|
|
+
|
|
|
+ r.logger.Info("Starting leadership transfer to %s", targetID)
|
|
|
+
|
|
|
+ // Step 1: Sync target to our log
|
|
|
+ if err := r.syncFollowerToLatest(targetAddr); err != nil {
|
|
|
+ r.mu.Lock()
|
|
|
+ r.transferring = false
|
|
|
+ r.transferTarget = ""
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("failed to sync target: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Step 2: Send TimeoutNow RPC
|
|
|
+ args := &TimeoutNowArgs{
|
|
|
+ Term: currentTerm,
|
|
|
+ LeaderID: r.nodeID,
|
|
|
+ }
|
|
|
+
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
|
|
|
+ defer cancel()
|
|
|
+
|
|
|
+ reply, err := r.transport.TimeoutNow(ctx, targetAddr, args)
|
|
|
+ if err != nil {
|
|
|
+ r.mu.Lock()
|
|
|
+ r.transferring = false
|
|
|
+ r.transferTarget = ""
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("TimeoutNow RPC failed: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if !reply.Success {
|
|
|
+ r.mu.Lock()
|
|
|
+ r.transferring = false
|
|
|
+ r.transferTarget = ""
|
|
|
+ r.mu.Unlock()
|
|
|
+ return fmt.Errorf("target rejected leadership transfer")
|
|
|
+ }
|
|
|
+
|
|
|
+ atomic.AddUint64(&r.metrics.LeadershipTransferSuccess, 1)
|
|
|
+ r.logger.Info("Leadership transfer to %s initiated successfully", targetID)
|
|
|
+
|
|
|
+ // Note: We don't immediately step down; we wait for the target to win election
|
|
|
+ // and send us an AppendEntries with higher term
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// syncFollowerToLatest ensures the follower is caught up to our log
|
|
|
+func (r *Raft) syncFollowerToLatest(peerAddr string) error {
|
|
|
+ r.mu.RLock()
|
|
|
+ if r.state != Leader {
|
|
|
+ r.mu.RUnlock()
|
|
|
+ return ErrNotLeader
|
|
|
+ }
|
|
|
+ currentTerm := r.currentTerm
|
|
|
+ leaderCommit := r.commitIndex
|
|
|
+ lastIndex := r.log.LastIndex()
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ // Keep replicating until follower is caught up
|
|
|
+ deadline := time.Now().Add(r.config.ElectionTimeoutMax * 2)
|
|
|
+ for time.Now().Before(deadline) {
|
|
|
+ r.mu.RLock()
|
|
|
+ if r.state != Leader || r.currentTerm != currentTerm {
|
|
|
+ r.mu.RUnlock()
|
|
|
+ return ErrLeadershipLost
|
|
|
+ }
|
|
|
+ matchIndex := r.matchIndex[peerAddr]
|
|
|
+ r.mu.RUnlock()
|
|
|
+
|
|
|
+ if matchIndex >= lastIndex {
|
|
|
+ return nil // Caught up
|
|
|
+ }
|
|
|
+
|
|
|
+ // Trigger replication
|
|
|
+ r.replicateToPeer(peerAddr, currentTerm, leaderCommit)
|
|
|
+ time.Sleep(10 * time.Millisecond)
|
|
|
+ }
|
|
|
+
|
|
|
+ return ErrTimeout
|
|
|
+}
|
|
|
+
|
|
|
+// HandleTimeoutNow handles TimeoutNow RPC (for leadership transfer)
|
|
|
+func (r *Raft) HandleTimeoutNow(args *TimeoutNowArgs) *TimeoutNowReply {
|
|
|
+ r.mu.Lock()
|
|
|
+ defer r.mu.Unlock()
|
|
|
+
|
|
|
+ reply := &TimeoutNowReply{
|
|
|
+ Term: r.currentTerm,
|
|
|
+ Success: false,
|
|
|
+ }
|
|
|
+
|
|
|
+ // Only accept if we're a follower and the term matches
|
|
|
+ if args.Term < r.currentTerm {
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ if r.state != Follower {
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ // Immediately start election
|
|
|
+ r.logger.Info("Received TimeoutNow from %s, starting immediate election", args.LeaderID)
|
|
|
+ r.state = Candidate
|
|
|
+ reply.Success = true
|
|
|
+
|
|
|
+ return reply
|
|
|
+}
|
|
|
+
|
|
|
+// HandleReadIndex handles ReadIndex RPC
|
|
|
+func (r *Raft) HandleReadIndex(args *ReadIndexArgs) *ReadIndexReply {
|
|
|
+ reply := &ReadIndexReply{
|
|
|
+ Success: false,
|
|
|
+ }
|
|
|
+
|
|
|
+ readIndex, err := r.ReadIndex()
|
|
|
+ if err != nil {
|
|
|
+ reply.Error = err.Error()
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ reply.ReadIndex = readIndex
|
|
|
+ reply.Success = true
|
|
|
+ return reply
|
|
|
+}
|
|
|
+
|
|
|
+// HandleGet handles Get RPC for remote KV reads
|
|
|
+func (r *Raft) HandleGet(args *GetArgs) *GetReply {
|
|
|
+ reply := &GetReply{
|
|
|
+ Found: false,
|
|
|
+ }
|
|
|
+
|
|
|
+ if r.config.GetHandler == nil {
|
|
|
+ reply.Error = "get handler not configured"
|
|
|
+ return reply
|
|
|
+ }
|
|
|
+
|
|
|
+ value, found := r.config.GetHandler(args.Key)
|
|
|
+ reply.Value = value
|
|
|
+ reply.Found = found
|
|
|
+ return reply
|
|
|
+}
|