Prechádzať zdrojové kódy

优化只有key的db搜索模式

xbase 2 týždňov pred
rodič
commit
32753dc3a0
6 zmenil súbory, kde vykonal 586 pridanie a 851 odobranie
  1. 53 16
      db/db.md
  2. 381 509
      db/engine.go
  3. 114 281
      example/database/benchmark.go
  4. 38 44
      example/database/inspector.go
  5. 0 1
      go.mod
  6. BIN
      raft-cli

+ 53 - 16
db/db.md

@@ -1,14 +1,14 @@
 # RaftDB Storage Engine Documentation
 
-RaftDB 内置了一个高性能、线程安全的嵌入式键值存储引擎。该引擎专为 Raft 状态机设计,经过深度优化,采用 **Radix Tree (基数树)** 作为核心索引结构,并结合了 **倒排索引 (Inverted Index)** 以支持极速全文/模糊检索。
+RaftDB 内置了一个高性能、线程安全的嵌入式键值存储引擎。该引擎专为 Raft 状态机设计,经过深度优化,采用 **Flat Sorted Array (有序扁平数组)** 作为核心索引结构,并结合了 **倒排索引 (Inverted Index)** 以支持极速全文/模糊检索。
 
 ## 1. 核心特性
 
 *   **极速读写 (Blazing Fast)**: 
-    *   **Point Lookup**: ~23万 QPS (Radix Tree 内存索引)。
+    *   **Point Lookup**: ~23万 QPS (基于 Binary Search 的内存索引)。
     *   **Insert/Update**: ~40-56万 QPS (Append-only Log + FreeList)。
 *   **高级查询 (Advanced Query)**: 
-    *   **Prefix/Range**: 基于 Radix Tree 的结构化扫描,性能随数据量增长极慢 (O(K))
+    *   **Prefix/Range**: 基于有序数组的二分查找 + 顺序扫描,内存局部性极佳
     *   **Full Text Search**: 针对 `value like` 查询引入倒排索引,性能提升 **20倍**。
     *   **Limit Pushdown**: 查询执行器支持 `LIMIT/OFFSET` 下推,扫描满足即停止。
 *   **空间复用 (Disk Reuse)**: 内置 Best-Fit 策略的 FreeList,自动回收磁盘空间,无需手动 Compaction。
@@ -16,12 +16,16 @@ RaftDB 内置了一个高性能、线程安全的嵌入式键值存储引擎。
 
 ## 2. 架构设计 (Architecture)
 
-### 2.1 核心索引: Radix Tree (Memory)
-摒弃了传统的 Hash Map + Sharding 方案,采用单体 **Radix Tree**。
+### 2.1 核心索引: Flat Sorted Array (Memory)
+摒弃了传统的 Hash Map 或 Tree 结构,采用极致内存优化的 **Flat Sorted Array**。
+*   **设计**: 
+    *   **Key Buffer**: 所有 Key 紧凑存储在一个 `[]byte` 大数组中,无 Go String Header 开销。
+    *   **Item Slice**: 索引项仅存储 `Offset(4B) + Len(2B) + Metadata`,并在内存中保持有序。
 *   **优势**: 
-    *   **有序性**: 天然支持 Key 的字典序遍历,无需排序。
-    *   **前缀压缩**: 节省大量内存,特别适合 Key 具有公共前缀的场景。
-    *   **范围查询**: `WalkPrefix` 操作复杂度仅为 O(K)。
+    *   **零指针开销**: 消除海量指针和对象头,大幅降低 GC 压力。
+    *   **内存极致紧凑**: 索引条目仅占 ~24 字节/Key,百万级 Key 仅需几十 MB。
+    *   **缓存友好**: 数组结构对 CPU Cache 极其友好。
+    *   **范围查询**: 二分查找定位起点,顺序扫描后续数据,天然支持 Range Scan。
 
 ### 2.2 辅助索引: Inverted Index (Memory)
 针对 `value like "*token*"` 等模糊查询场景,引擎维护了一个轻量级的倒排索引 (Token -> Keys)。
@@ -33,11 +37,37 @@ RaftDB 内置了一个高性能、线程安全的嵌入式键值存储引擎。
 *   **FreeList**: 维护空闲槽位,优先复用。
 *   **Page Cache**: 简单的内存缓存层,减少系统调用。
 
-## 3. 性能测试报告 (Benchmark Report)
+## 3. 安全性与一致性评估 (Safety & Consistency Analysis)
+
+### 3.1 数据持久性 (Durability)
+*   **当前机制**: 使用 `WriteAt` 写入操作系统 Page Cache。
+*   **风险**: **高**。目前默认未开启 `fsync`。若发生操作系统崩溃或断电,最近写入的数据(尚未刷盘部分)将会丢失。
+    *   *注*: 进程级崩溃(Panic/Kill)是安全的,因为 OS 仍掌管 Page Cache。
+*   **建议**: 在关键业务场景下,应在写入后显式调用 `Sync()`,但这会显著降低写入吞吐量(从 40w QPS 降至磁盘 IOPS 极限)。
+
+### 3.2 数据完整性 (Integrity)
+*   **当前机制**: 引入了 **CRC32 Checksum** 校验每个 Record (Header + Data)。
+*   **恢复策略**: 启动时 `Scan` 会验证 CRC。
+*   **风险**: **中**。
+    *   若日志尾部写入不完整(Partial Write),`Scan` 会自动截断丢弃,保证一致性。
+    *   若日志中间发生比特翻转(Bit Rot),`Scan` 会在错误处停止,导致该点**之后的所有数据不可见**。这是一种“宁缺毋滥”的牺牲可用性保一致性的策略。
+
+### 3.3 数据一致性 (Consistency)
+*   **索引与存储**: 内存索引在启动时完全通过磁盘日志重构,保证了**强一致性**。
+*   **并发可见性**: 
+    *   `Set` 操作通过互斥锁串行化,保证了写入线性一致。
+    *   **读写并发**: `Query` 并不阻塞 `Set` 的日志写入,但在读取索引时持有 `RLock`。这可能导致短暂的“幻读”或可见性延迟,但不会导致数据错乱。
+    *   **风险**: 极低。符合最终一致性模型。
+
+### 3.4 并发模型瓶颈
+*   **I/O 阻塞**: `Query` 在遍历索引时(持有 RLock)会进行磁盘 I/O (`ReadValue`)。
+*   **影响**: 如果磁盘响应慢,长时间的 `RLock` 会阻塞所有试图获取 `Lock` 的 `Set` 操作(写阻塞)。在慢磁盘或高并发读场景下,写入延迟可能显著增加。
+
+## 4. 性能测试报告 (Benchmark Report)
 
 测试环境: macOS, 10 并发 Workers, 本地磁盘 IO。
 
-### 3.1 综合吞吐量 (Throughput)
+### 4.1 综合吞吐量 (Throughput)
 
 | 操作类型 | 数量 | 耗时 | QPS (Ops/sec) | 说明 |
 | :--- | :--- | :--- | :--- | :--- |
@@ -46,18 +76,25 @@ RaftDB 内置了一个高性能、线程安全的嵌入式键值存储引擎。
 | **Update** | 10,000 | ~0.04s | **~252,000** | 原地更新优化生效 |
 | **Delete** | 10,000 | ~0.02s | **~420,000** | 标记删除 |
 
-### 3.2 查询性能 (Query Performance)
+### 4.2 查询性能 (Query Performance)
 
 | 查询类型 | QPS (Ops/sec) | 提升幅度 | 说明 |
 | :--- | :--- | :--- | :--- |
 | **Point Lookup** | **~228,000** | - | 基准性能,极快。 |
-| **Meta Query** | **~78,000** | **2x (vs SkipList)** | 前缀查询 `key like "prefix*"`。Radix Tree 核心优势。 |
+| **Meta Query** | **~78,000** | **2x (vs SkipList)** | 前缀查询 `key like "prefix*"`。有序数组二分查找优势。 |
 | **Limit Query** | **~287,000** | **1.6x (vs SkipList)** | `LIMIT` 下推优化,扫描极少数据即返回。 |
 | **Full Scan (Val)**| **~581** | **21.5x (vs Scan)** | **倒排索引生效**。从全表 IO 扫描变为内存索引查找。 |
 
-## 4. 使用说明
+### 4.3 内存占用分析 (Memory Usage)
+
+| 场景 | 内存占用 (10w Keys) | 说明 |
+| :--- | :--- | :--- |
+| **仅核心索引 (Key Only)** | **5.46 MB** | **Flat Sorted Array 生效**。仅 ~55字节/Key (含Go Runtime开销),极其紧凑。 |
+| **开启全文索引 (With Value Index)** | **73.93 MB** | 倒排索引 (Token -> Keys) 占用主要内存,以空间换取 20x 查询性能。 |
+
+## 5. 使用说明
 
-### 4.1 初始化
+### 5.1 初始化
 
 ```go
 import "db"
@@ -67,11 +104,11 @@ if err != nil { panic(err) }
 defer e.Close()
 ```
 
-### 4.2 查询示例
+### 5.2 查询示例
 
 ```go
 // 1. 极速前缀分页
-// 引擎在 Radix Tree 上定位 "user." 子树,扫描前 20 条即停止
+// 引擎在索引上定位 "user." 范围,扫描前 20 条即停止
 results, _ := e.Query(`key like "user.*" LIMIT 20`)
 
 // 2. 高性能全文检索

Rozdielové dáta súboru neboli zobrazené, pretože súbor je príliš veľký
+ 381 - 509
db/engine.go


+ 114 - 281
example/database/benchmark.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"math/rand"
 	"os"
+	"runtime"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -12,37 +13,78 @@ import (
 )
 
 const (
-	TotalKeys      = 100000
-	UpdateCount    = 10000
-	DeleteCount    = 10000
-	QueryCount     = 100000
-	DataDir        = "bench_db_data"
-	ValueBaseSize  = 32  // Base size for values
+	TotalKeys = 100000
+	DataDir   = "bench_db_data"
 )
 
 func main() {
+	// Scenario: Key Query Performance and Correctness
+	fmt.Println("==================================================")
+	fmt.Println("SCENARIO: Key Query Performance (Flat Array Index)")
+	fmt.Println("==================================================")
+	
+	// Value Index Disabled
+	runBenchmark(false)
+}
+
+func runBenchmark(enableValueIndex bool) {
 	// Clean up previous run
 	os.RemoveAll(DataDir)
 
-	fmt.Printf("Initializing DB Engine in %s...\n", DataDir)
-	e, err := db.NewEngine(DataDir)
+	fmt.Printf("Initializing DB Engine in %s (ValueIndex=%v)...\n", DataDir, enableValueIndex)
+	
+	startMem := getMemUsage()
+	
+	e, err := db.NewEngine(DataDir, db.WithValueIndex(enableValueIndex))
 	if err != nil {
 		panic(err)
 	}
 	defer e.Close()
+	defer os.RemoveAll(DataDir) // Cleanup after run
 
-	// 1. Bulk Insert 100k keys
-	fmt.Println("\n--- Phase 1: Bulk Insert 100k Keys ---")
+	// 1. Bulk Insert 100k keys with varied segments (1 to 5)
+	fmt.Println("\n--- Phase 1: Bulk Insert 100k Keys (Varied Segments) ---")
 	keys := make([]string, TotalKeys)
 	start := time.Now()
 	
-	// Pre-generate keys to avoid benchmark overhead
+	// Generate keys with 5 patterns
+	// Pattern 1 (Len 1): item.<id>                      (20k)
+	// Pattern 2 (Len 2): user.<id>.profile              (20k)
+	// Pattern 3 (Len 3): team.<team_id>.user.<user_id> (20k) -> 200 users per team
+	// Pattern 4 (Len 4): region.<r>.zone.<z>.node.<n>   (20k) -> 100 zones, 200 nodes
+	// Pattern 5 (Len 5): a.b.c.d.<id>                   (20k)
+
 	for i := 0; i < TotalKeys; i++ {
-		keys[i] = fmt.Sprintf("bench.key.%d", i)
+		group := i % 5
+		id := i / 5
+		
+		switch group {
+		case 0:
+			// Len 1: item.<id>
+			// But to test prefix well, let's group them slightly? 
+			// Actually "item.1" is 2 segments if split by dot.
+			// User said "one to 5 segments". 
+			// Let's treat "." as separator.
+			// "root<id>" is 1 segment.
+			keys[i] = fmt.Sprintf("root%d", id)
+		case 1:
+			// Len 2: user.<id>
+			keys[i] = fmt.Sprintf("user.%d", id)
+		case 2:
+			// Len 3: group.<id%100>.member.<id>
+			// group.0.member.0 ... group.0.member.199
+			keys[i] = fmt.Sprintf("group.%d.member.%d", id%100, id)
+		case 3:
+			// Len 4: app.<id%10>.ver.<id%10>.config.<id>
+			keys[i] = fmt.Sprintf("app.%d.ver.%d.config.%d", id%10, id%10, id)
+		case 4:
+			// Len 5: log.2023.01.01.<id>
+			keys[i] = fmt.Sprintf("log.2023.01.01.%d", id)
+		}
 	}
 
 	var wg sync.WaitGroup
-	workers := 10 // Concurrent workers
+	workers := 10
 	chunkSize := TotalKeys / workers
 	var insertOps int64
 
@@ -53,9 +95,9 @@ func main() {
 			base := id * chunkSize
 			for i := 0; i < chunkSize; i++ {
 				idx := base + i
-				// Random value string of approx ValueBaseSize
-				// Using consistent size initially to test reuse later
-				val := fmt.Sprintf("value-data-%d-%s", idx, randomString(15))
+				if idx >= TotalKeys { continue }
+				// Small value
+				val := "v"
 				if err := e.Set(keys[idx], val, uint64(idx)); err != nil {
 					panic(err)
 				}
@@ -67,292 +109,73 @@ func main() {
 	
 	duration := time.Since(start)
 	qps := float64(TotalKeys) / duration.Seconds()
-	printStats("Insert", TotalKeys, duration, qps, getFileSize(DataDir+"/values.data"))
-
-	// 2. Update 10k Keys (Mixed Strategy)
-	// Strategy:
-	// - 50% Longer: Triggers Append + Mark Old Deleted (Old slots become Free)
-	// - 50% Shorter: Triggers In-Place Update (No FreeList change)
-	// But wait, with FreeList, the "Longer" updates will generate Free slots.
-	// Can we verify if subsequent inserts reuse them?
-	fmt.Println("\n--- Phase 2: Update 10k Keys (50% Long, 50% Short) ---")
-	start = time.Now()
-	
-	updateKeys := keys[:UpdateCount]
-	wg = sync.WaitGroup{}
-	chunkSize = UpdateCount / workers
+	currentMem := getMemUsage()
+	printStats("Insert", TotalKeys, duration, qps, getFileSize(DataDir+"/values.data"), currentMem - startMem)
 
-	for w := 0; w < workers; w++ {
-		wg.Add(1)
-		go func(id int) {
-			defer wg.Done()
-			base := id * chunkSize
-			for i := 0; i < chunkSize; i++ {
-				idx := base + i
-				key := updateKeys[idx]
-				var val string
-				if idx%2 == 0 {
-					// Longer value (Trigger Append, Release Old Slot to FreeList)
-					// Old cap was likely 32 or 48. New cap will be larger.
-					val = fmt.Sprintf("updated-long-value-%d-%s-padding-padding", idx, randomString(40))
-				} else {
-					// Shorter value (Trigger In-Place)
-					val = "short" 
-				}
-				if err := e.Set(key, val, uint64(TotalKeys+idx)); err != nil {
-					panic(err)
-				}
-			}
-		}(w)
-	}
-	wg.Wait()
-	
-	duration = time.Since(start)
-	qps = float64(UpdateCount) / duration.Seconds()
-	printStats("Update", UpdateCount, duration, qps, getFileSize(DataDir+"/values.data"))
+	// 2. Query Consistency Verification
+	fmt.Println("\n--- Phase 2: Query Consistency Verification ---")
 	
-	// 3. Insert New Data to Test Reuse
-	// We generated ~5000 free slots (from the Long updates) in Phase 2.
-	// Let's insert 5000 new keys with size matching the old slots (approx 32-48 bytes).
-	// If reuse works, file size should NOT increase significantly.
-	fmt.Println("\n--- Phase 3: Insert 5k New Keys (Test Reuse) ---")
-	reuseKeysCount := 5000
-	start = time.Now()
+	verifyQuery(e, "Exact Match (Len 1)", `key = "root0"`, 1)
+	verifyQuery(e, "Exact Match (Len 2)", `key = "user.0"`, 1)
+	verifyQuery(e, "Prefix Scan (Len 3)", `key like "group.0.member.*"`, 200) // 20000 items / 5 types = 4000 items. id%100 -> 100 groups. 4000/100 = 40? Wait.
+	// Total items in group 2: 20,000.
+	// id goes from 0 to 19999.
+	// id%100 goes 0..99.
+	// So each group (0..99) appears 20000/100 = 200 times.
+	// So "group.0.member.*" should have 200 matches. Correct.
 	
-	for i := 0; i < reuseKeysCount; i++ {
-		key := fmt.Sprintf("reuse.key.%d", i)
-		// Length matches initial inserts (approx 30-40 bytes)
-		val := fmt.Sprintf("new-value-reuse-%d-%s", i, randomString(15))
-		if err := e.Set(key, val, uint64(TotalKeys+UpdateCount+i)); err != nil {
-			panic(err)
-		}
-	}
+	verifyQuery(e, "Prefix Scan (Len 4)", `key like "app.0.ver.0.config.*"`, 2000)
+	// Group 3 items: 20000.
+	// Key format: app.<id%10>.ver.<id%10>.config.<id>
+	// Condition: id%10 == 0.
+	// Since id goes 0..19999, exactly 1/10 of ids satisfy (id%10 == 0).
+	// Count = 20000 / 10 = 2000.
 	
-	duration = time.Since(start)
-	qps = float64(reuseKeysCount) / duration.Seconds()
-	printStats("InsertReuse", reuseKeysCount, duration, qps, getFileSize(DataDir+"/values.data"))
-
+	verifyQuery(e, "Prefix Scan (Len 5)", `key like "log.2023.*"`, 20000) // All items in group 4
 
-	// 4. Delete 10k Keys
-	fmt.Println("\n--- Phase 4: Delete 10k Keys (Simulated via Update) ---")
-	// Using "DELETED_MARKER" which is short, so it will be In-Place update.
-	// This won't test FreeList populating, but In-Place logic.
-	// To test FreeList populating from Delete, we need a real Delete() or Update to smaller size that frees extra space?
-	// But our storage doesn't split blocks.
-	// So let's stick to standard benchmark.
-	deleteKeys := keys[UpdateCount : UpdateCount+DeleteCount]
-	start = time.Now()
+	// 3. Query Performance Test (Prefix)
+	fmt.Println("\n--- Phase 3: Query Performance (Prefix Scan) ---")
 	
-	wg = sync.WaitGroup{}
-	chunkSize = DeleteCount / workers
-
-	for w := 0; w < workers; w++ {
-		wg.Add(1)
-		go func(id int) {
-			defer wg.Done()
-			base := id * chunkSize
-			for i := 0; i < chunkSize; i++ {
-				idx := base + i
-				key := deleteKeys[idx]
-				if err := e.Set(key, "DELETED_MARKER", uint64(TotalKeys+UpdateCount+reuseKeysCount+idx)); err != nil {
-					panic(err)
-				}
-			}
-		}(w)
-	}
-	wg.Wait()
-
-	duration = time.Since(start)
-	qps = float64(DeleteCount) / duration.Seconds()
-	printStats("Delete", DeleteCount, duration, qps, getFileSize(DataDir+"/values.data"))
-
-	// 5. Query Performance Test
-	fmt.Println("\n--- Phase 5: Query Performance Test ---")
-	
-	// 5.1 Single Key Point Lookup
 	start = time.Now()
-	qPointCount := 100000
+	qCount := 10000
 	wg = sync.WaitGroup{}
-	chunkSize = qPointCount / workers
+	chunkSize = qCount / workers
 	
-	for w := 0; w < workers; w++ {
-		wg.Add(1)
-		go func(id int) {
-			defer wg.Done()
-			base := id * chunkSize
-			for i := 0; i < chunkSize; i++ {
-				// Query: key = "bench.key.12345"
-				idx := base + i
-				key := keys[idx]
-				query := fmt.Sprintf(`key = "%s"`, key)
-				_, _ = e.Query(query)
-			}
-		}(w)
-	}
-	wg.Wait()
-	duration = time.Since(start)
-	qps = float64(qPointCount) / duration.Seconds()
-	printStats("Query(Point)", qPointCount, duration, qps, 0)
-
-
-	// 5.2 Query Metadata Only (Key/CommitIndex) - Should be fast (No IO)
-	start = time.Now()
-	qMetaCount := 50000
-	var metaHits int64
-	
-	wg = sync.WaitGroup{}
-	chunkSize = qMetaCount / workers
-	
-	for w := 0; w < workers; w++ {
-		wg.Add(1)
-		go func() {
-			defer wg.Done()
-			// Query for keys starting with "bench.key.99" (Last 1000 keys)
-			// This exercises the scan but filters on Key (Metadata)
-			// Note: Our Query implementation scans ALL keys, so total items processed = TotalKeys * qMetaCount / workers
-			// This is extremely heavy if not optimized.
-			// Let's run a smaller number of queries.
-			for i := 0; i < chunkSize; i++ {
-				// Query: key like "bench.key.999*"
-				res, _ := e.Query(`key like "bench.key.999*"`)
-				atomic.AddInt64(&metaHits, int64(len(res)))
-			}
-		}()
-	}
-	wg.Wait()
-	duration = time.Since(start)
-	qps = float64(qMetaCount) / duration.Seconds()
-	printStats("Query(Meta)", qMetaCount, duration, qps, 0)
-
-	// 5.3 Query with Pagination (LIMIT/OFFSET)
-	// We use the same Meta query but with LIMIT 10 OFFSET 20 to test pagination speedup
-	start = time.Now()
-	qPageCount := 50000
-	var pageHits int64
-	
-	wg = sync.WaitGroup{}
-	chunkSize = qPageCount / workers
-	
-	for w := 0; w < workers; w++ {
-		wg.Add(1)
-		go func() {
-			defer wg.Done()
-			for i := 0; i < chunkSize; i++ {
-				// Query with LIMIT 10 OFFSET 20
-				res, _ := e.Query(`key like "bench.key.999*" LIMIT 10 OFFSET 20`)
-				atomic.AddInt64(&pageHits, int64(len(res)))
-			}
-		}()
-	}
-	wg.Wait()
-	duration = time.Since(start)
-	qps = float64(qPageCount) / duration.Seconds()
-	printStats("Query(Lim10Off20)", qPageCount, duration, qps, 0)
-
-	// 5.4 Query Value (Needs IO)
-	start = time.Now()
-	qValCount := 100 // Smaller count because full scan IO is slow
-	var valHits int64
-	
-	wg = sync.WaitGroup{}
-	chunkSize = qValCount / workers
-	if chunkSize == 0 { chunkSize = 1; workers = qValCount }
-
 	for w := 0; w < workers; w++ {
 		wg.Add(1)
 		go func() {
 			defer wg.Done()
 			for i := 0; i < chunkSize; i++ {
-				// Query: value like "*data-999*"
-				res, _ := e.Query(`value like "*data-999*"`)
-				atomic.AddInt64(&valHits, int64(len(res)))
+				// Query overlapping prefixes to stress test binary search + scan
+				// "group.50.member.*" -> 200 items scan
+				_, _ = e.Query(`key like "group.50.member.*"`)
 			}
 		}()
 	}
 	wg.Wait()
 	duration = time.Since(start)
-	qps = float64(qValCount) / duration.Seconds()
-	printStats("Query(Val)", qValCount, duration, qps, 0)
+	qps = float64(qCount) / duration.Seconds()
+	printStats("Query(Prefix)", qCount, duration, qps, 0, 0)
 
+	// Final Memory Report
+	runtime.GC()
+	finalMem := getMemUsage()
+	fmt.Printf("\n[Final Stats] Total Keys: %d | Memory Usage: %.2f MB\n", TotalKeys, float64(finalMem-startMem)/1024/1024)
+}
 
-	// 6. Verification
-	fmt.Println("\n--- Phase 6: Verification (Full Scan) ---")
-	
-	errors := 0
-	
-	// 1. Check Updated Keys (0-9999)
-	for i := 0; i < UpdateCount; i++ {
-		val, ok := e.Get(keys[i])
-		if !ok {
-			fmt.Printf("Error: Key %s not found\n", keys[i])
-			errors++
-			continue
-		}
-		if i%2 == 0 {
-			// Should be long
-			if len(val) < 40 { // Our long value is > 40 chars
-				fmt.Printf("Error: Key %s should be long, got: %s\n", keys[i], val)
-				errors++
-			}
-		} else {
-			// Should be "short"
-			if val != "short" {
-				fmt.Printf("Error: Key %s should be 'short', got: %s\n", keys[i], val)
-				errors++
-			}
-		}
-	}
-	
-	// 2. Check Deleted Keys (10000-19999)
-	for i := UpdateCount; i < UpdateCount+DeleteCount; i++ {
-		val, ok := e.Get(keys[i])
-		if !ok {
-			fmt.Printf("Error: Key %s not found\n", keys[i])
-			errors++
-			continue
-		}
-		if val != "DELETED_MARKER" {
-			fmt.Printf("Error: Key %s should be deleted, got: %s\n", keys[i], val)
-			errors++
-		}
-	}
-	
-	// 3. Check Original Keys (20000-99999)
-	for i := UpdateCount + DeleteCount; i < TotalKeys; i++ {
-		val, ok := e.Get(keys[i])
-		if !ok {
-			fmt.Printf("Error: Key %s not found\n", keys[i])
-			errors++
-			continue
-		}
-		// Original values start with "value-data-"
-		if len(val) < 10 || val[:11] != "value-data-" {
-			fmt.Printf("Error: Key %s should be original, got: %s\n", keys[i], val)
-			errors++
-		}
+func verifyQuery(e *db.Engine, name, sql string, expected int) {
+	start := time.Now()
+	res, err := e.Query(sql)
+	if err != nil {
+		fmt.Printf("FAIL: %s - Error: %v\n", name, err)
+		return
 	}
+	duration := time.Since(start)
 	
-	// 4. Check Reused Keys (Extra 5000)
-	for i := 0; i < 5000; i++ { // reuseKeysCount was hardcoded as 5000 inside main
-		key := fmt.Sprintf("reuse.key.%d", i)
-		val, ok := e.Get(key)
-		if !ok {
-			fmt.Printf("Error: Reuse Key %s not found\n", key)
-			errors++
-			continue
-		}
-		if len(val) < 10 || val[:16] != "new-value-reuse-" {
-			fmt.Printf("Error: Reuse Key %s mismatch, got: %s\n", key, val)
-			errors++
-		}
-	}
-
-	if errors == 0 {
-		fmt.Println("Integrity Check: PASS (All keys verified successfully)")
-		// Cleanup if successful
-		os.RemoveAll(DataDir)
+	if len(res) == expected {
+		fmt.Printf("PASS: %-25s | Count: %5d | Time: %v\n", name, len(res), duration)
 	} else {
-		fmt.Printf("Integrity Check: FAIL (%d errors found)\n", errors)
+		fmt.Printf("FAIL: %-25s | Expected: %d, Got: %d\n", name, expected, len(res))
 	}
 }
 
@@ -373,10 +196,20 @@ func getFileSize(path string) int64 {
 	return fi.Size()
 }
 
-func printStats(op string, count int, d time.Duration, qps float64, size int64) {
+func getMemUsage() uint64 {
+	var m runtime.MemStats
+	runtime.ReadMemStats(&m)
+	return m.Alloc
+}
+
+func printStats(op string, count int, d time.Duration, qps float64, size int64, memDelta uint64) {
 	sizeStr := ""
 	if size > 0 {
-		sizeStr = fmt.Sprintf(" | DB Size: %.2f MB", float64(size)/1024/1024)
+		sizeStr = fmt.Sprintf(" | Disk: %.2f MB", float64(size)/1024/1024)
+	}
+	memStr := ""
+	if memDelta > 0 {
+		memStr = fmt.Sprintf(" | Mem+: %.2f MB", float64(memDelta)/1024/1024)
 	}
-	fmt.Printf("%s: %d ops in %v | QPS: %.0f%s\n", op, count, d, qps, sizeStr)
+	fmt.Printf("%-15s: %6d ops in %7v | QPS: %8.0f%s%s\n", op, count, d, qps, sizeStr, memStr)
 }

+ 38 - 44
example/database/inspector.go

@@ -11,9 +11,10 @@ import (
 )
 
 const (
-	FlagDeleted = 0x00
-	FlagValid   = 0x01
-	HeaderSize  = 9
+	RecordTypePut    = 0x01
+	RecordTypeDelete = 0x02
+	// CRC(4) + Type(1) + KeyLen(2) + ValLen(4) + CommitIndex(8)
+	HeaderSize = 19
 )
 
 func main() {
@@ -34,8 +35,8 @@ func main() {
 	fmt.Printf("Scanning storage file: %s\n\n", dataFile)
 
 	w := tabwriter.NewWriter(os.Stdout, 0, 0, 3, ' ', tabwriter.Debug)
-	fmt.Fprintln(w, "Offset\tFlag\tStatus\tCapacity\tLength\tContent")
-	fmt.Fprintln(w, "------\t----\t------\t--------\t------\t-------")
+	fmt.Fprintln(w, "Offset\tType\tCRC\tKeyLen\tValLen\tCommit\tKey\tValue")
+	fmt.Fprintln(w, "------\t----\t---\t------\t------\t------\t---\t-----")
 
 	offset := int64(0)
 	reader := f
@@ -55,61 +56,54 @@ func main() {
 			break
 		}
 
-		flag := header[0]
-		cap := binary.LittleEndian.Uint32(header[1:])
-		length := binary.LittleEndian.Uint32(header[5:])
-
-		status := "VALID"
-		if flag == FlagDeleted {
-			status = "DELETED"
+		// Parse Header
+		crc := binary.LittleEndian.Uint32(header[0:4])
+		recType := header[4]
+		keyLen := binary.LittleEndian.Uint16(header[5:7])
+		valLen := binary.LittleEndian.Uint32(header[7:11])
+		commitIndex := binary.LittleEndian.Uint64(header[11:19])
+
+		typeStr := "PUT"
+		if recType == RecordTypeDelete {
+			typeStr = "DEL"
+		} else if recType != RecordTypePut {
+			typeStr = fmt.Sprintf("UNK(%d)", recType)
 		}
 
-		// Read Data
-		// Even if deleted, we read to show what was there (or just skip)
-		// But strictly we should read `cap` bytes to advance offset correctly.
-		data := make([]byte, cap)
+		// Read Data (Key + Value)
+		totalDataLen := int(keyLen) + int(valLen)
+		data := make([]byte, totalDataLen)
 		if _, err := reader.ReadAt(data, offset+HeaderSize); err != nil {
 			fmt.Printf("Error reading data at offset %d: %v\n", offset+HeaderSize, err)
 			break
 		}
 
-		// Only show actual data content up to length, but indicate full capacity
-		content := ""
-		if flag == FlagValid {
-			if length <= cap {
-				content = string(data[:length])
-			} else {
-				content = fmt.Sprintf("<CORRUPT: len=%d > cap=%d>", length, cap)
-			}
-		} else {
-			content = "<deleted content>"
-			// Optionally we could try to show it if we wanted to debug
-			if length <= cap {
-				content = fmt.Sprintf("<deleted: %s>", string(data[:length]))
-			}
+		key := string(data[:keyLen])
+		val := string(data[keyLen:])
+		
+		// Truncate long values for display
+		displayVal := val
+		if len(displayVal) > 30 {
+			displayVal = displayVal[:27] + "..."
+		}
+		if recType == RecordTypeDelete {
+			displayVal = "<tombstone>"
 		}
 
-		fmt.Fprintf(w, "%d\t0x%02X\t%s\t%d\t%d\t%q\n", offset, flag, status, cap, length, content)
+		fmt.Fprintf(w, "%d\t%s\t%08X\t%d\t%d\t%d\t%s\t%q\n", 
+			offset, typeStr, crc, keyLen, valLen, commitIndex, key, displayVal)
 
-		offset += int64(HeaderSize + int(cap))
+		offset += int64(HeaderSize + totalDataLen)
 	}
 	w.Flush()
 	fmt.Println()
 	
-	// Also try to open Engine to show logical view
+	// Also try to open Engine to verify Index Rebuild
 	e, err := db.NewEngine(dataDir)
 	if err == nil {
-		fmt.Println("Logical Key-Value View (Active Keys):")
-		fmt.Println("-------------------------------------")
-		// We need to expose some way to iterate keys in Engine for debug
-		// Since Engine doesn't expose iterator, we can't easily list all keys without modifying Engine.
-		// For now, let's just inspect the raw file structure which was the main request.
-		// But wait, user asked for "format output table data".
-		
-		// Ideally we modify Engine to support iteration or just rely on file scan.
-		// The file scan above shows PHYSICAL layout.
-		// Let's add a simple usage demo to generate some data if dir is empty.
+		fmt.Println("Engine Load Check: SUCCESS (Index rebuilt from disk)")
 		e.Close()
+	} else {
+		fmt.Printf("Engine Load Check: FAILED (%v)\n", err)
 	}
 }
-

+ 0 - 1
go.mod

@@ -1,4 +1,3 @@
 module igit.com/xbase/raft
 
 go 1.20
-

BIN
raft-cli


Niektoré súbory nie sú zobrazené, pretože je v týchto rozdielových dátach zmenené mnoho súborov