Ver código fonte

leave测试成功

xbase 2 semanas atrás
pai
commit
9404be6fee
2 arquivos alterados com 93 adições e 0 exclusões
  1. 34 0
      raft.go
  2. 59 0
      server.go

+ 34 - 0
raft.go

@@ -1820,6 +1820,15 @@ func (r *Raft) flushAndReplicate() {
 // ProposeWithForward proposes a command, forwarding to leader if necessary
 // This is the recommended method for applications to use
 func (r *Raft) ProposeWithForward(command []byte) (index uint64, term uint64, err error) {
+	r.mu.RLock()
+	// Check if we are part of the cluster
+	// If we are not in clusterNodes, we shouldn't accept data commands
+	if _, exists := r.clusterNodes[r.nodeID]; !exists {
+		r.mu.RUnlock()
+		return 0, 0, fmt.Errorf("node %s is not part of the cluster", r.nodeID)
+	}
+	r.mu.RUnlock()
+
 	// Try local propose first
 	idx, t, isLeader := r.Propose(command)
 	if isLeader {
@@ -1860,6 +1869,14 @@ func (r *Raft) ProposeWithForward(command []byte) (index uint64, term uint64, er
 
 // ForwardGet forwards a get request to the leader
 func (r *Raft) ForwardGet(key string) (string, bool, error) {
+	r.mu.RLock()
+	// Check if we are part of the cluster
+	if _, exists := r.clusterNodes[r.nodeID]; !exists {
+		r.mu.RUnlock()
+		return "", false, fmt.Errorf("node %s is not part of the cluster", r.nodeID)
+	}
+	r.mu.RUnlock()
+
 	// Check if we are leader (local read)
 	if r.state == Leader {
 		if r.config.GetHandler != nil {
@@ -2482,10 +2499,27 @@ func (r *Raft) applyConfigChange(entry *LogEntry) {
 		r.logger.Info("Successfully joined cluster with %d nodes", len(r.clusterNodes))
 	}
 
+	// Check if we have been removed from the cluster
+	if _, exists := r.clusterNodes[r.nodeID]; !exists {
+		r.logger.Warn("Node %s removed from cluster configuration", r.nodeID)
+		// We could shut down here, or just stay as a listener.
+		// For now, let's step down to Follower if we are not already.
+		if r.state != Follower {
+			r.becomeFollower(r.currentTerm)
+		}
+	}
+
 	r.logger.Info("Applied config change at index %d, cluster now has %d nodes", entry.Index, len(r.clusterNodes))
 
 	// If we're the leader, update leader state
 	if r.state == Leader {
+		// Check if we are still in the cluster
+		if _, exists := r.clusterNodes[r.nodeID]; !exists {
+			r.logger.Warn("Leader removed from cluster (self), stepping down")
+			r.becomeFollower(r.currentTerm)
+			return
+		}
+
 		// Initialize nextIndex/matchIndex for any new nodes
 		lastIndex := r.log.LastIndex()
 		for nodeID, addr := range r.clusterNodes {

+ 59 - 0
server.go

@@ -20,6 +20,9 @@ type KVServer struct {
 	stopCh   chan struct{}
 	wg       sync.WaitGroup
 	stopOnce sync.Once
+	// leavingNodes tracks nodes that are currently being removed
+	// to prevent auto-rejoin/discovery logic from interfering
+	leavingNodes sync.Map
 }
 
 // NewKVServer creates a new KV server
@@ -231,9 +234,55 @@ func (s *KVServer) Join(nodeID, addr string) error {
 
 // Leave leaves the cluster
 func (s *KVServer) Leave(nodeID string) error {
+	// Mark node as leaving to prevent auto-rejoin
+	s.leavingNodes.Store(nodeID, time.Now())
+	
+	// Auto-expire the leaving flag after a while
+	go func() {
+		time.Sleep(30 * time.Second)
+		s.leavingNodes.Delete(nodeID)
+	}()
+
+	// Remove from RaftNode discovery key first to prevent auto-rejoin
+	if err := s.removeNodeFromDiscovery(nodeID); err != nil {
+		s.Raft.config.Logger.Warn("Failed to remove node from discovery key: %v", err)
+		// Continue anyway, as the main goal is to leave the cluster
+	}
 	return s.Raft.RemoveNodeWithForward(nodeID)
 }
 
+// removeNodeFromDiscovery removes a node from the RaftNode key to prevent auto-rejoin
+func (s *KVServer) removeNodeFromDiscovery(targetID string) error {
+	val, ok := s.Get("RaftNode")
+	if !ok || val == "" {
+		return nil
+	}
+
+	parts := strings.Split(val, ";")
+	var newParts []string
+	changed := false
+
+	for _, part := range parts {
+		if part == "" {
+			continue
+		}
+		kv := strings.SplitN(part, "=", 2)
+		if len(kv) == 2 {
+			if kv[0] == targetID {
+				changed = true
+				continue // Skip this node
+			}
+			newParts = append(newParts, part)
+		}
+	}
+
+	if changed {
+		newVal := strings.Join(newParts, ";")
+		return s.Set("RaftNode", newVal)
+	}
+	return nil
+}
+
 // WaitForLeader waits until a leader is elected
 func (s *KVServer) WaitForLeader(timeout time.Duration) error {
 	deadline := time.Now().Add(timeout)
@@ -367,6 +416,11 @@ func (s *KVServer) updateNodeInfo() {
 		changed := false
 		currentCluster := s.GetClusterNodes()
 		for id, addr := range currentCluster {
+			// Skip nodes that are marked as leaving
+			if _, leaving := s.leavingNodes.Load(id); leaving {
+				continue
+			}
+
 			if knownNodes[id] != addr {
 				knownNodes[id] = addr
 				changed = true
@@ -432,6 +486,11 @@ func (s *KVServer) checkConnections() {
 		}
 
 		if !currentAddrs[addr] {
+			// Skip nodes that are marked as leaving
+			if _, leaving := s.leavingNodes.Load(id); leaving {
+				continue
+			}
+
 			// Found a node that was previously in the cluster but is now missing
 			// Try to add it back
 			// We use AddNodeWithForward which handles non-blocking internally somewhat,