engine.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512
  1. package db
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "fmt"
  6. "hash/crc32"
  7. "io"
  8. "os"
  9. "regexp"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. )
  16. // hash returns a 32-bit FNV-1a hash of the string
  17. func hash(s string) uint32 {
  18. var h uint32 = 2166136261
  19. for i := 0; i < len(s); i++ {
  20. h = (h * 16777619) ^ uint32(s[i])
  21. }
  22. return h
  23. }
  24. // --- Flat Sorted Array Index Implementation ---
  25. // Extreme memory optimization:
  26. // 1. Removes all tree node overhead (pointers, structs, map buckets).
  27. // 2. Stores keys in a single monolithic []byte buffer (no string headers).
  28. // 3. Uses a sorted slice of offsets for O(log N) lookup and O(N) insertion.
  29. // IndexEntry
  30. type IndexEntry struct {
  31. ValueOffset int64
  32. CommitIndex uint64
  33. }
  34. type FlatIndex struct {
  35. // Monolithic buffer to store all key strings
  36. // Format: [key1 bytes][key2 bytes]...
  37. keyBuf []byte
  38. // Sorted list of index items
  39. items []flatItem
  40. mu sync.RWMutex
  41. }
  42. // flatItem is a compact struct (16 bytes aligned)
  43. type flatItem struct {
  44. keyOffset uint32 // 4 bytes: Offset into keyBuf (supports 4GB total key size)
  45. keyLen uint16 // 2 bytes: Length of key (max key len 65535)
  46. // Padding 2 bytes? No, Go struct alignment might add padding.
  47. // IndexEntry is 16 bytes (int64 + uint64).
  48. entry IndexEntry // 16 bytes
  49. }
  50. // Total per item: 4 + 2 + (2 padding) + 16 = 24 bytes per Key.
  51. // 100k keys => 2.4 MB.
  52. // Plus keyBuf: 100k * 18 bytes = 1.8 MB.
  53. // Total expected: ~4.2 MB.
  54. func NewFlatIndex() *FlatIndex {
  55. return &FlatIndex{
  56. keyBuf: make([]byte, 0, 1024*1024), // 1MB initial cap
  57. items: make([]flatItem, 0, 10000),
  58. }
  59. }
  60. // getKey unsafe-ish helper to get string from buffer without alloc?
  61. // Standard conversion string(b) allocates.
  62. // For comparison in binary search, we ideally want to avoid allocation.
  63. // But Go 1.20+ optimization might handle string(bytes) well in map keys or comparisons if they don't escape.
  64. // Let's rely on standard string() conversion for safety first.
  65. func (fi *FlatIndex) getKey(idx int) string {
  66. item := fi.items[idx]
  67. return string(fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)])
  68. }
  69. // Compare helper to avoid string allocation
  70. func (fi *FlatIndex) compare(idx int, target string) int {
  71. item := fi.items[idx]
  72. keyBytes := fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)]
  73. return strings.Compare(string(keyBytes), target)
  74. }
  75. func (fi *FlatIndex) Insert(key string, entry IndexEntry) {
  76. fi.mu.Lock()
  77. defer fi.mu.Unlock()
  78. fi.insertLocked(key, entry)
  79. }
  80. func (fi *FlatIndex) insertLocked(key string, entry IndexEntry) {
  81. // 1. Binary Search
  82. idx := sort.Search(len(fi.items), func(i int) bool {
  83. return fi.getKey(i) >= key
  84. })
  85. // 2. Update existing
  86. if idx < len(fi.items) && fi.getKey(idx) == key {
  87. fi.items[idx].entry = entry
  88. return
  89. }
  90. // 3. Add new key
  91. offset := len(fi.keyBuf)
  92. fi.keyBuf = append(fi.keyBuf, key...)
  93. if len(key) > 65535 {
  94. // Should have been caught earlier, but just in case cap it or panic
  95. // We can panic or truncate.
  96. // panic("key too long")
  97. }
  98. newItem := flatItem{
  99. keyOffset: uint32(offset),
  100. keyLen: uint16(len(key)),
  101. entry: entry,
  102. }
  103. // 4. Insert into sorted slice
  104. // Optimization: check if appending to end (common case for sequential inserts)
  105. if idx == len(fi.items) {
  106. fi.items = append(fi.items, newItem)
  107. } else {
  108. fi.items = append(fi.items, flatItem{})
  109. copy(fi.items[idx+1:], fi.items[idx:])
  110. fi.items[idx] = newItem
  111. }
  112. }
  113. func (fi *FlatIndex) Get(key string) (IndexEntry, bool) {
  114. fi.mu.RLock()
  115. defer fi.mu.RUnlock()
  116. idx := sort.Search(len(fi.items), func(i int) bool {
  117. return fi.getKey(i) >= key
  118. })
  119. if idx < len(fi.items) && fi.getKey(idx) == key {
  120. return fi.items[idx].entry, true
  121. }
  122. return IndexEntry{}, false
  123. }
  124. func (fi *FlatIndex) Delete(key string) bool {
  125. fi.mu.Lock()
  126. defer fi.mu.Unlock()
  127. idx := sort.Search(len(fi.items), func(i int) bool {
  128. return fi.getKey(i) >= key
  129. })
  130. if idx < len(fi.items) && fi.getKey(idx) == key {
  131. // Delete from slice
  132. fi.items = append(fi.items[:idx], fi.items[idx+1:]...)
  133. // Note: We don't reclaim space in keyBuf. It's append-only until compaction (not implemented).
  134. // This is fine for many cases, but heavy deletes might waste keyBuf space.
  135. return true
  136. }
  137. return false
  138. }
  139. // WalkPrefix iterates over keys starting with prefix.
  140. // Since keys are sorted, we find the first match and iterate until mismatch.
  141. func (fi *FlatIndex) WalkPrefix(prefix string, callback func(key string, entry IndexEntry) bool) {
  142. fi.mu.RLock()
  143. defer fi.mu.RUnlock()
  144. // Binary search for the start
  145. idx := sort.Search(len(fi.items), func(i int) bool {
  146. return fi.getKey(i) >= prefix
  147. })
  148. for i := idx; i < len(fi.items); i++ {
  149. key := fi.getKey(i)
  150. // Optimization: Check prefix match
  151. if !strings.HasPrefix(key, prefix) {
  152. break
  153. }
  154. if !callback(key, fi.items[i].entry) {
  155. return
  156. }
  157. }
  158. }
  159. // --- End Flat Index ---
  160. // --- Full Text Index ---
  161. type FullTextIndex struct {
  162. // Token -> Set of Keys
  163. index map[string]map[string]bool
  164. mu sync.RWMutex
  165. }
  166. func NewFullTextIndex() *FullTextIndex {
  167. return &FullTextIndex{
  168. index: make(map[string]map[string]bool),
  169. }
  170. }
  171. func (fti *FullTextIndex) Add(key string, value string) {
  172. fti.mu.Lock()
  173. defer fti.mu.Unlock()
  174. tokens := tokenize(value)
  175. for _, token := range tokens {
  176. if fti.index[token] == nil {
  177. fti.index[token] = make(map[string]bool)
  178. }
  179. fti.index[token][key] = true
  180. }
  181. }
  182. func (fti *FullTextIndex) Remove(key string, value string) {
  183. fti.mu.Lock()
  184. defer fti.mu.Unlock()
  185. tokens := tokenize(value)
  186. for _, token := range tokens {
  187. if set, ok := fti.index[token]; ok {
  188. delete(set, key)
  189. if len(set) == 0 {
  190. delete(fti.index, token)
  191. }
  192. }
  193. }
  194. }
  195. func (fti *FullTextIndex) Search(tokenPattern string) []string {
  196. fti.mu.RLock()
  197. defer fti.mu.RUnlock()
  198. // 1. Exact Match
  199. if !strings.Contains(tokenPattern, "*") {
  200. if keys, ok := fti.index[tokenPattern]; ok {
  201. res := make([]string, 0, len(keys))
  202. for k := range keys {
  203. res = append(res, k)
  204. }
  205. return res
  206. }
  207. return nil
  208. }
  209. // 2. Wildcard Scan
  210. var results []string
  211. seen := make(map[string]bool)
  212. for token, keys := range fti.index {
  213. if WildcardMatch(token, tokenPattern) {
  214. for k := range keys {
  215. if !seen[k] {
  216. results = append(results, k)
  217. seen[k] = true
  218. }
  219. }
  220. }
  221. }
  222. return results
  223. }
  224. func tokenize(val string) []string {
  225. f := func(c rune) bool {
  226. return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
  227. }
  228. return strings.FieldsFunc(val, f)
  229. }
  230. // --- Storage & Cache ---
  231. // StripedLock provides hashed locks for keys
  232. type StripedLock struct {
  233. locks [1024]sync.RWMutex
  234. }
  235. func (sl *StripedLock) GetLock(key string) *sync.RWMutex {
  236. h := hash(key)
  237. return &sl.locks[h%1024]
  238. }
  239. // Metadata handles the persistent state of the engine
  240. type Metadata struct {
  241. LastCommitIndex uint64
  242. }
  243. // Storage manages disk storage using Append-Only Log.
  244. type Storage struct {
  245. file *os.File
  246. filename string
  247. offset int64 // Current end of file
  248. cache map[int64]string
  249. cacheMu sync.RWMutex
  250. }
  251. const (
  252. RecordTypePut = 0x01
  253. RecordTypeDelete = 0x02
  254. // CRC(4) + Type(1) + KeyLen(2) + ValLen(4) + CommitIndex(8)
  255. HeaderSize = 4 + 1 + 2 + 4 + 8
  256. MaxCacheSize = 10000
  257. )
  258. func NewStorage(filename string) (*Storage, error) {
  259. f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
  260. if err != nil {
  261. return nil, err
  262. }
  263. st := &Storage{
  264. file: f,
  265. filename: filename,
  266. cache: make(map[int64]string),
  267. }
  268. return st, nil
  269. }
  270. // Record represents a log entry
  271. type Record struct {
  272. Type byte
  273. Key string
  274. Value string
  275. Offset int64
  276. CommitIndex uint64
  277. }
  278. // Scan iterates over the log file, validating CRCs and returning records.
  279. // It returns the valid offset after the last record.
  280. func (s *Storage) Scan(callback func(Record)) (int64, error) {
  281. offset := int64(0)
  282. if _, err := s.file.Seek(0, 0); err != nil {
  283. return 0, err
  284. }
  285. reader := bufio.NewReader(s.file)
  286. for {
  287. header := make([]byte, HeaderSize)
  288. n, err := io.ReadFull(reader, header)
  289. if err == io.EOF {
  290. break
  291. }
  292. if err == io.ErrUnexpectedEOF && n == 0 {
  293. break
  294. }
  295. if err != nil {
  296. fmt.Printf("Storage Scan Error at %d: %v\n", offset, err)
  297. break
  298. }
  299. storedCRC := binary.LittleEndian.Uint32(header[0:4])
  300. recType := header[4]
  301. keyLen := binary.LittleEndian.Uint16(header[5:7])
  302. valLen := binary.LittleEndian.Uint32(header[7:11])
  303. commitIndex := binary.LittleEndian.Uint64(header[11:19])
  304. totalLen := int(keyLen) + int(valLen)
  305. data := make([]byte, totalLen)
  306. if _, err := io.ReadFull(reader, data); err != nil {
  307. fmt.Printf("Storage Scan Data Error at %d: %v\n", offset, err)
  308. break
  309. }
  310. // Verify CRC
  311. crc := crc32.ChecksumIEEE(header[4:]) // Checksum of Type+Lengths+CommitIndex
  312. crc = crc32.Update(crc, crc32.IEEETable, data) // + Data
  313. if crc != storedCRC {
  314. fmt.Printf("CRC Mismatch at %d\n", offset)
  315. break
  316. }
  317. key := string(data[:keyLen])
  318. val := string(data[keyLen:])
  319. callback(Record{
  320. Type: recType,
  321. Key: key,
  322. Value: val,
  323. Offset: offset,
  324. CommitIndex: commitIndex,
  325. })
  326. offset += int64(HeaderSize + totalLen)
  327. }
  328. // Update storage offset
  329. s.offset = offset
  330. s.file.Seek(offset, 0)
  331. return offset, nil
  332. }
  333. // Append writes a new record
  334. func (s *Storage) Append(key, val string, recType byte, commitIndex uint64) (int64, error) {
  335. keyBytes := []byte(key)
  336. valBytes := []byte(val)
  337. keyLen := len(keyBytes)
  338. valLen := len(valBytes)
  339. if keyLen > 65535 {
  340. return 0, fmt.Errorf("key too large")
  341. }
  342. buf := make([]byte, HeaderSize+keyLen+valLen)
  343. // 1. Build Header Data (skip CRC)
  344. buf[4] = recType
  345. binary.LittleEndian.PutUint16(buf[5:7], uint16(keyLen))
  346. binary.LittleEndian.PutUint32(buf[7:11], uint32(valLen))
  347. binary.LittleEndian.PutUint64(buf[11:19], commitIndex)
  348. // 2. Copy Data
  349. copy(buf[HeaderSize:], keyBytes)
  350. copy(buf[HeaderSize+keyLen:], valBytes)
  351. // 3. Calc CRC
  352. crc := crc32.ChecksumIEEE(buf[4:]) // Everything after CRC field
  353. binary.LittleEndian.PutUint32(buf[0:4], crc)
  354. // 4. Write
  355. totalSize := int64(len(buf))
  356. writeOffset := atomic.AddInt64(&s.offset, totalSize) - totalSize
  357. if _, err := s.file.WriteAt(buf, writeOffset); err != nil {
  358. return 0, err
  359. }
  360. // Add to Cache if Put
  361. if recType == RecordTypePut {
  362. s.cacheMu.Lock()
  363. if len(s.cache) < MaxCacheSize {
  364. s.cache[writeOffset] = val
  365. }
  366. s.cacheMu.Unlock()
  367. }
  368. return writeOffset, nil
  369. }
  370. // Sync flushes writes to stable storage
  371. func (s *Storage) Sync() error {
  372. return s.file.Sync()
  373. }
  374. // ReadValue reads value at offset
  375. func (s *Storage) ReadValue(offset int64) (string, error) {
  376. // 1. Check Cache
  377. s.cacheMu.RLock()
  378. if val, ok := s.cache[offset]; ok {
  379. s.cacheMu.RUnlock()
  380. return val, nil
  381. }
  382. s.cacheMu.RUnlock()
  383. // 2. Read Header
  384. header := make([]byte, HeaderSize)
  385. if _, err := s.file.ReadAt(header, offset); err != nil {
  386. return "", err
  387. }
  388. keyLen := binary.LittleEndian.Uint16(header[5:7])
  389. valLen := binary.LittleEndian.Uint32(header[7:11])
  390. totalLen := int(keyLen) + int(valLen)
  391. data := make([]byte, totalLen)
  392. if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
  393. return "", err
  394. }
  395. // Verify CRC on read for safety
  396. storedCRC := binary.LittleEndian.Uint32(header[0:4])
  397. crc := crc32.ChecksumIEEE(header[4:])
  398. crc = crc32.Update(crc, crc32.IEEETable, data)
  399. if crc != storedCRC {
  400. return "", fmt.Errorf("data corruption detected at offset %d", offset)
  401. }
  402. val := string(data[keyLen:])
  403. // 3. Fill Cache
  404. s.cacheMu.Lock()
  405. if len(s.cache) < MaxCacheSize {
  406. s.cache[offset] = val
  407. } else {
  408. for k := range s.cache {
  409. delete(s.cache, k)
  410. break
  411. }
  412. s.cache[offset] = val
  413. }
  414. s.cacheMu.Unlock()
  415. return val, nil
  416. }
  417. func (s *Storage) Close() error {
  418. return s.file.Close()
  419. }
  420. // EngineOption defines a configuration option for the Engine
  421. type EngineOption func(*EngineConfig)
  422. type EngineConfig struct {
  423. EnableValueIndex bool
  424. }
  425. // WithValueIndex enables or disables the value index (Full Text Index)
  426. func WithValueIndex(enable bool) EngineOption {
  427. return func(c *EngineConfig) {
  428. c.EnableValueIndex = enable
  429. }
  430. }
  431. // Engine is the core storage engine.
  432. type Engine struct {
  433. Index *FlatIndex // Flattened memory structure
  434. Storage *Storage
  435. FTIndex *FullTextIndex // Can be nil if disabled
  436. KeyLocks StripedLock
  437. LastCommitIndex uint64
  438. commitMu sync.Mutex
  439. dataDir string
  440. // Metadata state
  441. metaFile *os.File
  442. writeMu sync.Mutex
  443. config EngineConfig
  444. }
  445. func NewEngine(dataDir string, opts ...EngineOption) (*Engine, error) {
  446. if err := os.MkdirAll(dataDir, 0755); err != nil {
  447. return nil, err
  448. }
  449. config := EngineConfig{
  450. EnableValueIndex: false, // Default to false (key-only)
  451. }
  452. for _, opt := range opts {
  453. opt(&config)
  454. }
  455. store, err := NewStorage(dataDir + "/values.data")
  456. if err != nil {
  457. return nil, err
  458. }
  459. // Open or create metadata file
  460. metaPath := dataDir + "/meta.state"
  461. metaFile, err := os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0644)
  462. if err != nil {
  463. store.Close()
  464. return nil, err
  465. }
  466. e := &Engine{
  467. Index: NewFlatIndex(),
  468. Storage: store,
  469. FTIndex: nil,
  470. dataDir: dataDir,
  471. metaFile: metaFile,
  472. config: config,
  473. }
  474. if config.EnableValueIndex {
  475. e.FTIndex = NewFullTextIndex()
  476. }
  477. // Load Metadata
  478. stat, err := metaFile.Stat()
  479. if err == nil && stat.Size() >= 8 {
  480. b := make([]byte, 8)
  481. if _, err := metaFile.ReadAt(b, 0); err == nil {
  482. e.LastCommitIndex = binary.LittleEndian.Uint64(b)
  483. }
  484. }
  485. // Rebuild Index from Disk
  486. // Note: We still scan to rebuild the memory index, but LastCommitIndex is initialized from meta file
  487. // We can update LastCommitIndex if the log is ahead of the meta file (e.g. crash before meta update)
  488. // We also correct LastCommitIndex if meta file is ahead of data (e.g. data flush lost during power failure)
  489. realMaxIndex := uint64(0)
  490. _, err = store.Scan(func(rec Record) {
  491. if rec.Type == RecordTypePut {
  492. e.Index.Insert(rec.Key, IndexEntry{
  493. ValueOffset: rec.Offset,
  494. CommitIndex: rec.CommitIndex,
  495. })
  496. if e.FTIndex != nil {
  497. e.FTIndex.Add(rec.Key, rec.Value)
  498. }
  499. } else if rec.Type == RecordTypeDelete {
  500. // Cleanup FTIndex using old value if possible
  501. e.removeValueFromFTIndex(rec.Key)
  502. e.Index.Delete(rec.Key)
  503. }
  504. // Track the actual max index present in the data file
  505. if rec.CommitIndex > realMaxIndex {
  506. realMaxIndex = rec.CommitIndex
  507. }
  508. })
  509. // Critical Safety Check:
  510. // 1. If Log > Meta: Update Meta (Normal crash recovery)
  511. // 2. If Meta > Log: Downgrade Meta (Data loss recovery - force Raft replay)
  512. if realMaxIndex > e.LastCommitIndex {
  513. e.LastCommitIndex = realMaxIndex
  514. } else if realMaxIndex < e.LastCommitIndex {
  515. // Detect inconsistency: Meta says we have more data than what's on disk.
  516. // This happens if Meta was flushed but Data tail was lost during power failure.
  517. // We MUST trust the actual data on disk.
  518. fmt.Printf("WARNING: Inconsistency detected! Meta LastCommitIndex (%d) > Data RealIndex (%d). Resetting to %d to force Raft replay.\n",
  519. e.LastCommitIndex, realMaxIndex, realMaxIndex)
  520. e.LastCommitIndex = realMaxIndex
  521. // Force save corrected metadata
  522. e.saveMetadata()
  523. }
  524. return e, nil
  525. }
  526. func (e *Engine) removeValueFromFTIndex(key string) {
  527. if e.FTIndex == nil {
  528. return
  529. }
  530. if entry, ok := e.Index.Get(key); ok {
  531. val, err := e.Storage.ReadValue(entry.ValueOffset)
  532. if err == nil {
  533. e.FTIndex.Remove(key, val)
  534. }
  535. }
  536. }
  537. func (e *Engine) Close() error {
  538. e.saveMetadata()
  539. e.metaFile.Close()
  540. return e.Storage.Close()
  541. }
  542. func (e *Engine) saveMetadata() {
  543. b := make([]byte, 8)
  544. binary.LittleEndian.PutUint64(b, e.LastCommitIndex)
  545. e.metaFile.WriteAt(b, 0)
  546. }
  547. func (e *Engine) Set(key, value string, commitIndex uint64) error {
  548. e.writeMu.Lock()
  549. defer e.writeMu.Unlock()
  550. // Idempotency check: if this log entry has already been applied, skip it.
  551. // This handles Raft replay on restart.
  552. if commitIndex > 0 && commitIndex <= e.LastCommitIndex {
  553. return nil
  554. }
  555. e.removeValueFromFTIndex(key)
  556. offset, err := e.Storage.Append(key, value, RecordTypePut, commitIndex)
  557. if err != nil {
  558. return err
  559. }
  560. e.Index.Insert(key, IndexEntry{
  561. ValueOffset: offset,
  562. CommitIndex: commitIndex,
  563. })
  564. if e.FTIndex != nil {
  565. e.FTIndex.Add(key, value)
  566. }
  567. e.commitMu.Lock()
  568. if commitIndex > e.LastCommitIndex {
  569. e.LastCommitIndex = commitIndex
  570. // Update metadata on disk periodically or on every write?
  571. // For safety, let's update it. Since it's pwrite at offset 0, it's fast.
  572. e.saveMetadata()
  573. }
  574. e.commitMu.Unlock()
  575. return nil
  576. }
  577. func (e *Engine) Delete(key string, commitIndex uint64) error {
  578. e.writeMu.Lock()
  579. defer e.writeMu.Unlock()
  580. // Idempotency check
  581. if commitIndex > 0 && commitIndex <= e.LastCommitIndex {
  582. return nil
  583. }
  584. e.removeValueFromFTIndex(key)
  585. _, err := e.Storage.Append(key, "", RecordTypeDelete, commitIndex)
  586. if err != nil {
  587. return err
  588. }
  589. e.Index.Delete(key)
  590. e.commitMu.Lock()
  591. if commitIndex > e.LastCommitIndex {
  592. e.LastCommitIndex = commitIndex
  593. e.saveMetadata()
  594. }
  595. e.commitMu.Unlock()
  596. return nil
  597. }
  598. // Sync flushes underlying storage to disk
  599. // GetLastAppliedIndex returns the last Raft log index applied to the DB
  600. func (e *Engine) GetLastAppliedIndex() uint64 {
  601. e.commitMu.Lock()
  602. defer e.commitMu.Unlock()
  603. return e.LastCommitIndex
  604. }
  605. func (e *Engine) GetDBSize() int64 {
  606. e.commitMu.Lock()
  607. defer e.commitMu.Unlock()
  608. info, err := e.Storage.file.Stat()
  609. if err == nil {
  610. return info.Size()
  611. }
  612. return 0
  613. }
  614. func (e *Engine) Sync() error {
  615. e.metaFile.Sync()
  616. return e.Storage.Sync()
  617. }
  618. func (e *Engine) Get(key string) (string, bool) {
  619. entry, ok := e.Index.Get(key)
  620. if !ok {
  621. return "", false
  622. }
  623. val, err := e.Storage.ReadValue(entry.ValueOffset)
  624. if err != nil {
  625. return "", false
  626. }
  627. return val, true
  628. }
  629. type QueryResult struct {
  630. Key string `json:"key"`
  631. Value string `json:"value"`
  632. CommitIndex uint64 `json:"commit_index"`
  633. }
  634. func WildcardMatch(str, pattern string) bool {
  635. s := []rune(str)
  636. p := []rune(pattern)
  637. sLen, pLen := len(s), len(p)
  638. i, j := 0, 0
  639. starIdx, matchIdx := -1, -1
  640. for i < sLen {
  641. if j < pLen && (p[j] == '?' || p[j] == s[i]) {
  642. i++
  643. j++
  644. } else if j < pLen && p[j] == '*' {
  645. starIdx = j
  646. matchIdx = i
  647. j++
  648. } else if starIdx != -1 {
  649. j = starIdx + 1
  650. matchIdx++
  651. i = matchIdx
  652. } else {
  653. return false
  654. }
  655. }
  656. for j < pLen && p[j] == '*' {
  657. j++
  658. }
  659. return j == pLen
  660. }
  661. func (e *Engine) Count(sql string) (int, error) {
  662. return e.execute(sql, true)
  663. }
  664. func (e *Engine) execute(sql string, countOnly bool) (int, error) {
  665. sql = strings.TrimSpace(sql)
  666. reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
  667. reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
  668. limit := -1
  669. offset := 0
  670. if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
  671. limit, _ = strconv.Atoi(match[1])
  672. sql = reLimit.ReplaceAllString(sql, "")
  673. }
  674. if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
  675. offset, _ = strconv.Atoi(match[1])
  676. sql = reOffset.ReplaceAllString(sql, "")
  677. }
  678. // 0. Handle Count All efficient case
  679. // Check specifically for "*" or simple wildcard match
  680. if sql == "*" || sql == "\"*\"" {
  681. e.Index.mu.RLock()
  682. count := len(e.Index.items)
  683. e.Index.mu.RUnlock()
  684. return count, nil
  685. }
  686. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  687. matches := re.FindAllStringSubmatch(sql, -1)
  688. if len(matches) == 0 {
  689. // Fallback for when regex fails but user meant "count all" or similar via other syntax
  690. // But strictly speaking, if not "*", it's invalid query per current parser
  691. return 0, fmt.Errorf("invalid query")
  692. }
  693. // Also check for 'key like "*"' pattern which is equivalent to count all
  694. if len(matches) == 1 && matches[0][1] == "key" && matches[0][2] == "like" && (matches[0][3] == "\"*\"" || matches[0][3] == "\"%\"") {
  695. e.Index.mu.RLock()
  696. count := len(e.Index.items)
  697. e.Index.mu.RUnlock()
  698. return count, nil
  699. }
  700. extractString := func(s string) string {
  701. return strings.Trim(strings.TrimSpace(s), "\"")
  702. }
  703. // 1. Optimize for Point Lookup
  704. for _, match := range matches {
  705. if match[1] == "key" && match[2] == "=" {
  706. targetKey := extractString(match[3])
  707. entry, ok := e.Index.Get(targetKey)
  708. if !ok {
  709. return 0, nil
  710. }
  711. needValue := false
  712. for _, m := range matches {
  713. if m[1] == "value" { needValue = true; break }
  714. }
  715. var val string
  716. if needValue {
  717. v, err := e.Storage.ReadValue(entry.ValueOffset)
  718. if err != nil { return 0, nil }
  719. val = v
  720. }
  721. matchAll := true
  722. for _, m := range matches {
  723. field, op, valRaw := m[1], m[2], m[3]
  724. switch field {
  725. case "CommitIndex":
  726. num, _ := strconv.ParseUint(valRaw, 10, 64)
  727. switch op {
  728. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  729. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  730. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  731. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  732. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  733. }
  734. case "value":
  735. t := extractString(valRaw)
  736. switch op {
  737. case "=": if val != t { matchAll = false }
  738. case "like": if !WildcardMatch(val, t) { matchAll = false }
  739. }
  740. }
  741. if !matchAll { break }
  742. }
  743. if matchAll {
  744. return 1, nil
  745. }
  746. return 0, nil
  747. }
  748. }
  749. var candidates map[string]bool
  750. var useFTIndex bool = false
  751. // 2. Try to use FT Index (if enabled)
  752. if e.FTIndex != nil {
  753. for _, match := range matches {
  754. if match[1] == "value" && match[2] == "like" {
  755. pattern := extractString(match[3])
  756. clean := strings.Trim(pattern, "*")
  757. if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
  758. matches := e.FTIndex.Search(pattern)
  759. if matches != nil {
  760. currentSet := make(map[string]bool)
  761. for _, k := range matches {
  762. currentSet[k] = true
  763. }
  764. if !useFTIndex {
  765. candidates = currentSet
  766. useFTIndex = true
  767. } else {
  768. newSet := make(map[string]bool)
  769. for k := range candidates {
  770. if currentSet[k] { newSet[k] = true }
  771. }
  772. candidates = newSet
  773. }
  774. } else {
  775. return 0, nil
  776. }
  777. }
  778. }
  779. }
  780. } else {
  781. for _, match := range matches {
  782. if match[1] == "value" && match[2] == "like" {
  783. // Value search requested but index disabled -> return 0
  784. return 0, nil
  785. }
  786. }
  787. }
  788. var iterator func(func(string, IndexEntry) bool)
  789. if useFTIndex {
  790. iterator = func(cb func(string, IndexEntry) bool) {
  791. keys := make([]string, 0, len(candidates))
  792. for k := range candidates { keys = append(keys, k) }
  793. sort.Strings(keys)
  794. for _, k := range keys {
  795. if entry, ok := e.Index.Get(k); ok {
  796. if !cb(k, entry) { return }
  797. }
  798. }
  799. }
  800. } else {
  801. // Use FlatIndex Prefix Search
  802. var prefix string = ""
  803. var usePrefix bool = false
  804. for _, match := range matches {
  805. if match[1] == "key" && match[2] == "like" {
  806. pattern := extractString(match[3])
  807. // Flat Index supports simple prefix match
  808. if strings.HasSuffix(pattern, "*") {
  809. clean := pattern[:len(pattern)-1]
  810. if !strings.ContainsAny(clean, "*?") {
  811. prefix = clean
  812. usePrefix = true
  813. break
  814. }
  815. }
  816. }
  817. }
  818. iterator = func(cb func(string, IndexEntry) bool) {
  819. if usePrefix {
  820. e.Index.WalkPrefix(prefix, cb)
  821. } else {
  822. e.Index.WalkPrefix("", cb)
  823. }
  824. }
  825. }
  826. matchedCount := 0
  827. iterator(func(key string, entry IndexEntry) bool {
  828. var valStr string
  829. var valLoaded bool
  830. matchAll := true
  831. for _, match := range matches {
  832. field, op, valRaw := match[1], match[2], match[3]
  833. switch field {
  834. case "CommitIndex":
  835. num, _ := strconv.ParseUint(valRaw, 10, 64)
  836. switch op {
  837. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  838. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  839. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  840. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  841. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  842. }
  843. case "key":
  844. target := extractString(valRaw)
  845. switch op {
  846. case "=": if key != target { matchAll = false }
  847. case "like": if !WildcardMatch(key, target) { matchAll = false }
  848. }
  849. case "value":
  850. if e.FTIndex == nil && op == "like" {
  851. matchAll = false
  852. break
  853. }
  854. if !valLoaded {
  855. v, err := e.Storage.ReadValue(entry.ValueOffset)
  856. if err != nil { matchAll = false; break }
  857. valStr = v
  858. valLoaded = true
  859. }
  860. target := extractString(valRaw)
  861. switch op {
  862. case "=": if valStr != target { matchAll = false }
  863. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  864. }
  865. }
  866. if !matchAll { break }
  867. }
  868. if matchAll {
  869. matchedCount++
  870. if limit > 0 && offset == 0 && matchedCount >= limit {
  871. return false
  872. }
  873. }
  874. return true
  875. })
  876. if offset > 0 {
  877. if offset >= matchedCount {
  878. return 0, nil
  879. }
  880. matchedCount -= offset
  881. }
  882. if limit >= 0 {
  883. if limit < matchedCount {
  884. matchedCount = limit
  885. }
  886. }
  887. return matchedCount, nil
  888. }
  889. func (e *Engine) Query(sql string) ([]QueryResult, error) {
  890. return e.queryInternal(sql)
  891. }
  892. func (e *Engine) queryInternal(sql string) ([]QueryResult, error) {
  893. sql = strings.TrimSpace(sql)
  894. reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
  895. reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
  896. limit := -1
  897. offset := 0
  898. if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
  899. limit, _ = strconv.Atoi(match[1])
  900. sql = reLimit.ReplaceAllString(sql, "")
  901. }
  902. if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
  903. offset, _ = strconv.Atoi(match[1])
  904. sql = reOffset.ReplaceAllString(sql, "")
  905. }
  906. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  907. matches := re.FindAllStringSubmatch(sql, -1)
  908. if len(matches) == 0 {
  909. return nil, fmt.Errorf("invalid query")
  910. }
  911. extractString := func(s string) string {
  912. return strings.Trim(strings.TrimSpace(s), "\"")
  913. }
  914. for _, match := range matches {
  915. if match[1] == "key" && match[2] == "=" {
  916. targetKey := extractString(match[3])
  917. entry, ok := e.Index.Get(targetKey)
  918. if !ok {
  919. return []QueryResult{}, nil
  920. }
  921. val, err := e.Storage.ReadValue(entry.ValueOffset)
  922. if err != nil {
  923. return []QueryResult{}, nil
  924. }
  925. matchAll := true
  926. for _, m := range matches {
  927. field, op, valRaw := m[1], m[2], m[3]
  928. switch field {
  929. case "CommitIndex":
  930. num, _ := strconv.ParseUint(valRaw, 10, 64)
  931. switch op {
  932. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  933. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  934. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  935. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  936. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  937. }
  938. case "value":
  939. t := extractString(valRaw)
  940. switch op {
  941. case "=": if val != t { matchAll = false }
  942. case "like": if !WildcardMatch(val, t) { matchAll = false }
  943. }
  944. }
  945. if !matchAll { break }
  946. }
  947. if matchAll {
  948. return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
  949. }
  950. return []QueryResult{}, nil
  951. }
  952. }
  953. var candidates map[string]bool
  954. var useFTIndex bool = false
  955. if e.FTIndex != nil {
  956. for _, match := range matches {
  957. if match[1] == "value" && match[2] == "like" {
  958. pattern := extractString(match[3])
  959. clean := strings.Trim(pattern, "*")
  960. if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
  961. matches := e.FTIndex.Search(pattern)
  962. if matches != nil {
  963. currentSet := make(map[string]bool)
  964. for _, k := range matches {
  965. currentSet[k] = true
  966. }
  967. if !useFTIndex {
  968. candidates = currentSet
  969. useFTIndex = true
  970. } else {
  971. newSet := make(map[string]bool)
  972. for k := range candidates {
  973. if currentSet[k] {
  974. newSet[k] = true
  975. }
  976. }
  977. candidates = newSet
  978. }
  979. } else {
  980. return []QueryResult{}, nil
  981. }
  982. }
  983. }
  984. }
  985. } else {
  986. for _, match := range matches {
  987. if match[1] == "value" && match[2] == "like" {
  988. return []QueryResult{}, nil
  989. }
  990. }
  991. }
  992. var iterator func(func(string, IndexEntry) bool)
  993. if useFTIndex {
  994. iterator = func(cb func(string, IndexEntry) bool) {
  995. keys := make([]string, 0, len(candidates))
  996. for k := range candidates {
  997. keys = append(keys, k)
  998. }
  999. sort.Strings(keys)
  1000. for _, k := range keys {
  1001. if entry, ok := e.Index.Get(k); ok {
  1002. if !cb(k, entry) {
  1003. return
  1004. }
  1005. }
  1006. }
  1007. }
  1008. } else {
  1009. var prefix string = ""
  1010. var usePrefix bool = false
  1011. for _, match := range matches {
  1012. if match[1] == "key" && match[2] == "like" {
  1013. pattern := extractString(match[3])
  1014. if strings.HasSuffix(pattern, "*") {
  1015. clean := pattern[:len(pattern)-1]
  1016. if !strings.ContainsAny(clean, "*?") {
  1017. prefix = clean
  1018. usePrefix = true
  1019. break
  1020. }
  1021. }
  1022. }
  1023. }
  1024. iterator = func(cb func(string, IndexEntry) bool) {
  1025. if usePrefix {
  1026. e.Index.WalkPrefix(prefix, cb)
  1027. } else {
  1028. e.Index.WalkPrefix("", cb)
  1029. }
  1030. }
  1031. }
  1032. var results []QueryResult
  1033. iterator(func(key string, entry IndexEntry) bool {
  1034. var valStr string
  1035. var valLoaded bool
  1036. matchAll := true
  1037. for _, match := range matches {
  1038. field, op, valRaw := match[1], match[2], match[3]
  1039. switch field {
  1040. case "CommitIndex":
  1041. num, _ := strconv.ParseUint(valRaw, 10, 64)
  1042. switch op {
  1043. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  1044. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  1045. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  1046. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  1047. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  1048. }
  1049. case "key":
  1050. target := extractString(valRaw)
  1051. switch op {
  1052. case "=": if key != target { matchAll = false }
  1053. case "like": if !WildcardMatch(key, target) { matchAll = false }
  1054. }
  1055. case "value":
  1056. if e.FTIndex == nil && op == "like" {
  1057. matchAll = false
  1058. break
  1059. }
  1060. if !valLoaded {
  1061. v, err := e.Storage.ReadValue(entry.ValueOffset)
  1062. if err != nil { matchAll = false; break }
  1063. valStr = v
  1064. valLoaded = true
  1065. }
  1066. target := extractString(valRaw)
  1067. switch op {
  1068. case "=": if valStr != target { matchAll = false }
  1069. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  1070. }
  1071. }
  1072. if !matchAll { break }
  1073. }
  1074. if matchAll {
  1075. if !valLoaded {
  1076. v, err := e.Storage.ReadValue(entry.ValueOffset)
  1077. if err == nil { valStr = v }
  1078. }
  1079. results = append(results, QueryResult{
  1080. Key: key,
  1081. Value: valStr,
  1082. CommitIndex: entry.CommitIndex,
  1083. })
  1084. needed := limit + offset
  1085. if limit > 0 && len(results) >= needed {
  1086. return false
  1087. }
  1088. }
  1089. return true
  1090. })
  1091. if offset > 0 {
  1092. if offset >= len(results) {
  1093. return []QueryResult{}, nil
  1094. }
  1095. results = results[offset:]
  1096. }
  1097. if limit >= 0 {
  1098. if limit < len(results) {
  1099. results = results[:limit]
  1100. }
  1101. }
  1102. return results, nil
  1103. }
  1104. // Snapshot creates a full consistency snapshot of the database.
  1105. // Since the DB engine itself IS the state machine, this simply serializes
  1106. // all current valid data. This snapshot can be used to restore a lagging node
  1107. // that has fallen behind the Raft logs.
  1108. func (e *Engine) Snapshot() ([]byte, error) {
  1109. e.Index.mu.RLock()
  1110. defer e.Index.mu.RUnlock()
  1111. // Snapshot Format Version 1:
  1112. // [Count U64] + [Record...]
  1113. // Record: [KeyLen U16] [Key Bytes] [ValLen U32] [Val Bytes] [CommitIndex U64]
  1114. // Pre-calculate size to reduce allocations
  1115. // Estimate: Count * (AverageKeySize + AverageValSize + Overhead)
  1116. // We start with a reasonable buffer size.
  1117. buf := make([]byte, 0, 1024*1024)
  1118. count := uint64(len(e.Index.items))
  1119. // Write Count
  1120. tmp := make([]byte, 8)
  1121. binary.LittleEndian.PutUint64(tmp, count)
  1122. buf = append(buf, tmp...)
  1123. // Buffer for encoding headers to avoid repeated small allocations
  1124. headerBuf := make([]byte, 2+4+8) // KeyLen + ValLen + CommitIndex
  1125. for i := range e.Index.items {
  1126. // Get Key
  1127. key := e.Index.getKey(i)
  1128. entry := e.Index.items[i].entry
  1129. // Get Value
  1130. // We read directly from storage. Since we hold the read lock on index,
  1131. // the offset is valid. The storage file is append-only, so old data exists.
  1132. val, err := e.Storage.ReadValue(entry.ValueOffset)
  1133. if err != nil {
  1134. // If we can't read a value, it's a critical error for snapshot integrity.
  1135. return nil, fmt.Errorf("snapshot failed reading key '%s' at offset %d: %v", key, entry.ValueOffset, err)
  1136. }
  1137. keyBytes := []byte(key)
  1138. valBytes := []byte(val)
  1139. // Encode Header
  1140. binary.LittleEndian.PutUint16(headerBuf[0:2], uint16(len(keyBytes)))
  1141. binary.LittleEndian.PutUint32(headerBuf[2:6], uint32(len(valBytes)))
  1142. binary.LittleEndian.PutUint64(headerBuf[6:14], entry.CommitIndex)
  1143. // Append to buffer
  1144. buf = append(buf, headerBuf...)
  1145. buf = append(buf, keyBytes...)
  1146. buf = append(buf, valBytes...)
  1147. }
  1148. return buf, nil
  1149. }
  1150. // Restore completely replaces the database content with the provided snapshot.
  1151. // This matches the "DB as Snapshot" design: we wipe the current state
  1152. // and rebuild from the full image provided by the leader.
  1153. func (e *Engine) Restore(data []byte) error {
  1154. e.writeMu.Lock()
  1155. defer e.writeMu.Unlock()
  1156. // 1. Safety check & Parse Header
  1157. if len(data) < 8 {
  1158. if len(data) == 0 {
  1159. // Empty snapshot implies empty DB.
  1160. return e.resetToEmpty()
  1161. }
  1162. return fmt.Errorf("invalid snapshot data: too short")
  1163. }
  1164. count := binary.LittleEndian.Uint64(data[0:8])
  1165. offset := 8
  1166. // 2. Stop-the-World: Close current storage to safely wipe it
  1167. e.Index.mu.Lock()
  1168. if err := e.Storage.Close(); err != nil {
  1169. e.Index.mu.Unlock()
  1170. return fmt.Errorf("failed to close storage for restore: %v", err)
  1171. }
  1172. // 3. Truncate & Reset
  1173. // We are replacing the entire DB state.
  1174. // Reset In-Memory Index by clearing slices (keeping underlying capacity)
  1175. // We do NOT replace the pointer, so we keep the lock valid.
  1176. e.Index.keyBuf = e.Index.keyBuf[:0]
  1177. e.Index.items = e.Index.items[:0]
  1178. if e.config.EnableValueIndex {
  1179. // Recreate FTIndex since it's a map
  1180. e.FTIndex = NewFullTextIndex()
  1181. }
  1182. // Clear Cache
  1183. e.Storage.cache = make(map[int64]string)
  1184. // Re-open storage in Truncate mode (Wipe data file)
  1185. f, err := os.OpenFile(e.Storage.filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
  1186. if err != nil {
  1187. e.Index.mu.Unlock()
  1188. return fmt.Errorf("failed to reopen storage for restore: %v", err)
  1189. }
  1190. e.Storage.file = f
  1191. e.Storage.offset = 0
  1192. // We hold the lock during the entire restore process to prevent any reads/writes
  1193. // This is acceptable as Restore is a rare, heavyweight operation.
  1194. defer e.Index.mu.Unlock()
  1195. maxCommitIndex := uint64(0)
  1196. // 4. Rebuild Data from Snapshot Stream
  1197. for i := uint64(0); i < count; i++ {
  1198. // Read Header (14 bytes)
  1199. if offset+14 > len(data) {
  1200. return fmt.Errorf("restore failed: truncated header at record %d", i)
  1201. }
  1202. keyLen := int(binary.LittleEndian.Uint16(data[offset : offset+2]))
  1203. valLen := int(binary.LittleEndian.Uint32(data[offset+2 : offset+6]))
  1204. commitIndex := binary.LittleEndian.Uint64(data[offset+6 : offset+14])
  1205. offset += 14
  1206. // Read Key
  1207. if offset+keyLen > len(data) {
  1208. return fmt.Errorf("restore failed: truncated data at record %d (key)", i)
  1209. }
  1210. key := string(data[offset : offset+keyLen])
  1211. offset += keyLen
  1212. // Read Value
  1213. if offset+valLen > len(data) {
  1214. return fmt.Errorf("restore failed: truncated data at record %d (value)", i)
  1215. }
  1216. val := string(data[offset : offset+valLen])
  1217. offset += valLen
  1218. if commitIndex > maxCommitIndex {
  1219. maxCommitIndex = commitIndex
  1220. }
  1221. // Direct Write to Storage (Internal Append)
  1222. // We use the low-level Storage.Append directly since we already hold locks
  1223. // and are bypassing the standard Set path.
  1224. writeOffset, err := e.Storage.Append(key, val, RecordTypePut, commitIndex)
  1225. if err != nil {
  1226. return fmt.Errorf("restore failed appending record %d: %v", i, err)
  1227. }
  1228. // Update Memory Index
  1229. // Accessing e.Index.items directly because we hold e.Index.mu
  1230. // But we should use the helper to safely manage keyBuf
  1231. // Use insertLocked to avoid deadlock (we already hold e.Index.mu)
  1232. e.Index.insertLocked(key, IndexEntry{
  1233. ValueOffset: writeOffset,
  1234. CommitIndex: commitIndex,
  1235. })
  1236. // Update Full Text Index if enabled
  1237. if e.FTIndex != nil {
  1238. e.FTIndex.Add(key, val)
  1239. }
  1240. }
  1241. // 5. Update Metadata
  1242. e.commitMu.Lock()
  1243. e.LastCommitIndex = maxCommitIndex
  1244. e.saveMetadata()
  1245. e.commitMu.Unlock()
  1246. // 6. Force Sync
  1247. if err := e.Sync(); err != nil {
  1248. return fmt.Errorf("failed to sync restored data: %v", err)
  1249. }
  1250. return nil
  1251. }
  1252. // resetToEmpty helper to wipe the DB cleanly
  1253. func (e *Engine) resetToEmpty() error {
  1254. e.Index.mu.Lock()
  1255. defer e.Index.mu.Unlock()
  1256. if err := e.Storage.Close(); err != nil {
  1257. return err
  1258. }
  1259. e.Index = NewFlatIndex()
  1260. if e.config.EnableValueIndex {
  1261. e.FTIndex = NewFullTextIndex()
  1262. }
  1263. e.Storage.cache = make(map[int64]string)
  1264. f, err := os.OpenFile(e.Storage.filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
  1265. if err != nil {
  1266. return err
  1267. }
  1268. e.Storage.file = f
  1269. e.Storage.offset = 0
  1270. e.commitMu.Lock()
  1271. e.LastCommitIndex = 0
  1272. e.saveMetadata()
  1273. e.commitMu.Unlock()
  1274. return nil
  1275. }
  1276. func (e *Engine) DebugClearAuxiliary() {
  1277. e.Storage.cacheMu.Lock()
  1278. e.Storage.cache = make(map[int64]string)
  1279. e.Storage.cacheMu.Unlock()
  1280. if e.FTIndex != nil {
  1281. e.FTIndex.mu.Lock()
  1282. e.FTIndex.index = make(map[string]map[string]bool)
  1283. e.FTIndex.mu.Unlock()
  1284. }
  1285. }