|
@@ -4,6 +4,8 @@ import (
|
|
|
"context"
|
|
"context"
|
|
|
"fmt"
|
|
"fmt"
|
|
|
"math/rand"
|
|
"math/rand"
|
|
|
|
|
+ "os"
|
|
|
|
|
+ "strings"
|
|
|
"sync"
|
|
"sync"
|
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
|
"time"
|
|
"time"
|
|
@@ -105,6 +107,9 @@ type Raft struct {
|
|
|
|
|
|
|
|
// Snapshot receiving state (for chunked transfer)
|
|
// Snapshot receiving state (for chunked transfer)
|
|
|
pendingSnapshot *pendingSnapshotState
|
|
pendingSnapshot *pendingSnapshotState
|
|
|
|
|
+
|
|
|
|
|
+ // Last contact time for each peer (for leader check)
|
|
|
|
|
+ lastContact map[string]time.Time
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// readIndexRequest represents a pending read index request
|
|
// readIndexRequest represents a pending read index request
|
|
@@ -256,8 +261,12 @@ func NewRaft(config *Config, transport Transport, applyCh chan ApplyMsg) (*Raft,
|
|
|
logger: config.Logger,
|
|
logger: config.Logger,
|
|
|
readIndexCh: make(chan *readIndexRequest, 100),
|
|
readIndexCh: make(chan *readIndexRequest, 100),
|
|
|
lastHeartbeat: time.Now(),
|
|
lastHeartbeat: time.Now(),
|
|
|
|
|
+ lastContact: make(map[string]time.Time),
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // Initialize metrics
|
|
|
|
|
+ r.metrics.Term = state.CurrentTerm
|
|
|
|
|
+
|
|
|
// Set RPC handler
|
|
// Set RPC handler
|
|
|
transport.SetRPCHandler(r)
|
|
transport.SetRPCHandler(r)
|
|
|
|
|
|
|
@@ -281,7 +290,12 @@ func (r *Raft) Start() error {
|
|
|
// Restore FSM from snapshot if exists
|
|
// Restore FSM from snapshot if exists
|
|
|
// This must happen before starting apply loop to ensure FSM state is restored
|
|
// This must happen before starting apply loop to ensure FSM state is restored
|
|
|
if err := r.restoreFromSnapshot(); err != nil {
|
|
if err := r.restoreFromSnapshot(); err != nil {
|
|
|
- r.logger.Warn("Failed to restore from snapshot: %v", err)
|
|
|
|
|
|
|
+ // Suppress warning if it's just a missing file (first start)
|
|
|
|
|
+ if !os.IsNotExist(err) && !strings.Contains(err.Error(), "no such file") {
|
|
|
|
|
+ r.logger.Warn("Failed to restore from snapshot: %v", err)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ r.logger.Info("No snapshot found, starting with empty state")
|
|
|
|
|
+ }
|
|
|
// Continue anyway - the node can still function, just without historical state
|
|
// Continue anyway - the node can still function, just without historical state
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -409,6 +423,7 @@ func (r *Raft) runFollower() {
|
|
|
}
|
|
}
|
|
|
r.logger.Debug("Election timeout, becoming candidate")
|
|
r.logger.Debug("Election timeout, becoming candidate")
|
|
|
r.state = Candidate
|
|
r.state = Candidate
|
|
|
|
|
+ r.leaderID = "" // Clear leaderID so we can vote for self in Pre-Vote
|
|
|
}
|
|
}
|
|
|
r.mu.Unlock()
|
|
r.mu.Unlock()
|
|
|
return
|
|
return
|
|
@@ -441,6 +456,9 @@ func (r *Raft) runCandidate() {
|
|
|
r.votedFor = r.nodeID
|
|
r.votedFor = r.nodeID
|
|
|
r.leaderID = ""
|
|
r.leaderID = ""
|
|
|
currentTerm := r.currentTerm
|
|
currentTerm := r.currentTerm
|
|
|
|
|
+ // Update metrics
|
|
|
|
|
+ atomic.StoreUint64(&r.metrics.Term, currentTerm)
|
|
|
|
|
+
|
|
|
if err := r.persistState(); err != nil {
|
|
if err := r.persistState(); err != nil {
|
|
|
r.logger.Error("Failed to persist state during election: %v", err)
|
|
r.logger.Error("Failed to persist state during election: %v", err)
|
|
|
r.mu.Unlock()
|
|
r.mu.Unlock()
|
|
@@ -704,6 +722,9 @@ func (r *Raft) becomeFollower(term uint64) {
|
|
|
|
|
|
|
|
r.state = Follower
|
|
r.state = Follower
|
|
|
r.currentTerm = term
|
|
r.currentTerm = term
|
|
|
|
|
+ // Update metrics
|
|
|
|
|
+ atomic.StoreUint64(&r.metrics.Term, term)
|
|
|
|
|
+
|
|
|
r.votedFor = ""
|
|
r.votedFor = ""
|
|
|
r.leaderID = ""
|
|
r.leaderID = ""
|
|
|
// Must persist before responding - use mustPersistState for critical transitions
|
|
// Must persist before responding - use mustPersistState for critical transitions
|
|
@@ -854,6 +875,9 @@ func (r *Raft) replicateToPeer(peer string, term, leaderCommit uint64) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if reply.Success {
|
|
if reply.Success {
|
|
|
|
|
+ // Update contact time
|
|
|
|
|
+ r.lastContact[peer] = time.Now()
|
|
|
|
|
+
|
|
|
// Update nextIndex and matchIndex
|
|
// Update nextIndex and matchIndex
|
|
|
if len(entries) > 0 {
|
|
if len(entries) > 0 {
|
|
|
newMatchIndex := entries[len(entries)-1].Index
|
|
newMatchIndex := entries[len(entries)-1].Index
|
|
@@ -869,6 +893,9 @@ func (r *Raft) replicateToPeer(peer string, term, leaderCommit uint64) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
|
|
+ // Even if failed (log conflict), we contacted the peer
|
|
|
|
|
+ r.lastContact[peer] = time.Now()
|
|
|
|
|
+
|
|
|
// Decrement nextIndex and retry
|
|
// Decrement nextIndex and retry
|
|
|
if reply.ConflictTerm > 0 {
|
|
if reply.ConflictTerm > 0 {
|
|
|
// Find the last entry of ConflictTerm in our log
|
|
// Find the last entry of ConflictTerm in our log
|
|
@@ -1657,6 +1684,28 @@ func (r *Raft) Propose(command []byte) (uint64, uint64, bool) {
|
|
|
return 0, 0, false
|
|
return 0, 0, false
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // Check connectivity (Lease Check)
|
|
|
|
|
+ // If we haven't heard from a majority of peers within ElectionTimeout,
|
|
|
|
|
+ // we shouldn't accept new commands because we might be partitioned.
|
|
|
|
|
+ if len(r.clusterNodes) > 1 {
|
|
|
|
|
+ activePeers := 1 // Self
|
|
|
|
|
+ now := time.Now()
|
|
|
|
|
+ timeout := r.config.ElectionTimeoutMax
|
|
|
|
|
+
|
|
|
|
|
+ for _, peer := range r.peers {
|
|
|
|
|
+ if last, ok := r.lastContact[peer]; ok && now.Sub(last) < timeout {
|
|
|
|
|
+ activePeers++
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Check majority
|
|
|
|
|
+ needed := len(r.clusterNodes)/2 + 1
|
|
|
|
|
+ if activePeers < needed {
|
|
|
|
|
+ r.logger.Warn("Rejecting Propose: lost contact with majority (active: %d, needed: %d)", activePeers, needed)
|
|
|
|
|
+ return 0, 0, false
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
index, err := r.log.AppendCommand(r.currentTerm, command)
|
|
index, err := r.log.AppendCommand(r.currentTerm, command)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
r.logger.Error("Failed to append command: %v", err)
|
|
r.logger.Error("Failed to append command: %v", err)
|
|
@@ -1761,6 +1810,42 @@ func (r *Raft) ProposeWithForward(command []byte) (index uint64, term uint64, er
|
|
|
return reply.Index, reply.Term, nil
|
|
return reply.Index, reply.Term, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// ForwardGet forwards a get request to the leader
|
|
|
|
|
+func (r *Raft) ForwardGet(key string) (string, bool, error) {
|
|
|
|
|
+ // Check if we are leader (local read)
|
|
|
|
|
+ if r.state == Leader {
|
|
|
|
|
+ if r.config.GetHandler != nil {
|
|
|
|
|
+ val, found := r.config.GetHandler(key)
|
|
|
|
|
+ return val, found, nil
|
|
|
|
|
+ }
|
|
|
|
|
+ return "", false, fmt.Errorf("get handler not configured")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ r.mu.RLock()
|
|
|
|
|
+ leaderID := r.leaderID
|
|
|
|
|
+ leaderAddr := r.clusterNodes[leaderID]
|
|
|
|
|
+ r.mu.RUnlock()
|
|
|
|
|
+
|
|
|
|
|
+ if leaderID == "" {
|
|
|
|
|
+ return "", false, ErrNoLeader
|
|
|
|
|
+ }
|
|
|
|
|
+ if leaderAddr == "" {
|
|
|
|
|
+ return "", false, fmt.Errorf("leader %s address not found", leaderID)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Forward to leader
|
|
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), r.config.RPCTimeout)
|
|
|
|
|
+ defer cancel()
|
|
|
|
|
+
|
|
|
|
|
+ args := &GetArgs{Key: key}
|
|
|
|
|
+ reply, err := r.transport.ForwardGet(ctx, leaderAddr, args)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return "", false, fmt.Errorf("forward failed: %w", err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return reply.Value, reply.Found, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// HandlePropose handles forwarded propose requests
|
|
// HandlePropose handles forwarded propose requests
|
|
|
func (r *Raft) HandlePropose(args *ProposeArgs) *ProposeReply {
|
|
func (r *Raft) HandlePropose(args *ProposeArgs) *ProposeReply {
|
|
|
index, term, isLeader := r.Propose(args.Command)
|
|
index, term, isLeader := r.Propose(args.Command)
|