高性能raft实现

xbase c1b825a75f like优化 2 周之前
db c1b825a75f like优化 2 周之前
example c1b825a75f like优化 2 周之前
.gitignore c1b825a75f like优化 2 周之前
IMPROVEMENTS.md 7ee9d6e686 copy 3 周之前
LICENSE 88b72b68c3 Initial commit 3 周之前
README.md 11c56e1310 修复几个bug,节点只剩一个的时候只能get不能set 3 周之前
codec.go 7ee9d6e686 copy 3 周之前
compaction_test.go 7ee9d6e686 copy 3 周之前
go.mod 11c56e1310 修复几个bug,节点只剩一个的时候只能get不能set 3 周之前
kv.go 7ee9d6e686 copy 3 周之前
log.go 7ee9d6e686 copy 3 周之前
raft.go 11c56e1310 修复几个bug,节点只剩一个的时候只能get不能set 3 周之前
resiliency_test.go 7ee9d6e686 copy 3 周之前
rpc.go 7ee9d6e686 copy 3 周之前
server.go 11c56e1310 修复几个bug,节点只剩一个的时候只能get不能set 3 周之前
storage.go 7ee9d6e686 copy 3 周之前
types.go 7ee9d6e686 copy 3 周之前
watcher.go 7ee9d6e686 copy 3 周之前

README.md

xnet/raft - Raft 共识库

一个功能完整的 Raft 共识算法实现,支持动态成员变更、日志压缩、线性一致性读等特性。

特性

  • Leader 选举 - 包含 PreVote 优化,防止网络分区导致的 term 膨胀
  • 日志复制 - 支持批量复制和流水线优化
  • 动态成员变更 - 运行时添加/移除节点
  • 日志压缩 (Snapshot) - 自动触发,支持分块传输,动态阈值防止压缩风暴
  • 线性一致性读 (ReadIndex) - 保证读取最新已提交数据
  • Leadership Transfer - 主动转移 Leader 角色
  • 自动请求路由 - Follower 自动转发读写请求到 Leader,无需客户端处理
  • Leader 租约检查 (Lease Check) - 防止网络分区的 Leader 接受无法提交的写请求
  • 持久化存储 - 日志和状态持久化到磁盘
  • 健康检查 & 监控指标 - 内置 Metrics 支持

快速开始

基本用法

package main

import (
    "log"
    "xbase/xnet/raft"
)

func main() {
    // 使用默认配置
    config := raft.DefaultConfig()
    config.NodeID = "node1"
    config.ListenAddr = "127.0.0.1:9001"
    config.DataDir = "data1"  // 每个节点使用不同目录!
    config.Logger = raft.NewConsoleLogger("node1", 1)

    // 初始集群配置(单节点启动)
    config.ClusterNodes = map[string]string{
        "node1": "127.0.0.1:9001",
    }

    // 创建 KV Server(内置状态机)
    server, err := raft.NewKVServer(config)
    if err != nil {
        log.Fatalf("Failed to create server: %v", err)
    }

    // 启动服务
    if err := server.Start(); err != nil {
        log.Fatalf("Failed to start server: %v", err)
    }

    log.Println("Node started on 127.0.0.1:9001")

    // 使用 KV 操作
    server.Set("key1", "value1")
    val, ok := server.Get("key1")
    
    // 线性一致性读(保证读到最新数据)
    val, ok, err = server.GetLinear("key1")
}

多节点集群

Node 1 (Bootstrap 节点):

config := raft.DefaultConfig()
config.NodeID = "node1"
config.ListenAddr = "127.0.0.1:9001"
config.DataDir = "data1"  // 重要:每个节点不同目录
config.ClusterNodes = map[string]string{
    "node1": "127.0.0.1:9001",
}

server, _ := raft.NewKVServer(config)
server.Start()

Node 2 & 3 (加入集群):

// Node 2
config := raft.DefaultConfig()
config.NodeID = "node2"
config.ListenAddr = "127.0.0.1:9002"
config.DataDir = "data2"  // 重要:每个节点不同目录
config.ClusterNodes = map[string]string{
    "node2": "127.0.0.1:9002",
}

server, _ := raft.NewKVServer(config)
server.Start()

然后通过客户端将节点加入集群:

// 通过 Leader 添加节点
client := raft.NewTCPTransport("127.0.0.1:9000", 10, logger)
args := &raft.AddNodeArgs{NodeID: "node2", Address: "127.0.0.1:9002"}
reply, err := client.ForwardAddNode(ctx, "127.0.0.1:9001", args)

动态成员变更

添加节点

// 方式 1: 直接在 Leader 上操作
err := server.Raft.AddNode("node4", "127.0.0.1:9004")

// 方式 2: 自动转发到 Leader(推荐)
err := server.Raft.AddNodeWithForward("node4", "127.0.0.1:9004")

// 方式 3: 通过 KVServer 封装
err := server.Join("node4", "127.0.0.1:9004")

移除节点

// 方式 1: 直接在 Leader 上操作
err := server.Raft.RemoveNode("node4")

// 方式 2: 自动转发到 Leader(推荐)
err := server.Raft.RemoveNodeWithForward("node4")

// 方式 3: 通过 KVServer 封装
err := server.Leave("node4")

获取集群成员

nodes := server.GetClusterNodes()
// 返回: map[string]string{"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002", ...}

成员变更规则

  1. 一次只能进行一个配置变更 - 如果有变更正在进行,会返回 ErrConfigInFlight
  2. 使用旧集群多数派 - 在配置变更提交前,使用旧集群大小计算多数派
  3. 配置变更作为日志条目 - 持久化到日志中,保证一致性

日志压缩 (Snapshot)

自动压缩机制

日志压缩是自动触发的,使用动态阈值防止压缩风暴:

// 默认配置
config.SnapshotThreshold = 100000    // 初始阈值:10万条日志
config.SnapshotMinRetention = 10000  // 压缩后保留:1万条

动态阈值算法

为防止"压缩风暴"(压缩后日志仍很大,导致每条新日志都触发压缩),采用动态阈值:

阶段 日志大小 触发阈值 压缩后大小 下次阈值
初始 0 100,000 - -
第1次 100,001 100,000 10,000 100,000
若压缩后仍大 100,001 100,000 80,000 120,000
第2次 120,001 120,000 90,000 135,000

规则

  • 下次阈值 = 压缩后大小 × 1.5
  • 阈值不低于初始值(防止阈值过小)

快照保存的是状态

快照保存的是状态机的最终状态,不是操作历史:

// 变量 counter 被更新 10 万次
// 快照只保存最终值: {"counter": "100000"}

压缩安全性

崩溃点 恢复后状态 是否一致
保存快照前 无新快照,日志完整
保存快照后,压缩前 有快照,日志完整(冗余但正确)
压缩过程中 快照存在,可正确恢复

线性一致性读 (ReadIndex)

ReadIndex 是 Raft 论文中描述的线性一致性读优化,无需写入日志即可保证读取最新数据。

// 方式 1: 通过 KVServer
val, ok, err := server.GetLinear("key")

// 方式 2: 直接使用 ReadIndex
readIndex, err := server.Raft.ReadIndex()
if err == nil {
    // 等待 lastApplied >= readIndex 后读取本地状态机
    val, ok := server.FSM.Get("key")
}

配置选项

config := raft.DefaultConfig()

// 必需配置
config.NodeID = "node1"                        // 节点唯一标识
config.ListenAddr = "127.0.0.1:9001"          // 监听地址
config.DataDir = "data1"                       // 数据目录(每个节点不同!)

// 集群配置
config.ClusterNodes = map[string]string{       // 集群成员映射
    "node1": "127.0.0.1:9001",
}

// 选举配置
config.ElectionTimeoutMin = 150 * time.Millisecond  // 最小选举超时
config.ElectionTimeoutMax = 300 * time.Millisecond  // 最大选举超时
config.HeartbeatInterval = 50 * time.Millisecond    // 心跳间隔

// 日志配置
config.MaxLogEntriesPerRequest = 5000          // 单次 AppendEntries 最大条目数
config.MemoryLogCapacity = 10000               // 内存日志容量

// 快照配置
config.SnapshotThreshold = 100000              // 初始触发阈值(动态调整)
config.SnapshotMinRetention = 10000            // 快照后保留的日志条数
config.SnapshotChunkSize = 1024 * 1024         // 快照分块大小 (1MB)

// RPC 超时配置
config.RPCTimeout = 500 * time.Millisecond     // 普通 RPC 超时
config.SnapshotRPCTimeout = 30 * time.Second   // 快照传输超时
config.ProposeTimeout = 3 * time.Second        // Propose 转发超时

// 批处理配置
config.BatchMinWait = 1 * time.Millisecond     // 批处理最小等待
config.BatchMaxWait = 10 * time.Millisecond    // 批处理最大等待
config.BatchMaxSize = 100                       // 批处理最大大小

// 日志
config.Logger = raft.NewConsoleLogger("node1", 1)  // 0=debug, 1=info, 2=warn, 3=error

健康检查 & 监控

// 健康状态
health := server.HealthCheck()
fmt.Printf("State: %s, Term: %d, Leader: %s\n", health.State, health.Term, health.LeaderID)
fmt.Printf("Cluster Size: %d, Healthy: %v\n", health.ClusterSize, health.IsHealthy)
fmt.Printf("Commit: %d, Applied: %d, Behind: %d\n", health.CommitIndex, health.LastApplied, health.LogBehind)

// 运行指标
metrics := server.GetMetrics()
fmt.Printf("Proposals: total=%d success=%d failed=%d\n", 
    metrics.ProposalsTotal, metrics.ProposalsSuccess, metrics.ProposalsFailed)
fmt.Printf("Elections: started=%d won=%d\n", metrics.ElectionsStarted, metrics.ElectionsWon)
fmt.Printf("Snapshots: taken=%d installed=%d\n", metrics.SnapshotsTaken, metrics.SnapshotsInstalled)

HealthStatus 字段

字段 类型 说明
NodeID string 节点 ID
State string 当前状态 (Leader/Follower/Candidate)
Term uint64 当前 Term
LeaderID string 当前 Leader ID
ClusterSize int 集群节点数
CommitIndex uint64 已提交的日志索引
LastApplied uint64 已应用的日志索引
LogBehind uint64 落后的日志条数
IsHealthy bool 是否健康
Uptime Duration 运行时间

Metrics 字段

字段 说明
ProposalsTotal 总 Propose 次数
ProposalsSuccess 成功次数
ProposalsFailed 失败次数
ProposalsForwarded 转发次数
AppendsSent 发送的 AppendEntries
AppendsReceived 接收的 AppendEntries
ElectionsStarted 发起的选举次数
ElectionsWon 赢得的选举次数
SnapshotsTaken 生成的快照数
SnapshotsInstalled 安装的快照数
ReadIndexRequests ReadIndex 请求数

Leadership Transfer

主动将 Leader 角色转移给指定节点:

// 将 Leader 转移给 node2
err := server.TransferLeadership("node2")
if err != nil {
    log.Printf("Transfer failed: %v", err)
}

自定义状态机

如果需要自定义状态机,可以直接使用底层 Raft:

// 创建 apply channel
applyCh := make(chan raft.ApplyMsg, 100)

// 创建 transport
transport := raft.NewTCPTransport(config.ListenAddr, 10, config.Logger)

// 创建 Raft 实例
r, err := raft.NewRaft(config, transport, applyCh)
if err != nil {
    log.Fatal(err)
}

// 启动
r.Start()

// 处理已提交的日志
go func() {
    for msg := range applyCh {
        if msg.CommandValid {
            // 应用命令到你的状态机
            myStateMachine.Apply(msg.Command)
        } else if msg.SnapshotValid {
            // 从快照恢复状态机
            myStateMachine.Restore(msg.Snapshot)
        }
    }
}()

// 提交命令
index, term, err := r.Propose([]byte("my-command"))

// 带转发的提交(如果不是 Leader 会自动转发)
index, term, err := r.ProposeWithForward([]byte("my-command"))

错误处理

import "errors"

// 常见错误
var (
    raft.ErrNoLeader       // 没有 Leader
    raft.ErrNotLeader      // 当前节点不是 Leader (内部使用,会自动重试/转发)
    raft.ErrConfigInFlight // 配置变更正在进行中
    raft.ErrTimeout        // 操作超时
    raft.ErrShutdown       // 节点正在关闭
    raft.ErrLeadershipLost // 操作过程中丢失 Leader 身份
)

// 错误处理示例
err := server.Set("key", "value")
if err != nil {
    // 客户端通常不需要手动处理 ErrNotLeader,因为 KVServer 会自动路由
    log.Printf("Set failed: %v", err)
}

客户端使用

// 创建客户端 Transport
client := raft.NewTCPTransport("127.0.0.1:9000", 10, logger)

// 提交命令
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

cmd := raft.KVCommand{Type: raft.KVSet, Key: "foo", Value: "bar"}
data, _ := json.Marshal(cmd)
args := &raft.ProposeArgs{Command: data}

reply, err := client.ForwardPropose(ctx, "127.0.0.1:9001", args)
if err != nil {
    log.Printf("Propose failed: %v", err)
}

// ReadIndex
args := &raft.ReadIndexArgs{}
reply, err := client.ReadIndex(ctx, leaderAddr, args)
if reply.Success {
    fmt.Printf("ReadIndex: %d\n", reply.ReadIndex)
}

// 远程读取
args := &raft.GetArgs{Key: "foo"}
reply, err := client.ForwardGet(ctx, nodeAddr, args)
if reply.Found {
    fmt.Printf("Value: %s\n", reply.Value)
}

// 添加节点
args := &raft.AddNodeArgs{NodeID: "node4", Address: "127.0.0.1:9004"}
reply, err := client.ForwardAddNode(ctx, leaderAddr, args)

// 移除节点
args := &raft.RemoveNodeArgs{NodeID: "node4"}
reply, err := client.ForwardRemoveNode(ctx, leaderAddr, args)

高级特性:变更监听 (Watcher)

Raft 模块提供了强大的变更监听功能,允许应用模块(如 AI 配置、安全模块)订阅特定的配置变更。

特性

  1. 实时推送:配置变更提交后,立即触发回调。
  2. 多级过滤:支持精确匹配和前缀通配符匹配(key.**)。
  3. 安全隔离:不同模块的 Watcher 相互独立,互不影响。
  4. 无重放:系统启动时的快照恢复不会触发 Watcher,避免初始化风暴。

最佳实践:模块化监听 (Handler Interface)

在生产环境中,建议定义通用的接口来解耦配置逻辑与业务模块。

1. 定义通用接口

// ConfigObserver 定义配置变更处理接口
type ConfigObserver interface {
    // OnConfigChange 当配置发生变化时被调用
    OnConfigChange(key, value string)
}

2. 模块实现 (AI 示例)

type AIModule struct {
    threshold float64
    modelPath string
}

// OnConfigChange 实现 ConfigObserver 接口
func (m *AIModule) OnConfigChange(key, value string) {
    switch key {
    case "ai.model.threshold":
        if val, err := strconv.ParseFloat(value, 64); err == nil {
            m.threshold = val
            log.Printf("[AI] Threshold updated to %.2f", val)
        }
    case "ai.model.path":
        m.modelPath = value
        log.Printf("[AI] Reloading model from %s...", value)
        // m.reloadModel()
    }
}

3. 模块实现 (安全模块示例)

type SecurityModule struct {
    firewall *FirewallClient
}

func (s *SecurityModule) OnConfigChange(key, value string) {
    // key: security.blocklist.192.168.1.1
    ip := strings.TrimPrefix(key, "security.blocklist.")
    
    if value == "true" {
        log.Printf("[Security] Blocking malicious IP: %s", ip)
        s.firewall.Block(ip)
    } else {
        log.Printf("[Security] Unblocking IP: %s", ip)
        s.firewall.Unblock(ip)
    }
}

4. 注册监听

func main() {
    // ... 初始化集群 ...

    // 初始化模块
    aiMod := &AIModule{threshold: 0.5}
    secMod := &SecurityModule{firewall: NewFirewall()}

    // 注册 AI 模块监听 (通过方法值传递)
    xbase.ClusterWatch("ai.**", aiMod.OnConfigChange)

    // 注册安全模块监听
    xbase.ClusterWatch("security.blocklist.**", secMod.OnConfigChange)
    
    // ...
}

测试

运行单元测试

go test -v ./xnet/raft/...

测试覆盖

测试 验证内容
TestCompactionBasic 基本日志压缩和索引更新
TestSnapshotSaveAndLoad 快照持久化和恢复
TestCompactionWithKVServer 通过 KVServer 触发压缩
TestDataConsistencyAfterCompaction 压缩后数据一致性
TestCompactionDoesNotLoseData 压缩不丢失已提交数据
TestDynamicCompactionThreshold 动态压缩阈值机制

文件结构

xnet/raft/
├── raft.go           # Raft 核心实现
├── types.go          # 类型定义和配置
├── log.go            # 日志管理
├── storage.go        # 持久化存储
├── rpc.go            # RPC 和 Transport
├── server.go         # KVServer 封装
├── kv.go             # KV 状态机
├── codec.go          # 编解码
├── compaction_test.go # 压缩相关测试
└── README.md         # 本文档

示例

完整示例参见 doc/xnet/raft/ 目录:

# 清理旧数据(每个节点的数据目录在其 main.go 所在文件夹下)
rm -rf doc/xnet/raft/node1/data doc/xnet/raft/node2/data doc/xnet/raft/node3/data

# 启动三节点集群(三个终端)
go run ./doc/xnet/raft/node1/main.go  # 终端 1
go run ./doc/xnet/raft/node2/main.go  # 终端 2
go run ./doc/xnet/raft/node3/main.go  # 终端 3

# 运行压力测试和一致性验证
go run ./doc/xnet/raft/stress_test/main.go

压力测试输出示例

=== Raft Cluster Test Suite ===

=== Phase 1: Cluster Formation ===
Adding Node 2...
Adding Node 3...

=== Phase 2: Stress Test ===
Duration:     247ms
Total ops:    10000
QPS:          40391.25
Success rate: 100.00%

=== Phase 3: Data Consistency Verification ===
Verifying 100 sampled keys across all nodes...
✅ All checked keys are CONSISTENT across all nodes!

=== Phase 4: ReadIndex Test ===
ReadIndex succeeded: readIndex=19884

=== Phase 5: Write-then-Read Verification ===
  127.0.0.1:9001: ✓ Value matches
  127.0.0.1:9002: ✓ Value matches
  127.0.0.1:9003: ✓ Value matches
✅ Write-then-read verification PASSED!

性能

基于 stress_test 的测试结果(3 节点本地集群):

  • QPS: ~40,000 ops/sec
  • 成功率: 100%
  • 延迟: < 1ms (本地)

实际性能取决于网络延迟、磁盘 I/O 和集群规模。

注意事项

  1. 数据目录每个节点必须使用不同的 DataDir,否则会数据冲突!
  2. 端口冲突:确保各节点监听地址不冲突
  3. 时钟同步:建议集群节点时钟同步(虽然 Raft 不依赖时钟)
  4. 奇数节点:推荐使用奇数个节点(3、5、7)以优化容错
  5. 清理数据:测试前建议清理旧的数据目录

核心安全保证

特性 说明
选举限制 日志不完整的节点无法成为 Leader
PreVote 防止分区节点扰乱集群
No-op 提交 新 Leader 立即提交前任 term 日志
Lease Check Leader 必须保持多数派连接才能接受写请求,防止分区写入
单配置变更 一次只允许一个成员变更
动态压缩阈值 防止压缩风暴
原子快照写入 使用 temp file + rename 保证原子性