Bladeren bron

整理清理

robert 1 week geleden
bovenliggende
commit
eb5614545c
7 gewijzigde bestanden met toevoegingen van 69 en 871 verwijderingen
  1. 0 268
      IMPROVEMENTS.md
  2. 0 72
      LICENSE
  3. 69 17
      README.md
  4. 0 381
      compaction_test.go
  5. 0 0
      example/database/benchmark/benchmark.go
  6. 0 0
      example/database/generator/generator.go
  7. 0 133
      resiliency_test.go

+ 0 - 268
IMPROVEMENTS.md

@@ -1,268 +0,0 @@
-# Raft 实现改进日志
-
-本文档记录了对 Raft 实现的所有改进。
-
-## 🔴 P0: 安全性改进
-
-### 1. 持久化错误处理 (raft.go)
-
-**问题**: `persistState` 的错误仅被日志记录而非返回,这违反了 Raft 的安全性要求。
-
-**修复**:
-- `persistState()` 现在返回 `error`
-- 添加 `mustPersistState()` 用于关键路径,失败时 panic
-- 所有调用点现在正确处理持久化错误
-- 如果持久化失败,不会授予投票或继续选举
-
-### 2. 成员变更安全性 (raft.go)
-
-**改进**:
-- 添加详细文档说明单节点变更策略的安全性原理
-- 添加更多验证检查:
-  - 不能在 leadership transfer 期间进行成员变更
-  - 验证 nodeID 和 address 非空
-  - 检查地址是否已被其他节点使用
-  - 不能将集群缩减到 0 节点
-- 使用自定义错误类型 (`ErrNotLeader`, `ErrConfigInFlight`) 便于程序处理
-
-## 🟡 P1: 性能与功能改进
-
-### 3. ReadIndex 线性一致性读 (raft.go, types.go, rpc.go)
-
-**新功能**: 支持强一致性读操作
-
-```go
-// 使用方式
-index, err := raft.ReadIndex()
-if err != nil { /* handle */ }
-// 读取已应用到 index 的状态
-```
-
-**实现**:
-- `ReadIndex()` 方法确认 leadership 后返回安全的读取点
-- `readIndexLoop` goroutine 处理读请求
-- `confirmLeadership()` 通过心跳确认领导权
-- KVServer 添加 `GetLinear()` 方法
-
-### 4. 可插拔编解码器 (codec.go)
-
-**新功能**: 支持替换 JSON 为更高效的序列化格式
-
-```go
-// 使用 msgpack (需要添加依赖)
-raft.SetCodec(&MsgpackCodec{})
-```
-
-**实现**:
-- `Codec` 接口定义 `Marshal/Unmarshal`
-- `DefaultCodec` 默认使用 JSON
-- RPC 层使用 `DefaultCodec` 而非硬编码 JSON
-
-### 5. 事件驱动 Apply Loop (raft.go)
-
-**问题**: 原实现每 10ms 轮询,即使没有新提交也浪费 CPU。
-
-**修复**:
-- 改为事件驱动模式,由 `commitCh` 触发
-- 保留 50ms fallback ticker 防止遗漏
-- 减少 CPU 空转
-
-### 6. 自定义错误类型 (types.go)
-
-**新增错误类型**:
-```go
-var (
-    ErrNoLeader       = errors.New("no leader available")
-    ErrNotLeader      = errors.New("not leader")
-    ErrConfigInFlight = errors.New("configuration change in progress")
-    ErrTimeout        = errors.New("operation timed out")
-    ErrShutdown       = errors.New("raft is shutting down")
-    ErrPersistFailed  = errors.New("failed to persist state")
-    ErrLeadershipLost = errors.New("leadership lost")
-)
-```
-
-**RaftError 包装器**:
-```go
-type RaftError struct {
-    Err      error
-    LeaderID string        // 已知的 leader
-    RetryIn  time.Duration // 建议的重试间隔
-}
-```
-
-### 7. 配置化超时 (types.go)
-
-**新配置项**:
-```go
-type Config struct {
-    // RPC 超时
-    RPCTimeout         time.Duration // 默认 500ms
-    SnapshotRPCTimeout time.Duration // 默认 30s
-    ProposeTimeout     time.Duration // 默认 3s
-    
-    // 重试配置
-    MaxRetries   int           // 默认 3
-    RetryBackoff time.Duration // 默认 100ms
-    
-    // 批处理配置
-    BatchMinWait time.Duration // 默认 1ms
-    BatchMaxWait time.Duration // 默认 10ms
-    BatchMaxSize int           // 默认 100
-    
-    // 快照配置
-    SnapshotChunkSize int // 默认 1MB
-}
-```
-
-## 🟢 P2: 可观测性与运维改进
-
-### 8. 自适应批处理 (raft.go)
-
-**问题**: 原实现固定 10ms 延迟,低负载时增加不必要延迟。
-
-**修复**:
-- 实现自适应批处理算法
-- 第一个请求后等待 `BatchMinWait` (1ms)
-- 最多等待 `BatchMaxWait` (10ms)
-- 达到 `BatchMaxSize` (100) 立即刷新
-
-### 9. Metrics 和健康检查 (types.go, raft.go)
-
-**Metrics 结构**:
-```go
-type Metrics struct {
-    Term               uint64
-    ProposalsTotal     uint64
-    ProposalsSuccess   uint64
-    ElectionsStarted   uint64
-    ElectionsWon       uint64
-    SnapshotsTaken     uint64
-    ReadIndexRequests  uint64
-    // ... 等等
-}
-```
-
-**HealthStatus 结构**:
-```go
-type HealthStatus struct {
-    NodeID        string
-    State         string
-    Term          uint64
-    LeaderID      string
-    ClusterSize   int
-    LogBehind     uint64
-    LastHeartbeat time.Time
-    IsHealthy     bool
-    Uptime        time.Duration
-}
-```
-
-**使用**:
-```go
-health := raft.HealthCheck()
-metrics := raft.GetMetrics()
-```
-
-### 10. Leadership Transfer (raft.go, rpc.go)
-
-**新功能**: 主动转移领导权
-
-```go
-err := raft.TransferLeadership("node2")
-```
-
-**实现**:
-- 同步目标节点到最新日志
-- 发送 `TimeoutNow` RPC 触发目标立即选举
-- 目标节点收到后跳过选举超时直接开始选举
-- 添加 `HandleTimeoutNow` RPC 处理器
-
-## 🔵 P3: 大规模优化
-
-### 11. 分块快照传输 (raft.go, types.go)
-
-**问题**: 大快照一次性传输可能超时或占用过多内存。
-
-**修复**:
-- `InstallSnapshotArgs` 添加 `Offset`, `Done` 字段
-- `sendSnapshot` 分块发送(默认 1MB 每块)
-- `HandleInstallSnapshot` 支持分块接收和组装
-- `pendingSnapshotState` 跟踪接收进度
-
-## 📁 文件变更汇总
-
-| 文件 | 变更类型 | 说明 |
-|-----|---------|------|
-| `types.go` | 修改 | 添加错误类型、Metrics、HealthStatus、新配置项 |
-| `raft.go` | 修改 | 修复持久化、添加 ReadIndex、Leadership Transfer 等 |
-| `rpc.go` | 修改 | 添加新 RPC 类型、使用可插拔编解码器 |
-| `codec.go` | 新增 | 可插拔编解码器接口 |
-| `server.go` | 修改 | 添加 GetLinear、HealthCheck 等 API |
-
-## 🧪 测试建议
-
-1. **单元测试**:
-   - 测试持久化失败时的行为
-   - 测试 ReadIndex 在各种状态下的行为
-   - 测试分块快照传输
-
-2. **集成测试**:
-   - 测试 Leadership Transfer
-   - 测试成员变更期间的请求处理
-   - 测试网络分区恢复
-
-3. **压力测试**:
-   - 验证自适应批处理的效果
-   - 验证大快照的分块传输
-
-## 📖 使用示例
-
-### 线性一致性读
-```go
-server, _ := raft.NewKVServer(config)
-// ...
-val, ok, err := server.GetLinear("key")
-if err != nil {
-    // 处理错误,可能需要重试
-}
-```
-
-### 领导权转移
-```go
-if server.IsLeader() {
-    err := server.TransferLeadership("node2")
-    if err != nil {
-        log.Printf("Transfer failed: %v", err)
-    }
-}
-```
-
-### 健康检查
-```go
-health := server.HealthCheck()
-if !health.IsHealthy {
-    log.Printf("Node unhealthy: state=%s, leader=%s", 
-        health.State, health.LeaderID)
-}
-```
-
-### 使用 msgpack 序列化
-```go
-import "github.com/vmihailenco/msgpack/v5"
-
-type MsgpackCodec struct{}
-
-func (c *MsgpackCodec) Marshal(v interface{}) ([]byte, error) {
-    return msgpack.Marshal(v)
-}
-
-func (c *MsgpackCodec) Unmarshal(data []byte, v interface{}) error {
-    return msgpack.Unmarshal(data, v)
-}
-
-func (c *MsgpackCodec) Name() string { return "msgpack" }
-
-// 在创建 Raft 之前设置
-raft.SetCodec(&MsgpackCodec{})
-```

+ 0 - 72
LICENSE

@@ -1,72 +0,0 @@
-Apache License 
-Version 2.0, January 2004 
-http://www.apache.org/licenses/
-TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
-1. Definitions.
-
-"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document.
-
-"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License.
-
-"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity.
-
-"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License.
-
-"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files.
-
-"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types.
-
-"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below).
-
-"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof.
-
-"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution."
-
-"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work.
-
-2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form.
-
-3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed.
-
-4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions:
-
-(a) You must give any other recipients of the Work or Derivative Works a copy of this License; and
-
-(b) You must cause any modified files to carry prominent notices stating that You changed the files; and
-
-(c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and
-
-(d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License.
-
-You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License.
-
-5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions.
-
-6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file.
-
-7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License.
-
-8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages.
-
-9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability.
-
-END OF TERMS AND CONDITIONS
-
-APPENDIX: How to apply the Apache License to your work.
-
-To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives.
-
-Copyright [yyyy] [name of copyright owner]
-
-Licensed under the Apache License, Version 2.0 (the "License"); 
-you may not use this file except in compliance with the License. 
-You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software 
-distributed under the License is distributed on an "AS IS" BASIS, 
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
-See the License for the specific language governing permissions and 
-limitations under the License.

+ 69 - 17
README.md

@@ -8,34 +8,32 @@
     - 包含 **PreVote (预投票)** 优化机制,有效防止网络分区或不稳定节点导致的 Term (任期) 无效膨胀,提高集群稳定性。
 - ✅ **日志复制 (Log Replication)**
     - 支持 **Pipeline (流水线)** 和 **Batching (批量)** 复制优化,显著提升高并发下的写入吞吐量。
+    - **自适应批处理**:基于负载自动调整批处理延迟 (1ms-10ms),兼顾低延迟与高吞吐。
 - ✅ **动态成员变更 (Dynamic Membership Changes)**
     - 支持在运行时动态添加或移除节点(实现为单节点变更算法),无需停机即可扩缩容。
+    - **安全性增强**:严格的成员变更验证,防止数据丢失与脑裂。
 - ✅ **自动日志压缩 (Log Compaction / Snapshotting)**
     - **自动触发**:基于日志增长量自动触发快照生成。
     - **动态阈值**:采用动态调整的压缩阈值,防止在写入高峰期出现“压缩风暴”。
     - **分块传输**:支持大快照的分块(Chunked)传输,避免大文件传输导致的网络超时。
 - ✅ **线性一致性读 (Linearizable Reads / ReadIndex)**
     - 实现了 **ReadIndex** 优化,允许在不产生 Log Entry 的情况下执行强一致性读取,性能远优于 Log Read。
+- ✅ **可插拔编解码器 (Pluggable Codec)**
+    - 支持替换默认的 JSON 序列化,可扩展 Msgpack/Protobuf 等高效格式。
 - ✅ **RBAC 权限控制 (Role-Based Access Control)**
     - **层级权限**:支持基于路径的层级权限控制(如 `user.asin` 自动覆盖 `user.asin.china`)。
     - **数值约束**:支持对数值型 Value 进行 Max/Min 约束控制。
     - **高性能权限检测**:通过本地缓存优化 `CheckPermission` 等高频操作,同时保持登录状态的全局一致性。
     - **全局一致会话**:登录状态通过 Raft 共识同步到全集群,支持 API 调用的负载均衡与故障转移。
-    - **系统命令管控**:核心运维命令(如 Join/Leave)仅限 root 账户执行。
-    - **多因素认证**:支持 Token 会话管理及 Google Authenticator MFA。
-    - **IP 管控**:支持用户级别的 IP 白名单与黑名单。
 - ✅ **领导权转移 (Leadership Transfer)**
     - 支持主动将 Leader 角色平滑移交给指定节点,适用于停机维护或负载均衡场景。
 - ✅ **自动请求路由 (Request Routing)**
     - Follower 节点会自动将写请求(以及配置的读请求)转发给当前的 Leader,客户端无需自行维护 Leader 状态。
-- ✅ **Leader 租约检查 (Lease Check)**
-    - Leader 在处理读写请求前会检查与多数派的心跳连接,防止网络分区下的脑裂(Split-Brain)写入。
 - ✅ **WebHook 通知 (Data Change Notification)**
     - 支持通过 HTTP WebHook 订阅 Key 的变更事件。
-    - **非阻塞设计**:采用异步 Worker Pool 处理回调,确保不阻塞 Raft 核心共识流程。
-    - **自动重试**:内置指数退避重试机制,确保通知的高可靠性。
 - ✅ **持久化存储 (Persistence)**
     - 关键的 Raft 日志和状态(Term, Vote)均实时持久化到磁盘,保证重启后数据不丢失。
+    - **严格错误处理**:关键路径的持久化失败会立即停止服务,防止状态分叉。
 - ✅ **健康检查与监控 (Health Checks & Metrics)**
     - 内置详细的运行时指标(Metrics)和健康检查接口,方便运维监控。
 
@@ -263,6 +261,38 @@ go build -o role_node2 node2/main.go node2/demo_cli.go
 > get product.discount     # 成功读取
 ```
 
+## 配置与错误处理
+
+### 核心配置 (Config)
+
+`raft.Config` 提供了丰富的配置项,以适应不同的网络环境和性能需求:
+
+```go
+type Config struct {
+    // 超时设置
+    RPCTimeout         time.Duration // RPC 请求超时 (默认 500ms)
+    SnapshotRPCTimeout time.Duration // 快照传输超时 (默认 30s)
+    ProposeTimeout     time.Duration // 客户端提案超时 (默认 3s)
+    
+    // 批处理优化
+    BatchMinWait time.Duration // 自适应批处理最小等待 (默认 1ms)
+    BatchMaxWait time.Duration // 自适应批处理最大等待 (默认 10ms)
+    BatchMaxSize int           // 批处理最大数量 (默认 100)
+    
+    // ... 其他基础配置 (NodeID, ListenAddr 等)
+}
+```
+
+### 错误处理 (Error Types)
+
+为了便于程序处理,系统定义了特定的错误类型:
+
+- `ErrNotLeader`: 当前节点不是 Leader。
+- `ErrConfigInFlight`: 正在进行成员变更。
+- `ErrTimeout`: 操作超时。
+- `ErrShutdown`: Raft 正在关闭。
+- `ErrPersistFailed`: 持久化失败 (严重错误)。
+
 ## 高级功能详解
 
 ### 动态成员变更
@@ -291,6 +321,7 @@ err := server.Leave("node4")
 ```go
 config.LogCompactionEnabled = true
 config.SnapshotThreshold = 100000      // 每新增 10万条日志触发一次压缩
+config.SnapshotMinRetention = 10000    // 压缩后保留最近 1万条日志 (方便 Follower 追赶)
 config.SnapshotChunkSize = 1024 * 1024 // 快照分块大小为 1MB
 ```
 
@@ -308,6 +339,29 @@ val, found, err := server.GetLinear("key")
 3.  **等待应用**:等待状态机(DB)应用到 `ReadIndex`。
 4.  **读取数据**:从状态机读取数据并返回。
 
+### 可插拔编解码器 (Pluggable Codec)
+
+系统支持替换默认的 JSON 编解码器,以提高序列化效率(如使用 msgpack)。
+
+```go
+import "github.com/vmihailenco/msgpack/v5"
+
+type MsgpackCodec struct{}
+
+func (c *MsgpackCodec) Marshal(v interface{}) ([]byte, error) {
+    return msgpack.Marshal(v)
+}
+
+func (c *MsgpackCodec) Unmarshal(data []byte, v interface{}) error {
+    return msgpack.Unmarshal(data, v)
+}
+
+func (c *MsgpackCodec) Name() string { return "msgpack" }
+
+// 在创建 Raft 之前设置
+raft.SetCodec(&MsgpackCodec{})
+```
+
 ### 领导权转移 (Leadership Transfer)
 
 你可以主动将 Leader 角色转移给某个 Follower,这在需要重启当前 Leader 节点进行升级或维护时非常有用,可以最小化集群的不可用时间。
@@ -340,13 +394,6 @@ curl -X POST http://localhost:8080/watch \
      -d '{"key": "my-key", "url": "http://your-service/callback"}'
 ```
 
-**取消订阅:**
-
-```bash
-curl -X POST http://localhost:8080/unwatch \
-     -d '{"key": "my-key", "url": "http://your-service/callback"}'
-```
-
 **回调格式:**
 
 当 Key 发生变化时,系统会向注册的 URL 发送 JSON 格式的 POST 请求:
@@ -393,6 +440,7 @@ xnet/raft/
 ├── log.go            # 日志管理 (LogManager)
 ├── storage.go        # 存储接口与实现 (HybridStorage)
 ├── rpc.go            # RPC 消息定义与网络传输层
+├── codec.go          # 编解码器接口
 ├── server.go         # KVServer 封装 (Raft 与 DB 的胶水层)
 ├── cli.go            # 命令行交互工具
 ├── db/               # 内置高性能 KV 存储引擎
@@ -403,6 +451,10 @@ xnet/raft/
 
 ## 性能表现
 
-*   **吞吐量**:在开启 Batching(批处理)和 Pipeline(流水线)优化后,单节点可支撑数万 QPS(取决于硬件配置)。
-*   **延迟**:本地读写延迟在亚毫秒级(Sub-millisecond)。
-*   **恢复速度**:利用 `LastAppliedIndex` 机制,重启时自动跳过已持久化的日志回放,实现秒级启动。
+*   **高吞吐量**:
+    *   **流水线 (Pipeline)**:并发发送 AppendEntries,不等待前一个 RPC 返回。
+    *   **批量复制 (Batching)**:自适应批处理机制,将多个 Log Entry 合并为单个 RPC 发送,显著减少网络开销。
+*   **低延迟**:
+    *   **事件驱动循环**:Apply Loop 采用事件驱动而非轮询,减少 CPU 空转并降低延迟。
+    *   **本地读优化**:本地读写延迟在亚毫秒级(Sub-millisecond)。
+*   **快速恢复**:利用 `LastAppliedIndex` 机制,重启时自动跳过已持久化的日志回放,实现秒级启动。

+ 0 - 381
compaction_test.go

@@ -1,381 +0,0 @@
-package raft
-
-import (
-	"encoding/json"
-	"os"
-	"testing"
-	"time"
-)
-
-// TestCompactionBasic tests basic log compaction
-func TestCompactionBasic(t *testing.T) {
-	dir, err := os.MkdirTemp("", "raft-compaction-test")
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer os.RemoveAll(dir)
-
-	// Create storage with small memory capacity to test file reads
-	storage, err := NewHybridStorage(dir, 200, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer storage.Close()
-
-	// Append 100 entries - these will stay in memory
-	entries := make([]LogEntry, 100)
-	for i := 0; i < 100; i++ {
-		entries[i] = LogEntry{
-			Index:   uint64(i + 1),
-			Term:    1,
-			Type:    EntryNormal,
-			Command: []byte("test"),
-		}
-	}
-	if err := storage.AppendEntries(entries); err != nil {
-		t.Fatal(err)
-	}
-
-	// Verify entries exist
-	if storage.GetLastIndex() != 100 {
-		t.Fatalf("expected lastIndex=100, got %d", storage.GetLastIndex())
-	}
-
-	// Verify entry before compaction
-	entry, err := storage.GetEntry(50)
-	if err != nil {
-		t.Fatalf("should be able to read entry 50 before compaction: %v", err)
-	}
-	if entry.Index != 50 {
-		t.Fatalf("expected entry with index=50, got index=%d", entry.Index)
-	}
-
-	// Compact up to index 50
-	if err := storage.TruncateBefore(50); err != nil {
-		t.Fatal(err)
-	}
-
-	// Verify firstIndex updated
-	if storage.GetFirstIndex() != 50 {
-		t.Fatalf("expected firstIndex=50, got %d", storage.GetFirstIndex())
-	}
-
-	// Verify old entries are inaccessible
-	_, err = storage.GetEntry(49)
-	if err != ErrCompacted {
-		t.Fatalf("expected ErrCompacted for index 49, got %v", err)
-	}
-
-	// Verify entries at and after compaction point are accessible
-	// Entry 50 should still be in memory after TruncateBefore
-	entry, err = storage.GetEntry(51)
-	if err != nil {
-		t.Fatalf("should be able to read entry 51: %v", err)
-	}
-	if entry.Index != 51 {
-		t.Fatalf("expected index=51, got %d", entry.Index)
-	}
-}
-
-// TestSnapshotSaveAndLoad tests snapshot persistence
-func TestSnapshotSaveAndLoad(t *testing.T) {
-	dir, err := os.MkdirTemp("", "raft-snapshot-test")
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer os.RemoveAll(dir)
-
-	// Create storage
-	storage, err := NewHybridStorage(dir, 1000, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	// Create snapshot data
-	snapshotData := map[string]string{
-		"key1": "value1",
-		"key2": "value2",
-	}
-	data, _ := json.Marshal(snapshotData)
-
-	// Save snapshot
-	if err := storage.SaveSnapshot(data, 100, 5); err != nil {
-		t.Fatal(err)
-	}
-	storage.Close()
-
-	// Reopen storage and verify snapshot
-	storage2, err := NewHybridStorage(dir, 1000, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer storage2.Close()
-
-	loadedData, lastIndex, lastTerm, err := storage2.GetSnapshot()
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	if lastIndex != 100 {
-		t.Fatalf("expected lastIndex=100, got %d", lastIndex)
-	}
-	if lastTerm != 5 {
-		t.Fatalf("expected lastTerm=5, got %d", lastTerm)
-	}
-
-	var loadedSnapshot map[string]string
-	if err := json.Unmarshal(loadedData, &loadedSnapshot); err != nil {
-		t.Fatal(err)
-	}
-
-	if loadedSnapshot["key1"] != "value1" || loadedSnapshot["key2"] != "value2" {
-		t.Fatalf("snapshot data mismatch: %v", loadedSnapshot)
-	}
-}
-
-// TestCompactionWithKVServer tests compaction through KVServer
-func TestCompactionWithKVServer(t *testing.T) {
-	dir, err := os.MkdirTemp("", "raft-kv-compaction-test")
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer os.RemoveAll(dir)
-
-	config := DefaultConfig()
-	config.NodeID = "test-node"
-	config.ListenAddr = "127.0.0.1:19001"
-	config.DataDir = dir
-	config.ClusterNodes = map[string]string{
-		"test-node": "127.0.0.1:19001",
-	}
-	// Lower threshold for testing
-	config.SnapshotThreshold = 100
-	config.SnapshotMinRetention = 10
-
-	server, err := NewKVServer(config)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	if err := server.Start(); err != nil {
-		t.Fatal(err)
-	}
-	defer server.Stop()
-
-	// Wait for leader election
-	time.Sleep(500 * time.Millisecond)
-
-	// Write enough entries to trigger compaction
-	for i := 0; i < 200; i++ {
-		key := "key"
-		val := "value"
-		// We just set the same key 200 times
-		if err := server.Set(key, val); err != nil {
-			t.Fatalf("failed to set key: %v", err)
-		}
-	}
-
-	// Check that FSM has the correct value
-	val, ok := server.Get("key")
-	if !ok {
-		t.Fatal("key not found")
-	}
-	if val != "value" {
-		t.Fatalf("expected 'value', got '%s'", val)
-	}
-}
-
-// TestDataConsistencyAfterCompaction tests that data is consistent after compaction
-func TestDataConsistencyAfterCompaction(t *testing.T) {
-	dir, err := os.MkdirTemp("", "raft-consistency-test")
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer os.RemoveAll(dir)
-
-	// Create KV state machine
-	fsm := NewKVStateMachine()
-
-	// Apply a series of operations
-	operations := []KVCommand{
-		{Type: KVSet, Key: "counter", Value: "1"},
-		{Type: KVSet, Key: "counter", Value: "2"},
-		{Type: KVSet, Key: "counter", Value: "3"},
-		{Type: KVSet, Key: "name", Value: "alice"},
-		{Type: KVSet, Key: "counter", Value: "4"},
-		{Type: KVDel, Key: "name"},
-		{Type: KVSet, Key: "counter", Value: "5"},
-	}
-
-	for _, op := range operations {
-		data, _ := json.Marshal(op)
-		fsm.Apply(data)
-	}
-
-	// Take snapshot
-	snapshot, err := fsm.Snapshot()
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	// Create new FSM and restore from snapshot
-	fsm2 := NewKVStateMachine()
-	if err := fsm2.Restore(snapshot); err != nil {
-		t.Fatal(err)
-	}
-
-	// Verify data consistency
-	val1, ok1 := fsm.Get("counter")
-	val2, ok2 := fsm2.Get("counter")
-
-	if val1 != val2 || ok1 != ok2 {
-		t.Fatalf("counter mismatch: original=%s(%v), restored=%s(%v)", val1, ok1, val2, ok2)
-	}
-
-	if val1 != "5" {
-		t.Fatalf("expected counter=5, got %s", val1)
-	}
-
-	// name should not exist (was deleted)
-	_, ok := fsm2.Get("name")
-	if ok {
-		t.Fatal("name should have been deleted")
-	}
-}
-
-// TestCompactionDoesNotLoseData tests that no committed data is lost during compaction
-func TestCompactionDoesNotLoseData(t *testing.T) {
-	dir, err := os.MkdirTemp("", "raft-no-loss-test")
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer os.RemoveAll(dir)
-
-	// Use larger memory capacity to keep all entries in memory
-	storage, err := NewHybridStorage(dir, 2000, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer storage.Close()
-
-	// Write 1000 entries with unique values
-	entries := make([]LogEntry, 1000)
-	for i := 0; i < 1000; i++ {
-		cmd := KVCommand{Type: KVSet, Key: "key", Value: string(rune('A' + i%26))}
-		data, _ := json.Marshal(cmd)
-		entries[i] = LogEntry{
-			Index:   uint64(i + 1),
-			Term:    1,
-			Type:    EntryNormal,
-			Command: data,
-		}
-	}
-	if err := storage.AppendEntries(entries); err != nil {
-		t.Fatal(err)
-	}
-
-	// Save snapshot at index 500
-	fsm := NewKVStateMachine()
-	for i := 0; i < 500; i++ {
-		fsm.Apply(entries[i].Command)
-	}
-	snapshot, _ := fsm.Snapshot()
-	if err := storage.SaveSnapshot(snapshot, 500, 1); err != nil {
-		t.Fatal(err)
-	}
-
-	// Compact log
-	if err := storage.TruncateBefore(500); err != nil {
-		t.Fatal(err)
-	}
-
-	// Verify entries 501-1000 still accessible (500 is the compaction point)
-	for i := 501; i <= 1000; i++ {
-		entry, err := storage.GetEntry(uint64(i))
-		if err != nil {
-			t.Fatalf("entry %d should be accessible: %v", i, err)
-		}
-		if entry.Index != uint64(i) {
-			t.Fatalf("entry index mismatch at %d", i)
-		}
-	}
-
-	// Verify entries before 500 return ErrCompacted
-	for i := 1; i < 500; i++ {
-		_, err := storage.GetEntry(uint64(i))
-		if err != ErrCompacted {
-			t.Fatalf("entry %d should return ErrCompacted, got: %v", i, err)
-		}
-	}
-}
-
-func mustMarshal(v interface{}) []byte {
-	data, _ := json.Marshal(v)
-	return data
-}
-
-// TestDynamicCompactionThreshold tests that compaction threshold increases dynamically
-func TestDynamicCompactionThreshold(t *testing.T) {
-	dir, err := os.MkdirTemp("", "raft-dynamic-threshold-test")
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer os.RemoveAll(dir)
-
-	config := DefaultConfig()
-	config.NodeID = "test-node"
-	config.ListenAddr = "127.0.0.1:19002"
-	config.DataDir = dir
-	config.ClusterNodes = map[string]string{
-		"test-node": "127.0.0.1:19002",
-	}
-	// Set low thresholds for testing
-	config.SnapshotThreshold = 100
-	config.SnapshotMinRetention = 10
-	config.Logger = nil // Suppress logs
-
-	server, err := NewKVServer(config)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	if err := server.Start(); err != nil {
-		t.Fatal(err)
-	}
-	defer server.Stop()
-
-	// Wait for leader election
-	time.Sleep(500 * time.Millisecond)
-
-	// Get initial threshold (should be 0, meaning use SnapshotThreshold)
-	initialThreshold := server.Raft.nextCompactionThreshold
-	if initialThreshold != 0 {
-		t.Fatalf("expected initial threshold to be 0, got %d", initialThreshold)
-	}
-
-	// Propose enough entries to trigger first compaction
-	// We need > 100 entries to trigger compaction
-	for i := 0; i < 150; i++ {
-		cmd := KVCommand{Type: KVSet, Key: "key", Value: "value"}
-		data, _ := json.Marshal(cmd)
-		server.Raft.Propose(data)
-	}
-
-	// Wait for apply and potential compaction
-	time.Sleep(500 * time.Millisecond)
-
-	// Check that dynamic threshold was set after compaction
-	// After compaction with 150 entries and minRetention=10, we should have ~10 entries
-	// So next threshold should be around 10 * 1.5 = 15, but at least 100 (initial threshold)
-	newThreshold := server.Raft.nextCompactionThreshold
-
-	// The threshold should now be set (> 0) or remain at initial if compaction happened
-	// Key point: it should be >= SnapshotThreshold to prevent thrashing
-	if newThreshold > 0 && newThreshold < config.SnapshotThreshold {
-		t.Fatalf("dynamic threshold %d should not be less than initial threshold %d",
-			newThreshold, config.SnapshotThreshold)
-	}
-
-	t.Logf("Dynamic threshold after first compaction: %d (initial: %d)",
-		newThreshold, config.SnapshotThreshold)
-}

+ 0 - 0
example/database/benchmark.go → example/database/benchmark/benchmark.go


+ 0 - 0
example/database/generator.go → example/database/generator/generator.go


+ 0 - 133
resiliency_test.go

@@ -1,133 +0,0 @@
-package raft
-
-import (
-	"strings"
-	"testing"
-	"time"
-)
-
-// TestResiliency verifies the robustness improvements
-func TestResiliency(t *testing.T) {
-	// 1. Test Single Node Startup
-	t.Run("SingleNode", func(t *testing.T) {
-		dir := t.TempDir()
-		config := &Config{
-			NodeID:       "node1",
-			ListenAddr:   "127.0.0.1:50001",
-			DataDir:      dir,
-			HeartbeatInterval: 50 * time.Millisecond,
-			ElectionTimeoutMin: 150 * time.Millisecond,
-			ElectionTimeoutMax: 300 * time.Millisecond,
-		}
-
-		server, err := NewKVServer(config)
-		if err != nil {
-			t.Fatalf("Failed to create server: %v", err)
-		}
-
-		if err := server.Start(); err != nil {
-			t.Fatalf("Failed to start server: %v", err)
-		}
-		defer server.Stop()
-
-		// Wait for leader
-		if err := server.WaitForLeader(2 * time.Second); err != nil {
-			t.Fatalf("Single node failed to become leader: %v", err)
-		}
-
-		// Verify RaftNode key
-		time.Sleep(2 * time.Second) // Allow maintenance loop to run
-		val, ok := server.Get("RaftNode")
-		if !ok {
-			t.Fatalf("RaftNode key not found")
-		}
-		if !strings.Contains(val, "node1=127.0.0.1:50001") {
-			t.Errorf("RaftNode key invalid: %s", val)
-		}
-
-		// Verify CreateNode key
-		val, ok = server.Get("CreateNode/node1")
-		if !ok || val != config.ListenAddr {
-			t.Errorf("CreateNode/node1 invalid: %s", val)
-		}
-	})
-
-	// 2. Test 2-Node Cluster Recovery
-	t.Run("TwoNodeRecovery", func(t *testing.T) {
-		dir1 := t.TempDir()
-		dir2 := t.TempDir()
-
-		addr1 := "127.0.0.1:50011"
-		addr2 := "127.0.0.1:50012"
-
-		// Start Node 1
-		conf1 := &Config{
-			NodeID: "node1", ListenAddr: addr1, DataDir: dir1,
-			HeartbeatInterval: 50 * time.Millisecond, ElectionTimeoutMin: 500 * time.Millisecond, ElectionTimeoutMax: 1000 * time.Millisecond,
-			Logger: NewConsoleLogger("node1", 0),
-		}
-		s1, _ := NewKVServer(conf1)
-		s1.Start()
-		defer s1.Stop()
-
-		// Wait for s1 to be leader (single node)
-		s1.WaitForLeader(2 * time.Second)
-
-		// Start Node 2
-		conf2 := &Config{
-			NodeID: "node2", ListenAddr: addr2, DataDir: dir2,
-			HeartbeatInterval: 50 * time.Millisecond, ElectionTimeoutMin: 500 * time.Millisecond, ElectionTimeoutMax: 1000 * time.Millisecond,
-			PeerMap: map[string]string{"node1": addr1}, // Initial peer
-			Logger: NewConsoleLogger("node2", 0),
-		}
-		s2, _ := NewKVServer(conf2)
-		s2.Start()
-		defer s2.Stop()
-
-		// Join s2 to s1
-		if err := s1.Join("node2", addr2); err != nil {
-			t.Fatalf("Failed to join node2: %v", err)
-		}
-
-		// Wait for cluster to stabilize
-		time.Sleep(1 * time.Second)
-		if len(s1.GetClusterNodes()) != 2 {
-			t.Fatalf("Cluster size mismatch: %d", len(s1.GetClusterNodes()))
-		}
-
-		// Verify RaftNode contains both
-		time.Sleep(4 * time.Second) // Allow maintenance loop to update
-		val, ok := s1.Get("RaftNode")
-		if !ok || (!strings.Contains(val, "node1") || !strings.Contains(val, "node2")) {
-			t.Logf("RaftNode incomplete (expected due to test timing/replication): %s", val)
-		}
-
-		// Kill Node 2
-		s2.Stop()
-		time.Sleep(1 * time.Second)
-
-		// Check CreateNode (should be present from initial single-node start)
-		_, ok = s1.Get("CreateNode/node1")
-		if !ok {
-			// This might fail if the initial Set wasn't committed before node2 joined and blocked commits
-			t.Logf("CreateNode/node1 not found (replication timing issue)")
-		}
-
-		// Restart Node 2
-		s2_new, _ := NewKVServer(conf2)
-		s2_new.Start()
-		defer s2_new.Stop()
-
-		// Wait for recovery
-		// They should auto-connect because s1 has s2 in config, and s2 has s1 in config.
-		// s1 will retry connecting to s2 (Raft internal or our checkConnections).
-		
-		time.Sleep(3 * time.Second)
-		
-		// Verify write works again
-		if err := s1.Set("foo", "bar"); err != nil {
-			t.Errorf("Cluster failed to recover write capability: %v", err)
-		}
-	})
-}
-