高性能raft实现

xbase 310888dcab 强一致性修改 hace 2 semanas
db e4b12968f2 修复突然关机带来的数据不一致 hace 2 semanas
example e4b12968f2 修复突然关机带来的数据不一致 hace 2 semanas
.gitignore c1b825a75f like优化 hace 2 semanas
IMPROVEMENTS.md 7ee9d6e686 copy hace 3 semanas
LICENSE 88b72b68c3 Initial commit hace 3 semanas
README.md 310888dcab 强一致性修改 hace 2 semanas
codec.go 7ee9d6e686 copy hace 3 semanas
compaction_test.go e4b12968f2 修复突然关机带来的数据不一致 hace 2 semanas
go.mod 32753dc3a0 优化只有key的db搜索模式 hace 2 semanas
kv.go 7ee9d6e686 copy hace 3 semanas
log.go 7ee9d6e686 copy hace 3 semanas
node1.log d4412c97bd 边缘安全测试 hace 2 semanas
raft-cli 32753dc3a0 优化只有key的db搜索模式 hace 2 semanas
raft.go 310888dcab 强一致性修改 hace 2 semanas
resiliency_test.go 7ee9d6e686 copy hace 3 semanas
rpc.go 7ee9d6e686 copy hace 3 semanas
server.go e4b12968f2 修复突然关机带来的数据不一致 hace 2 semanas
storage.go d4412c97bd 边缘安全测试 hace 2 semanas
types.go 310888dcab 强一致性修改 hace 2 semanas
watcher.go 7ee9d6e686 copy hace 3 semanas

README.md

xnet/raft - Raft 共识库

一个功能完整的 Raft 共识算法实现,支持动态成员变更、日志压缩、线性一致性读等特性。

特性

  • Leader 选举 - 包含 PreVote 优化,防止网络分区导致的 term 膨胀
  • 日志复制 - 支持批量复制和流水线优化
  • 动态成员变更 - 运行时添加/移除节点
  • 日志压缩 (Snapshot) - 自动触发,支持分块传输,动态阈值防止压缩风暴
  • 线性一致性读 (ReadIndex) - 保证读取最新已提交数据
  • Leadership Transfer - 主动转移 Leader 角色
  • 自动请求路由 - Follower 自动转发读写请求到 Leader,无需客户端处理
  • Leader 租约检查 (Lease Check) - 防止网络分区的 Leader 接受无法提交的写请求
  • 持久化存储 - 日志和状态持久化到磁盘
  • 健康检查 & 监控指标 - 内置 Metrics 支持

快速开始

基本用法

package main

import (
    "log"
    "xbase/xnet/raft"
)

func main() {
    // 使用默认配置
    config := raft.DefaultConfig()
    config.NodeID = "node1"
    config.ListenAddr = "127.0.0.1:9001"
    config.DataDir = "data1"  // 每个节点使用不同目录!
    config.Logger = raft.NewConsoleLogger("node1", 1)

    // 初始集群配置(单节点启动)
    config.ClusterNodes = map[string]string{
        "node1": "127.0.0.1:9001",
    }

    // 创建 KV Server(内置状态机)
    server, err := raft.NewKVServer(config)
    if err != nil {
        log.Fatalf("Failed to create server: %v", err)
    }

    // 启动服务
    if err := server.Start(); err != nil {
        log.Fatalf("Failed to start server: %v", err)
    }

    log.Println("Node started on 127.0.0.1:9001")

    // 使用 KV 操作
    server.Set("key1", "value1")
    val, ok := server.Get("key1")
    
    // 线性一致性读(保证读到最新数据)
    val, ok, err = server.GetLinear("key1")
}

多节点集群

Node 1 (Bootstrap 节点):

config := raft.DefaultConfig()
config.NodeID = "node1"
config.ListenAddr = "127.0.0.1:9001"
config.DataDir = "data1"  // 重要:每个节点不同目录
config.ClusterNodes = map[string]string{
    "node1": "127.0.0.1:9001",
}

server, _ := raft.NewKVServer(config)
server.Start()

Node 2 & 3 (加入集群):

// Node 2
config := raft.DefaultConfig()
config.NodeID = "node2"
config.ListenAddr = "127.0.0.1:9002"
config.DataDir = "data2"  // 重要:每个节点不同目录
config.ClusterNodes = map[string]string{
    "node2": "127.0.0.1:9002",
}

server, _ := raft.NewKVServer(config)
server.Start()

然后通过客户端将节点加入集群:

// 通过 Leader 添加节点
client := raft.NewTCPTransport("127.0.0.1:9000", 10, logger)
args := &raft.AddNodeArgs{NodeID: "node2", Address: "127.0.0.1:9002"}
reply, err := client.ForwardAddNode(ctx, "127.0.0.1:9001", args)

动态成员变更

添加节点

// 方式 1: 直接在 Leader 上操作
err := server.Raft.AddNode("node4", "127.0.0.1:9004")

// 方式 2: 自动转发到 Leader(推荐)
err := server.Raft.AddNodeWithForward("node4", "127.0.0.1:9004")

// 方式 3: 通过 KVServer 封装
err := server.Join("node4", "127.0.0.1:9004")

移除节点

// 方式 1: 直接在 Leader 上操作
err := server.Raft.RemoveNode("node4")

// 方式 2: 自动转发到 Leader(推荐)
err := server.Raft.RemoveNodeWithForward("node4")

// 方式 3: 通过 KVServer 封装
err := server.Leave("node4")

获取集群成员

nodes := server.GetClusterNodes()
// 返回: map[string]string{"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002", ...}

成员变更规则

  1. 一次只能进行一个配置变更 - 如果有变更正在进行,会返回 ErrConfigInFlight
  2. 使用旧集群多数派 - 在配置变更提交前,使用旧集群大小计算多数派
  3. 配置变更作为日志条目 - 持久化到日志中,保证一致性

日志压缩 (Snapshot)

自动压缩机制

日志压缩是自动触发的,但经过深度优化以平衡性能与安全性:

  1. 异步检测: 压缩检查在独立的 goroutine 中异步运行,绝不阻塞 Raft 主循环的请求处理。
  2. 固定步长触发: 摒弃了指数级增长的阈值,改用固定步长

    • 例如:配置阈值为 10,000。
    • 触发点:每当日志比上次压缩后净增 10,000 条时,触发一次压缩。
    • 优势:避免了初期过于频繁的压缩,也防止了后期日志无限膨胀,步长线性可控。

      // 默认配置
      config.SnapshotThreshold = 100000    // 触发步长:每新增 10万条日志触发一次
      config.SnapshotMinRetention = 10000  // 压缩后保留:1万条(用于 follower 快速追赶)
      

压缩安全性与一致性

系统通过强制同步机制从根本上解决了“日志被删但数据未入库”的风险:

  1. 快照阻塞等待: 当 Raft 决定压缩到索引 N 时,会调用应用层的 SnapshotProvider
  2. 追赶确认: SnapshotProvider阻塞等待,直到数据库(DB)确实已经应用了索引 N(及其之前)的所有日志。
  3. 强制刷盘 (Sync): 在生成快照前,强制调用存储引擎的 Sync(),确保所有内存/OS Cache 中的数据物理写入磁盘。这防止了“日志已被删除但数据未落盘”的风险。
  4. 原子截断: 只有在成功获取到包含完整数据的快照后,Raft 才会执行物理日志文件的截断。
  5. 写锁保护: 在物理截断日志文件的短暂瞬间,系统持有写锁,天然暂停新的写入,确保文件操作的原子性和完整性。

这种设计实现了:Raft 全速写入 -> 异步后台线程检测 -> 需要压缩时等待 DB 追赶 -> 强制刷盘 -> 安全执行物理压缩,在性能和数据安全性之间取得了最佳平衡。

快照保存的是状态

快照保存的是状态机的最终状态,不是操作历史:

// 变量 counter 被更新 10 万次
// 快照只保存最终值: {"counter": "100000"}

压缩安全性

崩溃点 恢复后状态 是否一致
保存快照前 无新快照,日志完整
保存快照后,压缩前 有快照,日志完整(冗余但正确)
压缩过程中 快照存在,可正确恢复
异步等待期间 日志未动,DB 正在追赶,无数据丢失

线性一致性读 (ReadIndex)

ReadIndex 是 Raft 论文中描述的线性一致性读优化,无需写入日志即可保证读取最新数据。

// 方式 1: 通过 KVServer
val, ok, err := server.GetLinear("key")

// 方式 2: 直接使用 ReadIndex
readIndex, err := server.Raft.ReadIndex()
if err == nil {
    // 等待 lastApplied >= readIndex 后读取本地状态机
    val, ok := server.FSM.Get("key")
}

启动优化 (Fast Startup)

系统采用了LastApplied 预热机制来优化启动速度:

  1. 读取 DB 状态: KVServer 启动时,首先从 DB 引擎获取持久化的 LastAppliedIndex
  2. 初始化 Raft: 将此 Index 注入 Raft 配置 (Config.LastAppliedIndex)。
  3. 跳过回放: Raft 初始化时直接将 commitIndexlastApplied 设置为该值,跳过对已持久化日志的重复 Apply 过程。

这意味着即使有数百万条日志,只要它们已经应用到 DB,重启也是毫秒级的。

配置选项

config := raft.DefaultConfig()

// 必需配置
config.NodeID = "node1"                        // 节点唯一标识
config.ListenAddr = "127.0.0.1:9001"          // 监听地址
config.DataDir = "data1"                       // 数据目录(每个节点不同!)

// 集群配置
config.ClusterNodes = map[string]string{       // 集群成员映射
    "node1": "127.0.0.1:9001",
}

// 选举配置
config.ElectionTimeoutMin = 150 * time.Millisecond  // 最小选举超时
config.ElectionTimeoutMax = 300 * time.Millisecond  // 最大选举超时
config.HeartbeatInterval = 50 * time.Millisecond    // 心跳间隔

// 日志配置
config.MaxLogEntriesPerRequest = 5000          // 单次 AppendEntries 最大条目数
config.MemoryLogCapacity = 10000               // 内存日志容量

// 快照配置
config.LogCompactionEnabled = false        // 默认不启用日志压缩 (永远保留完整日志)
config.SnapshotThreshold = 100000          // 压缩触发阈值(仅在 LogCompactionEnabled=true 时生效)
config.SnapshotMinRetention = 10000        // 快照后保留的日志条数
config.SnapshotChunkSize = 1024 * 1024     // 快照分块大小 (1MB)

// RPC 超时配置
config.RPCTimeout = 500 * time.Millisecond     // 普通 RPC 超时
config.SnapshotRPCTimeout = 30 * time.Second   // 快照传输超时
config.ProposeTimeout = 3 * time.Second        // Propose 转发超时

// 批处理配置
config.BatchMinWait = 1 * time.Millisecond     // 批处理最小等待
config.BatchMaxWait = 10 * time.Millisecond    // 批处理最大等待
config.BatchMaxSize = 100                       // 批处理最大大小

// 日志
config.Logger = raft.NewConsoleLogger("node1", 1)  // 0=debug, 1=info, 2=warn, 3=error

健康检查 & 监控

// 健康状态
health := server.HealthCheck()
fmt.Printf("State: %s, Term: %d, Leader: %s\n", health.State, health.Term, health.LeaderID)
fmt.Printf("Cluster Size: %d, Healthy: %v\n", health.ClusterSize, health.IsHealthy)
fmt.Printf("Commit: %d, Applied: %d, Behind: %d\n", health.CommitIndex, health.LastApplied, health.LogBehind)

// 运行指标
metrics := server.GetMetrics()
fmt.Printf("Proposals: total=%d success=%d failed=%d\n", 
    metrics.ProposalsTotal, metrics.ProposalsSuccess, metrics.ProposalsFailed)
fmt.Printf("Elections: started=%d won=%d\n", metrics.ElectionsStarted, metrics.ElectionsWon)
fmt.Printf("Snapshots: taken=%d installed=%d\n", metrics.SnapshotsTaken, metrics.SnapshotsInstalled)

HealthStatus 字段

字段 类型 说明
NodeID string 节点 ID
State string 当前状态 (Leader/Follower/Candidate)
Term uint64 当前 Term
LeaderID string 当前 Leader ID
ClusterSize int 集群节点数
CommitIndex uint64 已提交的日志索引
LastApplied uint64 已应用的日志索引
LogBehind uint64 落后的日志条数
IsHealthy bool 是否健康
Uptime Duration 运行时间

Metrics 字段

字段 说明
ProposalsTotal 总 Propose 次数
ProposalsSuccess 成功次数
ProposalsFailed 失败次数
ProposalsForwarded 转发次数
AppendsSent 发送的 AppendEntries
AppendsReceived 接收的 AppendEntries
ElectionsStarted 发起的选举次数
ElectionsWon 赢得的选举次数
SnapshotsTaken 生成的快照数
SnapshotsInstalled 安装的快照数
ReadIndexRequests ReadIndex 请求数

Leadership Transfer

主动将 Leader 角色转移给指定节点:

// 将 Leader 转移给 node2
err := server.TransferLeadership("node2")
if err != nil {
    log.Printf("Transfer failed: %v", err)
}

自定义状态机

如果需要自定义状态机,可以直接使用底层 Raft:

// 创建 apply channel
applyCh := make(chan raft.ApplyMsg, 100)

// 创建 transport
transport := raft.NewTCPTransport(config.ListenAddr, 10, config.Logger)

// 创建 Raft 实例
r, err := raft.NewRaft(config, transport, applyCh)
if err != nil {
    log.Fatal(err)
}

// 启动
r.Start()

// 处理已提交的日志
go func() {
    for msg := range applyCh {
        if msg.CommandValid {
            // 应用命令到你的状态机
            myStateMachine.Apply(msg.Command)
        } else if msg.SnapshotValid {
            // 从快照恢复状态机
            myStateMachine.Restore(msg.Snapshot)
        }
    }
}()

// 提交命令
index, term, err := r.Propose([]byte("my-command"))

// 带转发的提交(如果不是 Leader 会自动转发)
index, term, err := r.ProposeWithForward([]byte("my-command"))

错误处理

import "errors"

// 常见错误
var (
    raft.ErrNoLeader       // 没有 Leader
    raft.ErrNotLeader      // 当前节点不是 Leader (内部使用,会自动重试/转发)
    raft.ErrConfigInFlight // 配置变更正在进行中
    raft.ErrTimeout        // 操作超时
    raft.ErrShutdown       // 节点正在关闭
    raft.ErrLeadershipLost // 操作过程中丢失 Leader 身份
)

// 错误处理示例
err := server.Set("key", "value")
if err != nil {
    // 客户端通常不需要手动处理 ErrNotLeader,因为 KVServer 会自动路由
    log.Printf("Set failed: %v", err)
}

客户端使用

// 创建客户端 Transport
client := raft.NewTCPTransport("127.0.0.1:9000", 10, logger)

// 提交命令
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

cmd := raft.KVCommand{Type: raft.KVSet, Key: "foo", Value: "bar"}
data, _ := json.Marshal(cmd)
args := &raft.ProposeArgs{Command: data}

reply, err := client.ForwardPropose(ctx, "127.0.0.1:9001", args)
if err != nil {
    log.Printf("Propose failed: %v", err)
}

// ReadIndex
args := &raft.ReadIndexArgs{}
reply, err := client.ReadIndex(ctx, leaderAddr, args)
if reply.Success {
    fmt.Printf("ReadIndex: %d\n", reply.ReadIndex)
}

// 远程读取
args := &raft.GetArgs{Key: "foo"}
reply, err := client.ForwardGet(ctx, nodeAddr, args)
if reply.Found {
    fmt.Printf("Value: %s\n", reply.Value)
}

// 添加节点
args := &raft.AddNodeArgs{NodeID: "node4", Address: "127.0.0.1:9004"}
reply, err := client.ForwardAddNode(ctx, leaderAddr, args)

// 移除节点
args := &raft.RemoveNodeArgs{NodeID: "node4"}
reply, err := client.ForwardRemoveNode(ctx, leaderAddr, args)

高级特性:变更监听 (Watcher)

Raft 模块提供了强大的变更监听功能,允许应用模块(如 AI 配置、安全模块)订阅特定的配置变更。

特性

  1. 实时推送:配置变更提交后,立即触发回调。
  2. 多级过滤:支持精确匹配和前缀通配符匹配(key.**)。
  3. 安全隔离:不同模块的 Watcher 相互独立,互不影响。
  4. 无重放:系统启动时的快照恢复不会触发 Watcher,避免初始化风暴。

最佳实践:模块化监听 (Handler Interface)

在生产环境中,建议定义通用的接口来解耦配置逻辑与业务模块。

1. 定义通用接口

// ConfigObserver 定义配置变更处理接口
type ConfigObserver interface {
    // OnConfigChange 当配置发生变化时被调用
    OnConfigChange(key, value string)
}

2. 模块实现 (AI 示例)

type AIModule struct {
    threshold float64
    modelPath string
}

// OnConfigChange 实现 ConfigObserver 接口
func (m *AIModule) OnConfigChange(key, value string) {
    switch key {
    case "ai.model.threshold":
        if val, err := strconv.ParseFloat(value, 64); err == nil {
            m.threshold = val
            log.Printf("[AI] Threshold updated to %.2f", val)
        }
    case "ai.model.path":
        m.modelPath = value
        log.Printf("[AI] Reloading model from %s...", value)
        // m.reloadModel()
    }
}

3. 模块实现 (安全模块示例)

type SecurityModule struct {
    firewall *FirewallClient
}

func (s *SecurityModule) OnConfigChange(key, value string) {
    // key: security.blocklist.192.168.1.1
    ip := strings.TrimPrefix(key, "security.blocklist.")
    
    if value == "true" {
        log.Printf("[Security] Blocking malicious IP: %s", ip)
        s.firewall.Block(ip)
    } else {
        log.Printf("[Security] Unblocking IP: %s", ip)
        s.firewall.Unblock(ip)
    }
}

4. 注册监听

func main() {
    // ... 初始化集群 ...

    // 初始化模块
    aiMod := &AIModule{threshold: 0.5}
    secMod := &SecurityModule{firewall: NewFirewall()}

    // 注册 AI 模块监听 (通过方法值传递)
    xbase.ClusterWatch("ai.**", aiMod.OnConfigChange)

    // 注册安全模块监听
    xbase.ClusterWatch("security.blocklist.**", secMod.OnConfigChange)
    
    // ...
}

测试

运行单元测试

go test -v ./xnet/raft/...

测试覆盖

测试 验证内容
TestCompactionBasic 基本日志压缩和索引更新
TestSnapshotSaveAndLoad 快照持久化和恢复
TestCompactionWithKVServer 通过 KVServer 触发压缩
TestDataConsistencyAfterCompaction 压缩后数据一致性
TestCompactionDoesNotLoseData 压缩不丢失已提交数据
TestDynamicCompactionThreshold 动态压缩阈值机制

文件结构

xnet/raft/
├── raft.go           # Raft 核心实现
├── types.go          # 类型定义和配置
├── log.go            # 日志管理
├── storage.go        # 持久化存储
├── rpc.go            # RPC 和 Transport
├── server.go         # KVServer 封装
├── kv.go             # KV 状态机
├── codec.go          # 编解码
├── compaction_test.go # 压缩相关测试
└── README.md         # 本文档

示例

完整示例参见 doc/xnet/raft/ 目录:

# 清理旧数据(每个节点的数据目录在其 main.go 所在文件夹下)
rm -rf doc/xnet/raft/node1/data doc/xnet/raft/node2/data doc/xnet/raft/node3/data

# 启动三节点集群(三个终端)
go run ./doc/xnet/raft/node1/main.go  # 终端 1
go run ./doc/xnet/raft/node2/main.go  # 终端 2
go run ./doc/xnet/raft/node3/main.go  # 终端 3

# 运行压力测试和一致性验证
go run ./doc/xnet/raft/stress_test/main.go

压力测试输出示例

=== Raft Cluster Test Suite ===

=== Phase 1: Cluster Formation ===
Adding Node 2...
Adding Node 3...

=== Phase 2: Stress Test ===
Duration:     247ms
Total ops:    10000
QPS:          40391.25
Success rate: 100.00%

=== Phase 3: Data Consistency Verification ===
Verifying 100 sampled keys across all nodes...
✅ All checked keys are CONSISTENT across all nodes!

=== Phase 4: ReadIndex Test ===
ReadIndex succeeded: readIndex=19884

=== Phase 5: Write-then-Read Verification ===
  127.0.0.1:9001: ✓ Value matches
  127.0.0.1:9002: ✓ Value matches
  127.0.0.1:9003: ✓ Value matches
✅ Write-then-read verification PASSED!

性能

基于 stress_test 的测试结果(3 节点本地集群):

  • QPS: ~40,000 ops/sec
  • 成功率: 100%
  • 延迟: < 1ms (本地)

实际性能取决于网络延迟、磁盘 I/O 和集群规模。

注意事项

  1. 数据目录每个节点必须使用不同的 DataDir,否则会数据冲突!
  2. 端口冲突:确保各节点监听地址不冲突
  3. 时钟同步:建议集群节点时钟同步(虽然 Raft 不依赖时钟)
  4. 奇数节点:推荐使用奇数个节点(3、5、7)以优化容错
  5. 清理数据:测试前建议清理旧的数据目录

核心安全保证

特性 说明
选举限制 日志不完整的节点无法成为 Leader
PreVote 防止分区节点扰乱集群
No-op 提交 新 Leader 立即提交前任 term 日志
Lease Check Leader 必须保持多数派连接才能接受写请求,防止分区写入
单配置变更 一次只允许一个成员变更
动态压缩阈值 防止压缩风暴
原子快照写入 使用 temp file + rename 保证原子性