# xnet/raft - Raft 共识库 一个功能完整的 Raft 共识算法实现,支持动态成员变更、日志压缩、线性一致性读等特性。 ## 特性 - ✅ **Leader 选举** - 包含 PreVote 优化,防止网络分区导致的 term 膨胀 - ✅ **日志复制** - 支持批量复制和流水线优化 - ✅ **动态成员变更** - 运行时添加/移除节点 - ✅ **日志压缩 (Snapshot)** - 自动触发,支持分块传输,动态阈值防止压缩风暴 - ✅ **线性一致性读 (ReadIndex)** - 保证读取最新已提交数据 - ✅ **Leadership Transfer** - 主动转移 Leader 角色 - ✅ **自动请求路由** - Follower 自动转发读写请求到 Leader,无需客户端处理 - ✅ **Leader 租约检查 (Lease Check)** - 防止网络分区的 Leader 接受无法提交的写请求 - ✅ **持久化存储** - 日志和状态持久化到磁盘 - ✅ **健康检查 & 监控指标** - 内置 Metrics 支持 ## 快速开始 ### 基本用法 ```go package main import ( "log" "xbase/xnet/raft" ) func main() { // 使用默认配置 config := raft.DefaultConfig() config.NodeID = "node1" config.ListenAddr = "127.0.0.1:9001" config.DataDir = "data1" // 每个节点使用不同目录! config.Logger = raft.NewConsoleLogger("node1", 1) // 初始集群配置(单节点启动) config.ClusterNodes = map[string]string{ "node1": "127.0.0.1:9001", } // 创建 KV Server(内置状态机) server, err := raft.NewKVServer(config) if err != nil { log.Fatalf("Failed to create server: %v", err) } // 启动服务 if err := server.Start(); err != nil { log.Fatalf("Failed to start server: %v", err) } log.Println("Node started on 127.0.0.1:9001") // 使用 KV 操作 server.Set("key1", "value1") val, ok := server.Get("key1") // 线性一致性读(保证读到最新数据) val, ok, err = server.GetLinear("key1") } ``` ### 多节点集群 **Node 1 (Bootstrap 节点):** ```go config := raft.DefaultConfig() config.NodeID = "node1" config.ListenAddr = "127.0.0.1:9001" config.DataDir = "data1" // 重要:每个节点不同目录 config.ClusterNodes = map[string]string{ "node1": "127.0.0.1:9001", } server, _ := raft.NewKVServer(config) server.Start() ``` **Node 2 & 3 (加入集群):** ```go // Node 2 config := raft.DefaultConfig() config.NodeID = "node2" config.ListenAddr = "127.0.0.1:9002" config.DataDir = "data2" // 重要:每个节点不同目录 config.ClusterNodes = map[string]string{ "node2": "127.0.0.1:9002", } server, _ := raft.NewKVServer(config) server.Start() ``` 然后通过客户端将节点加入集群: ```go // 通过 Leader 添加节点 client := raft.NewTCPTransport("127.0.0.1:9000", 10, logger) args := &raft.AddNodeArgs{NodeID: "node2", Address: "127.0.0.1:9002"} reply, err := client.ForwardAddNode(ctx, "127.0.0.1:9001", args) ``` ## 动态成员变更 ### 添加节点 ```go // 方式 1: 直接在 Leader 上操作 err := server.Raft.AddNode("node4", "127.0.0.1:9004") // 方式 2: 自动转发到 Leader(推荐) err := server.Raft.AddNodeWithForward("node4", "127.0.0.1:9004") // 方式 3: 通过 KVServer 封装 err := server.Join("node4", "127.0.0.1:9004") ``` ### 移除节点 ```go // 方式 1: 直接在 Leader 上操作 err := server.Raft.RemoveNode("node4") // 方式 2: 自动转发到 Leader(推荐) err := server.Raft.RemoveNodeWithForward("node4") // 方式 3: 通过 KVServer 封装 err := server.Leave("node4") ``` ### 获取集群成员 ```go nodes := server.GetClusterNodes() // 返回: map[string]string{"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002", ...} ``` ### 成员变更规则 1. **一次只能进行一个配置变更** - 如果有变更正在进行,会返回 `ErrConfigInFlight` 2. **使用旧集群多数派** - 在配置变更提交前,使用旧集群大小计算多数派 3. **配置变更作为日志条目** - 持久化到日志中,保证一致性 ## 日志压缩 (Snapshot) ### 自动压缩机制 日志压缩是**自动触发**的,但经过深度优化以平衡性能与安全性: 1. **异步检测**: 压缩检查在独立的 goroutine 中异步运行,绝不阻塞 Raft 主循环的请求处理。 2. **固定步长触发**: 摒弃了指数级增长的阈值,改用**固定步长**。 * 例如:配置阈值为 10,000。 * 触发点:每当日志比上次压缩后**净增 10,000 条**时,触发一次压缩。 * 优势:避免了初期过于频繁的压缩,也防止了后期日志无限膨胀,步长线性可控。 ```go // 默认配置 config.SnapshotThreshold = 100000 // 触发步长:每新增 10万条日志触发一次 config.SnapshotMinRetention = 10000 // 压缩后保留:1万条(用于 follower 快速追赶) ``` ### 压缩安全性与一致性 系统通过**强制同步机制**从根本上解决了“日志被删但数据未入库”的风险: 1. **快照阻塞等待**: 当 Raft 决定压缩到索引 `N` 时,会调用应用层的 `SnapshotProvider`。 2. **追赶确认**: `SnapshotProvider` 会**阻塞等待**,直到数据库(DB)确实已经应用了索引 `N`(及其之前)的所有日志。 3. **强制刷盘 (Sync)**: 在生成快照前,强制调用存储引擎的 `Sync()`,确保所有内存/OS Cache 中的数据物理写入磁盘。这防止了“日志已被删除但数据未落盘”的风险。 4. **原子截断**: 只有在成功获取到包含完整数据的快照后,Raft 才会执行物理日志文件的截断。 5. **写锁保护**: 在物理截断日志文件的短暂瞬间,系统持有写锁,天然暂停新的写入,确保文件操作的原子性和完整性。 这种设计实现了:**Raft 全速写入 -> 异步后台线程检测 -> 需要压缩时等待 DB 追赶 -> 强制刷盘 -> 安全执行物理压缩**,在性能和数据安全性之间取得了最佳平衡。 ### 快照保存的是状态 快照保存的是**状态机的最终状态**,不是操作历史: ```go // 变量 counter 被更新 10 万次 // 快照只保存最终值: {"counter": "100000"} ``` ### 压缩安全性 | 崩溃点 | 恢复后状态 | 是否一致 | |--------|------------|----------| | 保存快照前 | 无新快照,日志完整 | ✅ | | 保存快照后,压缩前 | 有快照,日志完整(冗余但正确) | ✅ | | 压缩过程中 | 快照存在,可正确恢复 | ✅ | | **异步等待期间** | **日志未动,DB 正在追赶,无数据丢失** | ✅ | ## 线性一致性读 (ReadIndex) ReadIndex 是 Raft 论文中描述的线性一致性读优化,无需写入日志即可保证读取最新数据。 ```go // 方式 1: 通过 KVServer val, ok, err := server.GetLinear("key") // 方式 2: 直接使用 ReadIndex readIndex, err := server.Raft.ReadIndex() if err == nil { // 等待 lastApplied >= readIndex 后读取本地状态机 val, ok := server.FSM.Get("key") } ``` ## 启动优化 (Fast Startup) 系统采用了**LastApplied 预热**机制来优化启动速度: 1. **读取 DB 状态**: `KVServer` 启动时,首先从 DB 引擎获取持久化的 `LastAppliedIndex`。 2. **初始化 Raft**: 将此 Index 注入 Raft 配置 (`Config.LastAppliedIndex`)。 3. **跳过回放**: Raft 初始化时直接将 `commitIndex` 和 `lastApplied` 设置为该值,跳过对已持久化日志的重复 Apply 过程。 这意味着即使有数百万条日志,只要它们已经应用到 DB,重启也是毫秒级的。 ## 配置选项 ```go config := raft.DefaultConfig() // 必需配置 config.NodeID = "node1" // 节点唯一标识 config.ListenAddr = "127.0.0.1:9001" // 监听地址 config.DataDir = "data1" // 数据目录(每个节点不同!) // 集群配置 config.ClusterNodes = map[string]string{ // 集群成员映射 "node1": "127.0.0.1:9001", } // 选举配置 config.ElectionTimeoutMin = 150 * time.Millisecond // 最小选举超时 config.ElectionTimeoutMax = 300 * time.Millisecond // 最大选举超时 config.HeartbeatInterval = 50 * time.Millisecond // 心跳间隔 // 日志配置 config.MaxLogEntriesPerRequest = 5000 // 单次 AppendEntries 最大条目数 config.MemoryLogCapacity = 10000 // 内存日志容量 // 快照配置 config.LogCompactionEnabled = false // 默认不启用日志压缩 (永远保留完整日志) config.SnapshotThreshold = 100000 // 压缩触发阈值(仅在 LogCompactionEnabled=true 时生效) config.SnapshotMinRetention = 10000 // 快照后保留的日志条数 config.SnapshotChunkSize = 1024 * 1024 // 快照分块大小 (1MB) // RPC 超时配置 config.RPCTimeout = 500 * time.Millisecond // 普通 RPC 超时 config.SnapshotRPCTimeout = 30 * time.Second // 快照传输超时 config.ProposeTimeout = 3 * time.Second // Propose 转发超时 // 批处理配置 config.BatchMinWait = 1 * time.Millisecond // 批处理最小等待 config.BatchMaxWait = 10 * time.Millisecond // 批处理最大等待 config.BatchMaxSize = 100 // 批处理最大大小 // 日志 config.Logger = raft.NewConsoleLogger("node1", 1) // 0=debug, 1=info, 2=warn, 3=error ``` ## 健康检查 & 监控 ```go // 健康状态 health := server.HealthCheck() fmt.Printf("State: %s, Term: %d, Leader: %s\n", health.State, health.Term, health.LeaderID) fmt.Printf("Cluster Size: %d, Healthy: %v\n", health.ClusterSize, health.IsHealthy) fmt.Printf("Commit: %d, Applied: %d, Behind: %d\n", health.CommitIndex, health.LastApplied, health.LogBehind) // 运行指标 metrics := server.GetMetrics() fmt.Printf("Proposals: total=%d success=%d failed=%d\n", metrics.ProposalsTotal, metrics.ProposalsSuccess, metrics.ProposalsFailed) fmt.Printf("Elections: started=%d won=%d\n", metrics.ElectionsStarted, metrics.ElectionsWon) fmt.Printf("Snapshots: taken=%d installed=%d\n", metrics.SnapshotsTaken, metrics.SnapshotsInstalled) ``` ### HealthStatus 字段 | 字段 | 类型 | 说明 | |------|------|------| | NodeID | string | 节点 ID | | State | string | 当前状态 (Leader/Follower/Candidate) | | Term | uint64 | 当前 Term | | LeaderID | string | 当前 Leader ID | | ClusterSize | int | 集群节点数 | | CommitIndex | uint64 | 已提交的日志索引 | | LastApplied | uint64 | 已应用的日志索引 | | LogBehind | uint64 | 落后的日志条数 | | IsHealthy | bool | 是否健康 | | Uptime | Duration | 运行时间 | ### Metrics 字段 | 字段 | 说明 | |------|------| | ProposalsTotal | 总 Propose 次数 | | ProposalsSuccess | 成功次数 | | ProposalsFailed | 失败次数 | | ProposalsForwarded | 转发次数 | | AppendsSent | 发送的 AppendEntries | | AppendsReceived | 接收的 AppendEntries | | ElectionsStarted | 发起的选举次数 | | ElectionsWon | 赢得的选举次数 | | SnapshotsTaken | 生成的快照数 | | SnapshotsInstalled | 安装的快照数 | | ReadIndexRequests | ReadIndex 请求数 | ## Leadership Transfer 主动将 Leader 角色转移给指定节点: ```go // 将 Leader 转移给 node2 err := server.TransferLeadership("node2") if err != nil { log.Printf("Transfer failed: %v", err) } ``` ## 自定义状态机 如果需要自定义状态机,可以直接使用底层 Raft: ```go // 创建 apply channel applyCh := make(chan raft.ApplyMsg, 100) // 创建 transport transport := raft.NewTCPTransport(config.ListenAddr, 10, config.Logger) // 创建 Raft 实例 r, err := raft.NewRaft(config, transport, applyCh) if err != nil { log.Fatal(err) } // 启动 r.Start() // 处理已提交的日志 go func() { for msg := range applyCh { if msg.CommandValid { // 应用命令到你的状态机 myStateMachine.Apply(msg.Command) } else if msg.SnapshotValid { // 从快照恢复状态机 myStateMachine.Restore(msg.Snapshot) } } }() // 提交命令 index, term, err := r.Propose([]byte("my-command")) // 带转发的提交(如果不是 Leader 会自动转发) index, term, err := r.ProposeWithForward([]byte("my-command")) ``` ## 错误处理 ```go import "errors" // 常见错误 var ( raft.ErrNoLeader // 没有 Leader raft.ErrNotLeader // 当前节点不是 Leader (内部使用,会自动重试/转发) raft.ErrConfigInFlight // 配置变更正在进行中 raft.ErrTimeout // 操作超时 raft.ErrShutdown // 节点正在关闭 raft.ErrLeadershipLost // 操作过程中丢失 Leader 身份 ) // 错误处理示例 err := server.Set("key", "value") if err != nil { // 客户端通常不需要手动处理 ErrNotLeader,因为 KVServer 会自动路由 log.Printf("Set failed: %v", err) } ``` ## 客户端使用 ```go // 创建客户端 Transport client := raft.NewTCPTransport("127.0.0.1:9000", 10, logger) // 提交命令 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() cmd := raft.KVCommand{Type: raft.KVSet, Key: "foo", Value: "bar"} data, _ := json.Marshal(cmd) args := &raft.ProposeArgs{Command: data} reply, err := client.ForwardPropose(ctx, "127.0.0.1:9001", args) if err != nil { log.Printf("Propose failed: %v", err) } // ReadIndex args := &raft.ReadIndexArgs{} reply, err := client.ReadIndex(ctx, leaderAddr, args) if reply.Success { fmt.Printf("ReadIndex: %d\n", reply.ReadIndex) } // 远程读取 args := &raft.GetArgs{Key: "foo"} reply, err := client.ForwardGet(ctx, nodeAddr, args) if reply.Found { fmt.Printf("Value: %s\n", reply.Value) } // 添加节点 args := &raft.AddNodeArgs{NodeID: "node4", Address: "127.0.0.1:9004"} reply, err := client.ForwardAddNode(ctx, leaderAddr, args) // 移除节点 args := &raft.RemoveNodeArgs{NodeID: "node4"} reply, err := client.ForwardRemoveNode(ctx, leaderAddr, args) ``` ## 高级特性:变更监听 (Watcher) Raft 模块提供了强大的变更监听功能,允许应用模块(如 AI 配置、安全模块)订阅特定的配置变更。 ### 特性 1. **实时推送**:配置变更提交后,立即触发回调。 2. **多级过滤**:支持精确匹配和前缀通配符匹配(`key.**`)。 3. **安全隔离**:不同模块的 Watcher 相互独立,互不影响。 4. **无重放**:系统启动时的快照恢复不会触发 Watcher,避免初始化风暴。 ### 最佳实践:模块化监听 (Handler Interface) 在生产环境中,建议定义通用的接口来解耦配置逻辑与业务模块。 #### 1. 定义通用接口 ```go // ConfigObserver 定义配置变更处理接口 type ConfigObserver interface { // OnConfigChange 当配置发生变化时被调用 OnConfigChange(key, value string) } ``` #### 2. 模块实现 (AI 示例) ```go type AIModule struct { threshold float64 modelPath string } // OnConfigChange 实现 ConfigObserver 接口 func (m *AIModule) OnConfigChange(key, value string) { switch key { case "ai.model.threshold": if val, err := strconv.ParseFloat(value, 64); err == nil { m.threshold = val log.Printf("[AI] Threshold updated to %.2f", val) } case "ai.model.path": m.modelPath = value log.Printf("[AI] Reloading model from %s...", value) // m.reloadModel() } } ``` #### 3. 模块实现 (安全模块示例) ```go type SecurityModule struct { firewall *FirewallClient } func (s *SecurityModule) OnConfigChange(key, value string) { // key: security.blocklist.192.168.1.1 ip := strings.TrimPrefix(key, "security.blocklist.") if value == "true" { log.Printf("[Security] Blocking malicious IP: %s", ip) s.firewall.Block(ip) } else { log.Printf("[Security] Unblocking IP: %s", ip) s.firewall.Unblock(ip) } } ``` #### 4. 注册监听 ```go func main() { // ... 初始化集群 ... // 初始化模块 aiMod := &AIModule{threshold: 0.5} secMod := &SecurityModule{firewall: NewFirewall()} // 注册 AI 模块监听 (通过方法值传递) xbase.ClusterWatch("ai.**", aiMod.OnConfigChange) // 注册安全模块监听 xbase.ClusterWatch("security.blocklist.**", secMod.OnConfigChange) // ... } ``` ## 测试 ### 运行单元测试 ```bash go test -v ./xnet/raft/... ``` ### 测试覆盖 | 测试 | 验证内容 | |------|----------| | `TestCompactionBasic` | 基本日志压缩和索引更新 | | `TestSnapshotSaveAndLoad` | 快照持久化和恢复 | | `TestCompactionWithKVServer` | 通过 KVServer 触发压缩 | | `TestDataConsistencyAfterCompaction` | 压缩后数据一致性 | | `TestCompactionDoesNotLoseData` | 压缩不丢失已提交数据 | | `TestDynamicCompactionThreshold` | 动态压缩阈值机制 | ## 文件结构 ``` xnet/raft/ ├── raft.go # Raft 核心实现 ├── types.go # 类型定义和配置 ├── log.go # 日志管理 ├── storage.go # 持久化存储 ├── rpc.go # RPC 和 Transport ├── server.go # KVServer 封装 ├── kv.go # KV 状态机 ├── codec.go # 编解码 ├── compaction_test.go # 压缩相关测试 └── README.md # 本文档 ``` ## 示例 完整示例参见 `doc/xnet/raft/` 目录: ```bash # 清理旧数据(每个节点的数据目录在其 main.go 所在文件夹下) rm -rf doc/xnet/raft/node1/data doc/xnet/raft/node2/data doc/xnet/raft/node3/data # 启动三节点集群(三个终端) go run ./doc/xnet/raft/node1/main.go # 终端 1 go run ./doc/xnet/raft/node2/main.go # 终端 2 go run ./doc/xnet/raft/node3/main.go # 终端 3 # 运行压力测试和一致性验证 go run ./doc/xnet/raft/stress_test/main.go ``` ### 压力测试输出示例 ``` === Raft Cluster Test Suite === === Phase 1: Cluster Formation === Adding Node 2... Adding Node 3... === Phase 2: Stress Test === Duration: 247ms Total ops: 10000 QPS: 40391.25 Success rate: 100.00% === Phase 3: Data Consistency Verification === Verifying 100 sampled keys across all nodes... ✅ All checked keys are CONSISTENT across all nodes! === Phase 4: ReadIndex Test === ReadIndex succeeded: readIndex=19884 === Phase 5: Write-then-Read Verification === 127.0.0.1:9001: ✓ Value matches 127.0.0.1:9002: ✓ Value matches 127.0.0.1:9003: ✓ Value matches ✅ Write-then-read verification PASSED! ``` ## 性能 基于 `stress_test` 的测试结果(3 节点本地集群): - **QPS**: ~40,000 ops/sec - **成功率**: 100% - **延迟**: < 1ms (本地) 实际性能取决于网络延迟、磁盘 I/O 和集群规模。 ## 注意事项 1. **数据目录**:**每个节点必须使用不同的 `DataDir`**,否则会数据冲突! 2. **端口冲突**:确保各节点监听地址不冲突 3. **时钟同步**:建议集群节点时钟同步(虽然 Raft 不依赖时钟) 4. **奇数节点**:推荐使用奇数个节点(3、5、7)以优化容错 5. **清理数据**:测试前建议清理旧的数据目录 ## 核心安全保证 | 特性 | 说明 | |------|------| | **选举限制** | 日志不完整的节点无法成为 Leader | | **PreVote** | 防止分区节点扰乱集群 | | **No-op 提交** | 新 Leader 立即提交前任 term 日志 | | **Lease Check** | Leader 必须保持多数派连接才能接受写请求,防止分区写入 | | **单配置变更** | 一次只允许一个成员变更 | | **动态压缩阈值** | 防止压缩风暴 | | **原子快照写入** | 使用 temp file + rename 保证原子性 |