|
|
2 minggu lalu | |
|---|---|---|
| db | 2 minggu lalu | |
| example | 2 minggu lalu | |
| .gitignore | 2 minggu lalu | |
| IMPROVEMENTS.md | 3 minggu lalu | |
| LICENSE | 3 minggu lalu | |
| README.md | 2 minggu lalu | |
| codec.go | 3 minggu lalu | |
| compaction_test.go | 2 minggu lalu | |
| go.mod | 2 minggu lalu | |
| kv.go | 3 minggu lalu | |
| log.go | 3 minggu lalu | |
| node1.log | 2 minggu lalu | |
| raft-cli | 2 minggu lalu | |
| raft.go | 2 minggu lalu | |
| resiliency_test.go | 3 minggu lalu | |
| rpc.go | 2 minggu lalu | |
| server.go | 2 minggu lalu | |
| storage.go | 2 minggu lalu | |
| types.go | 2 minggu lalu | |
| watcher.go | 3 minggu lalu |
这是一个使用 Go 语言编写的功能完备的 Raft 共识算法实现。它支持动态成员变更、日志压缩(快照)、线性一致性读等高级特性,旨在为分布式系统提供强一致性的状态机复制能力。
package main
import (
"log"
"xbase/xnet/raft"
)
func main() {
// 1. 配置初始化
config := raft.DefaultConfig()
config.NodeID = "node1"
config.ListenAddr = "127.0.0.1:9001"
config.DataDir = "data1" // 注意:每个节点必须使用独立的数据目录
config.Logger = raft.NewConsoleLogger("node1", 1) // 1=Info级别
// 初始集群配置 (Bootstrap)
// 仅在集群首次启动时需要,用于定义初始成员
config.ClusterNodes = map[string]string{
"node1": "127.0.0.1:9001",
}
// 2. 创建 KV Server (包含 Raft 核心与内置状态机 DB)
server, err := raft.NewKVServer(config)
if err != nil {
log.Fatalf("Failed to create server: %v", err)
}
// 3. 启动服务
if err := server.Start(); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
defer server.Stop()
log.Println("Node started on 127.0.0.1:9001")
// 4. 执行 KV 操作
// 写操作 (如果当前不是 Leader,会自动转发)
if err := server.Set("key1", "value1"); err != nil {
log.Printf("Set failed: %v", err)
}
// 本地读取 (高性能,但存在极低概率读到旧数据)
if val, found := server.Get("key1"); found {
log.Printf("Got value: %s", val)
}
// 线性一致性读 (强一致性,保证读到最新提交的数据)
val, found, err := server.GetLinear("key1")
if err != nil {
log.Printf("Read failed: %v", err)
} else if found {
log.Printf("Consistent value: %s", val)
}
}
Node 1 (引导节点 / Bootstrap Node):
Node 1 作为集群的初始节点启动。
config := raft.DefaultConfig()
config.NodeID = "node1"
config.ListenAddr = "127.0.0.1:9001"
config.DataDir = "data1"
// 初始包含自己
config.ClusterNodes = map[string]string{
"node1": "127.0.0.1:9001",
}
// ... 启动 server ...
Node 2 & 3 (加入节点):
启动 Node 2 和 Node 3。注意:在启动时,它们的 ClusterNodes 可以留空或仅包含自己,因为它们稍后会被动态加入到集群中。
// Node 2 配置
config.NodeID = "node2"
config.ListenAddr = "127.0.0.1:9002"
config.DataDir = "data2"
// ... 启动 server ...
执行加入操作:
通过已存在的集群成员(例如 Node 1)发起 API 调用,将新节点加入集群。
// 连接到 Node 1 (或直接在 Node 1 的代码中调用)
// 将 Node 2 加入集群
err := server1.Join("node2", "127.0.0.1:9002")
if err != nil {
log.Printf("Failed to join node2: %v", err)
}
// 将 Node 3 加入集群
err := server1.Join("node3", "127.0.0.1:9003")
系统允许在不停机的情况下增删节点。为了保证安全性(Safety),系统一次只允许变更一个节点(Single-Server Membership Change)。
// 添加节点 (自动转发请求到 Leader)
err := server.Join("node4", "127.0.0.1:9004")
// 移除节点 (自动转发请求到 Leader)
err := server.Leave("node4")
为了防止日志无限增长,系统实现了自动快照机制。
config.SnapshotThreshold 时触发压缩。Sync() 强制将 DB 数据刷入磁盘,防止“日志被删但数据还在 OS 缓存中”导致的数据丢失风险。配置示例:
config.LogCompactionEnabled = true
config.SnapshotThreshold = 100000 // 每新增 10万条日志触发一次压缩
config.SnapshotChunkSize = 1024 * 1024 // 快照分块大小为 1MB
ReadIndex 是 Raft 论文中描述的一种读优化机制,它使得读操作无需产生 Raft Log(即无需磁盘 IO),就能保证线性一致性(Linearizability)。
val, found, err := server.GetLinear("key")
工作流程:
CommitIndex 作为 ReadIndex。ReadIndex。你可以主动将 Leader 角色转移给某个 Follower,这在需要重启当前 Leader 节点进行升级或维护时非常有用,可以最小化集群的不可用时间。
// 将 Leader 转移给 node2
err := server.TransferLeadership("node2")
机制:当前 Leader 会发送 TimeoutNow RPC 给目标节点,使其立即发起选举(并无视选举超时时间),从而大概率赢得选举。
通过 HealthCheck() 接口可以实时获取节点状态。
health := server.HealthCheck()
fmt.Printf("节点: %s, 状态: %s, Leader: %s, 健康: %v\n",
health.NodeID, health.State, health.LeaderID, health.IsHealthy)
系统内置了详细的指标统计,可用于接入 Prometheus 等监控系统。
metrics := server.GetMetrics()
fmt.Printf("当前任期: %d\n", metrics.Term)
fmt.Printf("总提案数: %d\n", metrics.ProposalsTotal)
fmt.Printf("生成的快照数: %d\n", metrics.SnapshotsTaken)
xnet/raft/
├── raft.go # Raft 核心逻辑 (选举、日志复制、心跳)
├── types.go # 类型定义与配置结构体
├── log.go # 日志管理 (LogManager)
├── storage.go # 存储接口与实现 (HybridStorage)
├── rpc.go # RPC 消息定义与网络传输层
├── server.go # KVServer 封装 (Raft 与 DB 的胶水层)
├── db/ # 内置高性能 KV 存储引擎
│ ├── engine.go # 引擎核心实现
│ └── db.md # 引擎详细文档
└── README.md # 项目说明文档
LastAppliedIndex 机制,重启时自动跳过已持久化的日志回放,实现秒级启动。