# Raft 实现改进日志 本文档记录了对 Raft 实现的所有改进。 ## 🔴 P0: 安全性改进 ### 1. 持久化错误处理 (raft.go) **问题**: `persistState` 的错误仅被日志记录而非返回,这违反了 Raft 的安全性要求。 **修复**: - `persistState()` 现在返回 `error` - 添加 `mustPersistState()` 用于关键路径,失败时 panic - 所有调用点现在正确处理持久化错误 - 如果持久化失败,不会授予投票或继续选举 ### 2. 成员变更安全性 (raft.go) **改进**: - 添加详细文档说明单节点变更策略的安全性原理 - 添加更多验证检查: - 不能在 leadership transfer 期间进行成员变更 - 验证 nodeID 和 address 非空 - 检查地址是否已被其他节点使用 - 不能将集群缩减到 0 节点 - 使用自定义错误类型 (`ErrNotLeader`, `ErrConfigInFlight`) 便于程序处理 ## 🟡 P1: 性能与功能改进 ### 3. ReadIndex 线性一致性读 (raft.go, types.go, rpc.go) **新功能**: 支持强一致性读操作 ```go // 使用方式 index, err := raft.ReadIndex() if err != nil { /* handle */ } // 读取已应用到 index 的状态 ``` **实现**: - `ReadIndex()` 方法确认 leadership 后返回安全的读取点 - `readIndexLoop` goroutine 处理读请求 - `confirmLeadership()` 通过心跳确认领导权 - KVServer 添加 `GetLinear()` 方法 ### 4. 可插拔编解码器 (codec.go) **新功能**: 支持替换 JSON 为更高效的序列化格式 ```go // 使用 msgpack (需要添加依赖) raft.SetCodec(&MsgpackCodec{}) ``` **实现**: - `Codec` 接口定义 `Marshal/Unmarshal` - `DefaultCodec` 默认使用 JSON - RPC 层使用 `DefaultCodec` 而非硬编码 JSON ### 5. 事件驱动 Apply Loop (raft.go) **问题**: 原实现每 10ms 轮询,即使没有新提交也浪费 CPU。 **修复**: - 改为事件驱动模式,由 `commitCh` 触发 - 保留 50ms fallback ticker 防止遗漏 - 减少 CPU 空转 ### 6. 自定义错误类型 (types.go) **新增错误类型**: ```go var ( ErrNoLeader = errors.New("no leader available") ErrNotLeader = errors.New("not leader") ErrConfigInFlight = errors.New("configuration change in progress") ErrTimeout = errors.New("operation timed out") ErrShutdown = errors.New("raft is shutting down") ErrPersistFailed = errors.New("failed to persist state") ErrLeadershipLost = errors.New("leadership lost") ) ``` **RaftError 包装器**: ```go type RaftError struct { Err error LeaderID string // 已知的 leader RetryIn time.Duration // 建议的重试间隔 } ``` ### 7. 配置化超时 (types.go) **新配置项**: ```go type Config struct { // RPC 超时 RPCTimeout time.Duration // 默认 500ms SnapshotRPCTimeout time.Duration // 默认 30s ProposeTimeout time.Duration // 默认 3s // 重试配置 MaxRetries int // 默认 3 RetryBackoff time.Duration // 默认 100ms // 批处理配置 BatchMinWait time.Duration // 默认 1ms BatchMaxWait time.Duration // 默认 10ms BatchMaxSize int // 默认 100 // 快照配置 SnapshotChunkSize int // 默认 1MB } ``` ## 🟢 P2: 可观测性与运维改进 ### 8. 自适应批处理 (raft.go) **问题**: 原实现固定 10ms 延迟,低负载时增加不必要延迟。 **修复**: - 实现自适应批处理算法 - 第一个请求后等待 `BatchMinWait` (1ms) - 最多等待 `BatchMaxWait` (10ms) - 达到 `BatchMaxSize` (100) 立即刷新 ### 9. Metrics 和健康检查 (types.go, raft.go) **Metrics 结构**: ```go type Metrics struct { Term uint64 ProposalsTotal uint64 ProposalsSuccess uint64 ElectionsStarted uint64 ElectionsWon uint64 SnapshotsTaken uint64 ReadIndexRequests uint64 // ... 等等 } ``` **HealthStatus 结构**: ```go type HealthStatus struct { NodeID string State string Term uint64 LeaderID string ClusterSize int LogBehind uint64 LastHeartbeat time.Time IsHealthy bool Uptime time.Duration } ``` **使用**: ```go health := raft.HealthCheck() metrics := raft.GetMetrics() ``` ### 10. Leadership Transfer (raft.go, rpc.go) **新功能**: 主动转移领导权 ```go err := raft.TransferLeadership("node2") ``` **实现**: - 同步目标节点到最新日志 - 发送 `TimeoutNow` RPC 触发目标立即选举 - 目标节点收到后跳过选举超时直接开始选举 - 添加 `HandleTimeoutNow` RPC 处理器 ## 🔵 P3: 大规模优化 ### 11. 分块快照传输 (raft.go, types.go) **问题**: 大快照一次性传输可能超时或占用过多内存。 **修复**: - `InstallSnapshotArgs` 添加 `Offset`, `Done` 字段 - `sendSnapshot` 分块发送(默认 1MB 每块) - `HandleInstallSnapshot` 支持分块接收和组装 - `pendingSnapshotState` 跟踪接收进度 ## 📁 文件变更汇总 | 文件 | 变更类型 | 说明 | |-----|---------|------| | `types.go` | 修改 | 添加错误类型、Metrics、HealthStatus、新配置项 | | `raft.go` | 修改 | 修复持久化、添加 ReadIndex、Leadership Transfer 等 | | `rpc.go` | 修改 | 添加新 RPC 类型、使用可插拔编解码器 | | `codec.go` | 新增 | 可插拔编解码器接口 | | `server.go` | 修改 | 添加 GetLinear、HealthCheck 等 API | ## 🧪 测试建议 1. **单元测试**: - 测试持久化失败时的行为 - 测试 ReadIndex 在各种状态下的行为 - 测试分块快照传输 2. **集成测试**: - 测试 Leadership Transfer - 测试成员变更期间的请求处理 - 测试网络分区恢复 3. **压力测试**: - 验证自适应批处理的效果 - 验证大快照的分块传输 ## 📖 使用示例 ### 线性一致性读 ```go server, _ := raft.NewKVServer(config) // ... val, ok, err := server.GetLinear("key") if err != nil { // 处理错误,可能需要重试 } ``` ### 领导权转移 ```go if server.IsLeader() { err := server.TransferLeadership("node2") if err != nil { log.Printf("Transfer failed: %v", err) } } ``` ### 健康检查 ```go health := server.HealthCheck() if !health.IsHealthy { log.Printf("Node unhealthy: state=%s, leader=%s", health.State, health.LeaderID) } ``` ### 使用 msgpack 序列化 ```go import "github.com/vmihailenco/msgpack/v5" type MsgpackCodec struct{} func (c *MsgpackCodec) Marshal(v interface{}) ([]byte, error) { return msgpack.Marshal(v) } func (c *MsgpackCodec) Unmarshal(data []byte, v interface{}) error { return msgpack.Unmarshal(data, v) } func (c *MsgpackCodec) Name() string { return "msgpack" } // 在创建 Raft 之前设置 raft.SetCodec(&MsgpackCodec{}) ```