|
|
2 주 전 | |
|---|---|---|
| db | 2 주 전 | |
| example | 2 주 전 | |
| .gitignore | 2 주 전 | |
| IMPROVEMENTS.md | 3 주 전 | |
| LICENSE | 3 주 전 | |
| README.md | 2 주 전 | |
| codec.go | 3 주 전 | |
| compaction_test.go | 2 주 전 | |
| go.mod | 2 주 전 | |
| kv.go | 3 주 전 | |
| log.go | 3 주 전 | |
| node1.log | 2 주 전 | |
| raft-cli | 2 주 전 | |
| raft.go | 2 주 전 | |
| resiliency_test.go | 3 주 전 | |
| rpc.go | 3 주 전 | |
| server.go | 2 주 전 | |
| storage.go | 2 주 전 | |
| types.go | 2 주 전 | |
| watcher.go | 3 주 전 |
一个功能完整的 Raft 共识算法实现,支持动态成员变更、日志压缩、线性一致性读等特性。
package main
import (
"log"
"xbase/xnet/raft"
)
func main() {
// 使用默认配置
config := raft.DefaultConfig()
config.NodeID = "node1"
config.ListenAddr = "127.0.0.1:9001"
config.DataDir = "data1" // 每个节点使用不同目录!
config.Logger = raft.NewConsoleLogger("node1", 1)
// 初始集群配置(单节点启动)
config.ClusterNodes = map[string]string{
"node1": "127.0.0.1:9001",
}
// 创建 KV Server(内置状态机)
server, err := raft.NewKVServer(config)
if err != nil {
log.Fatalf("Failed to create server: %v", err)
}
// 启动服务
if err := server.Start(); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
log.Println("Node started on 127.0.0.1:9001")
// 使用 KV 操作
server.Set("key1", "value1")
val, ok := server.Get("key1")
// 线性一致性读(保证读到最新数据)
val, ok, err = server.GetLinear("key1")
}
Node 1 (Bootstrap 节点):
config := raft.DefaultConfig()
config.NodeID = "node1"
config.ListenAddr = "127.0.0.1:9001"
config.DataDir = "data1" // 重要:每个节点不同目录
config.ClusterNodes = map[string]string{
"node1": "127.0.0.1:9001",
}
server, _ := raft.NewKVServer(config)
server.Start()
Node 2 & 3 (加入集群):
// Node 2
config := raft.DefaultConfig()
config.NodeID = "node2"
config.ListenAddr = "127.0.0.1:9002"
config.DataDir = "data2" // 重要:每个节点不同目录
config.ClusterNodes = map[string]string{
"node2": "127.0.0.1:9002",
}
server, _ := raft.NewKVServer(config)
server.Start()
然后通过客户端将节点加入集群:
// 通过 Leader 添加节点
client := raft.NewTCPTransport("127.0.0.1:9000", 10, logger)
args := &raft.AddNodeArgs{NodeID: "node2", Address: "127.0.0.1:9002"}
reply, err := client.ForwardAddNode(ctx, "127.0.0.1:9001", args)
// 方式 1: 直接在 Leader 上操作
err := server.Raft.AddNode("node4", "127.0.0.1:9004")
// 方式 2: 自动转发到 Leader(推荐)
err := server.Raft.AddNodeWithForward("node4", "127.0.0.1:9004")
// 方式 3: 通过 KVServer 封装
err := server.Join("node4", "127.0.0.1:9004")
// 方式 1: 直接在 Leader 上操作
err := server.Raft.RemoveNode("node4")
// 方式 2: 自动转发到 Leader(推荐)
err := server.Raft.RemoveNodeWithForward("node4")
// 方式 3: 通过 KVServer 封装
err := server.Leave("node4")
nodes := server.GetClusterNodes()
// 返回: map[string]string{"node1": "127.0.0.1:9001", "node2": "127.0.0.1:9002", ...}
ErrConfigInFlight日志压缩是自动触发的,但经过深度优化以平衡性能与安全性:
固定步长触发: 摒弃了指数级增长的阈值,改用固定步长。
优势:避免了初期过于频繁的压缩,也防止了后期日志无限膨胀,步长线性可控。
// 默认配置
config.SnapshotThreshold = 100000 // 触发步长:每新增 10万条日志触发一次
config.SnapshotMinRetention = 10000 // 压缩后保留:1万条(用于 follower 快速追赶)
系统通过强制同步机制从根本上解决了“日志被删但数据未入库”的风险:
N 时,会调用应用层的 SnapshotProvider。SnapshotProvider 会阻塞等待,直到数据库(DB)确实已经应用了索引 N(及其之前)的所有日志。Sync(),确保所有内存/OS Cache 中的数据物理写入磁盘。这防止了“日志已被删除但数据未落盘”的风险。这种设计实现了:Raft 全速写入 -> 异步后台线程检测 -> 需要压缩时等待 DB 追赶 -> 强制刷盘 -> 安全执行物理压缩,在性能和数据安全性之间取得了最佳平衡。
快照保存的是状态机的最终状态,不是操作历史:
// 变量 counter 被更新 10 万次
// 快照只保存最终值: {"counter": "100000"}
| 崩溃点 | 恢复后状态 | 是否一致 |
|---|---|---|
| 保存快照前 | 无新快照,日志完整 | ✅ |
| 保存快照后,压缩前 | 有快照,日志完整(冗余但正确) | ✅ |
| 压缩过程中 | 快照存在,可正确恢复 | ✅ |
| 异步等待期间 | 日志未动,DB 正在追赶,无数据丢失 | ✅ |
ReadIndex 是 Raft 论文中描述的线性一致性读优化,无需写入日志即可保证读取最新数据。
// 方式 1: 通过 KVServer
val, ok, err := server.GetLinear("key")
// 方式 2: 直接使用 ReadIndex
readIndex, err := server.Raft.ReadIndex()
if err == nil {
// 等待 lastApplied >= readIndex 后读取本地状态机
val, ok := server.FSM.Get("key")
}
系统采用了LastApplied 预热机制来优化启动速度:
KVServer 启动时,首先从 DB 引擎获取持久化的 LastAppliedIndex。Config.LastAppliedIndex)。commitIndex 和 lastApplied 设置为该值,跳过对已持久化日志的重复 Apply 过程。这意味着即使有数百万条日志,只要它们已经应用到 DB,重启也是毫秒级的。
config := raft.DefaultConfig()
// 必需配置
config.NodeID = "node1" // 节点唯一标识
config.ListenAddr = "127.0.0.1:9001" // 监听地址
config.DataDir = "data1" // 数据目录(每个节点不同!)
// 集群配置
config.ClusterNodes = map[string]string{ // 集群成员映射
"node1": "127.0.0.1:9001",
}
// 选举配置
config.ElectionTimeoutMin = 150 * time.Millisecond // 最小选举超时
config.ElectionTimeoutMax = 300 * time.Millisecond // 最大选举超时
config.HeartbeatInterval = 50 * time.Millisecond // 心跳间隔
// 日志配置
config.MaxLogEntriesPerRequest = 5000 // 单次 AppendEntries 最大条目数
config.MemoryLogCapacity = 10000 // 内存日志容量
// 快照配置
config.LogCompactionEnabled = false // 默认不启用日志压缩 (永远保留完整日志)
config.SnapshotThreshold = 100000 // 压缩触发阈值(仅在 LogCompactionEnabled=true 时生效)
config.SnapshotMinRetention = 10000 // 快照后保留的日志条数
config.SnapshotChunkSize = 1024 * 1024 // 快照分块大小 (1MB)
// RPC 超时配置
config.RPCTimeout = 500 * time.Millisecond // 普通 RPC 超时
config.SnapshotRPCTimeout = 30 * time.Second // 快照传输超时
config.ProposeTimeout = 3 * time.Second // Propose 转发超时
// 批处理配置
config.BatchMinWait = 1 * time.Millisecond // 批处理最小等待
config.BatchMaxWait = 10 * time.Millisecond // 批处理最大等待
config.BatchMaxSize = 100 // 批处理最大大小
// 日志
config.Logger = raft.NewConsoleLogger("node1", 1) // 0=debug, 1=info, 2=warn, 3=error
// 健康状态
health := server.HealthCheck()
fmt.Printf("State: %s, Term: %d, Leader: %s\n", health.State, health.Term, health.LeaderID)
fmt.Printf("Cluster Size: %d, Healthy: %v\n", health.ClusterSize, health.IsHealthy)
fmt.Printf("Commit: %d, Applied: %d, Behind: %d\n", health.CommitIndex, health.LastApplied, health.LogBehind)
// 运行指标
metrics := server.GetMetrics()
fmt.Printf("Proposals: total=%d success=%d failed=%d\n",
metrics.ProposalsTotal, metrics.ProposalsSuccess, metrics.ProposalsFailed)
fmt.Printf("Elections: started=%d won=%d\n", metrics.ElectionsStarted, metrics.ElectionsWon)
fmt.Printf("Snapshots: taken=%d installed=%d\n", metrics.SnapshotsTaken, metrics.SnapshotsInstalled)
| 字段 | 类型 | 说明 |
|---|---|---|
| NodeID | string | 节点 ID |
| State | string | 当前状态 (Leader/Follower/Candidate) |
| Term | uint64 | 当前 Term |
| LeaderID | string | 当前 Leader ID |
| ClusterSize | int | 集群节点数 |
| CommitIndex | uint64 | 已提交的日志索引 |
| LastApplied | uint64 | 已应用的日志索引 |
| LogBehind | uint64 | 落后的日志条数 |
| IsHealthy | bool | 是否健康 |
| Uptime | Duration | 运行时间 |
| 字段 | 说明 |
|---|---|
| ProposalsTotal | 总 Propose 次数 |
| ProposalsSuccess | 成功次数 |
| ProposalsFailed | 失败次数 |
| ProposalsForwarded | 转发次数 |
| AppendsSent | 发送的 AppendEntries |
| AppendsReceived | 接收的 AppendEntries |
| ElectionsStarted | 发起的选举次数 |
| ElectionsWon | 赢得的选举次数 |
| SnapshotsTaken | 生成的快照数 |
| SnapshotsInstalled | 安装的快照数 |
| ReadIndexRequests | ReadIndex 请求数 |
主动将 Leader 角色转移给指定节点:
// 将 Leader 转移给 node2
err := server.TransferLeadership("node2")
if err != nil {
log.Printf("Transfer failed: %v", err)
}
如果需要自定义状态机,可以直接使用底层 Raft:
// 创建 apply channel
applyCh := make(chan raft.ApplyMsg, 100)
// 创建 transport
transport := raft.NewTCPTransport(config.ListenAddr, 10, config.Logger)
// 创建 Raft 实例
r, err := raft.NewRaft(config, transport, applyCh)
if err != nil {
log.Fatal(err)
}
// 启动
r.Start()
// 处理已提交的日志
go func() {
for msg := range applyCh {
if msg.CommandValid {
// 应用命令到你的状态机
myStateMachine.Apply(msg.Command)
} else if msg.SnapshotValid {
// 从快照恢复状态机
myStateMachine.Restore(msg.Snapshot)
}
}
}()
// 提交命令
index, term, err := r.Propose([]byte("my-command"))
// 带转发的提交(如果不是 Leader 会自动转发)
index, term, err := r.ProposeWithForward([]byte("my-command"))
import "errors"
// 常见错误
var (
raft.ErrNoLeader // 没有 Leader
raft.ErrNotLeader // 当前节点不是 Leader (内部使用,会自动重试/转发)
raft.ErrConfigInFlight // 配置变更正在进行中
raft.ErrTimeout // 操作超时
raft.ErrShutdown // 节点正在关闭
raft.ErrLeadershipLost // 操作过程中丢失 Leader 身份
)
// 错误处理示例
err := server.Set("key", "value")
if err != nil {
// 客户端通常不需要手动处理 ErrNotLeader,因为 KVServer 会自动路由
log.Printf("Set failed: %v", err)
}
// 创建客户端 Transport
client := raft.NewTCPTransport("127.0.0.1:9000", 10, logger)
// 提交命令
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cmd := raft.KVCommand{Type: raft.KVSet, Key: "foo", Value: "bar"}
data, _ := json.Marshal(cmd)
args := &raft.ProposeArgs{Command: data}
reply, err := client.ForwardPropose(ctx, "127.0.0.1:9001", args)
if err != nil {
log.Printf("Propose failed: %v", err)
}
// ReadIndex
args := &raft.ReadIndexArgs{}
reply, err := client.ReadIndex(ctx, leaderAddr, args)
if reply.Success {
fmt.Printf("ReadIndex: %d\n", reply.ReadIndex)
}
// 远程读取
args := &raft.GetArgs{Key: "foo"}
reply, err := client.ForwardGet(ctx, nodeAddr, args)
if reply.Found {
fmt.Printf("Value: %s\n", reply.Value)
}
// 添加节点
args := &raft.AddNodeArgs{NodeID: "node4", Address: "127.0.0.1:9004"}
reply, err := client.ForwardAddNode(ctx, leaderAddr, args)
// 移除节点
args := &raft.RemoveNodeArgs{NodeID: "node4"}
reply, err := client.ForwardRemoveNode(ctx, leaderAddr, args)
Raft 模块提供了强大的变更监听功能,允许应用模块(如 AI 配置、安全模块)订阅特定的配置变更。
key.**)。在生产环境中,建议定义通用的接口来解耦配置逻辑与业务模块。
// ConfigObserver 定义配置变更处理接口
type ConfigObserver interface {
// OnConfigChange 当配置发生变化时被调用
OnConfigChange(key, value string)
}
type AIModule struct {
threshold float64
modelPath string
}
// OnConfigChange 实现 ConfigObserver 接口
func (m *AIModule) OnConfigChange(key, value string) {
switch key {
case "ai.model.threshold":
if val, err := strconv.ParseFloat(value, 64); err == nil {
m.threshold = val
log.Printf("[AI] Threshold updated to %.2f", val)
}
case "ai.model.path":
m.modelPath = value
log.Printf("[AI] Reloading model from %s...", value)
// m.reloadModel()
}
}
type SecurityModule struct {
firewall *FirewallClient
}
func (s *SecurityModule) OnConfigChange(key, value string) {
// key: security.blocklist.192.168.1.1
ip := strings.TrimPrefix(key, "security.blocklist.")
if value == "true" {
log.Printf("[Security] Blocking malicious IP: %s", ip)
s.firewall.Block(ip)
} else {
log.Printf("[Security] Unblocking IP: %s", ip)
s.firewall.Unblock(ip)
}
}
func main() {
// ... 初始化集群 ...
// 初始化模块
aiMod := &AIModule{threshold: 0.5}
secMod := &SecurityModule{firewall: NewFirewall()}
// 注册 AI 模块监听 (通过方法值传递)
xbase.ClusterWatch("ai.**", aiMod.OnConfigChange)
// 注册安全模块监听
xbase.ClusterWatch("security.blocklist.**", secMod.OnConfigChange)
// ...
}
go test -v ./xnet/raft/...
| 测试 | 验证内容 |
|---|---|
TestCompactionBasic |
基本日志压缩和索引更新 |
TestSnapshotSaveAndLoad |
快照持久化和恢复 |
TestCompactionWithKVServer |
通过 KVServer 触发压缩 |
TestDataConsistencyAfterCompaction |
压缩后数据一致性 |
TestCompactionDoesNotLoseData |
压缩不丢失已提交数据 |
TestDynamicCompactionThreshold |
动态压缩阈值机制 |
xnet/raft/
├── raft.go # Raft 核心实现
├── types.go # 类型定义和配置
├── log.go # 日志管理
├── storage.go # 持久化存储
├── rpc.go # RPC 和 Transport
├── server.go # KVServer 封装
├── kv.go # KV 状态机
├── codec.go # 编解码
├── compaction_test.go # 压缩相关测试
└── README.md # 本文档
完整示例参见 doc/xnet/raft/ 目录:
# 清理旧数据(每个节点的数据目录在其 main.go 所在文件夹下)
rm -rf doc/xnet/raft/node1/data doc/xnet/raft/node2/data doc/xnet/raft/node3/data
# 启动三节点集群(三个终端)
go run ./doc/xnet/raft/node1/main.go # 终端 1
go run ./doc/xnet/raft/node2/main.go # 终端 2
go run ./doc/xnet/raft/node3/main.go # 终端 3
# 运行压力测试和一致性验证
go run ./doc/xnet/raft/stress_test/main.go
=== Raft Cluster Test Suite ===
=== Phase 1: Cluster Formation ===
Adding Node 2...
Adding Node 3...
=== Phase 2: Stress Test ===
Duration: 247ms
Total ops: 10000
QPS: 40391.25
Success rate: 100.00%
=== Phase 3: Data Consistency Verification ===
Verifying 100 sampled keys across all nodes...
✅ All checked keys are CONSISTENT across all nodes!
=== Phase 4: ReadIndex Test ===
ReadIndex succeeded: readIndex=19884
=== Phase 5: Write-then-Read Verification ===
127.0.0.1:9001: ✓ Value matches
127.0.0.1:9002: ✓ Value matches
127.0.0.1:9003: ✓ Value matches
✅ Write-then-read verification PASSED!
基于 stress_test 的测试结果(3 节点本地集群):
实际性能取决于网络延迟、磁盘 I/O 和集群规模。
DataDir,否则会数据冲突!| 特性 | 说明 |
|---|---|
| 选举限制 | 日志不完整的节点无法成为 Leader |
| PreVote | 防止分区节点扰乱集群 |
| No-op 提交 | 新 Leader 立即提交前任 term 日志 |
| Lease Check | Leader 必须保持多数派连接才能接受写请求,防止分区写入 |
| 单配置变更 | 一次只允许一个成员变更 |
| 动态压缩阈值 | 防止压缩风暴 |
| 原子快照写入 | 使用 temp file + rename 保证原子性 |