# xnet/raft - Raft 分布式共识库 这是一个使用 Go 语言编写的功能完备的 Raft 共识算法实现。它支持动态成员变更、日志压缩(快照)、线性一致性读等高级特性,旨在为分布式系统提供强一致性的状态机复制能力。 ## 核心特性 - ✅ **领导者选举 (Leader Election)** - 包含 **PreVote (预投票)** 优化机制,有效防止网络分区或不稳定节点导致的 Term (任期) 无效膨胀,提高集群稳定性。 - ✅ **日志复制 (Log Replication)** - 支持 **Pipeline (流水线)** 和 **Batching (批量)** 复制优化,显著提升高并发下的写入吞吐量。 - ✅ **动态成员变更 (Dynamic Membership Changes)** - 支持在运行时动态添加或移除节点(实现为单节点变更算法),无需停机即可扩缩容。 - ✅ **自动日志压缩 (Log Compaction / Snapshotting)** - **自动触发**:基于日志增长量自动触发快照生成。 - **动态阈值**:采用动态调整的压缩阈值,防止在写入高峰期出现“压缩风暴”。 - **分块传输**:支持大快照的分块(Chunked)传输,避免大文件传输导致的网络超时。 - ✅ **线性一致性读 (Linearizable Reads / ReadIndex)** - 实现了 **ReadIndex** 优化,允许在不产生 Log Entry 的情况下执行强一致性读取,性能远优于 Log Read。 - ✅ **领导权转移 (Leadership Transfer)** - 支持主动将 Leader 角色平滑移交给指定节点,适用于停机维护或负载均衡场景。 - ✅ **自动请求路由 (Request Routing)** - Follower 节点会自动将写请求(以及配置的读请求)转发给当前的 Leader,客户端无需自行维护 Leader 状态。 - ✅ **Leader 租约检查 (Lease Check)** - Leader 在处理读写请求前会检查与多数派的心跳连接,防止网络分区下的脑裂(Split-Brain)写入。 - ✅ **持久化存储 (Persistence)** - 关键的 Raft 日志和状态(Term, Vote)均实时持久化到磁盘,保证重启后数据不丢失。 - ✅ **健康检查与监控 (Health Checks & Metrics)** - 内置详细的运行时指标(Metrics)和健康检查接口,方便运维监控。 ## 快速开始 ### 1. 基础用法:启动单节点服务 ```go package main import ( "log" "xbase/xnet/raft" ) func main() { // 1. 配置初始化 config := raft.DefaultConfig() config.NodeID = "node1" config.ListenAddr = "127.0.0.1:9001" config.DataDir = "data1" // 注意:每个节点必须使用独立的数据目录 config.Logger = raft.NewConsoleLogger("node1", 1) // 1=Info级别 // 初始集群配置 (Bootstrap) // 仅在集群首次启动时需要,用于定义初始成员 config.ClusterNodes = map[string]string{ "node1": "127.0.0.1:9001", } // 2. 创建 KV Server (包含 Raft 核心与内置状态机 DB) server, err := raft.NewKVServer(config) if err != nil { log.Fatalf("Failed to create server: %v", err) } // 3. 启动服务 if err := server.Start(); err != nil { log.Fatalf("Failed to start server: %v", err) } defer server.Stop() log.Println("Node started on 127.0.0.1:9001") // 4. 执行 KV 操作 // 写操作 (如果当前不是 Leader,会自动转发) if err := server.Set("key1", "value1"); err != nil { log.Printf("Set failed: %v", err) } // 本地读取 (高性能,但存在极低概率读到旧数据) if val, found := server.Get("key1"); found { log.Printf("Got value: %s", val) } // 线性一致性读 (强一致性,保证读到最新提交的数据) val, found, err := server.GetLinear("key1") if err != nil { log.Printf("Read failed: %v", err) } else if found { log.Printf("Consistent value: %s", val) } } ``` ### 2. 构建多节点集群 **Node 1 (引导节点 / Bootstrap Node):** Node 1 作为集群的初始节点启动。 ```go config := raft.DefaultConfig() config.NodeID = "node1" config.ListenAddr = "127.0.0.1:9001" config.DataDir = "data1" // 初始包含自己 config.ClusterNodes = map[string]string{ "node1": "127.0.0.1:9001", } // ... 启动 server ... ``` **Node 2 & 3 (加入节点):** 启动 Node 2 和 Node 3。注意:在启动时,它们的 `ClusterNodes` 可以留空或仅包含自己,因为它们稍后会被动态加入到集群中。 ```go // Node 2 配置 config.NodeID = "node2" config.ListenAddr = "127.0.0.1:9002" config.DataDir = "data2" // ... 启动 server ... ``` **执行加入操作:** 通过已存在的集群成员(例如 Node 1)发起 API 调用,将新节点加入集群。 ```go // 连接到 Node 1 (或直接在 Node 1 的代码中调用) // 将 Node 2 加入集群 err := server1.Join("node2", "127.0.0.1:9002") if err != nil { log.Printf("Failed to join node2: %v", err) } // 将 Node 3 加入集群 err := server1.Join("node3", "127.0.0.1:9003") ``` ## 命令行交互 (CLI) 系统内置了一个强大的交互式命令行工具,默认随节点启动。它提供了直观的界面来管理数据、监控集群状态以及执行运维操作。 **基础命令:** * **数据操作**: * `set ` / `get ` / `del `:标准的 KV 操作。 * `search [limit] [offset]`:搜索 Key(支持 SQL `LIKE` 模式)。 * `count `:统计符合模式的 Key 数量。 * **集群管理**: * `join `:将新节点加入集群。 * `leave `:将节点移出集群。 * **系统监控**: * `info`:实时展示节点健康状态、Raft 索引(Commit/Applied)、存储占用(DB/Log)、内存使用及集群成员连接状况。 **扩展命令(示例):** CLI 设计为可扩展的。在 `example/basic` 示例项目中,我们展示了如何通过 `common.RegisterDemoCommands` 注册自定义命令: * `demodata `:快速生成 n 条测试数据(例如 `demodata 1000 user-*`)。 * `deletedatas `:批量删除匹配模式的数据。 ## 高级功能详解 ### 动态成员变更 系统允许在不停机的情况下增删节点。为了保证安全性(Safety),系统一次只允许变更一个节点(Single-Server Membership Change)。 ```go // 添加节点 (自动转发请求到 Leader) err := server.Join("node4", "127.0.0.1:9004") // 移除节点 (自动转发请求到 Leader) err := server.Leave("node4") ``` ### 日志压缩 (Snapshots) 为了防止日志无限增长,系统实现了自动快照机制。 1. **触发机制**:当日志条目数超过 `config.SnapshotThreshold` 时触发压缩。 2. **安全性保障**: * **DB 追赶**:在生成快照前,Raft 会等待底层 DB 确实应用了所有日志。 * **强制落盘**:调用 `Sync()` 强制将 DB 数据刷入磁盘,防止“日志被删但数据还在 OS 缓存中”导致的数据丢失风险。 3. **分块传输**:对于大型快照,系统会自动将其切分为多个小块(Chunk)进行传输,默认每块 1MB,有效防止网络抖动导致的传输失败。 配置示例: ```go config.LogCompactionEnabled = true config.SnapshotThreshold = 100000 // 每新增 10万条日志触发一次压缩 config.SnapshotChunkSize = 1024 * 1024 // 快照分块大小为 1MB ``` ### 线性一致性读 (ReadIndex) `ReadIndex` 是 Raft 论文中描述的一种读优化机制,它使得读操作无需产生 Raft Log(即无需磁盘 IO),就能保证线性一致性(Linearizability)。 ```go val, found, err := server.GetLinear("key") ``` **工作流程**: 1. **确认领导权**:Leader 发送一轮心跳给 Followers,确保自己仍拥有多数派支持。 2. **获取 ReadIndex**:记录当前的 `CommitIndex` 作为 `ReadIndex`。 3. **等待应用**:等待状态机(DB)应用到 `ReadIndex`。 4. **读取数据**:从状态机读取数据并返回。 ### 领导权转移 (Leadership Transfer) 你可以主动将 Leader 角色转移给某个 Follower,这在需要重启当前 Leader 节点进行升级或维护时非常有用,可以最小化集群的不可用时间。 ```go // 将 Leader 转移给 node2 err := server.TransferLeadership("node2") ``` 机制:当前 Leader 会发送 `TimeoutNow` RPC 给目标节点,使其立即发起选举(并无视选举超时时间),从而大概率赢得选举。 ## 监控与运维 ### 健康检查 通过 `HealthCheck()` 接口可以实时获取节点状态。 ```go health := server.HealthCheck() fmt.Printf("节点: %s, 状态: %s, Leader: %s, 健康: %v\n", health.NodeID, health.State, health.LeaderID, health.IsHealthy) ``` ### 运行指标 (Metrics) 系统内置了详细的指标统计,可用于接入 Prometheus 等监控系统。 ```go metrics := server.GetMetrics() fmt.Printf("当前任期: %d\n", metrics.Term) fmt.Printf("总提案数: %d\n", metrics.ProposalsTotal) fmt.Printf("生成的快照数: %d\n", metrics.SnapshotsTaken) ``` ## 项目目录结构 ``` xnet/raft/ ├── raft.go # Raft 核心逻辑 (选举、日志复制、心跳) ├── types.go # 类型定义与配置结构体 ├── log.go # 日志管理 (LogManager) ├── storage.go # 存储接口与实现 (HybridStorage) ├── rpc.go # RPC 消息定义与网络传输层 ├── server.go # KVServer 封装 (Raft 与 DB 的胶水层) ├── db/ # 内置高性能 KV 存储引擎 │ ├── engine.go # 引擎核心实现 │ └── db.md # 引擎详细文档 └── README.md # 项目说明文档 ``` ## 性能表现 * **吞吐量**:在开启 Batching(批处理)和 Pipeline(流水线)优化后,单节点可支撑数万 QPS(取决于硬件配置)。 * **延迟**:本地读写延迟在亚毫秒级(Sub-millisecond)。 * **恢复速度**:利用 `LastAppliedIndex` 机制,重启时自动跳过已持久化的日志回放,实现秒级启动。