# xnet/raft - Raft 分布式共识库 这是一个使用 Go 语言编写的功能完备的 Raft 共识算法实现。它支持动态成员变更、日志压缩(快照)、线性一致性读、RBAC 权限控制等高级特性,旨在为分布式系统提供强一致性与安全的状态机复制能力。 ## 核心特性 - ✅ **领导者选举 (Leader Election)** - 包含 **PreVote (预投票)** 优化机制,有效防止网络分区或不稳定节点导致的 Term (任期) 无效膨胀,提高集群稳定性。 - ✅ **日志复制 (Log Replication)** - 支持 **Pipeline (流水线)** 和 **Batching (批量)** 复制优化,显著提升高并发下的写入吞吐量。 - **自适应批处理**:基于负载自动调整批处理延迟 (1ms-10ms),兼顾低延迟与高吞吐。 - ✅ **动态成员变更 (Dynamic Membership Changes)** - 支持在运行时动态添加或移除节点(实现为单节点变更算法),无需停机即可扩缩容。 - **安全性增强**:严格的成员变更验证,防止数据丢失与脑裂。 - ✅ **自动日志压缩 (Log Compaction / Snapshotting)** - **自动触发**:基于日志增长量自动触发快照生成。 - **动态阈值**:采用动态调整的压缩阈值,防止在写入高峰期出现“压缩风暴”。 - **分块传输**:支持大快照的分块(Chunked)传输,避免大文件传输导致的网络超时。 - ✅ **线性一致性读 (Linearizable Reads / ReadIndex)** - 实现了 **ReadIndex** 优化,允许在不产生 Log Entry 的情况下执行强一致性读取,性能远优于 Log Read。 - ✅ **可插拔编解码器 (Pluggable Codec)** - 支持替换默认的 JSON 序列化,可扩展 Msgpack/Protobuf 等高效格式。 - ✅ **RBAC 权限控制 (Role-Based Access Control)** - **层级权限**:支持基于路径的层级权限控制(如 `user.asin` 自动覆盖 `user.asin.china`)。 - **数值约束**:支持对数值型 Value 进行 Max/Min 约束控制。 - **高性能权限检测**:通过本地缓存优化 `CheckPermission` 等高频操作,同时保持登录状态的全局一致性。 - **全局一致会话**:登录状态通过 Raft 共识同步到全集群,支持 API 调用的负载均衡与故障转移。 - ✅ **领导权转移 (Leadership Transfer)** - 支持主动将 Leader 角色平滑移交给指定节点,适用于停机维护或负载均衡场景。 - ✅ **自动请求路由 (Request Routing)** - Follower 节点会自动将写请求(以及配置的读请求)转发给当前的 Leader,客户端无需自行维护 Leader 状态。 - ✅ **WebHook 通知 (Data Change Notification)** - 支持通过 HTTP WebHook 订阅 Key 的变更事件。 - ✅ **持久化存储 (Persistence)** - 关键的 Raft 日志和状态(Term, Vote)均实时持久化到磁盘,保证重启后数据不丢失。 - **严格错误处理**:关键路径的持久化失败会立即停止服务,防止状态分叉。 - ✅ **健康检查与监控 (Health Checks & Metrics)** - 内置详细的运行时指标(Metrics)和健康检查接口,方便运维监控。 ## 快速开始 ### 1. 基础用法:启动单节点服务 ```go package main import ( "log" "xbase/xnet/raft" ) func main() { // 1. 配置初始化 config := raft.DefaultConfig() config.NodeID = "node1" config.ListenAddr = "127.0.0.1:9001" config.DataDir = "data1" // 注意:每个节点必须使用独立的数据目录 config.Logger = raft.NewConsoleLogger("node1", 1) // 1=Info级别 // 初始集群配置 (Bootstrap) // 仅在集群首次启动时需要,用于定义初始成员 config.ClusterNodes = map[string]string{ "node1": "127.0.0.1:9001", } // 2. 创建 KV Server (包含 Raft 核心与内置状态机 DB) server, err := raft.NewKVServer(config) if err != nil { log.Fatalf("Failed to create server: %v", err) } // 3. 启动服务 if err := server.Start(); err != nil { log.Fatalf("Failed to start server: %v", err) } defer server.Stop() log.Println("Node started on 127.0.0.1:9001") // 4. 执行 KV 操作 // 写操作 (如果当前不是 Leader,会自动转发) if err := server.Set("key1", "value1"); err != nil { log.Printf("Set failed: %v", err) } // 本地读取 (高性能,但存在极低概率读到旧数据) if val, found := server.Get("key1"); found { log.Printf("Got value: %s", val) } // 线性一致性读 (强一致性,保证读到最新提交的数据) val, found, err := server.GetLinear("key1") if err != nil { log.Printf("Read failed: %v", err) } else if found { log.Printf("Consistent value: %s", val) } } ``` ### 2. 构建多节点集群 **Node 1 (引导节点 / Bootstrap Node):** Node 1 作为集群的初始节点启动。 ```go config := raft.DefaultConfig() config.NodeID = "node1" config.ListenAddr = "127.0.0.1:9001" config.DataDir = "data1" // 初始包含自己 config.ClusterNodes = map[string]string{ "node1": "127.0.0.1:9001", } // ... 启动 server ... ``` **Node 2 & 3 (加入节点):** 启动 Node 2 和 Node 3。注意:在启动时,它们的 `ClusterNodes` 可以留空或仅包含自己,因为它们稍后会被动态加入到集群中。 ```go // Node 2 配置 config.NodeID = "node2" config.ListenAddr = "127.0.0.1:9002" config.DataDir = "data2" // ... 启动 server ... ``` **执行加入操作:** 通过已存在的集群成员(例如 Node 1)发起 API 调用,将新节点加入集群。 ```go // 连接到 Node 1 (或直接在 Node 1 的代码中调用) // 将 Node 2 加入集群 err := server1.Join("node2", "127.0.0.1:9002") if err != nil { log.Printf("Failed to join node2: %v", err) } // 将 Node 3 加入集群 err := server1.Join("node3", "127.0.0.1:9003") ``` ## 命令行交互 (CLI) 系统内置了一个强大的交互式命令行工具,默认随节点启动。它提供了直观的界面来管理数据、监控集群状态以及执行运维操作。 **基础命令:** * **数据操作**: * `set ` / `get ` / `del `:标准的 KV 操作(支持权限校验)。 * `search [limit] [offset]`:搜索 Key(严格过滤权限,Superuser 可直接下推 DB)。 * `count `:统计符合模式的 Key 数量(仅统计有权访问的 Key)。 * **权限管理**: * `auth-init `:初始化权限系统(Root Only)。 * `login [code]`:登录系统(全局会话)。 * `logout`:登出当前会话(全局注销)。 * `whoami`:查看当前登录用户及 Token 信息。 * **集群管理** (Root Only): * `join `:将新节点加入集群。 * `leave `:将节点移出集群。 * **系统监控**: * `info`:实时展示节点健康状态、Raft 索引(Commit/Applied)、存储占用(DB/Log)、内存使用及集群成员连接状况。 ## 权限控制 (RBAC) 与演示流程 系统内置了完整的层级 RBAC 权限系统,支持细粒度的 Key 访问控制和数值约束。 ### 1. 权限模型详解 系统权限模型设计兼顾了灵活性与性能,采用 **Raft 全局一致性存储** 与 **本地高性能缓存** 相结合的架构。 #### 1.1 数据结构与存储 所有权限相关的数据均作为特殊的 Key-Value 存储在 Raft Log 中,保证全集群强一致性。 - **用户 (User)**: `system.user.` - 包含密码 Hash、Salt、角色列表、独立权限规则、MFA 密钥、IP 白名单等。 - **角色 (Role)**: `system.role.` - 包含权限规则集合、父级角色(支持多重继承)。 - **会话 (Session)**: `system.session.` - 包含 Token、用户名、登录 IP、过期时间。 - **全局同步**:登录/注销操作会生成 Raft 日志,Token 在全集群有效,支持 API 负载均衡。 #### 1.2 权限规则 (Permission) 每条权限规则包含三个核心要素: 1. **KeyPattern (键模式)**: 支持三种匹配模式 - `*`: 匹配所有 Key。 - `prefix*`: 前缀匹配(如 `user.*` 匹配 `user.alice`, `user.bob`)。 - `exact`: 精确匹配。 2. **Actions (操作)**: 允许的操作列表 - `read`: 读操作 (Get, Search, Count)。 - `write`: 写操作 (Set, Del)。 - `admin`: 管理操作。 - `*`: 所有操作。 3. **Constraint (数值约束)**: 针对 `write` 操作的细粒度控制 - **Min/Max**: 当 Value 为数值时,必须在指定范围内才允许写入(如折扣率限制在 0.5 ~ 0.9)。 #### 1.3 鉴权流程与性能优化 系统在处理请求时遵循 **Deny-Allow-Role** 的判定顺序: 1. **Deny 优先**: 检查用户 `DenyPermissions`,命中即拒绝。 2. **Allow 检查**: 检查用户 `AllowPermissions`,命中即允许。 3. **角色递归**: 递归检查用户所属 Role 及其 Parent Role。 **性能优化 (High Performance)**: - **本地缓存 (Local Perm Cache)**: 每个节点在内存中维护了一个 `Token + Key + Action -> Result` 的高速缓存。 - **全集群一致性**: 当权限数据(用户/角色)变更时,Raft 状态机会自动通知所有节点清理缓存,确保权限变更立即生效。 - **Superuser 快速通道**: 对于拥有 `*` 权限的用户(如 root),系统会跳过复杂的正则匹配,直接下推查询到 DB 引擎(如 Search 操作),实现零损耗的性能。 ### 2. 运行演示 (Example) 我们在 `example/role` 目录下提供了一个完整的演示案例,模拟了“销售经理 vs 普通销售”的折扣权限控制场景。 **步骤 1: 编译并启动 Node 1** ```bash cd example/role # 编译 go build -o role_node1 node1/main.go node1/demo_cli.go go build -o role_node2 node2/main.go node2/demo_cli.go # 启动 Node 1 ./role_node1 ``` **步骤 2: 初始化权限与数据 (在 Node 1 CLI 中)** ```bash > auth-init rootpass # 初始化系统,创建 root 用户 > login root rootpass # 登录 > demo-init # 初始化演示数据 (创建角色 alice/bob) ``` *此时已创建:* * `alice` (经理): 允许设置折扣 0.5 ~ 0.9 * `bob` (员工): 允许设置折扣 0.8 ~ 0.95 **步骤 3: 测试权限 (在 Node 1 CLI 中)** ```bash > login alice pass123 > set product.discount 0.6 # 成功 (0.6 在 0.5-0.9 之间) > set product.discount 0.4 # 失败 (低于 min 0.5) > login bob pass123 > set product.discount 0.85 # 成功 > set product.discount 0.6 # 失败 (低于 bob 的 min 0.8) ``` **步骤 4: 集群同步测试** 保持 Node 1 运行,开启新终端启动 Node 2: ```bash ./role_node2 ``` 在 Node 1 中加入 Node 2: ```bash > login root rootpass > join node2 127.0.0.1:9002 ``` 此时在 Node 2 上可以直接登录 `bob`,验证权限数据已自动同步: ```bash (在 Node 2 CLI) > login bob pass123 > get product.discount # 成功读取 ``` ## 配置与错误处理 ### 核心配置 (Config) `raft.Config` 提供了丰富的配置项,以适应不同的网络环境和性能需求: ```go type Config struct { // 超时设置 RPCTimeout time.Duration // RPC 请求超时 (默认 500ms) SnapshotRPCTimeout time.Duration // 快照传输超时 (默认 30s) ProposeTimeout time.Duration // 客户端提案超时 (默认 3s) // 批处理优化 BatchMinWait time.Duration // 自适应批处理最小等待 (默认 1ms) BatchMaxWait time.Duration // 自适应批处理最大等待 (默认 10ms) BatchMaxSize int // 批处理最大数量 (默认 100) // ... 其他基础配置 (NodeID, ListenAddr 等) } ``` ### 错误处理 (Error Types) 为了便于程序处理,系统定义了特定的错误类型: - `ErrNotLeader`: 当前节点不是 Leader。 - `ErrConfigInFlight`: 正在进行成员变更。 - `ErrTimeout`: 操作超时。 - `ErrShutdown`: Raft 正在关闭。 - `ErrPersistFailed`: 持久化失败 (严重错误)。 ## 高级功能详解 ### 动态成员变更 系统允许在不停机的情况下增删节点。为了保证安全性(Safety),系统一次只允许变更一个节点(Single-Server Membership Change)。 ```go // 添加节点 (自动转发请求到 Leader) err := server.Join("node4", "127.0.0.1:9004") // 移除节点 (自动转发请求到 Leader) err := server.Leave("node4") ``` ### 日志压缩 (Snapshots) 为了防止日志无限增长,系统实现了自动快照机制。 1. **触发机制**:当日志条目数超过 `config.SnapshotThreshold` 时触发压缩。 2. **安全性保障**: * **DB 追赶**:在生成快照前,Raft 会等待底层 DB 确实应用了所有日志。 * **强制落盘**:调用 `Sync()` 强制将 DB 数据刷入磁盘,防止“日志被删但数据还在 OS 缓存中”导致的数据丢失风险。 3. **分块传输**:对于大型快照,系统会自动将其切分为多个小块(Chunk)进行传输,默认每块 1MB,有效防止网络抖动导致的传输失败。 配置示例: ```go config.LogCompactionEnabled = true config.SnapshotThreshold = 100000 // 每新增 10万条日志触发一次压缩 config.SnapshotMinRetention = 10000 // 压缩后保留最近 1万条日志 (方便 Follower 追赶) config.SnapshotChunkSize = 1024 * 1024 // 快照分块大小为 1MB ``` ### 线性一致性读 (ReadIndex) `ReadIndex` 是 Raft 论文中描述的一种读优化机制,它使得读操作无需产生 Raft Log(即无需磁盘 IO),就能保证线性一致性(Linearizability)。 ```go val, found, err := server.GetLinear("key") ``` **工作流程**: 1. **确认领导权**:Leader 发送一轮心跳给 Followers,确保自己仍拥有多数派支持。 2. **获取 ReadIndex**:记录当前的 `CommitIndex` 作为 `ReadIndex`。 3. **等待应用**:等待状态机(DB)应用到 `ReadIndex`。 4. **读取数据**:从状态机读取数据并返回。 ### 可插拔编解码器 (Pluggable Codec) 系统支持替换默认的 JSON 编解码器,以提高序列化效率(如使用 msgpack)。 ```go import "github.com/vmihailenco/msgpack/v5" type MsgpackCodec struct{} func (c *MsgpackCodec) Marshal(v interface{}) ([]byte, error) { return msgpack.Marshal(v) } func (c *MsgpackCodec) Unmarshal(data []byte, v interface{}) error { return msgpack.Unmarshal(data, v) } func (c *MsgpackCodec) Name() string { return "msgpack" } // 在创建 Raft 之前设置 raft.SetCodec(&MsgpackCodec{}) ``` ### 领导权转移 (Leadership Transfer) 你可以主动将 Leader 角色转移给某个 Follower,这在需要重启当前 Leader 节点进行升级或维护时非常有用,可以最小化集群的不可用时间。 ```go // 将 Leader 转移给 node2 err := server.TransferLeadership("node2") ``` 机制:当前 Leader 会发送 `TimeoutNow` RPC 给目标节点,使其立即发起选举(并无视选举超时时间),从而大概率赢得选举。 ### WebHook 变更通知 系统提供了一套高性能的 WebHook 机制,允许外部系统通过 HTTP 接口订阅 Key 的变更事件。 **启用 HTTP 服务:** ```go // 在启动时配置 HTTP 监听地址 config.HTTPAddr = ":8080" server, _ := raft.NewKVServer(config) ``` **订阅变更:** 通过 POST 请求订阅指定 Key 的变更: ```bash curl -X POST http://localhost:8080/watch \ -d '{"key": "my-key", "url": "http://your-service/callback"}' ``` **回调格式:** 当 Key 发生变化时,系统会向注册的 URL 发送 JSON 格式的 POST 请求: ```json { "key": "my-key", "value": "new-value", "event_type": 0, // 0=Set, 1=Del "timestamp": 1630000000000 } ``` ## 监控与运维 ### 健康检查 通过 `HealthCheck()` 接口可以实时获取节点状态。 ```go health := server.HealthCheck() fmt.Printf("节点: %s, 状态: %s, Leader: %s, 健康: %v\n", health.NodeID, health.State, health.LeaderID, health.IsHealthy) ``` ### 运行指标 (Metrics) 系统内置了详细的指标统计,可用于接入 Prometheus 等监控系统。 ```go metrics := server.GetMetrics() fmt.Printf("当前任期: %d\n", metrics.Term) fmt.Printf("总提案数: %d\n", metrics.ProposalsTotal) fmt.Printf("生成的快照数: %d\n", metrics.SnapshotsTaken) ``` ## 项目目录结构 ``` xnet/raft/ ├── raft.go # Raft 核心逻辑 (选举、日志复制、心跳) ├── auth.go # RBAC 权限控制逻辑 ├── types.go # 类型定义与配置结构体 ├── log.go # 日志管理 (LogManager) ├── storage.go # 存储接口与实现 (HybridStorage) ├── rpc.go # RPC 消息定义与网络传输层 ├── codec.go # 编解码器接口 ├── server.go # KVServer 封装 (Raft 与 DB 的胶水层) ├── cli.go # 命令行交互工具 ├── db/ # 内置高性能 KV 存储引擎 │ ├── engine.go # 引擎核心实现 │ └── db.md # 引擎详细文档 └── README.md # 项目说明文档 ``` ## 性能表现 * **高吞吐量**: * **流水线 (Pipeline)**:并发发送 AppendEntries,不等待前一个 RPC 返回。 * **批量复制 (Batching)**:自适应批处理机制,将多个 Log Entry 合并为单个 RPC 发送,显著减少网络开销。 * **低延迟**: * **事件驱动循环**:Apply Loop 采用事件驱动而非轮询,减少 CPU 空转并降低延迟。 * **本地读优化**:本地读写延迟在亚毫秒级(Sub-millisecond)。 * **快速恢复**:利用 `LastAppliedIndex` 机制,重启时自动跳过已持久化的日志回放,实现秒级启动。