|
|
hace 1 semana | |
|---|---|---|
| db | hace 2 semanas | |
| example | hace 1 semana | |
| .gitignore | hace 2 semanas | |
| README.md | hace 1 semana | |
| auth.go | hace 1 semana | |
| bench | hace 1 semana | |
| cli.go | hace 1 semana | |
| codec.go | hace 3 semanas | |
| go.mod | hace 2 semanas | |
| kv.go | hace 3 semanas | |
| log.go | hace 3 semanas | |
| raft.go | hace 1 semana | |
| raft_ext.go | hace 2 semanas | |
| raft_node | hace 1 semana | |
| rpc.go | hace 2 semanas | |
| server.go | hace 1 semana | |
| storage.go | hace 2 semanas | |
| tcp_server.go | hace 1 semana | |
| types.go | hace 2 semanas | |
| watcher.go | hace 2 semanas |
这是一个使用 Go 语言编写的功能完备的 Raft 共识算法实现。它支持动态成员变更、日志压缩(快照)、线性一致性读、RBAC 权限控制等高级特性,旨在为分布式系统提供强一致性与安全的状态机复制能力。
user.asin 自动覆盖 user.asin.china)。LOGIN, AUTH, GET, SET 等命令,方便集成到各类客户端。package main
import (
"log"
"xbase/xnet/raft"
)
func main() {
// 1. 配置初始化
config := raft.DefaultConfig()
config.NodeID = "node1"
config.ListenAddr = "127.0.0.1:9001"
config.DataDir = "data1" // 注意:每个节点必须使用独立的数据目录
config.Logger = raft.NewConsoleLogger("node1", 1) // 1=Info级别
// 初始集群配置 (Bootstrap)
// 仅在集群首次启动时需要,用于定义初始成员
config.ClusterNodes = map[string]string{
"node1": "127.0.0.1:9001",
}
// 2. 创建 KV Server (包含 Raft 核心与内置状态机 DB)
server, err := raft.NewKVServer(config)
if err != nil {
log.Fatalf("Failed to create server: %v", err)
}
// 3. 启动服务
if err := server.Start(); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
defer server.Stop()
log.Println("Node started on 127.0.0.1:9001")
// 4. 执行 KV 操作
// 写操作 (如果当前不是 Leader,会自动转发)
if err := server.Set("key1", "value1"); err != nil {
log.Printf("Set failed: %v", err)
}
// 本地读取 (高性能,但存在极低概率读到旧数据)
if val, found := server.Get("key1"); found {
log.Printf("Got value: %s", val)
}
// 线性一致性读 (强一致性,保证读到最新提交的数据)
val, found, err := server.GetLinear("key1")
if err != nil {
log.Printf("Read failed: %v", err)
} else if found {
log.Printf("Consistent value: %s", val)
}
}
Node 1 (引导节点 / Bootstrap Node):
Node 1 作为集群的初始节点启动。
config := raft.DefaultConfig()
config.NodeID = "node1"
config.ListenAddr = "127.0.0.1:9001"
config.DataDir = "data1"
// 初始包含自己
config.ClusterNodes = map[string]string{
"node1": "127.0.0.1:9001",
}
// ... 启动 server ...
Node 2 & 3 (加入节点):
启动 Node 2 和 Node 3。注意:在启动时,它们的 ClusterNodes 可以留空或仅包含自己,因为它们稍后会被动态加入到集群中。
// Node 2 配置
config.NodeID = "node2"
config.ListenAddr = "127.0.0.1:9002"
config.DataDir = "data2"
// ... 启动 server ...
执行加入操作:
通过已存在的集群成员(例如 Node 1)发起 API 调用,将新节点加入集群。
// 连接到 Node 1 (或直接在 Node 1 的代码中调用)
// 将 Node 2 加入集群
err := server1.Join("node2", "127.0.0.1:9002")
if err != nil {
log.Printf("Failed to join node2: %v", err)
}
// 将 Node 3 加入集群
err := server1.Join("node3", "127.0.0.1:9003")
系统内置了一个强大的交互式命令行工具,默认随节点启动。它提供了直观的界面来管理数据、监控集群状态以及执行运维操作。
基础命令:
set <key> <value> / get <key> / del <key>:标准的 KV 操作(支持权限校验)。search <pattern> [limit] [offset]:搜索 Key(严格过滤权限,Superuser 可直接下推 DB)。count <pattern>:统计符合模式的 Key 数量(仅统计有权访问的 Key)。auth-init <root_pass>:初始化权限系统(Root Only)。login <user> <pass> [code]:登录系统(全局会话)。logout:登出当前会话(全局注销)。whoami:查看当前登录用户及 Token 信息。join <id> <addr>:将新节点加入集群。leave <id>:将节点移出集群。info:实时展示节点健康状态、Raft 索引(Commit/Applied)、存储占用(DB/Log)、内存使用及集群成员连接状况。系统内置了完整的层级 RBAC 权限系统,支持细粒度的 Key 访问控制和数值约束。
系统权限模型设计兼顾了灵活性与性能,采用 Raft 全局一致性存储 与 本地高性能缓存 相结合的架构。
所有权限相关的数据(Session 除外)均作为特殊的 Key-Value 存储在 Raft Log 中,保证全集群强一致性。
system.user.<username>
system.role.<rolename>
system.lock.<username>
每条权限规则包含三个核心要素:
*: 匹配所有 Key。prefix*: 前缀匹配(如 user.* 匹配 user.alice, user.bob)。exact: 精确匹配。read: 读操作 (Get, Search, Count)。write: 写操作 (Set, Del)。admin: 管理操作。*: 所有操作。write 操作的细粒度控制
系统在处理请求时遵循 Deny-Allow 的判定顺序:
DenyPermissions,命中即拒绝。EffectivePermissions 列表,该列表已包含用户自身及所有继承角色的权限。性能优化 (High Performance):
EffectivePermissions,将复杂的 RBAC 树扁平化。鉴权时仅需简单的列表遍历,无需递归查询。* 权限的用户(如 root),系统会直接下推查询到 DB 引擎。系统现在提供了一个基于 TCP 的持久连接 API,并配套了一个 Web 管理端示例。
TCP API 协议(默认端口:Raft端口 + 10,如 9011):
> LOGIN user pass [otp]
< OK <token>
> GET key
< OK value
> SET key value
< OK
运行 Web 管理端:
cd example/web_admin
go run main.go
# 访问 http://localhost:8088
我们在 example/role 目录下提供了一个完整的演示案例,模拟了“销售经理 vs 普通销售”的折扣权限控制场景。
步骤 1: 编译并启动 Node 1
cd example/role
# 编译
go build -o role_node1 node1/main.go node1/demo_cli.go
go build -o role_node2 node2/main.go node2/demo_cli.go
# 启动 Node 1
./role_node1
步骤 2: 初始化权限与数据 (在 Node 1 CLI 中)
> auth-init rootpass # 初始化系统,创建 root 用户
> login root rootpass # 登录
> demo-init # 初始化演示数据 (创建角色 alice/bob)
此时已创建:
alice (经理): 允许设置折扣 0.5 ~ 0.9bob (员工): 允许设置折扣 0.8 ~ 0.95步骤 3: 测试权限 (在 Node 1 CLI 中)
> login alice pass123
> set product.discount 0.6 # 成功 (0.6 在 0.5-0.9 之间)
> set product.discount 0.4 # 失败 (低于 min 0.5)
> login bob pass123
> set product.discount 0.85 # 成功
> set product.discount 0.6 # 失败 (低于 bob 的 min 0.8)
步骤 4: 集群同步测试 保持 Node 1 运行,开启新终端启动 Node 2:
./role_node2
在 Node 1 中加入 Node 2:
> login root rootpass
> join node2 127.0.0.1:9002
此时在 Node 2 上可以直接登录 bob,验证权限数据已自动同步:
(在 Node 2 CLI)
> login bob pass123
> get product.discount # 成功读取
raft.Config 提供了丰富的配置项,以适应不同的网络环境和性能需求:
type Config struct {
// 超时设置
RPCTimeout time.Duration // RPC 请求超时 (默认 500ms)
SnapshotRPCTimeout time.Duration // 快照传输超时 (默认 30s)
ProposeTimeout time.Duration // 客户端提案超时 (默认 3s)
// 批处理优化
BatchMinWait time.Duration // 自适应批处理最小等待 (默认 1ms)
BatchMaxWait time.Duration // 自适应批处理最大等待 (默认 10ms)
BatchMaxSize int // 批处理最大数量 (默认 100)
// ... 其他基础配置 (NodeID, ListenAddr 等)
}
为了便于程序处理,系统定义了特定的错误类型:
ErrNotLeader: 当前节点不是 Leader。ErrConfigInFlight: 正在进行成员变更。ErrTimeout: 操作超时。ErrShutdown: Raft 正在关闭。ErrPersistFailed: 持久化失败 (严重错误)。系统允许在不停机的情况下增删节点。为了保证安全性(Safety),系统一次只允许变更一个节点(Single-Server Membership Change)。
// 添加节点 (自动转发请求到 Leader)
err := server.Join("node4", "127.0.0.1:9004")
// 移除节点 (自动转发请求到 Leader)
err := server.Leave("node4")
为了防止日志无限增长,系统实现了自动快照机制。
config.SnapshotThreshold 时触发压缩。Sync() 强制将 DB 数据刷入磁盘,防止“日志被删但数据还在 OS 缓存中”导致的数据丢失风险。配置示例:
config.LogCompactionEnabled = true
config.SnapshotThreshold = 100000 // 每新增 10万条日志触发一次压缩
config.SnapshotMinRetention = 10000 // 压缩后保留最近 1万条日志 (方便 Follower 追赶)
config.SnapshotChunkSize = 1024 * 1024 // 快照分块大小为 1MB
ReadIndex 是 Raft 论文中描述的一种读优化机制,它使得读操作无需产生 Raft Log(即无需磁盘 IO),就能保证线性一致性(Linearizability)。
val, found, err := server.GetLinear("key")
工作流程:
CommitIndex 作为 ReadIndex。ReadIndex。系统支持替换默认的 JSON 编解码器,以提高序列化效率(如使用 msgpack)。
import "github.com/vmihailenco/msgpack/v5"
type MsgpackCodec struct{}
func (c *MsgpackCodec) Marshal(v interface{}) ([]byte, error) {
return msgpack.Marshal(v)
}
func (c *MsgpackCodec) Unmarshal(data []byte, v interface{}) error {
return msgpack.Unmarshal(data, v)
}
func (c *MsgpackCodec) Name() string { return "msgpack" }
// 在创建 Raft 之前设置
raft.SetCodec(&MsgpackCodec{})
你可以主动将 Leader 角色转移给某个 Follower,这在需要重启当前 Leader 节点进行升级或维护时非常有用,可以最小化集群的不可用时间。
// 将 Leader 转移给 node2
err := server.TransferLeadership("node2")
机制:当前 Leader 会发送 TimeoutNow RPC 给目标节点,使其立即发起选举(并无视选举超时时间),从而大概率赢得选举。
系统提供了一套高性能的 WebHook 机制,允许外部系统通过 HTTP 接口订阅 Key 的变更事件。
启用 HTTP 服务:
// 在启动时配置 HTTP 监听地址
config.HTTPAddr = ":8080"
server, _ := raft.NewKVServer(config)
订阅变更:
通过 POST 请求订阅指定 Key 的变更:
curl -X POST http://localhost:8080/watch \
-d '{"key": "my-key", "url": "http://your-service/callback"}'
回调格式:
当 Key 发生变化时,系统会向注册的 URL 发送 JSON 格式的 POST 请求:
{
"key": "my-key",
"value": "new-value",
"event_type": 0, // 0=Set, 1=Del
"timestamp": 1630000000000
}
通过 HealthCheck() 接口可以实时获取节点状态。
health := server.HealthCheck()
fmt.Printf("节点: %s, 状态: %s, Leader: %s, 健康: %v\n",
health.NodeID, health.State, health.LeaderID, health.IsHealthy)
系统内置了详细的指标统计,可用于接入 Prometheus 等监控系统。
metrics := server.GetMetrics()
fmt.Printf("当前任期: %d\n", metrics.Term)
fmt.Printf("总提案数: %d\n", metrics.ProposalsTotal)
fmt.Printf("生成的快照数: %d\n", metrics.SnapshotsTaken)
xnet/raft/
├── raft.go # Raft 核心逻辑 (选举、日志复制、心跳)
├── auth.go # RBAC 权限控制逻辑
├── types.go # 类型定义与配置结构体
├── log.go # 日志管理 (LogManager)
├── storage.go # 存储接口与实现 (HybridStorage)
├── rpc.go # RPC 消息定义与网络传输层
├── codec.go # 编解码器接口
├── server.go # KVServer 封装 (Raft 与 DB 的胶水层)
├── cli.go # 命令行交互工具
├── db/ # 内置高性能 KV 存储引擎
│ ├── engine.go # 引擎核心实现
│ └── db.md # 引擎详细文档
└── README.md # 项目说明文档
LastAppliedIndex 机制,重启时自动跳过已持久化的日志回放,实现秒级启动。