engine.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482
  1. package db
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "fmt"
  6. "hash/crc32"
  7. "io"
  8. "os"
  9. "regexp"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. )
  16. // hash returns a 32-bit FNV-1a hash of the string
  17. func hash(s string) uint32 {
  18. var h uint32 = 2166136261
  19. for i := 0; i < len(s); i++ {
  20. h = (h * 16777619) ^ uint32(s[i])
  21. }
  22. return h
  23. }
  24. // --- Flat Sorted Array Index Implementation ---
  25. // Extreme memory optimization:
  26. // 1. Removes all tree node overhead (pointers, structs, map buckets).
  27. // 2. Stores keys in a single monolithic []byte buffer (no string headers).
  28. // 3. Uses a sorted slice of offsets for O(log N) lookup and O(N) insertion.
  29. // IndexEntry
  30. type IndexEntry struct {
  31. ValueOffset int64
  32. CommitIndex uint64
  33. }
  34. type FlatIndex struct {
  35. // Monolithic buffer to store all key strings
  36. // Format: [key1 bytes][key2 bytes]...
  37. keyBuf []byte
  38. // Sorted list of index items
  39. items []flatItem
  40. mu sync.RWMutex
  41. }
  42. // flatItem is a compact struct (16 bytes aligned)
  43. type flatItem struct {
  44. keyOffset uint32 // 4 bytes: Offset into keyBuf (supports 4GB total key size)
  45. keyLen uint16 // 2 bytes: Length of key (max key len 65535)
  46. // Padding 2 bytes? No, Go struct alignment might add padding.
  47. // IndexEntry is 16 bytes (int64 + uint64).
  48. entry IndexEntry // 16 bytes
  49. }
  50. // Total per item: 4 + 2 + (2 padding) + 16 = 24 bytes per Key.
  51. // 100k keys => 2.4 MB.
  52. // Plus keyBuf: 100k * 18 bytes = 1.8 MB.
  53. // Total expected: ~4.2 MB.
  54. func NewFlatIndex() *FlatIndex {
  55. return &FlatIndex{
  56. keyBuf: make([]byte, 0, 1024*1024), // 1MB initial cap
  57. items: make([]flatItem, 0, 10000),
  58. }
  59. }
  60. // getKey unsafe-ish helper to get string from buffer without alloc?
  61. // Standard conversion string(b) allocates.
  62. // For comparison in binary search, we ideally want to avoid allocation.
  63. // But Go 1.20+ optimization might handle string(bytes) well in map keys or comparisons if they don't escape.
  64. // Let's rely on standard string() conversion for safety first.
  65. func (fi *FlatIndex) getKey(idx int) string {
  66. item := fi.items[idx]
  67. return string(fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)])
  68. }
  69. // Compare helper to avoid string allocation
  70. func (fi *FlatIndex) compare(idx int, target string) int {
  71. item := fi.items[idx]
  72. keyBytes := fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)]
  73. return strings.Compare(string(keyBytes), target)
  74. }
  75. func (fi *FlatIndex) Insert(key string, entry IndexEntry) {
  76. fi.mu.Lock()
  77. defer fi.mu.Unlock()
  78. fi.insertLocked(key, entry)
  79. }
  80. func (fi *FlatIndex) insertLocked(key string, entry IndexEntry) {
  81. // 1. Binary Search
  82. idx := sort.Search(len(fi.items), func(i int) bool {
  83. return fi.getKey(i) >= key
  84. })
  85. // 2. Update existing
  86. if idx < len(fi.items) && fi.getKey(idx) == key {
  87. fi.items[idx].entry = entry
  88. return
  89. }
  90. // 3. Add new key
  91. offset := len(fi.keyBuf)
  92. fi.keyBuf = append(fi.keyBuf, key...)
  93. if len(key) > 65535 {
  94. // Should have been caught earlier, but just in case cap it or panic
  95. // We can panic or truncate.
  96. // panic("key too long")
  97. }
  98. newItem := flatItem{
  99. keyOffset: uint32(offset),
  100. keyLen: uint16(len(key)),
  101. entry: entry,
  102. }
  103. // 4. Insert into sorted slice
  104. // Optimization: check if appending to end (common case for sequential inserts)
  105. if idx == len(fi.items) {
  106. fi.items = append(fi.items, newItem)
  107. } else {
  108. fi.items = append(fi.items, flatItem{})
  109. copy(fi.items[idx+1:], fi.items[idx:])
  110. fi.items[idx] = newItem
  111. }
  112. }
  113. func (fi *FlatIndex) Get(key string) (IndexEntry, bool) {
  114. fi.mu.RLock()
  115. defer fi.mu.RUnlock()
  116. idx := sort.Search(len(fi.items), func(i int) bool {
  117. return fi.getKey(i) >= key
  118. })
  119. if idx < len(fi.items) && fi.getKey(idx) == key {
  120. return fi.items[idx].entry, true
  121. }
  122. return IndexEntry{}, false
  123. }
  124. func (fi *FlatIndex) Delete(key string) bool {
  125. fi.mu.Lock()
  126. defer fi.mu.Unlock()
  127. idx := sort.Search(len(fi.items), func(i int) bool {
  128. return fi.getKey(i) >= key
  129. })
  130. if idx < len(fi.items) && fi.getKey(idx) == key {
  131. // Delete from slice
  132. fi.items = append(fi.items[:idx], fi.items[idx+1:]...)
  133. // Note: We don't reclaim space in keyBuf. It's append-only until compaction (not implemented).
  134. // This is fine for many cases, but heavy deletes might waste keyBuf space.
  135. return true
  136. }
  137. return false
  138. }
  139. // WalkPrefix iterates over keys starting with prefix.
  140. // Since keys are sorted, we find the first match and iterate until mismatch.
  141. func (fi *FlatIndex) WalkPrefix(prefix string, callback func(key string, entry IndexEntry) bool) {
  142. fi.mu.RLock()
  143. defer fi.mu.RUnlock()
  144. // Binary search for the start
  145. idx := sort.Search(len(fi.items), func(i int) bool {
  146. return fi.getKey(i) >= prefix
  147. })
  148. for i := idx; i < len(fi.items); i++ {
  149. key := fi.getKey(i)
  150. // Optimization: Check prefix match
  151. if !strings.HasPrefix(key, prefix) {
  152. break
  153. }
  154. if !callback(key, fi.items[i].entry) {
  155. return
  156. }
  157. }
  158. }
  159. // --- End Flat Index ---
  160. // --- Full Text Index ---
  161. type FullTextIndex struct {
  162. // Token -> Set of Keys
  163. index map[string]map[string]bool
  164. mu sync.RWMutex
  165. }
  166. func NewFullTextIndex() *FullTextIndex {
  167. return &FullTextIndex{
  168. index: make(map[string]map[string]bool),
  169. }
  170. }
  171. func (fti *FullTextIndex) Add(key string, value string) {
  172. fti.mu.Lock()
  173. defer fti.mu.Unlock()
  174. tokens := tokenize(value)
  175. for _, token := range tokens {
  176. if fti.index[token] == nil {
  177. fti.index[token] = make(map[string]bool)
  178. }
  179. fti.index[token][key] = true
  180. }
  181. }
  182. func (fti *FullTextIndex) Remove(key string, value string) {
  183. fti.mu.Lock()
  184. defer fti.mu.Unlock()
  185. tokens := tokenize(value)
  186. for _, token := range tokens {
  187. if set, ok := fti.index[token]; ok {
  188. delete(set, key)
  189. if len(set) == 0 {
  190. delete(fti.index, token)
  191. }
  192. }
  193. }
  194. }
  195. func (fti *FullTextIndex) Search(tokenPattern string) []string {
  196. fti.mu.RLock()
  197. defer fti.mu.RUnlock()
  198. // 1. Exact Match
  199. if !strings.Contains(tokenPattern, "*") {
  200. if keys, ok := fti.index[tokenPattern]; ok {
  201. res := make([]string, 0, len(keys))
  202. for k := range keys {
  203. res = append(res, k)
  204. }
  205. return res
  206. }
  207. return nil
  208. }
  209. // 2. Wildcard Scan
  210. var results []string
  211. seen := make(map[string]bool)
  212. for token, keys := range fti.index {
  213. if WildcardMatch(token, tokenPattern) {
  214. for k := range keys {
  215. if !seen[k] {
  216. results = append(results, k)
  217. seen[k] = true
  218. }
  219. }
  220. }
  221. }
  222. return results
  223. }
  224. func tokenize(val string) []string {
  225. f := func(c rune) bool {
  226. return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
  227. }
  228. return strings.FieldsFunc(val, f)
  229. }
  230. // --- Storage & Cache ---
  231. // StripedLock provides hashed locks for keys
  232. type StripedLock struct {
  233. locks [1024]sync.RWMutex
  234. }
  235. func (sl *StripedLock) GetLock(key string) *sync.RWMutex {
  236. h := hash(key)
  237. return &sl.locks[h%1024]
  238. }
  239. // Metadata handles the persistent state of the engine
  240. type Metadata struct {
  241. LastCommitIndex uint64
  242. }
  243. // Storage manages disk storage using Append-Only Log.
  244. type Storage struct {
  245. file *os.File
  246. filename string
  247. offset int64 // Current end of file
  248. cache map[int64]string
  249. cacheMu sync.RWMutex
  250. }
  251. const (
  252. RecordTypePut = 0x01
  253. RecordTypeDelete = 0x02
  254. // CRC(4) + Type(1) + KeyLen(2) + ValLen(4) + CommitIndex(8)
  255. HeaderSize = 4 + 1 + 2 + 4 + 8
  256. MaxCacheSize = 10000
  257. )
  258. func NewStorage(filename string) (*Storage, error) {
  259. f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
  260. if err != nil {
  261. return nil, err
  262. }
  263. st := &Storage{
  264. file: f,
  265. filename: filename,
  266. cache: make(map[int64]string),
  267. }
  268. return st, nil
  269. }
  270. // Record represents a log entry
  271. type Record struct {
  272. Type byte
  273. Key string
  274. Value string
  275. Offset int64
  276. CommitIndex uint64
  277. }
  278. // Scan iterates over the log file, validating CRCs and returning records.
  279. // It returns the valid offset after the last record.
  280. func (s *Storage) Scan(callback func(Record)) (int64, error) {
  281. offset := int64(0)
  282. if _, err := s.file.Seek(0, 0); err != nil {
  283. return 0, err
  284. }
  285. reader := bufio.NewReader(s.file)
  286. for {
  287. header := make([]byte, HeaderSize)
  288. n, err := io.ReadFull(reader, header)
  289. if err == io.EOF {
  290. break
  291. }
  292. if err == io.ErrUnexpectedEOF && n == 0 {
  293. break
  294. }
  295. if err != nil {
  296. fmt.Printf("Storage Scan Error at %d: %v\n", offset, err)
  297. break
  298. }
  299. storedCRC := binary.LittleEndian.Uint32(header[0:4])
  300. recType := header[4]
  301. keyLen := binary.LittleEndian.Uint16(header[5:7])
  302. valLen := binary.LittleEndian.Uint32(header[7:11])
  303. commitIndex := binary.LittleEndian.Uint64(header[11:19])
  304. totalLen := int(keyLen) + int(valLen)
  305. data := make([]byte, totalLen)
  306. if _, err := io.ReadFull(reader, data); err != nil {
  307. fmt.Printf("Storage Scan Data Error at %d: %v\n", offset, err)
  308. break
  309. }
  310. // Verify CRC
  311. crc := crc32.ChecksumIEEE(header[4:]) // Checksum of Type+Lengths+CommitIndex
  312. crc = crc32.Update(crc, crc32.IEEETable, data) // + Data
  313. if crc != storedCRC {
  314. fmt.Printf("CRC Mismatch at %d\n", offset)
  315. break
  316. }
  317. key := string(data[:keyLen])
  318. val := string(data[keyLen:])
  319. callback(Record{
  320. Type: recType,
  321. Key: key,
  322. Value: val,
  323. Offset: offset,
  324. CommitIndex: commitIndex,
  325. })
  326. offset += int64(HeaderSize + totalLen)
  327. }
  328. // Update storage offset
  329. s.offset = offset
  330. s.file.Seek(offset, 0)
  331. return offset, nil
  332. }
  333. // Append writes a new record
  334. func (s *Storage) Append(key, val string, recType byte, commitIndex uint64) (int64, error) {
  335. keyBytes := []byte(key)
  336. valBytes := []byte(val)
  337. keyLen := len(keyBytes)
  338. valLen := len(valBytes)
  339. if keyLen > 65535 {
  340. return 0, fmt.Errorf("key too large")
  341. }
  342. buf := make([]byte, HeaderSize+keyLen+valLen)
  343. // 1. Build Header Data (skip CRC)
  344. buf[4] = recType
  345. binary.LittleEndian.PutUint16(buf[5:7], uint16(keyLen))
  346. binary.LittleEndian.PutUint32(buf[7:11], uint32(valLen))
  347. binary.LittleEndian.PutUint64(buf[11:19], commitIndex)
  348. // 2. Copy Data
  349. copy(buf[HeaderSize:], keyBytes)
  350. copy(buf[HeaderSize+keyLen:], valBytes)
  351. // 3. Calc CRC
  352. crc := crc32.ChecksumIEEE(buf[4:]) // Everything after CRC field
  353. binary.LittleEndian.PutUint32(buf[0:4], crc)
  354. // 4. Write
  355. totalSize := int64(len(buf))
  356. writeOffset := atomic.AddInt64(&s.offset, totalSize) - totalSize
  357. if _, err := s.file.WriteAt(buf, writeOffset); err != nil {
  358. return 0, err
  359. }
  360. // Add to Cache if Put
  361. if recType == RecordTypePut {
  362. s.cacheMu.Lock()
  363. if len(s.cache) < MaxCacheSize {
  364. s.cache[writeOffset] = val
  365. }
  366. s.cacheMu.Unlock()
  367. }
  368. return writeOffset, nil
  369. }
  370. // Sync flushes writes to stable storage
  371. func (s *Storage) Sync() error {
  372. return s.file.Sync()
  373. }
  374. // ReadValue reads value at offset
  375. func (s *Storage) ReadValue(offset int64) (string, error) {
  376. // 1. Check Cache
  377. s.cacheMu.RLock()
  378. if val, ok := s.cache[offset]; ok {
  379. s.cacheMu.RUnlock()
  380. return val, nil
  381. }
  382. s.cacheMu.RUnlock()
  383. // 2. Read Header
  384. header := make([]byte, HeaderSize)
  385. if _, err := s.file.ReadAt(header, offset); err != nil {
  386. return "", err
  387. }
  388. keyLen := binary.LittleEndian.Uint16(header[5:7])
  389. valLen := binary.LittleEndian.Uint32(header[7:11])
  390. totalLen := int(keyLen) + int(valLen)
  391. data := make([]byte, totalLen)
  392. if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
  393. return "", err
  394. }
  395. // Verify CRC on read for safety
  396. storedCRC := binary.LittleEndian.Uint32(header[0:4])
  397. crc := crc32.ChecksumIEEE(header[4:])
  398. crc = crc32.Update(crc, crc32.IEEETable, data)
  399. if crc != storedCRC {
  400. return "", fmt.Errorf("data corruption detected at offset %d", offset)
  401. }
  402. val := string(data[keyLen:])
  403. // 3. Fill Cache
  404. s.cacheMu.Lock()
  405. if len(s.cache) < MaxCacheSize {
  406. s.cache[offset] = val
  407. } else {
  408. for k := range s.cache {
  409. delete(s.cache, k)
  410. break
  411. }
  412. s.cache[offset] = val
  413. }
  414. s.cacheMu.Unlock()
  415. return val, nil
  416. }
  417. func (s *Storage) Close() error {
  418. return s.file.Close()
  419. }
  420. // EngineOption defines a configuration option for the Engine
  421. type EngineOption func(*EngineConfig)
  422. type EngineConfig struct {
  423. EnableValueIndex bool
  424. }
  425. // WithValueIndex enables or disables the value index (Full Text Index)
  426. func WithValueIndex(enable bool) EngineOption {
  427. return func(c *EngineConfig) {
  428. c.EnableValueIndex = enable
  429. }
  430. }
  431. // Engine is the core storage engine.
  432. type Engine struct {
  433. Index *FlatIndex // Flattened memory structure
  434. Storage *Storage
  435. FTIndex *FullTextIndex // Can be nil if disabled
  436. KeyLocks StripedLock
  437. LastCommitIndex uint64
  438. commitMu sync.Mutex
  439. dataDir string
  440. // Metadata state
  441. metaFile *os.File
  442. writeMu sync.Mutex
  443. config EngineConfig
  444. }
  445. func NewEngine(dataDir string, opts ...EngineOption) (*Engine, error) {
  446. if err := os.MkdirAll(dataDir, 0755); err != nil {
  447. return nil, err
  448. }
  449. config := EngineConfig{
  450. EnableValueIndex: false, // Default to false (key-only)
  451. }
  452. for _, opt := range opts {
  453. opt(&config)
  454. }
  455. store, err := NewStorage(dataDir + "/values.data")
  456. if err != nil {
  457. return nil, err
  458. }
  459. // Open or create metadata file
  460. metaPath := dataDir + "/meta.state"
  461. metaFile, err := os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0644)
  462. if err != nil {
  463. store.Close()
  464. return nil, err
  465. }
  466. e := &Engine{
  467. Index: NewFlatIndex(),
  468. Storage: store,
  469. FTIndex: nil,
  470. dataDir: dataDir,
  471. metaFile: metaFile,
  472. config: config,
  473. }
  474. if config.EnableValueIndex {
  475. e.FTIndex = NewFullTextIndex()
  476. }
  477. // Load Metadata
  478. stat, err := metaFile.Stat()
  479. if err == nil && stat.Size() >= 8 {
  480. b := make([]byte, 8)
  481. if _, err := metaFile.ReadAt(b, 0); err == nil {
  482. e.LastCommitIndex = binary.LittleEndian.Uint64(b)
  483. }
  484. }
  485. // Rebuild Index from Disk
  486. // Note: We still scan to rebuild the memory index, but LastCommitIndex is initialized from meta file
  487. // We can update LastCommitIndex if the log is ahead of the meta file (e.g. crash before meta update)
  488. // We also correct LastCommitIndex if meta file is ahead of data (e.g. data flush lost during power failure)
  489. realMaxIndex := uint64(0)
  490. _, err = store.Scan(func(rec Record) {
  491. if rec.Type == RecordTypePut {
  492. e.Index.Insert(rec.Key, IndexEntry{
  493. ValueOffset: rec.Offset,
  494. CommitIndex: rec.CommitIndex,
  495. })
  496. if e.FTIndex != nil {
  497. e.FTIndex.Add(rec.Key, rec.Value)
  498. }
  499. } else if rec.Type == RecordTypeDelete {
  500. // Cleanup FTIndex using old value if possible
  501. e.removeValueFromFTIndex(rec.Key)
  502. e.Index.Delete(rec.Key)
  503. }
  504. // Track the actual max index present in the data file
  505. if rec.CommitIndex > realMaxIndex {
  506. realMaxIndex = rec.CommitIndex
  507. }
  508. })
  509. // Critical Safety Check:
  510. // 1. If Log > Meta: Update Meta (Normal crash recovery)
  511. // 2. If Meta > Log: Downgrade Meta (Data loss recovery - force Raft replay)
  512. if realMaxIndex > e.LastCommitIndex {
  513. e.LastCommitIndex = realMaxIndex
  514. } else if realMaxIndex < e.LastCommitIndex {
  515. // Detect inconsistency: Meta says we have more data than what's on disk.
  516. // This happens if Meta was flushed but Data tail was lost during power failure.
  517. // We MUST trust the actual data on disk.
  518. fmt.Printf("WARNING: Inconsistency detected! Meta LastCommitIndex (%d) > Data RealIndex (%d). Resetting to %d to force Raft replay.\n",
  519. e.LastCommitIndex, realMaxIndex, realMaxIndex)
  520. e.LastCommitIndex = realMaxIndex
  521. // Force save corrected metadata
  522. e.saveMetadata()
  523. }
  524. return e, nil
  525. }
  526. func (e *Engine) removeValueFromFTIndex(key string) {
  527. if e.FTIndex == nil {
  528. return
  529. }
  530. if entry, ok := e.Index.Get(key); ok {
  531. val, err := e.Storage.ReadValue(entry.ValueOffset)
  532. if err == nil {
  533. e.FTIndex.Remove(key, val)
  534. }
  535. }
  536. }
  537. func (e *Engine) Close() error {
  538. e.saveMetadata()
  539. e.metaFile.Close()
  540. return e.Storage.Close()
  541. }
  542. func (e *Engine) saveMetadata() {
  543. b := make([]byte, 8)
  544. binary.LittleEndian.PutUint64(b, e.LastCommitIndex)
  545. e.metaFile.WriteAt(b, 0)
  546. }
  547. func (e *Engine) Set(key, value string, commitIndex uint64) error {
  548. e.writeMu.Lock()
  549. defer e.writeMu.Unlock()
  550. // Idempotency check: if this log entry has already been applied, skip it.
  551. // This handles Raft replay on restart.
  552. if commitIndex > 0 && commitIndex <= e.LastCommitIndex {
  553. return nil
  554. }
  555. e.removeValueFromFTIndex(key)
  556. offset, err := e.Storage.Append(key, value, RecordTypePut, commitIndex)
  557. if err != nil {
  558. return err
  559. }
  560. e.Index.Insert(key, IndexEntry{
  561. ValueOffset: offset,
  562. CommitIndex: commitIndex,
  563. })
  564. if e.FTIndex != nil {
  565. e.FTIndex.Add(key, value)
  566. }
  567. e.commitMu.Lock()
  568. if commitIndex > e.LastCommitIndex {
  569. e.LastCommitIndex = commitIndex
  570. // Update metadata on disk periodically or on every write?
  571. // For safety, let's update it. Since it's pwrite at offset 0, it's fast.
  572. e.saveMetadata()
  573. }
  574. e.commitMu.Unlock()
  575. return nil
  576. }
  577. func (e *Engine) Delete(key string, commitIndex uint64) error {
  578. e.writeMu.Lock()
  579. defer e.writeMu.Unlock()
  580. // Idempotency check
  581. if commitIndex > 0 && commitIndex <= e.LastCommitIndex {
  582. return nil
  583. }
  584. e.removeValueFromFTIndex(key)
  585. _, err := e.Storage.Append(key, "", RecordTypeDelete, commitIndex)
  586. if err != nil {
  587. return err
  588. }
  589. e.Index.Delete(key)
  590. e.commitMu.Lock()
  591. if commitIndex > e.LastCommitIndex {
  592. e.LastCommitIndex = commitIndex
  593. e.saveMetadata()
  594. }
  595. e.commitMu.Unlock()
  596. return nil
  597. }
  598. // Sync flushes underlying storage to disk
  599. // GetLastAppliedIndex returns the last Raft log index applied to the DB
  600. func (e *Engine) GetLastAppliedIndex() uint64 {
  601. e.commitMu.Lock()
  602. defer e.commitMu.Unlock()
  603. return e.LastCommitIndex
  604. }
  605. func (e *Engine) Sync() error {
  606. e.metaFile.Sync()
  607. return e.Storage.Sync()
  608. }
  609. func (e *Engine) Get(key string) (string, bool) {
  610. entry, ok := e.Index.Get(key)
  611. if !ok {
  612. return "", false
  613. }
  614. val, err := e.Storage.ReadValue(entry.ValueOffset)
  615. if err != nil {
  616. return "", false
  617. }
  618. return val, true
  619. }
  620. type QueryResult struct {
  621. Key string `json:"key"`
  622. Value string `json:"value"`
  623. CommitIndex uint64 `json:"commit_index"`
  624. }
  625. func WildcardMatch(str, pattern string) bool {
  626. s := []rune(str)
  627. p := []rune(pattern)
  628. sLen, pLen := len(s), len(p)
  629. i, j := 0, 0
  630. starIdx, matchIdx := -1, -1
  631. for i < sLen {
  632. if j < pLen && (p[j] == '?' || p[j] == s[i]) {
  633. i++
  634. j++
  635. } else if j < pLen && p[j] == '*' {
  636. starIdx = j
  637. matchIdx = i
  638. j++
  639. } else if starIdx != -1 {
  640. j = starIdx + 1
  641. matchIdx++
  642. i = matchIdx
  643. } else {
  644. return false
  645. }
  646. }
  647. for j < pLen && p[j] == '*' {
  648. j++
  649. }
  650. return j == pLen
  651. }
  652. func (e *Engine) Count(sql string) (int, error) {
  653. return e.execute(sql, true)
  654. }
  655. func (e *Engine) execute(sql string, countOnly bool) (int, error) {
  656. sql = strings.TrimSpace(sql)
  657. reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
  658. reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
  659. limit := -1
  660. offset := 0
  661. if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
  662. limit, _ = strconv.Atoi(match[1])
  663. sql = reLimit.ReplaceAllString(sql, "")
  664. }
  665. if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
  666. offset, _ = strconv.Atoi(match[1])
  667. sql = reOffset.ReplaceAllString(sql, "")
  668. }
  669. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  670. matches := re.FindAllStringSubmatch(sql, -1)
  671. if len(matches) == 0 {
  672. return 0, fmt.Errorf("invalid query")
  673. }
  674. extractString := func(s string) string {
  675. return strings.Trim(strings.TrimSpace(s), "\"")
  676. }
  677. // 1. Optimize for Point Lookup
  678. for _, match := range matches {
  679. if match[1] == "key" && match[2] == "=" {
  680. targetKey := extractString(match[3])
  681. entry, ok := e.Index.Get(targetKey)
  682. if !ok {
  683. return 0, nil
  684. }
  685. needValue := false
  686. for _, m := range matches {
  687. if m[1] == "value" { needValue = true; break }
  688. }
  689. var val string
  690. if needValue {
  691. v, err := e.Storage.ReadValue(entry.ValueOffset)
  692. if err != nil { return 0, nil }
  693. val = v
  694. }
  695. matchAll := true
  696. for _, m := range matches {
  697. field, op, valRaw := m[1], m[2], m[3]
  698. switch field {
  699. case "CommitIndex":
  700. num, _ := strconv.ParseUint(valRaw, 10, 64)
  701. switch op {
  702. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  703. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  704. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  705. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  706. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  707. }
  708. case "value":
  709. t := extractString(valRaw)
  710. switch op {
  711. case "=": if val != t { matchAll = false }
  712. case "like": if !WildcardMatch(val, t) { matchAll = false }
  713. }
  714. }
  715. if !matchAll { break }
  716. }
  717. if matchAll {
  718. return 1, nil
  719. }
  720. return 0, nil
  721. }
  722. }
  723. var candidates map[string]bool
  724. var useFTIndex bool = false
  725. // 2. Try to use FT Index (if enabled)
  726. if e.FTIndex != nil {
  727. for _, match := range matches {
  728. if match[1] == "value" && match[2] == "like" {
  729. pattern := extractString(match[3])
  730. clean := strings.Trim(pattern, "*")
  731. if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
  732. matches := e.FTIndex.Search(pattern)
  733. if matches != nil {
  734. currentSet := make(map[string]bool)
  735. for _, k := range matches {
  736. currentSet[k] = true
  737. }
  738. if !useFTIndex {
  739. candidates = currentSet
  740. useFTIndex = true
  741. } else {
  742. newSet := make(map[string]bool)
  743. for k := range candidates {
  744. if currentSet[k] { newSet[k] = true }
  745. }
  746. candidates = newSet
  747. }
  748. } else {
  749. return 0, nil
  750. }
  751. }
  752. }
  753. }
  754. } else {
  755. for _, match := range matches {
  756. if match[1] == "value" && match[2] == "like" {
  757. // Value search requested but index disabled -> return 0
  758. return 0, nil
  759. }
  760. }
  761. }
  762. var iterator func(func(string, IndexEntry) bool)
  763. if useFTIndex {
  764. iterator = func(cb func(string, IndexEntry) bool) {
  765. keys := make([]string, 0, len(candidates))
  766. for k := range candidates { keys = append(keys, k) }
  767. sort.Strings(keys)
  768. for _, k := range keys {
  769. if entry, ok := e.Index.Get(k); ok {
  770. if !cb(k, entry) { return }
  771. }
  772. }
  773. }
  774. } else {
  775. // Use FlatIndex Prefix Search
  776. var prefix string = ""
  777. var usePrefix bool = false
  778. for _, match := range matches {
  779. if match[1] == "key" && match[2] == "like" {
  780. pattern := extractString(match[3])
  781. // Flat Index supports simple prefix match
  782. if strings.HasSuffix(pattern, "*") {
  783. clean := pattern[:len(pattern)-1]
  784. if !strings.ContainsAny(clean, "*?") {
  785. prefix = clean
  786. usePrefix = true
  787. break
  788. }
  789. }
  790. }
  791. }
  792. iterator = func(cb func(string, IndexEntry) bool) {
  793. if usePrefix {
  794. e.Index.WalkPrefix(prefix, cb)
  795. } else {
  796. e.Index.WalkPrefix("", cb)
  797. }
  798. }
  799. }
  800. matchedCount := 0
  801. iterator(func(key string, entry IndexEntry) bool {
  802. var valStr string
  803. var valLoaded bool
  804. matchAll := true
  805. for _, match := range matches {
  806. field, op, valRaw := match[1], match[2], match[3]
  807. switch field {
  808. case "CommitIndex":
  809. num, _ := strconv.ParseUint(valRaw, 10, 64)
  810. switch op {
  811. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  812. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  813. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  814. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  815. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  816. }
  817. case "key":
  818. target := extractString(valRaw)
  819. switch op {
  820. case "=": if key != target { matchAll = false }
  821. case "like": if !WildcardMatch(key, target) { matchAll = false }
  822. }
  823. case "value":
  824. if e.FTIndex == nil && op == "like" {
  825. matchAll = false
  826. break
  827. }
  828. if !valLoaded {
  829. v, err := e.Storage.ReadValue(entry.ValueOffset)
  830. if err != nil { matchAll = false; break }
  831. valStr = v
  832. valLoaded = true
  833. }
  834. target := extractString(valRaw)
  835. switch op {
  836. case "=": if valStr != target { matchAll = false }
  837. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  838. }
  839. }
  840. if !matchAll { break }
  841. }
  842. if matchAll {
  843. matchedCount++
  844. if limit > 0 && offset == 0 && matchedCount >= limit {
  845. return false
  846. }
  847. }
  848. return true
  849. })
  850. if offset > 0 {
  851. if offset >= matchedCount {
  852. return 0, nil
  853. }
  854. matchedCount -= offset
  855. }
  856. if limit >= 0 {
  857. if limit < matchedCount {
  858. matchedCount = limit
  859. }
  860. }
  861. return matchedCount, nil
  862. }
  863. func (e *Engine) Query(sql string) ([]QueryResult, error) {
  864. return e.queryInternal(sql)
  865. }
  866. func (e *Engine) queryInternal(sql string) ([]QueryResult, error) {
  867. sql = strings.TrimSpace(sql)
  868. reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
  869. reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
  870. limit := -1
  871. offset := 0
  872. if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
  873. limit, _ = strconv.Atoi(match[1])
  874. sql = reLimit.ReplaceAllString(sql, "")
  875. }
  876. if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
  877. offset, _ = strconv.Atoi(match[1])
  878. sql = reOffset.ReplaceAllString(sql, "")
  879. }
  880. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  881. matches := re.FindAllStringSubmatch(sql, -1)
  882. if len(matches) == 0 {
  883. return nil, fmt.Errorf("invalid query")
  884. }
  885. extractString := func(s string) string {
  886. return strings.Trim(strings.TrimSpace(s), "\"")
  887. }
  888. for _, match := range matches {
  889. if match[1] == "key" && match[2] == "=" {
  890. targetKey := extractString(match[3])
  891. entry, ok := e.Index.Get(targetKey)
  892. if !ok {
  893. return []QueryResult{}, nil
  894. }
  895. val, err := e.Storage.ReadValue(entry.ValueOffset)
  896. if err != nil {
  897. return []QueryResult{}, nil
  898. }
  899. matchAll := true
  900. for _, m := range matches {
  901. field, op, valRaw := m[1], m[2], m[3]
  902. switch field {
  903. case "CommitIndex":
  904. num, _ := strconv.ParseUint(valRaw, 10, 64)
  905. switch op {
  906. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  907. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  908. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  909. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  910. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  911. }
  912. case "value":
  913. t := extractString(valRaw)
  914. switch op {
  915. case "=": if val != t { matchAll = false }
  916. case "like": if !WildcardMatch(val, t) { matchAll = false }
  917. }
  918. }
  919. if !matchAll { break }
  920. }
  921. if matchAll {
  922. return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
  923. }
  924. return []QueryResult{}, nil
  925. }
  926. }
  927. var candidates map[string]bool
  928. var useFTIndex bool = false
  929. if e.FTIndex != nil {
  930. for _, match := range matches {
  931. if match[1] == "value" && match[2] == "like" {
  932. pattern := extractString(match[3])
  933. clean := strings.Trim(pattern, "*")
  934. if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
  935. matches := e.FTIndex.Search(pattern)
  936. if matches != nil {
  937. currentSet := make(map[string]bool)
  938. for _, k := range matches {
  939. currentSet[k] = true
  940. }
  941. if !useFTIndex {
  942. candidates = currentSet
  943. useFTIndex = true
  944. } else {
  945. newSet := make(map[string]bool)
  946. for k := range candidates {
  947. if currentSet[k] {
  948. newSet[k] = true
  949. }
  950. }
  951. candidates = newSet
  952. }
  953. } else {
  954. return []QueryResult{}, nil
  955. }
  956. }
  957. }
  958. }
  959. } else {
  960. for _, match := range matches {
  961. if match[1] == "value" && match[2] == "like" {
  962. return []QueryResult{}, nil
  963. }
  964. }
  965. }
  966. var iterator func(func(string, IndexEntry) bool)
  967. if useFTIndex {
  968. iterator = func(cb func(string, IndexEntry) bool) {
  969. keys := make([]string, 0, len(candidates))
  970. for k := range candidates {
  971. keys = append(keys, k)
  972. }
  973. sort.Strings(keys)
  974. for _, k := range keys {
  975. if entry, ok := e.Index.Get(k); ok {
  976. if !cb(k, entry) {
  977. return
  978. }
  979. }
  980. }
  981. }
  982. } else {
  983. var prefix string = ""
  984. var usePrefix bool = false
  985. for _, match := range matches {
  986. if match[1] == "key" && match[2] == "like" {
  987. pattern := extractString(match[3])
  988. if strings.HasSuffix(pattern, "*") {
  989. clean := pattern[:len(pattern)-1]
  990. if !strings.ContainsAny(clean, "*?") {
  991. prefix = clean
  992. usePrefix = true
  993. break
  994. }
  995. }
  996. }
  997. }
  998. iterator = func(cb func(string, IndexEntry) bool) {
  999. if usePrefix {
  1000. e.Index.WalkPrefix(prefix, cb)
  1001. } else {
  1002. e.Index.WalkPrefix("", cb)
  1003. }
  1004. }
  1005. }
  1006. var results []QueryResult
  1007. iterator(func(key string, entry IndexEntry) bool {
  1008. var valStr string
  1009. var valLoaded bool
  1010. matchAll := true
  1011. for _, match := range matches {
  1012. field, op, valRaw := match[1], match[2], match[3]
  1013. switch field {
  1014. case "CommitIndex":
  1015. num, _ := strconv.ParseUint(valRaw, 10, 64)
  1016. switch op {
  1017. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  1018. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  1019. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  1020. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  1021. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  1022. }
  1023. case "key":
  1024. target := extractString(valRaw)
  1025. switch op {
  1026. case "=": if key != target { matchAll = false }
  1027. case "like": if !WildcardMatch(key, target) { matchAll = false }
  1028. }
  1029. case "value":
  1030. if e.FTIndex == nil && op == "like" {
  1031. matchAll = false
  1032. break
  1033. }
  1034. if !valLoaded {
  1035. v, err := e.Storage.ReadValue(entry.ValueOffset)
  1036. if err != nil { matchAll = false; break }
  1037. valStr = v
  1038. valLoaded = true
  1039. }
  1040. target := extractString(valRaw)
  1041. switch op {
  1042. case "=": if valStr != target { matchAll = false }
  1043. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  1044. }
  1045. }
  1046. if !matchAll { break }
  1047. }
  1048. if matchAll {
  1049. if !valLoaded {
  1050. v, err := e.Storage.ReadValue(entry.ValueOffset)
  1051. if err == nil { valStr = v }
  1052. }
  1053. results = append(results, QueryResult{
  1054. Key: key,
  1055. Value: valStr,
  1056. CommitIndex: entry.CommitIndex,
  1057. })
  1058. needed := limit + offset
  1059. if limit > 0 && len(results) >= needed {
  1060. return false
  1061. }
  1062. }
  1063. return true
  1064. })
  1065. if offset > 0 {
  1066. if offset >= len(results) {
  1067. return []QueryResult{}, nil
  1068. }
  1069. results = results[offset:]
  1070. }
  1071. if limit >= 0 {
  1072. if limit < len(results) {
  1073. results = results[:limit]
  1074. }
  1075. }
  1076. return results, nil
  1077. }
  1078. // Snapshot creates a full consistency snapshot of the database.
  1079. // Since the DB engine itself IS the state machine, this simply serializes
  1080. // all current valid data. This snapshot can be used to restore a lagging node
  1081. // that has fallen behind the Raft logs.
  1082. func (e *Engine) Snapshot() ([]byte, error) {
  1083. e.Index.mu.RLock()
  1084. defer e.Index.mu.RUnlock()
  1085. // Snapshot Format Version 1:
  1086. // [Count U64] + [Record...]
  1087. // Record: [KeyLen U16] [Key Bytes] [ValLen U32] [Val Bytes] [CommitIndex U64]
  1088. // Pre-calculate size to reduce allocations
  1089. // Estimate: Count * (AverageKeySize + AverageValSize + Overhead)
  1090. // We start with a reasonable buffer size.
  1091. buf := make([]byte, 0, 1024*1024)
  1092. count := uint64(len(e.Index.items))
  1093. // Write Count
  1094. tmp := make([]byte, 8)
  1095. binary.LittleEndian.PutUint64(tmp, count)
  1096. buf = append(buf, tmp...)
  1097. // Buffer for encoding headers to avoid repeated small allocations
  1098. headerBuf := make([]byte, 2+4+8) // KeyLen + ValLen + CommitIndex
  1099. for i := range e.Index.items {
  1100. // Get Key
  1101. key := e.Index.getKey(i)
  1102. entry := e.Index.items[i].entry
  1103. // Get Value
  1104. // We read directly from storage. Since we hold the read lock on index,
  1105. // the offset is valid. The storage file is append-only, so old data exists.
  1106. val, err := e.Storage.ReadValue(entry.ValueOffset)
  1107. if err != nil {
  1108. // If we can't read a value, it's a critical error for snapshot integrity.
  1109. return nil, fmt.Errorf("snapshot failed reading key '%s' at offset %d: %v", key, entry.ValueOffset, err)
  1110. }
  1111. keyBytes := []byte(key)
  1112. valBytes := []byte(val)
  1113. // Encode Header
  1114. binary.LittleEndian.PutUint16(headerBuf[0:2], uint16(len(keyBytes)))
  1115. binary.LittleEndian.PutUint32(headerBuf[2:6], uint32(len(valBytes)))
  1116. binary.LittleEndian.PutUint64(headerBuf[6:14], entry.CommitIndex)
  1117. // Append to buffer
  1118. buf = append(buf, headerBuf...)
  1119. buf = append(buf, keyBytes...)
  1120. buf = append(buf, valBytes...)
  1121. }
  1122. return buf, nil
  1123. }
  1124. // Restore completely replaces the database content with the provided snapshot.
  1125. // This matches the "DB as Snapshot" design: we wipe the current state
  1126. // and rebuild from the full image provided by the leader.
  1127. func (e *Engine) Restore(data []byte) error {
  1128. e.writeMu.Lock()
  1129. defer e.writeMu.Unlock()
  1130. // 1. Safety check & Parse Header
  1131. if len(data) < 8 {
  1132. if len(data) == 0 {
  1133. // Empty snapshot implies empty DB.
  1134. return e.resetToEmpty()
  1135. }
  1136. return fmt.Errorf("invalid snapshot data: too short")
  1137. }
  1138. count := binary.LittleEndian.Uint64(data[0:8])
  1139. offset := 8
  1140. // 2. Stop-the-World: Close current storage to safely wipe it
  1141. e.Index.mu.Lock()
  1142. if err := e.Storage.Close(); err != nil {
  1143. e.Index.mu.Unlock()
  1144. return fmt.Errorf("failed to close storage for restore: %v", err)
  1145. }
  1146. // 3. Truncate & Reset
  1147. // We are replacing the entire DB state.
  1148. // Reset In-Memory Index by clearing slices (keeping underlying capacity)
  1149. // We do NOT replace the pointer, so we keep the lock valid.
  1150. e.Index.keyBuf = e.Index.keyBuf[:0]
  1151. e.Index.items = e.Index.items[:0]
  1152. if e.config.EnableValueIndex {
  1153. // Recreate FTIndex since it's a map
  1154. e.FTIndex = NewFullTextIndex()
  1155. }
  1156. // Clear Cache
  1157. e.Storage.cache = make(map[int64]string)
  1158. // Re-open storage in Truncate mode (Wipe data file)
  1159. f, err := os.OpenFile(e.Storage.filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
  1160. if err != nil {
  1161. e.Index.mu.Unlock()
  1162. return fmt.Errorf("failed to reopen storage for restore: %v", err)
  1163. }
  1164. e.Storage.file = f
  1165. e.Storage.offset = 0
  1166. // We hold the lock during the entire restore process to prevent any reads/writes
  1167. // This is acceptable as Restore is a rare, heavyweight operation.
  1168. defer e.Index.mu.Unlock()
  1169. maxCommitIndex := uint64(0)
  1170. // 4. Rebuild Data from Snapshot Stream
  1171. for i := uint64(0); i < count; i++ {
  1172. // Read Header (14 bytes)
  1173. if offset+14 > len(data) {
  1174. return fmt.Errorf("restore failed: truncated header at record %d", i)
  1175. }
  1176. keyLen := int(binary.LittleEndian.Uint16(data[offset : offset+2]))
  1177. valLen := int(binary.LittleEndian.Uint32(data[offset+2 : offset+6]))
  1178. commitIndex := binary.LittleEndian.Uint64(data[offset+6 : offset+14])
  1179. offset += 14
  1180. // Read Key
  1181. if offset+keyLen > len(data) {
  1182. return fmt.Errorf("restore failed: truncated data at record %d (key)", i)
  1183. }
  1184. key := string(data[offset : offset+keyLen])
  1185. offset += keyLen
  1186. // Read Value
  1187. if offset+valLen > len(data) {
  1188. return fmt.Errorf("restore failed: truncated data at record %d (value)", i)
  1189. }
  1190. val := string(data[offset : offset+valLen])
  1191. offset += valLen
  1192. if commitIndex > maxCommitIndex {
  1193. maxCommitIndex = commitIndex
  1194. }
  1195. // Direct Write to Storage (Internal Append)
  1196. // We use the low-level Storage.Append directly since we already hold locks
  1197. // and are bypassing the standard Set path.
  1198. writeOffset, err := e.Storage.Append(key, val, RecordTypePut, commitIndex)
  1199. if err != nil {
  1200. return fmt.Errorf("restore failed appending record %d: %v", i, err)
  1201. }
  1202. // Update Memory Index
  1203. // Accessing e.Index.items directly because we hold e.Index.mu
  1204. // But we should use the helper to safely manage keyBuf
  1205. // Use insertLocked to avoid deadlock (we already hold e.Index.mu)
  1206. e.Index.insertLocked(key, IndexEntry{
  1207. ValueOffset: writeOffset,
  1208. CommitIndex: commitIndex,
  1209. })
  1210. // Update Full Text Index if enabled
  1211. if e.FTIndex != nil {
  1212. e.FTIndex.Add(key, val)
  1213. }
  1214. }
  1215. // 5. Update Metadata
  1216. e.commitMu.Lock()
  1217. e.LastCommitIndex = maxCommitIndex
  1218. e.saveMetadata()
  1219. e.commitMu.Unlock()
  1220. // 6. Force Sync
  1221. if err := e.Sync(); err != nil {
  1222. return fmt.Errorf("failed to sync restored data: %v", err)
  1223. }
  1224. return nil
  1225. }
  1226. // resetToEmpty helper to wipe the DB cleanly
  1227. func (e *Engine) resetToEmpty() error {
  1228. e.Index.mu.Lock()
  1229. defer e.Index.mu.Unlock()
  1230. if err := e.Storage.Close(); err != nil {
  1231. return err
  1232. }
  1233. e.Index = NewFlatIndex()
  1234. if e.config.EnableValueIndex {
  1235. e.FTIndex = NewFullTextIndex()
  1236. }
  1237. e.Storage.cache = make(map[int64]string)
  1238. f, err := os.OpenFile(e.Storage.filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
  1239. if err != nil {
  1240. return err
  1241. }
  1242. e.Storage.file = f
  1243. e.Storage.offset = 0
  1244. e.commitMu.Lock()
  1245. e.LastCommitIndex = 0
  1246. e.saveMetadata()
  1247. e.commitMu.Unlock()
  1248. return nil
  1249. }
  1250. func (e *Engine) DebugClearAuxiliary() {
  1251. e.Storage.cacheMu.Lock()
  1252. e.Storage.cache = make(map[int64]string)
  1253. e.Storage.cacheMu.Unlock()
  1254. if e.FTIndex != nil {
  1255. e.FTIndex.mu.Lock()
  1256. e.FTIndex.index = make(map[string]map[string]bool)
  1257. e.FTIndex.mu.Unlock()
  1258. }
  1259. }