Kaynağa Gözat

db query优化

xbase 2 hafta önce
ebeveyn
işleme
d02847dd0f
4 değiştirilmiş dosya ile 778 ekleme ve 208 silme
  1. BIN
      bench_db_data/values.data
  2. 128 0
      db/db.md
  3. 590 206
      db/engine.go
  4. 60 2
      example/database/benchmark.go

BIN
bench_db_data/values.data


+ 128 - 0
db/db.md

@@ -0,0 +1,128 @@
+# RaftDB Storage Engine Documentation
+
+RaftDB 内置了一个高性能、线程安全的嵌入式键值存储引擎。该引擎专为 Raft 状态机设计,支持高并发读写、倒排索引查询以及磁盘空间复用。
+
+## 1. 核心特性
+
+*   **高并发 (High Concurrency)**: 采用分片锁 (Striped Locking) 和原子操作,支持数万 QPS 的并行读写。
+*   **数据安全 (Data Safety)**: Key 级锁确保同一 Key 的读写操作原子性,避免脏读和写冲突。
+*   **磁盘复用 (Disk Reuse)**: 内置 Best-Fit 策略的 FreeList,自动回收并重用被删除或更新产生的空闲磁盘空间。
+*   **倒排索引 (Inverted Index)**: 支持对 Key 和 Value 的自动分词索引,支持复杂的 SQL 风格查询。
+*   **崩溃一致性 (Crash Consistency)**: 优化的写入顺序(Write New -> Update Index -> Mark Old Deleted),确保数据不丢失。
+
+## 2. 架构设计
+
+### 2.1 存储布局 (Storage Layout)
+
+数据存储在单个 append-only 文件中 (`values.data`),但支持空间复用。
+
+**物理格式:**
+```
+[Header][Data]
+```
+*   **Header (9 bytes)**:
+    *   `Flag` (1 byte): `0x01` (Valid) 或 `0x00` (Deleted)
+    *   `Capacity` (4 bytes): 该槽位分配的总大小(包含 Data),用于复用。
+    *   `Length` (4 bytes): 实际数据长度。
+*   **Data**: 实际存储的 Key/Value 数据。
+
+### 2.2 内存索引 (In-Memory Index)
+
+为了减少锁竞争,核心数据结构均采用了分片设计 (Sharding)。
+
+1.  **IndexShards (16 Shards)**:
+    *   存储 `KeyID -> {FileOffset, CommitIndex}` 的映射。
+    *   读写请求根据 KeyHash 分发到不同分片,互不干扰。
+
+2.  **KeyMap (16 Shards)**:
+    *   维护 `String Key <-> uint32 KeyID` 的双向映射。
+    *   减少内存占用(索引中使用 uint32 替代 string),并加速比较。
+
+3.  **InvertedIndex (16 Shards)**:
+    *   维护 `Token -> []KeyID` 的倒排表。
+    *   支持 `LIKE` 和全文匹配查询。
+
+### 2.3 锁机制 (Locking Strategy)
+
+*   **Key Locks (1024 Striped Locks)**:
+    *   针对特定 Key 的操作(Get/Set)必须先获取该 Key 对应的 Hash 锁。
+    *   保证了同一 Key 的强一致性。
+*   **Shard Locks**:
+    *   仅在访问 Index 映射表时短暂持有,IO 操作在 Index 锁之外进行。
+*   **Atomic Offset**:
+    *   文件追加写操作使用 `atomic.AddInt64`,无需全局锁。
+
+## 3. 性能测试报告 (Benchmark Report)
+
+测试环境: macOS, 10 并发 Workers, 本地磁盘 IO。
+
+### 3.1 吞吐量 (Throughput)
+
+| 操作类型 | 数量 | 耗时 | QPS (Ops/sec) | 说明 |
+| :--- | :--- | :--- | :--- | :--- |
+| **Insert** | 100,000 | ~2.07s | **~48,300** | 批量写入,完全并行 |
+| **Update** | 10,000 | ~0.09s | **~106,000** | 混合原地更新与追加写 |
+| **Insert (Reuse)** | 5,000 | ~0.27s | **~18,200** | 复用旧空间,无需文件增长 |
+| **Delete** | 10,000 | ~0.02s | **~403,800** | 标记删除,极快 |
+
+### 3.2 磁盘空间复用验证
+
+*   **初始写入 (100k items)**: DB Size **3.91 MB**
+*   **更新后 (10k updates)**: DB Size **4.33 MB** (仅少量增长,部分原地更新)
+*   **删除后 (10k deletes)**: DB Size **4.61 MB** (文件大小不变,产生空洞)
+*   **复用写入 (5k reuse)**: DB Size **4.61 MB** (完全复用空洞,**文件大小零增长**)
+
+**结论**: FreeList 机制有效工作,长时间运行后数据库文件大小将趋于稳定,不会无限膨胀。
+
+### 3.3 数据完整性验证
+
+*   **并发写安全**: 100% 通过。多线程更新同一 Key 也是安全的。
+*   **数据一致性**: 100% 通过。全量扫描验证所有 Key 的值均为预期最新值。
+
+## 4. 使用说明
+
+### 4.1 初始化
+
+```go
+import "db"
+
+// 初始化引擎,指定数据目录
+e, err := db.NewEngine("./my_data")
+if err != nil {
+    panic(err)
+}
+defer e.Close()
+```
+
+### 4.2 基本读写
+
+```go
+// 写入数据 (Key, Value, CommitIndex)
+err := e.Set("user.1001", "{\"name\":\"Alice\"}", 1)
+
+// 读取数据
+val, found := e.Get("user.1001")
+if found {
+    fmt.Println("Value:", val)
+}
+```
+
+### 4.3 高级查询 (SQL-like)
+
+支持对 Key、Value 和 CommitIndex 进行查询。
+
+```go
+// 查询所有 name 包含 "Alice" 且 CommitIndex > 0 的记录
+results, err := e.Query(`value like "*Alice*" commit_index > 0`)
+
+for _, row := range results {
+    fmt.Printf("Key: %s, Val: %s\n", row.Key, row.Value)
+}
+```
+
+## 5. 改进与优化历史
+
+1.  **初始版本**: 全局互斥锁,性能瓶颈明显 (~15k QPS)。
+2.  **V1 优化**: 引入 `KeyMap` 和 `Index` 分片,去除文件 IO 全局锁,使用 `pwrite`/`pread`。QPS 提升至 ~49k。
+3.  **V2 安全加固**: 引入 `StripedLock` (Key级锁) 解决并发写冲突和脏读问题。在保证安全的前提下,Update/Delete 性能进一步提升至 100k+ QPS。
+

Dosya farkı çok büyük olduğundan ihmal edildi
+ 590 - 206
db/engine.go


+ 60 - 2
example/database/benchmark.go

@@ -166,8 +166,66 @@ func main() {
 	qps = float64(DeleteCount) / duration.Seconds()
 	printStats("Delete", DeleteCount, duration, qps, getFileSize(DataDir+"/values.data"))
 
-	// 5. Verification
-	fmt.Println("\n--- Phase 5: Verification (Full Scan) ---")
+	// 5. Query Performance Test
+	fmt.Println("\n--- Phase 5: Query Performance Test ---")
+
+	// 5.1 Query Metadata Only (Key/CommitIndex) - Should be fast (No IO)
+	start = time.Now()
+	qMetaCount := 5000
+	var metaHits int64
+	
+	wg = sync.WaitGroup{}
+	chunkSize = qMetaCount / workers
+	
+	for w := 0; w < workers; w++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			// Query for keys starting with "bench.key.99" (Last 1000 keys)
+			// This exercises the scan but filters on Key (Metadata)
+			// Note: Our Query implementation scans ALL keys, so total items processed = TotalKeys * qMetaCount / workers
+			// This is extremely heavy if not optimized.
+			// Let's run a smaller number of queries.
+			for i := 0; i < chunkSize; i++ {
+				// Query: key like "bench.key.999*"
+				res, _ := e.Query(`key like "bench.key.999*"`)
+				atomic.AddInt64(&metaHits, int64(len(res)))
+			}
+		}()
+	}
+	wg.Wait()
+	duration = time.Since(start)
+	qps = float64(qMetaCount) / duration.Seconds()
+	printStats("Query(Meta)", qMetaCount, duration, qps, 0)
+
+	// 5.2 Query Value (Needs IO)
+	start = time.Now()
+	qValCount := 100 // Smaller count because full scan IO is slow
+	var valHits int64
+	
+	wg = sync.WaitGroup{}
+	chunkSize = qValCount / workers
+	if chunkSize == 0 { chunkSize = 1; workers = qValCount }
+
+	for w := 0; w < workers; w++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for i := 0; i < chunkSize; i++ {
+				// Query: value like "*data-999*"
+				res, _ := e.Query(`value like "*data-999*"`)
+				atomic.AddInt64(&valHits, int64(len(res)))
+			}
+		}()
+	}
+	wg.Wait()
+	duration = time.Since(start)
+	qps = float64(qValCount) / duration.Seconds()
+	printStats("Query(Val)", qValCount, duration, qps, 0)
+
+
+	// 6. Verification
+	fmt.Println("\n--- Phase 6: Verification (Full Scan) ---")
 	
 	errors := 0
 	

Bu fark içinde çok fazla dosya değişikliği olduğu için bazı dosyalar gösterilmiyor