本文档记录了对 Raft 实现的所有改进。
问题: persistState 的错误仅被日志记录而非返回,这违反了 Raft 的安全性要求。
修复:
persistState() 现在返回 errormustPersistState() 用于关键路径,失败时 panic改进:
ErrNotLeader, ErrConfigInFlight) 便于程序处理新功能: 支持强一致性读操作
// 使用方式
index, err := raft.ReadIndex()
if err != nil { /* handle */ }
// 读取已应用到 index 的状态
实现:
ReadIndex() 方法确认 leadership 后返回安全的读取点readIndexLoop goroutine 处理读请求confirmLeadership() 通过心跳确认领导权GetLinear() 方法新功能: 支持替换 JSON 为更高效的序列化格式
// 使用 msgpack (需要添加依赖)
raft.SetCodec(&MsgpackCodec{})
实现:
Codec 接口定义 Marshal/UnmarshalDefaultCodec 默认使用 JSONDefaultCodec 而非硬编码 JSON问题: 原实现每 10ms 轮询,即使没有新提交也浪费 CPU。
修复:
commitCh 触发新增错误类型:
var (
ErrNoLeader = errors.New("no leader available")
ErrNotLeader = errors.New("not leader")
ErrConfigInFlight = errors.New("configuration change in progress")
ErrTimeout = errors.New("operation timed out")
ErrShutdown = errors.New("raft is shutting down")
ErrPersistFailed = errors.New("failed to persist state")
ErrLeadershipLost = errors.New("leadership lost")
)
RaftError 包装器:
type RaftError struct {
Err error
LeaderID string // 已知的 leader
RetryIn time.Duration // 建议的重试间隔
}
新配置项:
type Config struct {
// RPC 超时
RPCTimeout time.Duration // 默认 500ms
SnapshotRPCTimeout time.Duration // 默认 30s
ProposeTimeout time.Duration // 默认 3s
// 重试配置
MaxRetries int // 默认 3
RetryBackoff time.Duration // 默认 100ms
// 批处理配置
BatchMinWait time.Duration // 默认 1ms
BatchMaxWait time.Duration // 默认 10ms
BatchMaxSize int // 默认 100
// 快照配置
SnapshotChunkSize int // 默认 1MB
}
问题: 原实现固定 10ms 延迟,低负载时增加不必要延迟。
修复:
BatchMinWait (1ms)BatchMaxWait (10ms)BatchMaxSize (100) 立即刷新Metrics 结构:
type Metrics struct {
Term uint64
ProposalsTotal uint64
ProposalsSuccess uint64
ElectionsStarted uint64
ElectionsWon uint64
SnapshotsTaken uint64
ReadIndexRequests uint64
// ... 等等
}
HealthStatus 结构:
type HealthStatus struct {
NodeID string
State string
Term uint64
LeaderID string
ClusterSize int
LogBehind uint64
LastHeartbeat time.Time
IsHealthy bool
Uptime time.Duration
}
使用:
health := raft.HealthCheck()
metrics := raft.GetMetrics()
新功能: 主动转移领导权
err := raft.TransferLeadership("node2")
实现:
TimeoutNow RPC 触发目标立即选举HandleTimeoutNow RPC 处理器问题: 大快照一次性传输可能超时或占用过多内存。
修复:
InstallSnapshotArgs 添加 Offset, Done 字段sendSnapshot 分块发送(默认 1MB 每块)HandleInstallSnapshot 支持分块接收和组装pendingSnapshotState 跟踪接收进度| 文件 | 变更类型 | 说明 |
|---|---|---|
types.go |
修改 | 添加错误类型、Metrics、HealthStatus、新配置项 |
raft.go |
修改 | 修复持久化、添加 ReadIndex、Leadership Transfer 等 |
rpc.go |
修改 | 添加新 RPC 类型、使用可插拔编解码器 |
codec.go |
新增 | 可插拔编解码器接口 |
server.go |
修改 | 添加 GetLinear、HealthCheck 等 API |
单元测试:
集成测试:
压力测试:
server, _ := raft.NewKVServer(config)
// ...
val, ok, err := server.GetLinear("key")
if err != nil {
// 处理错误,可能需要重试
}
if server.IsLeader() {
err := server.TransferLeadership("node2")
if err != nil {
log.Printf("Transfer failed: %v", err)
}
}
health := server.HealthCheck()
if !health.IsHealthy {
log.Printf("Node unhealthy: state=%s, leader=%s",
health.State, health.LeaderID)
}
import "github.com/vmihailenco/msgpack/v5"
type MsgpackCodec struct{}
func (c *MsgpackCodec) Marshal(v interface{}) ([]byte, error) {
return msgpack.Marshal(v)
}
func (c *MsgpackCodec) Unmarshal(data []byte, v interface{}) error {
return msgpack.Unmarshal(data, v)
}
func (c *MsgpackCodec) Name() string { return "msgpack" }
// 在创建 Raft 之前设置
raft.SetCodec(&MsgpackCodec{})