IMPROVEMENTS.md 6.7 KB

Raft 实现改进日志

本文档记录了对 Raft 实现的所有改进。

🔴 P0: 安全性改进

1. 持久化错误处理 (raft.go)

问题: persistState 的错误仅被日志记录而非返回,这违反了 Raft 的安全性要求。

修复:

  • persistState() 现在返回 error
  • 添加 mustPersistState() 用于关键路径,失败时 panic
  • 所有调用点现在正确处理持久化错误
  • 如果持久化失败,不会授予投票或继续选举

2. 成员变更安全性 (raft.go)

改进:

  • 添加详细文档说明单节点变更策略的安全性原理
  • 添加更多验证检查:
    • 不能在 leadership transfer 期间进行成员变更
    • 验证 nodeID 和 address 非空
    • 检查地址是否已被其他节点使用
    • 不能将集群缩减到 0 节点
  • 使用自定义错误类型 (ErrNotLeader, ErrConfigInFlight) 便于程序处理

🟡 P1: 性能与功能改进

3. ReadIndex 线性一致性读 (raft.go, types.go, rpc.go)

新功能: 支持强一致性读操作

// 使用方式
index, err := raft.ReadIndex()
if err != nil { /* handle */ }
// 读取已应用到 index 的状态

实现:

  • ReadIndex() 方法确认 leadership 后返回安全的读取点
  • readIndexLoop goroutine 处理读请求
  • confirmLeadership() 通过心跳确认领导权
  • KVServer 添加 GetLinear() 方法

4. 可插拔编解码器 (codec.go)

新功能: 支持替换 JSON 为更高效的序列化格式

// 使用 msgpack (需要添加依赖)
raft.SetCodec(&MsgpackCodec{})

实现:

  • Codec 接口定义 Marshal/Unmarshal
  • DefaultCodec 默认使用 JSON
  • RPC 层使用 DefaultCodec 而非硬编码 JSON

5. 事件驱动 Apply Loop (raft.go)

问题: 原实现每 10ms 轮询,即使没有新提交也浪费 CPU。

修复:

  • 改为事件驱动模式,由 commitCh 触发
  • 保留 50ms fallback ticker 防止遗漏
  • 减少 CPU 空转

6. 自定义错误类型 (types.go)

新增错误类型:

var (
    ErrNoLeader       = errors.New("no leader available")
    ErrNotLeader      = errors.New("not leader")
    ErrConfigInFlight = errors.New("configuration change in progress")
    ErrTimeout        = errors.New("operation timed out")
    ErrShutdown       = errors.New("raft is shutting down")
    ErrPersistFailed  = errors.New("failed to persist state")
    ErrLeadershipLost = errors.New("leadership lost")
)

RaftError 包装器:

type RaftError struct {
    Err      error
    LeaderID string        // 已知的 leader
    RetryIn  time.Duration // 建议的重试间隔
}

7. 配置化超时 (types.go)

新配置项:

type Config struct {
    // RPC 超时
    RPCTimeout         time.Duration // 默认 500ms
    SnapshotRPCTimeout time.Duration // 默认 30s
    ProposeTimeout     time.Duration // 默认 3s
    
    // 重试配置
    MaxRetries   int           // 默认 3
    RetryBackoff time.Duration // 默认 100ms
    
    // 批处理配置
    BatchMinWait time.Duration // 默认 1ms
    BatchMaxWait time.Duration // 默认 10ms
    BatchMaxSize int           // 默认 100
    
    // 快照配置
    SnapshotChunkSize int // 默认 1MB
}

🟢 P2: 可观测性与运维改进

8. 自适应批处理 (raft.go)

问题: 原实现固定 10ms 延迟,低负载时增加不必要延迟。

修复:

  • 实现自适应批处理算法
  • 第一个请求后等待 BatchMinWait (1ms)
  • 最多等待 BatchMaxWait (10ms)
  • 达到 BatchMaxSize (100) 立即刷新

9. Metrics 和健康检查 (types.go, raft.go)

Metrics 结构:

type Metrics struct {
    Term               uint64
    ProposalsTotal     uint64
    ProposalsSuccess   uint64
    ElectionsStarted   uint64
    ElectionsWon       uint64
    SnapshotsTaken     uint64
    ReadIndexRequests  uint64
    // ... 等等
}

HealthStatus 结构:

type HealthStatus struct {
    NodeID        string
    State         string
    Term          uint64
    LeaderID      string
    ClusterSize   int
    LogBehind     uint64
    LastHeartbeat time.Time
    IsHealthy     bool
    Uptime        time.Duration
}

使用:

health := raft.HealthCheck()
metrics := raft.GetMetrics()

10. Leadership Transfer (raft.go, rpc.go)

新功能: 主动转移领导权

err := raft.TransferLeadership("node2")

实现:

  • 同步目标节点到最新日志
  • 发送 TimeoutNow RPC 触发目标立即选举
  • 目标节点收到后跳过选举超时直接开始选举
  • 添加 HandleTimeoutNow RPC 处理器

🔵 P3: 大规模优化

11. 分块快照传输 (raft.go, types.go)

问题: 大快照一次性传输可能超时或占用过多内存。

修复:

  • InstallSnapshotArgs 添加 Offset, Done 字段
  • sendSnapshot 分块发送(默认 1MB 每块)
  • HandleInstallSnapshot 支持分块接收和组装
  • pendingSnapshotState 跟踪接收进度

📁 文件变更汇总

文件 变更类型 说明
types.go 修改 添加错误类型、Metrics、HealthStatus、新配置项
raft.go 修改 修复持久化、添加 ReadIndex、Leadership Transfer 等
rpc.go 修改 添加新 RPC 类型、使用可插拔编解码器
codec.go 新增 可插拔编解码器接口
server.go 修改 添加 GetLinear、HealthCheck 等 API

🧪 测试建议

  1. 单元测试:

    • 测试持久化失败时的行为
    • 测试 ReadIndex 在各种状态下的行为
    • 测试分块快照传输
  2. 集成测试:

    • 测试 Leadership Transfer
    • 测试成员变更期间的请求处理
    • 测试网络分区恢复
  3. 压力测试:

    • 验证自适应批处理的效果
    • 验证大快照的分块传输

📖 使用示例

线性一致性读

server, _ := raft.NewKVServer(config)
// ...
val, ok, err := server.GetLinear("key")
if err != nil {
    // 处理错误,可能需要重试
}

领导权转移

if server.IsLeader() {
    err := server.TransferLeadership("node2")
    if err != nil {
        log.Printf("Transfer failed: %v", err)
    }
}

健康检查

health := server.HealthCheck()
if !health.IsHealthy {
    log.Printf("Node unhealthy: state=%s, leader=%s", 
        health.State, health.LeaderID)
}

使用 msgpack 序列化

import "github.com/vmihailenco/msgpack/v5"

type MsgpackCodec struct{}

func (c *MsgpackCodec) Marshal(v interface{}) ([]byte, error) {
    return msgpack.Marshal(v)
}

func (c *MsgpackCodec) Unmarshal(data []byte, v interface{}) error {
    return msgpack.Unmarshal(data, v)
}

func (c *MsgpackCodec) Name() string { return "msgpack" }

// 在创建 Raft 之前设置
raft.SetCodec(&MsgpackCodec{})