Эх сурвалжийг харах

修复突然关机带来的数据不一致

xbase 2 долоо хоног өмнө
parent
commit
e4b12968f2
8 өөрчлөгдсөн 151 нэмэгдсэн , 69 устгасан
  1. 28 14
      README.md
  2. 4 2
      compaction_test.go
  3. 9 2
      db/db.md
  4. 44 32
      db/engine.go
  5. 18 8
      example/node1/main.go
  6. 14 5
      raft.go
  7. 27 4
      server.go
  8. 7 2
      types.go

+ 28 - 14
README.md

@@ -149,28 +149,31 @@ nodes := server.GetClusterNodes()
 
 ### 自动压缩机制
 
-日志压缩是**自动触发**的,使用**动态阈值**防止压缩风暴:
+日志压缩是**自动触发**的,但经过深度优化以平衡性能与安全性:
+
+1.  **异步检测**: 压缩检查在独立的 goroutine 中异步运行,绝不阻塞 Raft 主循环的请求处理。
+2.  **固定步长触发**: 摒弃了指数级增长的阈值,改用**固定步长**。
+    *   例如:配置阈值为 10,000。
+    *   触发点:每当日志比上次压缩后**净增 10,000 条**时,触发一次压缩。
+    *   优势:避免了初期过于频繁的压缩,也防止了后期日志无限膨胀,步长线性可控。
 
 ```go
 // 默认配置
-config.SnapshotThreshold = 100000    // 初始阈值:10万条日志
-config.SnapshotMinRetention = 10000  // 压缩后保留:1万条
+config.SnapshotThreshold = 100000    // 触发步长:每新增 10万条日志触发一次
+config.SnapshotMinRetention = 10000  // 压缩后保留:1万条(用于 follower 快速追赶)
 ```
 
-### 动态阈值算法
+### 压缩安全性与一致性
 
-为防止"压缩风暴"(压缩后日志仍很大,导致每条新日志都触发压缩),采用动态阈值
+系统通过**强制同步机制**从根本上解决了“日志被删但数据未入库”的风险
 
-| 阶段 | 日志大小 | 触发阈值 | 压缩后大小 | 下次阈值 |
-|------|----------|----------|------------|----------|
-| 初始 | 0 | 100,000 | - | - |
-| 第1次 | 100,001 | 100,000 | 10,000 | 100,000 |
-| 若压缩后仍大 | 100,001 | 100,000 | 80,000 | 120,000 |
-| 第2次 | 120,001 | 120,000 | 90,000 | 135,000 |
+1.  **快照阻塞等待**: 当 Raft 决定压缩到索引 `N` 时,会调用应用层的 `SnapshotProvider`。
+2.  **追赶确认**: `SnapshotProvider` 会**阻塞等待**,直到数据库(DB)确实已经应用了索引 `N`(及其之前)的所有日志。
+3.  **强制刷盘 (Sync)**: 在生成快照前,强制调用存储引擎的 `Sync()`,确保所有内存/OS Cache 中的数据物理写入磁盘。这防止了“日志已被删除但数据未落盘”的风险。
+4.  **原子截断**: 只有在成功获取到包含完整数据的快照后,Raft 才会执行物理日志文件的截断。
+5.  **写锁保护**: 在物理截断日志文件的短暂瞬间,系统持有写锁,天然暂停新的写入,确保文件操作的原子性和完整性。
 
-**规则**:
-- 下次阈值 = 压缩后大小 × 1.5
-- 阈值不低于初始值(防止阈值过小)
+这种设计实现了:**Raft 全速写入 -> 异步后台线程检测 -> 需要压缩时等待 DB 追赶 -> 强制刷盘 -> 安全执行物理压缩**,在性能和数据安全性之间取得了最佳平衡。
 
 ### 快照保存的是状态
 
@@ -188,6 +191,7 @@ config.SnapshotMinRetention = 10000  // 压缩后保留:1万条
 | 保存快照前 | 无新快照,日志完整 | ✅ |
 | 保存快照后,压缩前 | 有快照,日志完整(冗余但正确) | ✅ |
 | 压缩过程中 | 快照存在,可正确恢复 | ✅ |
+| **异步等待期间** | **日志未动,DB 正在追赶,无数据丢失** | ✅ |
 
 ## 线性一致性读 (ReadIndex)
 
@@ -205,6 +209,16 @@ if err == nil {
 }
 ```
 
+## 启动优化 (Fast Startup)
+
+系统采用了**LastApplied 预热**机制来优化启动速度:
+
+1.  **读取 DB 状态**: `KVServer` 启动时,首先从 DB 引擎获取持久化的 `LastAppliedIndex`。
+2.  **初始化 Raft**: 将此 Index 注入 Raft 配置 (`Config.LastAppliedIndex`)。
+3.  **跳过回放**: Raft 初始化时直接将 `commitIndex` 和 `lastApplied` 设置为该值,跳过对已持久化日志的重复 Apply 过程。
+
+这意味着即使有数百万条日志,只要它们已经应用到 DB,重启也是毫秒级的。
+
 ## 配置选项
 
 ```go

+ 4 - 2
compaction_test.go

@@ -170,11 +170,13 @@ func TestCompactionWithKVServer(t *testing.T) {
 		key := "key"
 		val := "value"
 		// We just set the same key 200 times
-		server.FSM.Apply(mustMarshal(KVCommand{Type: KVSet, Key: key, Value: val}))
+		if err := server.Set(key, val); err != nil {
+			t.Fatalf("failed to set key: %v", err)
+		}
 	}
 
 	// Check that FSM has the correct value
-	val, ok := server.FSM.Get("key")
+	val, ok := server.Get("key")
 	if !ok {
 		t.Fatal("key not found")
 	}

+ 9 - 2
db/db.md

@@ -43,6 +43,7 @@ RaftDB 内置了一个高性能、线程安全的嵌入式键值存储引擎。
 *   **当前机制**: 使用 `WriteAt` 写入操作系统 Page Cache。
 *   **风险**: **高**。目前默认未开启 `fsync`。若发生操作系统崩溃或断电,最近写入的数据(尚未刷盘部分)将会丢失。
     *   *注*: 进程级崩溃(Panic/Kill)是安全的,因为 OS 仍掌管 Page Cache。
+*   **快照安全性增强**: 为了解决日志压缩带来的风险,系统在 **生成快照前强制调用 Sync()**。这确保了在 Raft 删除旧日志之前,对应的数据一定已经物理持久化。
 *   **建议**: 在关键业务场景下,应在写入后显式调用 `Sync()`,但这会显著降低写入吞吐量(从 40w QPS 降至磁盘 IOPS 极限)。
 
 ### 3.2 数据完整性 (Integrity)
@@ -118,8 +119,9 @@ err := e.Restore(data)
 *   **过程**:
     1.  **Stop-the-World**: 锁定写入,关闭当前存储文件。
     2.  **Truncate**: **直接清空**当前数据文件 (`values.data`) 和元数据文件 (`meta.state`)。
-    3.  **Rebuild**: 按照快照流顺序,重新生成新的 Append-Only Log 文件,并重建内存索引。
-    4.  **Sync**: 强制刷盘,更新 `LastCommitIndex` 到最新状态。
+    3.  **Reset Memory**: 清空内存索引 (`FlatIndex`) 和缓存。
+    4.  **Rebuild**: 按照快照流顺序,重新生成新的 Append-Only Log 文件,并重建内存索引。
+    5.  **Sync**: 强制刷盘,更新 `LastCommitIndex` 到最新状态。
 *   **场景**: 仅在节点数据严重滞后(Raft Log 已被截断)或数据损坏时触发。
 
 ### 6.4 容灾与安全性
@@ -129,4 +131,9 @@ err := e.Restore(data)
     *   **Meta 优先**: 启动时读取 `meta.state`。
     *   **Scan 兜底**: 若 Meta 损坏,自动扫描数据文件尾部修正 `LastCommitIndex`。
     *   **Truncate 保护**: 扫描过程中若发现 CRC 错误或截断,自动丢弃尾部脏数据,保证一致性。
+    *   **Raft 协同启动**: 引擎提供 `GetLastAppliedIndex()` 接口。系统启动时,Raft 会直接读取该索引作为初始状态,跳过回放已入库的日志,实现极速启动。
+*   **一致性保障 (Snapshot Safety)**:
+    *   **强制同步**: 快照生成接口 `SnapshotProvider(minIncludeIndex)` 会**阻塞等待**,直到 DB 的 `LastAppliedIndex` 至少追赶到目标索引 `minIncludeIndex`。
+    *   **强制落盘**: 在返回快照数据前,引擎执行 `Sync()`,确保数据物理安全。
+    *   **原子压缩**: 只有在确保数据已安全转入快照(DB 已追赶、已刷盘并成功生成快照)后,Raft 才会执行物理日志截断。这杜绝了“日志被删但数据未入库”的数据丢失风险。
 

+ 44 - 32
db/engine.go

@@ -86,7 +86,10 @@ func (fi *FlatIndex) compare(idx int, target string) int {
 func (fi *FlatIndex) Insert(key string, entry IndexEntry) {
 	fi.mu.Lock()
 	defer fi.mu.Unlock()
+	fi.insertLocked(key, entry)
+}
 
+func (fi *FlatIndex) insertLocked(key string, entry IndexEntry) {
 	// 1. Binary Search
 	idx := sort.Search(len(fi.items), func(i int) bool {
 		return fi.getKey(i) >= key
@@ -586,6 +589,10 @@ func NewEngine(dataDir string, opts ...EngineOption) (*Engine, error) {
 	// Rebuild Index from Disk
 	// Note: We still scan to rebuild the memory index, but LastCommitIndex is initialized from meta file
 	// We can update LastCommitIndex if the log is ahead of the meta file (e.g. crash before meta update)
+	// We also correct LastCommitIndex if meta file is ahead of data (e.g. data flush lost during power failure)
+	
+	realMaxIndex := uint64(0)
+	
 	_, err = store.Scan(func(rec Record) {
 		if rec.Type == RecordTypePut {
 			e.Index.Insert(rec.Key, IndexEntry{
@@ -595,23 +602,34 @@ func NewEngine(dataDir string, opts ...EngineOption) (*Engine, error) {
 			if e.FTIndex != nil {
 				e.FTIndex.Add(rec.Key, rec.Value)
 			}
-			
-			// Update LastCommitIndex if log is ahead
-			if rec.CommitIndex > e.LastCommitIndex {
-				e.LastCommitIndex = rec.CommitIndex
-			}
-			
 		} else if rec.Type == RecordTypeDelete {
 			// Cleanup FTIndex using old value if possible
 			e.removeValueFromFTIndex(rec.Key)
 			e.Index.Delete(rec.Key)
-			
-			if rec.CommitIndex > e.LastCommitIndex {
-				e.LastCommitIndex = rec.CommitIndex
-			}
+		}
+		
+		// Track the actual max index present in the data file
+		if rec.CommitIndex > realMaxIndex {
+			realMaxIndex = rec.CommitIndex
 		}
 	})
 	
+	// Critical Safety Check:
+	// 1. If Log > Meta: Update Meta (Normal crash recovery)
+	// 2. If Meta > Log: Downgrade Meta (Data loss recovery - force Raft replay)
+	if realMaxIndex > e.LastCommitIndex {
+		e.LastCommitIndex = realMaxIndex
+	} else if realMaxIndex < e.LastCommitIndex {
+		// Detect inconsistency: Meta says we have more data than what's on disk.
+		// This happens if Meta was flushed but Data tail was lost during power failure.
+		// We MUST trust the actual data on disk.
+		fmt.Printf("WARNING: Inconsistency detected! Meta LastCommitIndex (%d) > Data RealIndex (%d). Resetting to %d to force Raft replay.\n", 
+			e.LastCommitIndex, realMaxIndex, realMaxIndex)
+		e.LastCommitIndex = realMaxIndex
+		// Force save corrected metadata
+		e.saveMetadata()
+	}
+	
 	return e, nil
 }
 
@@ -1326,9 +1344,13 @@ func (e *Engine) Restore(data []byte) error {
 	// 3. Truncate & Reset
 	// We are replacing the entire DB state.
 	
-	// Reset In-Memory Index
-	e.Index = NewFlatIndex()
+	// Reset In-Memory Index by clearing slices (keeping underlying capacity)
+	// We do NOT replace the pointer, so we keep the lock valid.
+	e.Index.keyBuf = e.Index.keyBuf[:0]
+	e.Index.items = e.Index.items[:0]
+
 	if e.config.EnableValueIndex {
+		// Recreate FTIndex since it's a map
 		e.FTIndex = NewFullTextIndex()
 	}
 	
@@ -1352,12 +1374,15 @@ func (e *Engine) Restore(data []byte) error {
 
 	// 4. Rebuild Data from Snapshot Stream
 	for i := uint64(0); i < count; i++ {
-		// Read KeyLen
-		if offset+2 > len(data) {
-			return fmt.Errorf("restore failed: truncated data at record %d (keylen)", i)
+		// Read Header (14 bytes)
+		if offset+14 > len(data) {
+			return fmt.Errorf("restore failed: truncated header at record %d", i)
 		}
-		keyLen := int(binary.LittleEndian.Uint16(data[offset:]))
-		offset += 2
+		
+		keyLen := int(binary.LittleEndian.Uint16(data[offset : offset+2]))
+		valLen := int(binary.LittleEndian.Uint32(data[offset+2 : offset+6]))
+		commitIndex := binary.LittleEndian.Uint64(data[offset+6 : offset+14])
+		offset += 14
 		
 		// Read Key
 		if offset+keyLen > len(data) {
@@ -1366,13 +1391,6 @@ func (e *Engine) Restore(data []byte) error {
 		key := string(data[offset : offset+keyLen])
 		offset += keyLen
 		
-		// Read ValLen
-		if offset+4 > len(data) {
-			return fmt.Errorf("restore failed: truncated data at record %d (vallen)", i)
-		}
-		valLen := int(binary.LittleEndian.Uint32(data[offset:]))
-		offset += 4
-		
 		// Read Value
 		if offset+valLen > len(data) {
 			return fmt.Errorf("restore failed: truncated data at record %d (value)", i)
@@ -1380,13 +1398,6 @@ func (e *Engine) Restore(data []byte) error {
 		val := string(data[offset : offset+valLen])
 		offset += valLen
 		
-		// Read CommitIndex
-		if offset+8 > len(data) {
-			return fmt.Errorf("restore failed: truncated data at record %d (commitIndex)", i)
-		}
-		commitIndex := binary.LittleEndian.Uint64(data[offset:])
-		offset += 8
-		
 		if commitIndex > maxCommitIndex {
 			maxCommitIndex = commitIndex
 		}
@@ -1402,7 +1413,8 @@ func (e *Engine) Restore(data []byte) error {
 		// Update Memory Index
 		// Accessing e.Index.items directly because we hold e.Index.mu
 		// But we should use the helper to safely manage keyBuf
-		e.Index.Insert(key, IndexEntry{
+		// Use insertLocked to avoid deadlock (we already hold e.Index.mu)
+		e.Index.insertLocked(key, IndexEntry{
 			ValueOffset: writeOffset,
 			CommitIndex: commitIndex,
 		})

+ 18 - 8
example/node1/main.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"flag"
 	"fmt"
 	"log"
 	"os"
@@ -11,6 +12,12 @@ import (
 )
 
 func main() {
+	// Command line flags
+	snapshotThreshold := flag.Uint64("threshold", 10000, "Log count to trigger snapshot/compaction")
+	minRetention := flag.Uint64("retention", 20000, "Minimum log entries to retain after compaction")
+	totalKeys := flag.Int("keys", 100000, "Total number of keys to write")
+	flag.Parse()
+
 	// Configuration
 	nodeID := "node1"
 	addr := "localhost:5001"
@@ -33,9 +40,13 @@ func main() {
 	config.Logger = raft.NewConsoleLogger(nodeID, 1)
 
 	// Stress Test Configuration
-	// Trigger compaction at 40000, retain 20000.
-	config.SnapshotThreshold = 40000
-	config.SnapshotMinRetention = 20000
+	config.SnapshotThreshold = *snapshotThreshold
+	config.SnapshotMinRetention = *minRetention
+	
+	fmt.Printf("Configuration:\n")
+	fmt.Printf("- Snapshot Threshold: %d\n", config.SnapshotThreshold)
+	fmt.Printf("- Min Retention:      %d\n", config.SnapshotMinRetention)
+	fmt.Printf("- Total Keys:         %d\n", *totalKeys)
 
 	// Ensure data directory exists
 	if err := os.MkdirAll(dataDir, 0755); err != nil {
@@ -67,14 +78,13 @@ func main() {
 	}
 
 	// Write loop
-	totalKeys := 50000 // Enough to trigger 40k threshold
 	logPath := filepath.Join(dataDir, "log.bin")
 	
 	var initialLogSize int64
 	
-	fmt.Printf("Starting write workload of %d keys...\n", totalKeys)
+	fmt.Printf("Starting write workload of %d keys...\n", *totalKeys)
 
-	for i := 0; i < totalKeys; i++ {
+	for i := 0; i < *totalKeys; i++ {
 		// Ensure leader
 		if i%1000 == 0 {
 			if !server.IsLeader() {
@@ -111,7 +121,7 @@ func main() {
 		
 		if i > 0 && i % 5000 == 0 {
 			stats := server.GetStats()
-			fmt.Printf("Progress: %d/%d. Log Indices: [%d, %d]\n", i, totalKeys, stats.FirstLogIndex, stats.LastLogIndex)
+			fmt.Printf("Progress: %d/%d. Log Indices: [%d, %d]\n", i, *totalKeys, stats.FirstLogIndex, stats.LastLogIndex)
 		}
 	}
 	
@@ -143,7 +153,7 @@ func main() {
 	errors := 0
 	// Check specifically keys that might have been compacted (old keys)
 	// and keys that are new.
-	checkIndices := []int{0, 100, 1000, 10000, 20000, 30000, 40000, 49999}
+	checkIndices := []int{0, 100, 1000, 10000, 20000, 30000, 40000, 50000, 60000, 80000, 99999}
 	
 	for _, i := range checkIndices {
 		key := fmt.Sprintf("k-%d", i)

+ 14 - 5
raft.go

@@ -268,6 +268,13 @@ func NewRaft(config *Config, transport Transport, applyCh chan ApplyMsg) (*Raft,
 	// Initialize metrics
 	r.metrics.Term = state.CurrentTerm
 
+	// Initialize lastApplied and commitIndex from config (if provided)
+	if config.LastAppliedIndex > 0 {
+		r.lastApplied = config.LastAppliedIndex
+		r.commitIndex = config.LastAppliedIndex // Applied implies committed
+		r.logger.Info("Initialized lastApplied and commitIndex to %d from config", config.LastAppliedIndex)
+	}
+
 	// Set RPC handler
 	transport.SetRPCHandler(r)
 
@@ -1199,7 +1206,8 @@ func (r *Raft) applyCommitted() {
 	}
 
 	// Check if log compaction is needed
-	r.maybeCompactLog()
+	// Run asynchronously to avoid blocking the apply loop if snapshotting is slow
+	go r.maybeCompactLog()
 }
 
 // maybeCompactLog checks if automatic log compaction should be triggered
@@ -1295,7 +1303,7 @@ func (r *Raft) maybeCompactLog() {
 	}
 
 	// Get snapshot from application layer
-	snapshotData, err := r.config.SnapshotProvider()
+	snapshotData, err := r.config.SnapshotProvider(compactUpTo)
 	if err != nil {
 		r.logger.Error("Failed to get snapshot from provider: %v", err)
 		return
@@ -1330,10 +1338,11 @@ func (r *Raft) maybeCompactLog() {
 	// Calculate new log size after compaction
 	newLogSize := lastApplied - compactUpTo
 
-	// Update dynamic threshold for next compaction: current size * 1.5
-	// This prevents "compaction thrashing" where every entry triggers compaction
+	// Update dynamic threshold for next compaction
+	// We use a linear growth model: next threshold = current size + SnapshotThreshold
+	// This ensures that we trigger compaction roughly every SnapshotThreshold entries.
 	r.mu.Lock()
-	r.nextCompactionThreshold = newLogSize + newLogSize/2 // newLogSize * 1.5
+	r.nextCompactionThreshold = newLogSize + r.config.SnapshotThreshold
 	// Ensure threshold doesn't go below the initial threshold
 	if r.nextCompactionThreshold < initialThreshold {
 		r.nextCompactionThreshold = initialThreshold

+ 27 - 4
server.go

@@ -31,8 +31,31 @@ func NewKVServer(config *Config) (*KVServer, error) {
 		return nil, fmt.Errorf("failed to create db engine: %w", err)
 	}
 
+	// Initialize LastAppliedIndex from DB to prevent re-applying entries
+	config.LastAppliedIndex = engine.GetLastAppliedIndex()
+
+	// Create stop channel early for use in callbacks
+	stopCh := make(chan struct{})
+
 	// Configure snapshot provider
-	config.SnapshotProvider = func() ([]byte, error) {
+	config.SnapshotProvider = func(minIncludeIndex uint64) ([]byte, error) {
+		// Wait for DB to catch up to the requested index
+		// This is critical for data integrity during compaction
+		for engine.GetLastAppliedIndex() < minIncludeIndex {
+			select {
+			case <-stopCh:
+				return nil, fmt.Errorf("server stopping")
+			default:
+				time.Sleep(10 * time.Millisecond)
+			}
+		}
+
+		// Force sync to disk to ensure data durability before compaction
+		// This prevents data loss if Raft logs are compacted but DB data is only in OS cache
+		if err := engine.Sync(); err != nil {
+			return nil, fmt.Errorf("failed to sync engine before snapshot: %w", err)
+		}
+
 		return engine.Snapshot()
 	}
 
@@ -51,15 +74,15 @@ func NewKVServer(config *Config) (*KVServer, error) {
 	}
 
 	s := &KVServer{
-		Raft: r,
-		DB:   engine,
+		Raft:   r,
+		DB:     engine,
+		stopCh: stopCh,
 	}
 
 	// Start applying entries
 	go s.runApplyLoop(applyCh)
 
 	// Start background maintenance loop
-	s.stopCh = make(chan struct{})
 	s.wg.Add(1)
 	go s.maintenanceLoop()
 

+ 7 - 2
types.go

@@ -184,7 +184,8 @@ type Config struct {
 	// SnapshotProvider is a callback function that returns the current state machine snapshot
 	// If set, automatic log compaction will be enabled
 	// The function should return the serialized state that can restore the state machine
-	SnapshotProvider func() ([]byte, error)
+	// minIncludeIndex: The snapshot MUST include all entries up to at least this index.
+	SnapshotProvider func(minIncludeIndex uint64) ([]byte, error)
 
 	// GetHandler is a callback function to handle remote Get requests
 	// If set, clients can read values from this node via RPC
@@ -194,6 +195,10 @@ type Config struct {
 	// Default: 1MB
 	SnapshotChunkSize int
 
+	// LastAppliedIndex is the index of the last log entry applied to the state machine
+	// Used for initialization to avoid re-applying entries
+	LastAppliedIndex uint64
+
 	// RPC Timeout configurations
 	RPCTimeout         time.Duration // Default: 500ms for normal RPCs
 	SnapshotRPCTimeout time.Duration // Default: 30s for snapshot transfers
@@ -264,7 +269,7 @@ func DefaultConfig() *Config {
 		HeartbeatInterval:       50 * time.Millisecond,
 		MaxLogEntriesPerRequest: 5000,
 		MemoryLogCapacity:       10000,
-		SnapshotThreshold:       100000,      // 10万条日志触发压缩
+		SnapshotThreshold:       ^uint64(0),  // 默认不启用压缩 (MaxUint64)
 		SnapshotMinRetention:    10000,       // 保留1万条用于 follower 追赶
 		SnapshotChunkSize:       1024 * 1024, // 1MB chunks
 		RPCTimeout:              500 * time.Millisecond,