|
|
@@ -1,23 +1,35 @@
|
|
|
-# xnet/raft - Raft 共识库
|
|
|
-
|
|
|
-一个功能完整的 Raft 共识算法实现,支持动态成员变更、日志压缩、线性一致性读等特性。
|
|
|
-
|
|
|
-## 特性
|
|
|
-
|
|
|
-- ✅ **Leader 选举** - 包含 PreVote 优化,防止网络分区导致的 term 膨胀
|
|
|
-- ✅ **日志复制** - 支持批量复制和流水线优化
|
|
|
-- ✅ **动态成员变更** - 运行时添加/移除节点
|
|
|
-- ✅ **日志压缩 (Snapshot)** - 自动触发,支持分块传输,动态阈值防止压缩风暴
|
|
|
-- ✅ **线性一致性读 (ReadIndex)** - 保证读取最新已提交数据
|
|
|
-- ✅ **Leadership Transfer** - 主动转移 Leader 角色
|
|
|
-- ✅ **自动请求路由** - Follower 自动转发读写请求到 Leader,无需客户端处理
|
|
|
-- ✅ **Leader 租约检查 (Lease Check)** - 防止网络分区的 Leader 接受无法提交的写请求
|
|
|
-- ✅ **持久化存储** - 日志和状态持久化到磁盘
|
|
|
-- ✅ **健康检查 & 监控指标** - 内置 Metrics 支持
|
|
|
+# xnet/raft - Raft 分布式共识库
|
|
|
+
|
|
|
+这是一个使用 Go 语言编写的功能完备的 Raft 共识算法实现。它支持动态成员变更、日志压缩(快照)、线性一致性读等高级特性,旨在为分布式系统提供强一致性的状态机复制能力。
|
|
|
+
|
|
|
+## 核心特性
|
|
|
+
|
|
|
+- ✅ **领导者选举 (Leader Election)**
|
|
|
+ - 包含 **PreVote (预投票)** 优化机制,有效防止网络分区或不稳定节点导致的 Term (任期) 无效膨胀,提高集群稳定性。
|
|
|
+- ✅ **日志复制 (Log Replication)**
|
|
|
+ - 支持 **Pipeline (流水线)** 和 **Batching (批量)** 复制优化,显著提升高并发下的写入吞吐量。
|
|
|
+- ✅ **动态成员变更 (Dynamic Membership Changes)**
|
|
|
+ - 支持在运行时动态添加或移除节点(实现为单节点变更算法),无需停机即可扩缩容。
|
|
|
+- ✅ **自动日志压缩 (Log Compaction / Snapshotting)**
|
|
|
+ - **自动触发**:基于日志增长量自动触发快照生成。
|
|
|
+ - **动态阈值**:采用动态调整的压缩阈值,防止在写入高峰期出现“压缩风暴”。
|
|
|
+ - **分块传输**:支持大快照的分块(Chunked)传输,避免大文件传输导致的网络超时。
|
|
|
+- ✅ **线性一致性读 (Linearizable Reads / ReadIndex)**
|
|
|
+ - 实现了 **ReadIndex** 优化,允许在不产生 Log Entry 的情况下执行强一致性读取,性能远优于 Log Read。
|
|
|
+- ✅ **领导权转移 (Leadership Transfer)**
|
|
|
+ - 支持主动将 Leader 角色平滑移交给指定节点,适用于停机维护或负载均衡场景。
|
|
|
+- ✅ **自动请求路由 (Request Routing)**
|
|
|
+ - Follower 节点会自动将写请求(以及配置的读请求)转发给当前的 Leader,客户端无需自行维护 Leader 状态。
|
|
|
+- ✅ **Leader 租约检查 (Lease Check)**
|
|
|
+ - Leader 在处理读写请求前会检查与多数派的心跳连接,防止网络分区下的脑裂(Split-Brain)写入。
|
|
|
+- ✅ **持久化存储 (Persistence)**
|
|
|
+ - 关键的 Raft 日志和状态(Term, Vote)均实时持久化到磁盘,保证重启后数据不丢失。
|
|
|
+- ✅ **健康检查与监控 (Health Checks & Metrics)**
|
|
|
+ - 内置详细的运行时指标(Metrics)和健康检查接口,方便运维监控。
|
|
|
|
|
|
## 快速开始
|
|
|
|
|
|
-### 基本用法
|
|
|
+### 1. 基础用法:启动单节点服务
|
|
|
|
|
|
```go
|
|
|
package main
|
|
|
@@ -28,603 +40,198 @@ import (
|
|
|
)
|
|
|
|
|
|
func main() {
|
|
|
- // 使用默认配置
|
|
|
+ // 1. 配置初始化
|
|
|
config := raft.DefaultConfig()
|
|
|
config.NodeID = "node1"
|
|
|
config.ListenAddr = "127.0.0.1:9001"
|
|
|
- config.DataDir = "data1" // 每个节点使用不同目录!
|
|
|
- config.Logger = raft.NewConsoleLogger("node1", 1)
|
|
|
+ config.DataDir = "data1" // 注意:每个节点必须使用独立的数据目录
|
|
|
+ config.Logger = raft.NewConsoleLogger("node1", 1) // 1=Info级别
|
|
|
|
|
|
- // 初始集群配置(单节点启动)
|
|
|
+ // 初始集群配置 (Bootstrap)
|
|
|
+ // 仅在集群首次启动时需要,用于定义初始成员
|
|
|
config.ClusterNodes = map[string]string{
|
|
|
"node1": "127.0.0.1:9001",
|
|
|
}
|
|
|
|
|
|
- // 创建 KV Server(内置状态机)
|
|
|
+ // 2. 创建 KV Server (包含 Raft 核心与内置状态机 DB)
|
|
|
server, err := raft.NewKVServer(config)
|
|
|
if err != nil {
|
|
|
log.Fatalf("Failed to create server: %v", err)
|
|
|
}
|
|
|
|
|
|
- // 启动服务
|
|
|
+ // 3. 启动服务
|
|
|
if err := server.Start(); err != nil {
|
|
|
log.Fatalf("Failed to start server: %v", err)
|
|
|
}
|
|
|
+ defer server.Stop()
|
|
|
|
|
|
log.Println("Node started on 127.0.0.1:9001")
|
|
|
|
|
|
- // 使用 KV 操作
|
|
|
- server.Set("key1", "value1")
|
|
|
- val, ok := server.Get("key1")
|
|
|
+ // 4. 执行 KV 操作
|
|
|
+
|
|
|
+ // 写操作 (如果当前不是 Leader,会自动转发)
|
|
|
+ if err := server.Set("key1", "value1"); err != nil {
|
|
|
+ log.Printf("Set failed: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 本地读取 (高性能,但存在极低概率读到旧数据)
|
|
|
+ if val, found := server.Get("key1"); found {
|
|
|
+ log.Printf("Got value: %s", val)
|
|
|
+ }
|
|
|
|
|
|
- // 线性一致性读(保证读到最新数据)
|
|
|
- val, ok, err = server.GetLinear("key1")
|
|
|
+ // 线性一致性读 (强一致性,保证读到最新提交的数据)
|
|
|
+ val, found, err := server.GetLinear("key1")
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("Read failed: %v", err)
|
|
|
+ } else if found {
|
|
|
+ log.Printf("Consistent value: %s", val)
|
|
|
+ }
|
|
|
}
|
|
|
```
|
|
|
|
|
|
-### 多节点集群
|
|
|
+### 2. 构建多节点集群
|
|
|
+
|
|
|
+**Node 1 (引导节点 / Bootstrap Node):**
|
|
|
|
|
|
-**Node 1 (Bootstrap 节点):**
|
|
|
+Node 1 作为集群的初始节点启动。
|
|
|
|
|
|
```go
|
|
|
config := raft.DefaultConfig()
|
|
|
config.NodeID = "node1"
|
|
|
config.ListenAddr = "127.0.0.1:9001"
|
|
|
-config.DataDir = "data1" // 重要:每个节点不同目录
|
|
|
+config.DataDir = "data1"
|
|
|
+// 初始包含自己
|
|
|
config.ClusterNodes = map[string]string{
|
|
|
"node1": "127.0.0.1:9001",
|
|
|
}
|
|
|
-
|
|
|
-server, _ := raft.NewKVServer(config)
|
|
|
-server.Start()
|
|
|
+// ... 启动 server ...
|
|
|
```
|
|
|
|
|
|
-**Node 2 & 3 (加入集群):**
|
|
|
+**Node 2 & 3 (加入节点):**
|
|
|
+
|
|
|
+启动 Node 2 和 Node 3。注意:在启动时,它们的 `ClusterNodes` 可以留空或仅包含自己,因为它们稍后会被动态加入到集群中。
|
|
|
|
|
|
```go
|
|
|
-// Node 2
|
|
|
-config := raft.DefaultConfig()
|
|
|
+// Node 2 配置
|
|
|
config.NodeID = "node2"
|
|
|
config.ListenAddr = "127.0.0.1:9002"
|
|
|
-config.DataDir = "data2" // 重要:每个节点不同目录
|
|
|
-config.ClusterNodes = map[string]string{
|
|
|
- "node2": "127.0.0.1:9002",
|
|
|
-}
|
|
|
-
|
|
|
-server, _ := raft.NewKVServer(config)
|
|
|
-server.Start()
|
|
|
-```
|
|
|
-
|
|
|
-然后通过客户端将节点加入集群:
|
|
|
-
|
|
|
-```go
|
|
|
-// 通过 Leader 添加节点
|
|
|
-client := raft.NewTCPTransport("127.0.0.1:9000", 10, logger)
|
|
|
-args := &raft.AddNodeArgs{NodeID: "node2", Address: "127.0.0.1:9002"}
|
|
|
-reply, err := client.ForwardAddNode(ctx, "127.0.0.1:9001", args)
|
|
|
+config.DataDir = "data2"
|
|
|
+// ... 启动 server ...
|
|
|
```
|
|
|
|
|
|
-## 动态成员变更
|
|
|
+**执行加入操作:**
|
|
|
|
|
|
-### 添加节点
|
|
|
+通过已存在的集群成员(例如 Node 1)发起 API 调用,将新节点加入集群。
|
|
|
|
|
|
```go
|
|
|
-// 方式 1: 直接在 Leader 上操作
|
|
|
-err := server.Raft.AddNode("node4", "127.0.0.1:9004")
|
|
|
-
|
|
|
-// 方式 2: 自动转发到 Leader(推荐)
|
|
|
-err := server.Raft.AddNodeWithForward("node4", "127.0.0.1:9004")
|
|
|
-
|
|
|
-// 方式 3: 通过 KVServer 封装
|
|
|
-err := server.Join("node4", "127.0.0.1:9004")
|
|
|
-```
|
|
|
-
|
|
|
-### 移除节点
|
|
|
-
|
|
|
-```go
|
|
|
-// 方式 1: 直接在 Leader 上操作
|
|
|
-err := server.Raft.RemoveNode("node4")
|
|
|
-
|
|
|
-// 方式 2: 自动转发到 Leader(推荐)
|
|
|
-err := server.Raft.RemoveNodeWithForward("node4")
|
|
|
-
|
|
|
-// 方式 3: 通过 KVServer 封装
|
|
|
-err := server.Leave("node4")
|
|
|
-```
|
|
|
-
|
|
|
-### 获取集群成员
|
|
|
+// 连接到 Node 1 (或直接在 Node 1 的代码中调用)
|
|
|
+// 将 Node 2 加入集群
|
|
|
+err := server1.Join("node2", "127.0.0.1:9002")
|
|
|
+if err != nil {
|
|
|
+ log.Printf("Failed to join node2: %v", err)
|
|
|
+}
|
|
|
|
|
|
-```go
|
|
|
-nodes := server.GetClusterNodes()
|
|
|
-// 返回: map[string]string{"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002", ...}
|
|
|
+// 将 Node 3 加入集群
|
|
|
+err := server1.Join("node3", "127.0.0.1:9003")
|
|
|
```
|
|
|
|
|
|
-### 成员变更规则
|
|
|
+## 高级功能详解
|
|
|
|
|
|
-1. **一次只能进行一个配置变更** - 如果有变更正在进行,会返回 `ErrConfigInFlight`
|
|
|
-2. **使用旧集群多数派** - 在配置变更提交前,使用旧集群大小计算多数派
|
|
|
-3. **配置变更作为日志条目** - 持久化到日志中,保证一致性
|
|
|
+### 动态成员变更
|
|
|
|
|
|
-## 日志压缩 (Snapshot)
|
|
|
-
|
|
|
-### 自动压缩机制
|
|
|
-
|
|
|
-日志压缩是**自动触发**的,但经过深度优化以平衡性能与安全性:
|
|
|
-
|
|
|
-1. **异步检测**: 压缩检查在独立的 goroutine 中异步运行,绝不阻塞 Raft 主循环的请求处理。
|
|
|
-2. **固定步长触发**: 摒弃了指数级增长的阈值,改用**固定步长**。
|
|
|
- * 例如:配置阈值为 10,000。
|
|
|
- * 触发点:每当日志比上次压缩后**净增 10,000 条**时,触发一次压缩。
|
|
|
- * 优势:避免了初期过于频繁的压缩,也防止了后期日志无限膨胀,步长线性可控。
|
|
|
+系统允许在不停机的情况下增删节点。为了保证安全性(Safety),系统一次只允许变更一个节点(Single-Server Membership Change)。
|
|
|
|
|
|
```go
|
|
|
-// 默认配置
|
|
|
-config.SnapshotThreshold = 100000 // 触发步长:每新增 10万条日志触发一次
|
|
|
-config.SnapshotMinRetention = 10000 // 压缩后保留:1万条(用于 follower 快速追赶)
|
|
|
-```
|
|
|
-
|
|
|
-### 压缩安全性与一致性
|
|
|
-
|
|
|
-系统通过**强制同步机制**从根本上解决了“日志被删但数据未入库”的风险:
|
|
|
-
|
|
|
-1. **快照阻塞等待**: 当 Raft 决定压缩到索引 `N` 时,会调用应用层的 `SnapshotProvider`。
|
|
|
-2. **追赶确认**: `SnapshotProvider` 会**阻塞等待**,直到数据库(DB)确实已经应用了索引 `N`(及其之前)的所有日志。
|
|
|
-3. **强制刷盘 (Sync)**: 在生成快照前,强制调用存储引擎的 `Sync()`,确保所有内存/OS Cache 中的数据物理写入磁盘。这防止了“日志已被删除但数据未落盘”的风险。
|
|
|
-4. **原子截断**: 只有在成功获取到包含完整数据的快照后,Raft 才会执行物理日志文件的截断。
|
|
|
-5. **写锁保护**: 在物理截断日志文件的短暂瞬间,系统持有写锁,天然暂停新的写入,确保文件操作的原子性和完整性。
|
|
|
-
|
|
|
-这种设计实现了:**Raft 全速写入 -> 异步后台线程检测 -> 需要压缩时等待 DB 追赶 -> 强制刷盘 -> 安全执行物理压缩**,在性能和数据安全性之间取得了最佳平衡。
|
|
|
-
|
|
|
-### 快照保存的是状态
|
|
|
-
|
|
|
-快照保存的是**状态机的最终状态**,不是操作历史:
|
|
|
+// 添加节点 (自动转发请求到 Leader)
|
|
|
+err := server.Join("node4", "127.0.0.1:9004")
|
|
|
|
|
|
-```go
|
|
|
-// 变量 counter 被更新 10 万次
|
|
|
-// 快照只保存最终值: {"counter": "100000"}
|
|
|
+// 移除节点 (自动转发请求到 Leader)
|
|
|
+err := server.Leave("node4")
|
|
|
```
|
|
|
|
|
|
-### 压缩安全性
|
|
|
-
|
|
|
-| 崩溃点 | 恢复后状态 | 是否一致 |
|
|
|
-|--------|------------|----------|
|
|
|
-| 保存快照前 | 无新快照,日志完整 | ✅ |
|
|
|
-| 保存快照后,压缩前 | 有快照,日志完整(冗余但正确) | ✅ |
|
|
|
-| 压缩过程中 | 快照存在,可正确恢复 | ✅ |
|
|
|
-| **异步等待期间** | **日志未动,DB 正在追赶,无数据丢失** | ✅ |
|
|
|
+### 日志压缩 (Snapshots)
|
|
|
|
|
|
-## 线性一致性读 (ReadIndex)
|
|
|
+为了防止日志无限增长,系统实现了自动快照机制。
|
|
|
|
|
|
-ReadIndex 是 Raft 论文中描述的线性一致性读优化,无需写入日志即可保证读取最新数据。
|
|
|
+1. **触发机制**:当日志条目数超过 `config.SnapshotThreshold` 时触发压缩。
|
|
|
+2. **安全性保障**:
|
|
|
+ * **DB 追赶**:在生成快照前,Raft 会等待底层 DB 确实应用了所有日志。
|
|
|
+ * **强制落盘**:调用 `Sync()` 强制将 DB 数据刷入磁盘,防止“日志被删但数据还在 OS 缓存中”导致的数据丢失风险。
|
|
|
+3. **分块传输**:对于大型快照,系统会自动将其切分为多个小块(Chunk)进行传输,默认每块 1MB,有效防止网络抖动导致的传输失败。
|
|
|
|
|
|
+配置示例:
|
|
|
```go
|
|
|
-// 方式 1: 通过 KVServer
|
|
|
-val, ok, err := server.GetLinear("key")
|
|
|
-
|
|
|
-// 方式 2: 直接使用 ReadIndex
|
|
|
-readIndex, err := server.Raft.ReadIndex()
|
|
|
-if err == nil {
|
|
|
- // 等待 lastApplied >= readIndex 后读取本地状态机
|
|
|
- val, ok := server.FSM.Get("key")
|
|
|
-}
|
|
|
+config.LogCompactionEnabled = true
|
|
|
+config.SnapshotThreshold = 100000 // 每新增 10万条日志触发一次压缩
|
|
|
+config.SnapshotChunkSize = 1024 * 1024 // 快照分块大小为 1MB
|
|
|
```
|
|
|
|
|
|
-## 启动优化 (Fast Startup)
|
|
|
-
|
|
|
-系统采用了**LastApplied 预热**机制来优化启动速度:
|
|
|
-
|
|
|
-1. **读取 DB 状态**: `KVServer` 启动时,首先从 DB 引擎获取持久化的 `LastAppliedIndex`。
|
|
|
-2. **初始化 Raft**: 将此 Index 注入 Raft 配置 (`Config.LastAppliedIndex`)。
|
|
|
-3. **跳过回放**: Raft 初始化时直接将 `commitIndex` 和 `lastApplied` 设置为该值,跳过对已持久化日志的重复 Apply 过程。
|
|
|
+### 线性一致性读 (ReadIndex)
|
|
|
|
|
|
-这意味着即使有数百万条日志,只要它们已经应用到 DB,重启也是毫秒级的。
|
|
|
-
|
|
|
-## 配置选项
|
|
|
+`ReadIndex` 是 Raft 论文中描述的一种读优化机制,它使得读操作无需产生 Raft Log(即无需磁盘 IO),就能保证线性一致性(Linearizability)。
|
|
|
|
|
|
```go
|
|
|
-config := raft.DefaultConfig()
|
|
|
-
|
|
|
-// 必需配置
|
|
|
-config.NodeID = "node1" // 节点唯一标识
|
|
|
-config.ListenAddr = "127.0.0.1:9001" // 监听地址
|
|
|
-config.DataDir = "data1" // 数据目录(每个节点不同!)
|
|
|
-
|
|
|
-// 集群配置
|
|
|
-config.ClusterNodes = map[string]string{ // 集群成员映射
|
|
|
- "node1": "127.0.0.1:9001",
|
|
|
-}
|
|
|
-
|
|
|
-// 选举配置
|
|
|
-config.ElectionTimeoutMin = 150 * time.Millisecond // 最小选举超时
|
|
|
-config.ElectionTimeoutMax = 300 * time.Millisecond // 最大选举超时
|
|
|
-config.HeartbeatInterval = 50 * time.Millisecond // 心跳间隔
|
|
|
-
|
|
|
-// 日志配置
|
|
|
-config.MaxLogEntriesPerRequest = 5000 // 单次 AppendEntries 最大条目数
|
|
|
-config.MemoryLogCapacity = 10000 // 内存日志容量
|
|
|
-
|
|
|
-// 快照配置
|
|
|
-config.LogCompactionEnabled = false // 默认不启用日志压缩 (永远保留完整日志)
|
|
|
-config.SnapshotThreshold = 100000 // 压缩触发阈值(仅在 LogCompactionEnabled=true 时生效)
|
|
|
-config.SnapshotMinRetention = 10000 // 快照后保留的日志条数
|
|
|
-config.SnapshotChunkSize = 1024 * 1024 // 快照分块大小 (1MB)
|
|
|
-
|
|
|
-// RPC 超时配置
|
|
|
-config.RPCTimeout = 500 * time.Millisecond // 普通 RPC 超时
|
|
|
-config.SnapshotRPCTimeout = 30 * time.Second // 快照传输超时
|
|
|
-config.ProposeTimeout = 3 * time.Second // Propose 转发超时
|
|
|
-
|
|
|
-// 批处理配置
|
|
|
-config.BatchMinWait = 1 * time.Millisecond // 批处理最小等待
|
|
|
-config.BatchMaxWait = 10 * time.Millisecond // 批处理最大等待
|
|
|
-config.BatchMaxSize = 100 // 批处理最大大小
|
|
|
-
|
|
|
-// 日志
|
|
|
-config.Logger = raft.NewConsoleLogger("node1", 1) // 0=debug, 1=info, 2=warn, 3=error
|
|
|
+val, found, err := server.GetLinear("key")
|
|
|
```
|
|
|
|
|
|
-## 健康检查 & 监控
|
|
|
+**工作流程**:
|
|
|
+1. **确认领导权**:Leader 发送一轮心跳给 Followers,确保自己仍拥有多数派支持。
|
|
|
+2. **获取 ReadIndex**:记录当前的 `CommitIndex` 作为 `ReadIndex`。
|
|
|
+3. **等待应用**:等待状态机(DB)应用到 `ReadIndex`。
|
|
|
+4. **读取数据**:从状态机读取数据并返回。
|
|
|
|
|
|
-```go
|
|
|
-// 健康状态
|
|
|
-health := server.HealthCheck()
|
|
|
-fmt.Printf("State: %s, Term: %d, Leader: %s\n", health.State, health.Term, health.LeaderID)
|
|
|
-fmt.Printf("Cluster Size: %d, Healthy: %v\n", health.ClusterSize, health.IsHealthy)
|
|
|
-fmt.Printf("Commit: %d, Applied: %d, Behind: %d\n", health.CommitIndex, health.LastApplied, health.LogBehind)
|
|
|
-
|
|
|
-// 运行指标
|
|
|
-metrics := server.GetMetrics()
|
|
|
-fmt.Printf("Proposals: total=%d success=%d failed=%d\n",
|
|
|
- metrics.ProposalsTotal, metrics.ProposalsSuccess, metrics.ProposalsFailed)
|
|
|
-fmt.Printf("Elections: started=%d won=%d\n", metrics.ElectionsStarted, metrics.ElectionsWon)
|
|
|
-fmt.Printf("Snapshots: taken=%d installed=%d\n", metrics.SnapshotsTaken, metrics.SnapshotsInstalled)
|
|
|
-```
|
|
|
+### 领导权转移 (Leadership Transfer)
|
|
|
|
|
|
-### HealthStatus 字段
|
|
|
-
|
|
|
-| 字段 | 类型 | 说明 |
|
|
|
-|------|------|------|
|
|
|
-| NodeID | string | 节点 ID |
|
|
|
-| State | string | 当前状态 (Leader/Follower/Candidate) |
|
|
|
-| Term | uint64 | 当前 Term |
|
|
|
-| LeaderID | string | 当前 Leader ID |
|
|
|
-| ClusterSize | int | 集群节点数 |
|
|
|
-| CommitIndex | uint64 | 已提交的日志索引 |
|
|
|
-| LastApplied | uint64 | 已应用的日志索引 |
|
|
|
-| LogBehind | uint64 | 落后的日志条数 |
|
|
|
-| IsHealthy | bool | 是否健康 |
|
|
|
-| Uptime | Duration | 运行时间 |
|
|
|
-
|
|
|
-### Metrics 字段
|
|
|
-
|
|
|
-| 字段 | 说明 |
|
|
|
-|------|------|
|
|
|
-| ProposalsTotal | 总 Propose 次数 |
|
|
|
-| ProposalsSuccess | 成功次数 |
|
|
|
-| ProposalsFailed | 失败次数 |
|
|
|
-| ProposalsForwarded | 转发次数 |
|
|
|
-| AppendsSent | 发送的 AppendEntries |
|
|
|
-| AppendsReceived | 接收的 AppendEntries |
|
|
|
-| ElectionsStarted | 发起的选举次数 |
|
|
|
-| ElectionsWon | 赢得的选举次数 |
|
|
|
-| SnapshotsTaken | 生成的快照数 |
|
|
|
-| SnapshotsInstalled | 安装的快照数 |
|
|
|
-| ReadIndexRequests | ReadIndex 请求数 |
|
|
|
-
|
|
|
-## Leadership Transfer
|
|
|
-
|
|
|
-主动将 Leader 角色转移给指定节点:
|
|
|
+你可以主动将 Leader 角色转移给某个 Follower,这在需要重启当前 Leader 节点进行升级或维护时非常有用,可以最小化集群的不可用时间。
|
|
|
|
|
|
```go
|
|
|
// 将 Leader 转移给 node2
|
|
|
err := server.TransferLeadership("node2")
|
|
|
-if err != nil {
|
|
|
- log.Printf("Transfer failed: %v", err)
|
|
|
-}
|
|
|
```
|
|
|
|
|
|
-## 自定义状态机
|
|
|
+机制:当前 Leader 会发送 `TimeoutNow` RPC 给目标节点,使其立即发起选举(并无视选举超时时间),从而大概率赢得选举。
|
|
|
|
|
|
-如果需要自定义状态机,可以直接使用底层 Raft:
|
|
|
+## 监控与运维
|
|
|
|
|
|
-```go
|
|
|
-// 创建 apply channel
|
|
|
-applyCh := make(chan raft.ApplyMsg, 100)
|
|
|
+### 健康检查
|
|
|
|
|
|
-// 创建 transport
|
|
|
-transport := raft.NewTCPTransport(config.ListenAddr, 10, config.Logger)
|
|
|
-
|
|
|
-// 创建 Raft 实例
|
|
|
-r, err := raft.NewRaft(config, transport, applyCh)
|
|
|
-if err != nil {
|
|
|
- log.Fatal(err)
|
|
|
-}
|
|
|
-
|
|
|
-// 启动
|
|
|
-r.Start()
|
|
|
-
|
|
|
-// 处理已提交的日志
|
|
|
-go func() {
|
|
|
- for msg := range applyCh {
|
|
|
- if msg.CommandValid {
|
|
|
- // 应用命令到你的状态机
|
|
|
- myStateMachine.Apply(msg.Command)
|
|
|
- } else if msg.SnapshotValid {
|
|
|
- // 从快照恢复状态机
|
|
|
- myStateMachine.Restore(msg.Snapshot)
|
|
|
- }
|
|
|
- }
|
|
|
-}()
|
|
|
-
|
|
|
-// 提交命令
|
|
|
-index, term, err := r.Propose([]byte("my-command"))
|
|
|
-
|
|
|
-// 带转发的提交(如果不是 Leader 会自动转发)
|
|
|
-index, term, err := r.ProposeWithForward([]byte("my-command"))
|
|
|
-```
|
|
|
-
|
|
|
-## 错误处理
|
|
|
+通过 `HealthCheck()` 接口可以实时获取节点状态。
|
|
|
|
|
|
```go
|
|
|
-import "errors"
|
|
|
-
|
|
|
-// 常见错误
|
|
|
-var (
|
|
|
- raft.ErrNoLeader // 没有 Leader
|
|
|
- raft.ErrNotLeader // 当前节点不是 Leader (内部使用,会自动重试/转发)
|
|
|
- raft.ErrConfigInFlight // 配置变更正在进行中
|
|
|
- raft.ErrTimeout // 操作超时
|
|
|
- raft.ErrShutdown // 节点正在关闭
|
|
|
- raft.ErrLeadershipLost // 操作过程中丢失 Leader 身份
|
|
|
-)
|
|
|
-
|
|
|
-// 错误处理示例
|
|
|
-err := server.Set("key", "value")
|
|
|
-if err != nil {
|
|
|
- // 客户端通常不需要手动处理 ErrNotLeader,因为 KVServer 会自动路由
|
|
|
- log.Printf("Set failed: %v", err)
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-## 客户端使用
|
|
|
-
|
|
|
-```go
|
|
|
-// 创建客户端 Transport
|
|
|
-client := raft.NewTCPTransport("127.0.0.1:9000", 10, logger)
|
|
|
-
|
|
|
-// 提交命令
|
|
|
-ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
-defer cancel()
|
|
|
-
|
|
|
-cmd := raft.KVCommand{Type: raft.KVSet, Key: "foo", Value: "bar"}
|
|
|
-data, _ := json.Marshal(cmd)
|
|
|
-args := &raft.ProposeArgs{Command: data}
|
|
|
-
|
|
|
-reply, err := client.ForwardPropose(ctx, "127.0.0.1:9001", args)
|
|
|
-if err != nil {
|
|
|
- log.Printf("Propose failed: %v", err)
|
|
|
-}
|
|
|
-
|
|
|
-// ReadIndex
|
|
|
-args := &raft.ReadIndexArgs{}
|
|
|
-reply, err := client.ReadIndex(ctx, leaderAddr, args)
|
|
|
-if reply.Success {
|
|
|
- fmt.Printf("ReadIndex: %d\n", reply.ReadIndex)
|
|
|
-}
|
|
|
-
|
|
|
-// 远程读取
|
|
|
-args := &raft.GetArgs{Key: "foo"}
|
|
|
-reply, err := client.ForwardGet(ctx, nodeAddr, args)
|
|
|
-if reply.Found {
|
|
|
- fmt.Printf("Value: %s\n", reply.Value)
|
|
|
-}
|
|
|
-
|
|
|
-// 添加节点
|
|
|
-args := &raft.AddNodeArgs{NodeID: "node4", Address: "127.0.0.1:9004"}
|
|
|
-reply, err := client.ForwardAddNode(ctx, leaderAddr, args)
|
|
|
-
|
|
|
-// 移除节点
|
|
|
-args := &raft.RemoveNodeArgs{NodeID: "node4"}
|
|
|
-reply, err := client.ForwardRemoveNode(ctx, leaderAddr, args)
|
|
|
-```
|
|
|
-
|
|
|
-## 高级特性:变更监听 (Watcher)
|
|
|
-
|
|
|
-Raft 模块提供了强大的变更监听功能,允许应用模块(如 AI 配置、安全模块)订阅特定的配置变更。
|
|
|
-
|
|
|
-### 特性
|
|
|
-
|
|
|
-1. **实时推送**:配置变更提交后,立即触发回调。
|
|
|
-2. **多级过滤**:支持精确匹配和前缀通配符匹配(`key.**`)。
|
|
|
-3. **安全隔离**:不同模块的 Watcher 相互独立,互不影响。
|
|
|
-4. **无重放**:系统启动时的快照恢复不会触发 Watcher,避免初始化风暴。
|
|
|
-
|
|
|
-### 最佳实践:模块化监听 (Handler Interface)
|
|
|
-
|
|
|
-在生产环境中,建议定义通用的接口来解耦配置逻辑与业务模块。
|
|
|
-
|
|
|
-#### 1. 定义通用接口
|
|
|
-
|
|
|
-```go
|
|
|
-// ConfigObserver 定义配置变更处理接口
|
|
|
-type ConfigObserver interface {
|
|
|
- // OnConfigChange 当配置发生变化时被调用
|
|
|
- OnConfigChange(key, value string)
|
|
|
-}
|
|
|
+health := server.HealthCheck()
|
|
|
+fmt.Printf("节点: %s, 状态: %s, Leader: %s, 健康: %v\n",
|
|
|
+ health.NodeID, health.State, health.LeaderID, health.IsHealthy)
|
|
|
```
|
|
|
|
|
|
-#### 2. 模块实现 (AI 示例)
|
|
|
+### 运行指标 (Metrics)
|
|
|
|
|
|
-```go
|
|
|
-type AIModule struct {
|
|
|
- threshold float64
|
|
|
- modelPath string
|
|
|
-}
|
|
|
-
|
|
|
-// OnConfigChange 实现 ConfigObserver 接口
|
|
|
-func (m *AIModule) OnConfigChange(key, value string) {
|
|
|
- switch key {
|
|
|
- case "ai.model.threshold":
|
|
|
- if val, err := strconv.ParseFloat(value, 64); err == nil {
|
|
|
- m.threshold = val
|
|
|
- log.Printf("[AI] Threshold updated to %.2f", val)
|
|
|
- }
|
|
|
- case "ai.model.path":
|
|
|
- m.modelPath = value
|
|
|
- log.Printf("[AI] Reloading model from %s...", value)
|
|
|
- // m.reloadModel()
|
|
|
- }
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-#### 3. 模块实现 (安全模块示例)
|
|
|
+系统内置了详细的指标统计,可用于接入 Prometheus 等监控系统。
|
|
|
|
|
|
```go
|
|
|
-type SecurityModule struct {
|
|
|
- firewall *FirewallClient
|
|
|
-}
|
|
|
-
|
|
|
-func (s *SecurityModule) OnConfigChange(key, value string) {
|
|
|
- // key: security.blocklist.192.168.1.1
|
|
|
- ip := strings.TrimPrefix(key, "security.blocklist.")
|
|
|
-
|
|
|
- if value == "true" {
|
|
|
- log.Printf("[Security] Blocking malicious IP: %s", ip)
|
|
|
- s.firewall.Block(ip)
|
|
|
- } else {
|
|
|
- log.Printf("[Security] Unblocking IP: %s", ip)
|
|
|
- s.firewall.Unblock(ip)
|
|
|
- }
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-#### 4. 注册监听
|
|
|
-
|
|
|
-```go
|
|
|
-func main() {
|
|
|
- // ... 初始化集群 ...
|
|
|
-
|
|
|
- // 初始化模块
|
|
|
- aiMod := &AIModule{threshold: 0.5}
|
|
|
- secMod := &SecurityModule{firewall: NewFirewall()}
|
|
|
-
|
|
|
- // 注册 AI 模块监听 (通过方法值传递)
|
|
|
- xbase.ClusterWatch("ai.**", aiMod.OnConfigChange)
|
|
|
-
|
|
|
- // 注册安全模块监听
|
|
|
- xbase.ClusterWatch("security.blocklist.**", secMod.OnConfigChange)
|
|
|
-
|
|
|
- // ...
|
|
|
-}
|
|
|
-```
|
|
|
-
|
|
|
-## 测试
|
|
|
-
|
|
|
-### 运行单元测试
|
|
|
-
|
|
|
-```bash
|
|
|
-go test -v ./xnet/raft/...
|
|
|
+metrics := server.GetMetrics()
|
|
|
+fmt.Printf("当前任期: %d\n", metrics.Term)
|
|
|
+fmt.Printf("总提案数: %d\n", metrics.ProposalsTotal)
|
|
|
+fmt.Printf("生成的快照数: %d\n", metrics.SnapshotsTaken)
|
|
|
```
|
|
|
|
|
|
-### 测试覆盖
|
|
|
-
|
|
|
-| 测试 | 验证内容 |
|
|
|
-|------|----------|
|
|
|
-| `TestCompactionBasic` | 基本日志压缩和索引更新 |
|
|
|
-| `TestSnapshotSaveAndLoad` | 快照持久化和恢复 |
|
|
|
-| `TestCompactionWithKVServer` | 通过 KVServer 触发压缩 |
|
|
|
-| `TestDataConsistencyAfterCompaction` | 压缩后数据一致性 |
|
|
|
-| `TestCompactionDoesNotLoseData` | 压缩不丢失已提交数据 |
|
|
|
-| `TestDynamicCompactionThreshold` | 动态压缩阈值机制 |
|
|
|
-
|
|
|
-## 文件结构
|
|
|
+## 项目目录结构
|
|
|
|
|
|
```
|
|
|
xnet/raft/
|
|
|
-├── raft.go # Raft 核心实现
|
|
|
-├── types.go # 类型定义和配置
|
|
|
-├── log.go # 日志管理
|
|
|
-├── storage.go # 持久化存储
|
|
|
-├── rpc.go # RPC 和 Transport
|
|
|
-├── server.go # KVServer 封装
|
|
|
-├── kv.go # KV 状态机
|
|
|
-├── codec.go # 编解码
|
|
|
-├── compaction_test.go # 压缩相关测试
|
|
|
-└── README.md # 本文档
|
|
|
-```
|
|
|
-
|
|
|
-## 示例
|
|
|
-
|
|
|
-完整示例参见 `doc/xnet/raft/` 目录:
|
|
|
-
|
|
|
-```bash
|
|
|
-# 清理旧数据(每个节点的数据目录在其 main.go 所在文件夹下)
|
|
|
-rm -rf doc/xnet/raft/node1/data doc/xnet/raft/node2/data doc/xnet/raft/node3/data
|
|
|
-
|
|
|
-# 启动三节点集群(三个终端)
|
|
|
-go run ./doc/xnet/raft/node1/main.go # 终端 1
|
|
|
-go run ./doc/xnet/raft/node2/main.go # 终端 2
|
|
|
-go run ./doc/xnet/raft/node3/main.go # 终端 3
|
|
|
-
|
|
|
-# 运行压力测试和一致性验证
|
|
|
-go run ./doc/xnet/raft/stress_test/main.go
|
|
|
-```
|
|
|
-
|
|
|
-### 压力测试输出示例
|
|
|
-
|
|
|
-```
|
|
|
-=== Raft Cluster Test Suite ===
|
|
|
-
|
|
|
-=== Phase 1: Cluster Formation ===
|
|
|
-Adding Node 2...
|
|
|
-Adding Node 3...
|
|
|
-
|
|
|
-=== Phase 2: Stress Test ===
|
|
|
-Duration: 247ms
|
|
|
-Total ops: 10000
|
|
|
-QPS: 40391.25
|
|
|
-Success rate: 100.00%
|
|
|
-
|
|
|
-=== Phase 3: Data Consistency Verification ===
|
|
|
-Verifying 100 sampled keys across all nodes...
|
|
|
-✅ All checked keys are CONSISTENT across all nodes!
|
|
|
-
|
|
|
-=== Phase 4: ReadIndex Test ===
|
|
|
-ReadIndex succeeded: readIndex=19884
|
|
|
-
|
|
|
-=== Phase 5: Write-then-Read Verification ===
|
|
|
- 127.0.0.1:9001: ✓ Value matches
|
|
|
- 127.0.0.1:9002: ✓ Value matches
|
|
|
- 127.0.0.1:9003: ✓ Value matches
|
|
|
-✅ Write-then-read verification PASSED!
|
|
|
-```
|
|
|
-
|
|
|
-## 性能
|
|
|
-
|
|
|
-基于 `stress_test` 的测试结果(3 节点本地集群):
|
|
|
-
|
|
|
-- **QPS**: ~40,000 ops/sec
|
|
|
-- **成功率**: 100%
|
|
|
-- **延迟**: < 1ms (本地)
|
|
|
-
|
|
|
-实际性能取决于网络延迟、磁盘 I/O 和集群规模。
|
|
|
-
|
|
|
-## 注意事项
|
|
|
-
|
|
|
-1. **数据目录**:**每个节点必须使用不同的 `DataDir`**,否则会数据冲突!
|
|
|
-2. **端口冲突**:确保各节点监听地址不冲突
|
|
|
-3. **时钟同步**:建议集群节点时钟同步(虽然 Raft 不依赖时钟)
|
|
|
-4. **奇数节点**:推荐使用奇数个节点(3、5、7)以优化容错
|
|
|
-5. **清理数据**:测试前建议清理旧的数据目录
|
|
|
-
|
|
|
-## 核心安全保证
|
|
|
-
|
|
|
-| 特性 | 说明 |
|
|
|
-|------|------|
|
|
|
-| **选举限制** | 日志不完整的节点无法成为 Leader |
|
|
|
-| **PreVote** | 防止分区节点扰乱集群 |
|
|
|
-| **No-op 提交** | 新 Leader 立即提交前任 term 日志 |
|
|
|
-| **Lease Check** | Leader 必须保持多数派连接才能接受写请求,防止分区写入 |
|
|
|
-| **单配置变更** | 一次只允许一个成员变更 |
|
|
|
-| **动态压缩阈值** | 防止压缩风暴 |
|
|
|
-| **原子快照写入** | 使用 temp file + rename 保证原子性 |
|
|
|
+├── raft.go # Raft 核心逻辑 (选举、日志复制、心跳)
|
|
|
+├── types.go # 类型定义与配置结构体
|
|
|
+├── log.go # 日志管理 (LogManager)
|
|
|
+├── storage.go # 存储接口与实现 (HybridStorage)
|
|
|
+├── rpc.go # RPC 消息定义与网络传输层
|
|
|
+├── server.go # KVServer 封装 (Raft 与 DB 的胶水层)
|
|
|
+├── db/ # 内置高性能 KV 存储引擎
|
|
|
+│ ├── engine.go # 引擎核心实现
|
|
|
+│ └── db.md # 引擎详细文档
|
|
|
+└── README.md # 项目说明文档
|
|
|
+```
|
|
|
+
|
|
|
+## 性能表现
|
|
|
+
|
|
|
+* **吞吐量**:在开启 Batching(批处理)和 Pipeline(流水线)优化后,单节点可支撑数万 QPS(取决于硬件配置)。
|
|
|
+* **延迟**:本地读写延迟在亚毫秒级(Sub-millisecond)。
|
|
|
+* **恢复速度**:利用 `LastAppliedIndex` 机制,重启时自动跳过已持久化的日志回放,实现秒级启动。
|