engine.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470
  1. package db
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "fmt"
  6. "hash/crc32"
  7. "io"
  8. "os"
  9. "regexp"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. )
  16. // hash returns a 32-bit FNV-1a hash of the string
  17. func hash(s string) uint32 {
  18. var h uint32 = 2166136261
  19. for i := 0; i < len(s); i++ {
  20. h = (h * 16777619) ^ uint32(s[i])
  21. }
  22. return h
  23. }
  24. // --- Flat Sorted Array Index Implementation ---
  25. // Extreme memory optimization:
  26. // 1. Removes all tree node overhead (pointers, structs, map buckets).
  27. // 2. Stores keys in a single monolithic []byte buffer (no string headers).
  28. // 3. Uses a sorted slice of offsets for O(log N) lookup and O(N) insertion.
  29. // IndexEntry
  30. type IndexEntry struct {
  31. ValueOffset int64
  32. CommitIndex uint64
  33. }
  34. type FlatIndex struct {
  35. // Monolithic buffer to store all key strings
  36. // Format: [key1 bytes][key2 bytes]...
  37. keyBuf []byte
  38. // Sorted list of index items
  39. items []flatItem
  40. mu sync.RWMutex
  41. }
  42. // flatItem is a compact struct (16 bytes aligned)
  43. type flatItem struct {
  44. keyOffset uint32 // 4 bytes: Offset into keyBuf (supports 4GB total key size)
  45. keyLen uint16 // 2 bytes: Length of key (max key len 65535)
  46. // Padding 2 bytes? No, Go struct alignment might add padding.
  47. // IndexEntry is 16 bytes (int64 + uint64).
  48. entry IndexEntry // 16 bytes
  49. }
  50. // Total per item: 4 + 2 + (2 padding) + 16 = 24 bytes per Key.
  51. // 100k keys => 2.4 MB.
  52. // Plus keyBuf: 100k * 18 bytes = 1.8 MB.
  53. // Total expected: ~4.2 MB.
  54. func NewFlatIndex() *FlatIndex {
  55. return &FlatIndex{
  56. keyBuf: make([]byte, 0, 1024*1024), // 1MB initial cap
  57. items: make([]flatItem, 0, 10000),
  58. }
  59. }
  60. // getKey unsafe-ish helper to get string from buffer without alloc?
  61. // Standard conversion string(b) allocates.
  62. // For comparison in binary search, we ideally want to avoid allocation.
  63. // But Go 1.20+ optimization might handle string(bytes) well in map keys or comparisons if they don't escape.
  64. // Let's rely on standard string() conversion for safety first.
  65. func (fi *FlatIndex) getKey(idx int) string {
  66. item := fi.items[idx]
  67. return string(fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)])
  68. }
  69. // Compare helper to avoid string allocation
  70. func (fi *FlatIndex) compare(idx int, target string) int {
  71. item := fi.items[idx]
  72. keyBytes := fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)]
  73. return strings.Compare(string(keyBytes), target)
  74. }
  75. func (fi *FlatIndex) Insert(key string, entry IndexEntry) {
  76. fi.mu.Lock()
  77. defer fi.mu.Unlock()
  78. // 1. Binary Search
  79. idx := sort.Search(len(fi.items), func(i int) bool {
  80. return fi.getKey(i) >= key
  81. })
  82. // 2. Update existing
  83. if idx < len(fi.items) && fi.getKey(idx) == key {
  84. fi.items[idx].entry = entry
  85. return
  86. }
  87. // 3. Add new key
  88. offset := len(fi.keyBuf)
  89. fi.keyBuf = append(fi.keyBuf, key...)
  90. if len(key) > 65535 {
  91. // Should have been caught earlier, but just in case cap it or panic
  92. // We can panic or truncate.
  93. // panic("key too long")
  94. }
  95. newItem := flatItem{
  96. keyOffset: uint32(offset),
  97. keyLen: uint16(len(key)),
  98. entry: entry,
  99. }
  100. // 4. Insert into sorted slice
  101. // Optimization: check if appending to end (common case for sequential inserts)
  102. if idx == len(fi.items) {
  103. fi.items = append(fi.items, newItem)
  104. } else {
  105. fi.items = append(fi.items, flatItem{})
  106. copy(fi.items[idx+1:], fi.items[idx:])
  107. fi.items[idx] = newItem
  108. }
  109. }
  110. func (fi *FlatIndex) Get(key string) (IndexEntry, bool) {
  111. fi.mu.RLock()
  112. defer fi.mu.RUnlock()
  113. idx := sort.Search(len(fi.items), func(i int) bool {
  114. return fi.getKey(i) >= key
  115. })
  116. if idx < len(fi.items) && fi.getKey(idx) == key {
  117. return fi.items[idx].entry, true
  118. }
  119. return IndexEntry{}, false
  120. }
  121. func (fi *FlatIndex) Delete(key string) bool {
  122. fi.mu.Lock()
  123. defer fi.mu.Unlock()
  124. idx := sort.Search(len(fi.items), func(i int) bool {
  125. return fi.getKey(i) >= key
  126. })
  127. if idx < len(fi.items) && fi.getKey(idx) == key {
  128. // Delete from slice
  129. fi.items = append(fi.items[:idx], fi.items[idx+1:]...)
  130. // Note: We don't reclaim space in keyBuf. It's append-only until compaction (not implemented).
  131. // This is fine for many cases, but heavy deletes might waste keyBuf space.
  132. return true
  133. }
  134. return false
  135. }
  136. // WalkPrefix iterates over keys starting with prefix.
  137. // Since keys are sorted, we find the first match and iterate until mismatch.
  138. func (fi *FlatIndex) WalkPrefix(prefix string, callback func(key string, entry IndexEntry) bool) {
  139. fi.mu.RLock()
  140. defer fi.mu.RUnlock()
  141. // Binary search for the start
  142. idx := sort.Search(len(fi.items), func(i int) bool {
  143. return fi.getKey(i) >= prefix
  144. })
  145. for i := idx; i < len(fi.items); i++ {
  146. key := fi.getKey(i)
  147. // Optimization: Check prefix match
  148. if !strings.HasPrefix(key, prefix) {
  149. break
  150. }
  151. if !callback(key, fi.items[i].entry) {
  152. return
  153. }
  154. }
  155. }
  156. // --- End Flat Index ---
  157. // --- Full Text Index ---
  158. type FullTextIndex struct {
  159. // Token -> Set of Keys
  160. index map[string]map[string]bool
  161. mu sync.RWMutex
  162. }
  163. func NewFullTextIndex() *FullTextIndex {
  164. return &FullTextIndex{
  165. index: make(map[string]map[string]bool),
  166. }
  167. }
  168. func (fti *FullTextIndex) Add(key string, value string) {
  169. fti.mu.Lock()
  170. defer fti.mu.Unlock()
  171. tokens := tokenize(value)
  172. for _, token := range tokens {
  173. if fti.index[token] == nil {
  174. fti.index[token] = make(map[string]bool)
  175. }
  176. fti.index[token][key] = true
  177. }
  178. }
  179. func (fti *FullTextIndex) Remove(key string, value string) {
  180. fti.mu.Lock()
  181. defer fti.mu.Unlock()
  182. tokens := tokenize(value)
  183. for _, token := range tokens {
  184. if set, ok := fti.index[token]; ok {
  185. delete(set, key)
  186. if len(set) == 0 {
  187. delete(fti.index, token)
  188. }
  189. }
  190. }
  191. }
  192. func (fti *FullTextIndex) Search(tokenPattern string) []string {
  193. fti.mu.RLock()
  194. defer fti.mu.RUnlock()
  195. // 1. Exact Match
  196. if !strings.Contains(tokenPattern, "*") {
  197. if keys, ok := fti.index[tokenPattern]; ok {
  198. res := make([]string, 0, len(keys))
  199. for k := range keys {
  200. res = append(res, k)
  201. }
  202. return res
  203. }
  204. return nil
  205. }
  206. // 2. Wildcard Scan
  207. var results []string
  208. seen := make(map[string]bool)
  209. for token, keys := range fti.index {
  210. if WildcardMatch(token, tokenPattern) {
  211. for k := range keys {
  212. if !seen[k] {
  213. results = append(results, k)
  214. seen[k] = true
  215. }
  216. }
  217. }
  218. }
  219. return results
  220. }
  221. func tokenize(val string) []string {
  222. f := func(c rune) bool {
  223. return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
  224. }
  225. return strings.FieldsFunc(val, f)
  226. }
  227. // --- Storage & Cache ---
  228. // StripedLock provides hashed locks for keys
  229. type StripedLock struct {
  230. locks [1024]sync.RWMutex
  231. }
  232. func (sl *StripedLock) GetLock(key string) *sync.RWMutex {
  233. h := hash(key)
  234. return &sl.locks[h%1024]
  235. }
  236. // Metadata handles the persistent state of the engine
  237. type Metadata struct {
  238. LastCommitIndex uint64
  239. }
  240. // Storage manages disk storage using Append-Only Log.
  241. type Storage struct {
  242. file *os.File
  243. filename string
  244. offset int64 // Current end of file
  245. cache map[int64]string
  246. cacheMu sync.RWMutex
  247. }
  248. const (
  249. RecordTypePut = 0x01
  250. RecordTypeDelete = 0x02
  251. // CRC(4) + Type(1) + KeyLen(2) + ValLen(4) + CommitIndex(8)
  252. HeaderSize = 4 + 1 + 2 + 4 + 8
  253. MaxCacheSize = 10000
  254. )
  255. func NewStorage(filename string) (*Storage, error) {
  256. f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
  257. if err != nil {
  258. return nil, err
  259. }
  260. st := &Storage{
  261. file: f,
  262. filename: filename,
  263. cache: make(map[int64]string),
  264. }
  265. return st, nil
  266. }
  267. // Record represents a log entry
  268. type Record struct {
  269. Type byte
  270. Key string
  271. Value string
  272. Offset int64
  273. CommitIndex uint64
  274. }
  275. // Scan iterates over the log file, validating CRCs and returning records.
  276. // It returns the valid offset after the last record.
  277. func (s *Storage) Scan(callback func(Record)) (int64, error) {
  278. offset := int64(0)
  279. if _, err := s.file.Seek(0, 0); err != nil {
  280. return 0, err
  281. }
  282. reader := bufio.NewReader(s.file)
  283. for {
  284. header := make([]byte, HeaderSize)
  285. n, err := io.ReadFull(reader, header)
  286. if err == io.EOF {
  287. break
  288. }
  289. if err == io.ErrUnexpectedEOF && n == 0 {
  290. break
  291. }
  292. if err != nil {
  293. fmt.Printf("Storage Scan Error at %d: %v\n", offset, err)
  294. break
  295. }
  296. storedCRC := binary.LittleEndian.Uint32(header[0:4])
  297. recType := header[4]
  298. keyLen := binary.LittleEndian.Uint16(header[5:7])
  299. valLen := binary.LittleEndian.Uint32(header[7:11])
  300. commitIndex := binary.LittleEndian.Uint64(header[11:19])
  301. totalLen := int(keyLen) + int(valLen)
  302. data := make([]byte, totalLen)
  303. if _, err := io.ReadFull(reader, data); err != nil {
  304. fmt.Printf("Storage Scan Data Error at %d: %v\n", offset, err)
  305. break
  306. }
  307. // Verify CRC
  308. crc := crc32.ChecksumIEEE(header[4:]) // Checksum of Type+Lengths+CommitIndex
  309. crc = crc32.Update(crc, crc32.IEEETable, data) // + Data
  310. if crc != storedCRC {
  311. fmt.Printf("CRC Mismatch at %d\n", offset)
  312. break
  313. }
  314. key := string(data[:keyLen])
  315. val := string(data[keyLen:])
  316. callback(Record{
  317. Type: recType,
  318. Key: key,
  319. Value: val,
  320. Offset: offset,
  321. CommitIndex: commitIndex,
  322. })
  323. offset += int64(HeaderSize + totalLen)
  324. }
  325. // Update storage offset
  326. s.offset = offset
  327. s.file.Seek(offset, 0)
  328. return offset, nil
  329. }
  330. // Append writes a new record
  331. func (s *Storage) Append(key, val string, recType byte, commitIndex uint64) (int64, error) {
  332. keyBytes := []byte(key)
  333. valBytes := []byte(val)
  334. keyLen := len(keyBytes)
  335. valLen := len(valBytes)
  336. if keyLen > 65535 {
  337. return 0, fmt.Errorf("key too large")
  338. }
  339. buf := make([]byte, HeaderSize+keyLen+valLen)
  340. // 1. Build Header Data (skip CRC)
  341. buf[4] = recType
  342. binary.LittleEndian.PutUint16(buf[5:7], uint16(keyLen))
  343. binary.LittleEndian.PutUint32(buf[7:11], uint32(valLen))
  344. binary.LittleEndian.PutUint64(buf[11:19], commitIndex)
  345. // 2. Copy Data
  346. copy(buf[HeaderSize:], keyBytes)
  347. copy(buf[HeaderSize+keyLen:], valBytes)
  348. // 3. Calc CRC
  349. crc := crc32.ChecksumIEEE(buf[4:]) // Everything after CRC field
  350. binary.LittleEndian.PutUint32(buf[0:4], crc)
  351. // 4. Write
  352. totalSize := int64(len(buf))
  353. writeOffset := atomic.AddInt64(&s.offset, totalSize) - totalSize
  354. if _, err := s.file.WriteAt(buf, writeOffset); err != nil {
  355. return 0, err
  356. }
  357. // Add to Cache if Put
  358. if recType == RecordTypePut {
  359. s.cacheMu.Lock()
  360. if len(s.cache) < MaxCacheSize {
  361. s.cache[writeOffset] = val
  362. }
  363. s.cacheMu.Unlock()
  364. }
  365. return writeOffset, nil
  366. }
  367. // Sync flushes writes to stable storage
  368. func (s *Storage) Sync() error {
  369. return s.file.Sync()
  370. }
  371. // ReadValue reads value at offset
  372. func (s *Storage) ReadValue(offset int64) (string, error) {
  373. // 1. Check Cache
  374. s.cacheMu.RLock()
  375. if val, ok := s.cache[offset]; ok {
  376. s.cacheMu.RUnlock()
  377. return val, nil
  378. }
  379. s.cacheMu.RUnlock()
  380. // 2. Read Header
  381. header := make([]byte, HeaderSize)
  382. if _, err := s.file.ReadAt(header, offset); err != nil {
  383. return "", err
  384. }
  385. keyLen := binary.LittleEndian.Uint16(header[5:7])
  386. valLen := binary.LittleEndian.Uint32(header[7:11])
  387. totalLen := int(keyLen) + int(valLen)
  388. data := make([]byte, totalLen)
  389. if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
  390. return "", err
  391. }
  392. // Verify CRC on read for safety
  393. storedCRC := binary.LittleEndian.Uint32(header[0:4])
  394. crc := crc32.ChecksumIEEE(header[4:])
  395. crc = crc32.Update(crc, crc32.IEEETable, data)
  396. if crc != storedCRC {
  397. return "", fmt.Errorf("data corruption detected at offset %d", offset)
  398. }
  399. val := string(data[keyLen:])
  400. // 3. Fill Cache
  401. s.cacheMu.Lock()
  402. if len(s.cache) < MaxCacheSize {
  403. s.cache[offset] = val
  404. } else {
  405. for k := range s.cache {
  406. delete(s.cache, k)
  407. break
  408. }
  409. s.cache[offset] = val
  410. }
  411. s.cacheMu.Unlock()
  412. return val, nil
  413. }
  414. func (s *Storage) Close() error {
  415. return s.file.Close()
  416. }
  417. // EngineOption defines a configuration option for the Engine
  418. type EngineOption func(*EngineConfig)
  419. type EngineConfig struct {
  420. EnableValueIndex bool
  421. }
  422. // WithValueIndex enables or disables the value index (Full Text Index)
  423. func WithValueIndex(enable bool) EngineOption {
  424. return func(c *EngineConfig) {
  425. c.EnableValueIndex = enable
  426. }
  427. }
  428. // Engine is the core storage engine.
  429. type Engine struct {
  430. Index *FlatIndex // Flattened memory structure
  431. Storage *Storage
  432. FTIndex *FullTextIndex // Can be nil if disabled
  433. KeyLocks StripedLock
  434. LastCommitIndex uint64
  435. commitMu sync.Mutex
  436. dataDir string
  437. // Metadata state
  438. metaFile *os.File
  439. writeMu sync.Mutex
  440. config EngineConfig
  441. }
  442. func NewEngine(dataDir string, opts ...EngineOption) (*Engine, error) {
  443. if err := os.MkdirAll(dataDir, 0755); err != nil {
  444. return nil, err
  445. }
  446. config := EngineConfig{
  447. EnableValueIndex: false, // Default to false (key-only)
  448. }
  449. for _, opt := range opts {
  450. opt(&config)
  451. }
  452. store, err := NewStorage(dataDir + "/values.data")
  453. if err != nil {
  454. return nil, err
  455. }
  456. // Open or create metadata file
  457. metaPath := dataDir + "/meta.state"
  458. metaFile, err := os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0644)
  459. if err != nil {
  460. store.Close()
  461. return nil, err
  462. }
  463. e := &Engine{
  464. Index: NewFlatIndex(),
  465. Storage: store,
  466. FTIndex: nil,
  467. dataDir: dataDir,
  468. metaFile: metaFile,
  469. config: config,
  470. }
  471. if config.EnableValueIndex {
  472. e.FTIndex = NewFullTextIndex()
  473. }
  474. // Load Metadata
  475. stat, err := metaFile.Stat()
  476. if err == nil && stat.Size() >= 8 {
  477. b := make([]byte, 8)
  478. if _, err := metaFile.ReadAt(b, 0); err == nil {
  479. e.LastCommitIndex = binary.LittleEndian.Uint64(b)
  480. }
  481. }
  482. // Rebuild Index from Disk
  483. // Note: We still scan to rebuild the memory index, but LastCommitIndex is initialized from meta file
  484. // We can update LastCommitIndex if the log is ahead of the meta file (e.g. crash before meta update)
  485. _, err = store.Scan(func(rec Record) {
  486. if rec.Type == RecordTypePut {
  487. e.Index.Insert(rec.Key, IndexEntry{
  488. ValueOffset: rec.Offset,
  489. CommitIndex: rec.CommitIndex,
  490. })
  491. if e.FTIndex != nil {
  492. e.FTIndex.Add(rec.Key, rec.Value)
  493. }
  494. // Update LastCommitIndex if log is ahead
  495. if rec.CommitIndex > e.LastCommitIndex {
  496. e.LastCommitIndex = rec.CommitIndex
  497. }
  498. } else if rec.Type == RecordTypeDelete {
  499. // Cleanup FTIndex using old value if possible
  500. e.removeValueFromFTIndex(rec.Key)
  501. e.Index.Delete(rec.Key)
  502. if rec.CommitIndex > e.LastCommitIndex {
  503. e.LastCommitIndex = rec.CommitIndex
  504. }
  505. }
  506. })
  507. return e, nil
  508. }
  509. func (e *Engine) removeValueFromFTIndex(key string) {
  510. if e.FTIndex == nil {
  511. return
  512. }
  513. if entry, ok := e.Index.Get(key); ok {
  514. val, err := e.Storage.ReadValue(entry.ValueOffset)
  515. if err == nil {
  516. e.FTIndex.Remove(key, val)
  517. }
  518. }
  519. }
  520. func (e *Engine) Close() error {
  521. e.saveMetadata()
  522. e.metaFile.Close()
  523. return e.Storage.Close()
  524. }
  525. func (e *Engine) saveMetadata() {
  526. b := make([]byte, 8)
  527. binary.LittleEndian.PutUint64(b, e.LastCommitIndex)
  528. e.metaFile.WriteAt(b, 0)
  529. }
  530. func (e *Engine) Set(key, value string, commitIndex uint64) error {
  531. e.writeMu.Lock()
  532. defer e.writeMu.Unlock()
  533. // Idempotency check: if this log entry has already been applied, skip it.
  534. // This handles Raft replay on restart.
  535. if commitIndex > 0 && commitIndex <= e.LastCommitIndex {
  536. return nil
  537. }
  538. e.removeValueFromFTIndex(key)
  539. offset, err := e.Storage.Append(key, value, RecordTypePut, commitIndex)
  540. if err != nil {
  541. return err
  542. }
  543. e.Index.Insert(key, IndexEntry{
  544. ValueOffset: offset,
  545. CommitIndex: commitIndex,
  546. })
  547. if e.FTIndex != nil {
  548. e.FTIndex.Add(key, value)
  549. }
  550. e.commitMu.Lock()
  551. if commitIndex > e.LastCommitIndex {
  552. e.LastCommitIndex = commitIndex
  553. // Update metadata on disk periodically or on every write?
  554. // For safety, let's update it. Since it's pwrite at offset 0, it's fast.
  555. e.saveMetadata()
  556. }
  557. e.commitMu.Unlock()
  558. return nil
  559. }
  560. func (e *Engine) Delete(key string, commitIndex uint64) error {
  561. e.writeMu.Lock()
  562. defer e.writeMu.Unlock()
  563. // Idempotency check
  564. if commitIndex > 0 && commitIndex <= e.LastCommitIndex {
  565. return nil
  566. }
  567. e.removeValueFromFTIndex(key)
  568. _, err := e.Storage.Append(key, "", RecordTypeDelete, commitIndex)
  569. if err != nil {
  570. return err
  571. }
  572. e.Index.Delete(key)
  573. e.commitMu.Lock()
  574. if commitIndex > e.LastCommitIndex {
  575. e.LastCommitIndex = commitIndex
  576. e.saveMetadata()
  577. }
  578. e.commitMu.Unlock()
  579. return nil
  580. }
  581. // Sync flushes underlying storage to disk
  582. // GetLastAppliedIndex returns the last Raft log index applied to the DB
  583. func (e *Engine) GetLastAppliedIndex() uint64 {
  584. e.commitMu.Lock()
  585. defer e.commitMu.Unlock()
  586. return e.LastCommitIndex
  587. }
  588. func (e *Engine) Sync() error {
  589. e.metaFile.Sync()
  590. return e.Storage.Sync()
  591. }
  592. func (e *Engine) Get(key string) (string, bool) {
  593. entry, ok := e.Index.Get(key)
  594. if !ok {
  595. return "", false
  596. }
  597. val, err := e.Storage.ReadValue(entry.ValueOffset)
  598. if err != nil {
  599. return "", false
  600. }
  601. return val, true
  602. }
  603. type QueryResult struct {
  604. Key string `json:"key"`
  605. Value string `json:"value"`
  606. CommitIndex uint64 `json:"commit_index"`
  607. }
  608. func WildcardMatch(str, pattern string) bool {
  609. s := []rune(str)
  610. p := []rune(pattern)
  611. sLen, pLen := len(s), len(p)
  612. i, j := 0, 0
  613. starIdx, matchIdx := -1, -1
  614. for i < sLen {
  615. if j < pLen && (p[j] == '?' || p[j] == s[i]) {
  616. i++
  617. j++
  618. } else if j < pLen && p[j] == '*' {
  619. starIdx = j
  620. matchIdx = i
  621. j++
  622. } else if starIdx != -1 {
  623. j = starIdx + 1
  624. matchIdx++
  625. i = matchIdx
  626. } else {
  627. return false
  628. }
  629. }
  630. for j < pLen && p[j] == '*' {
  631. j++
  632. }
  633. return j == pLen
  634. }
  635. func (e *Engine) Count(sql string) (int, error) {
  636. return e.execute(sql, true)
  637. }
  638. func (e *Engine) execute(sql string, countOnly bool) (int, error) {
  639. sql = strings.TrimSpace(sql)
  640. reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
  641. reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
  642. limit := -1
  643. offset := 0
  644. if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
  645. limit, _ = strconv.Atoi(match[1])
  646. sql = reLimit.ReplaceAllString(sql, "")
  647. }
  648. if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
  649. offset, _ = strconv.Atoi(match[1])
  650. sql = reOffset.ReplaceAllString(sql, "")
  651. }
  652. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  653. matches := re.FindAllStringSubmatch(sql, -1)
  654. if len(matches) == 0 {
  655. return 0, fmt.Errorf("invalid query")
  656. }
  657. extractString := func(s string) string {
  658. return strings.Trim(strings.TrimSpace(s), "\"")
  659. }
  660. // 1. Optimize for Point Lookup
  661. for _, match := range matches {
  662. if match[1] == "key" && match[2] == "=" {
  663. targetKey := extractString(match[3])
  664. entry, ok := e.Index.Get(targetKey)
  665. if !ok {
  666. return 0, nil
  667. }
  668. needValue := false
  669. for _, m := range matches {
  670. if m[1] == "value" { needValue = true; break }
  671. }
  672. var val string
  673. if needValue {
  674. v, err := e.Storage.ReadValue(entry.ValueOffset)
  675. if err != nil { return 0, nil }
  676. val = v
  677. }
  678. matchAll := true
  679. for _, m := range matches {
  680. field, op, valRaw := m[1], m[2], m[3]
  681. switch field {
  682. case "CommitIndex":
  683. num, _ := strconv.ParseUint(valRaw, 10, 64)
  684. switch op {
  685. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  686. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  687. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  688. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  689. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  690. }
  691. case "value":
  692. t := extractString(valRaw)
  693. switch op {
  694. case "=": if val != t { matchAll = false }
  695. case "like": if !WildcardMatch(val, t) { matchAll = false }
  696. }
  697. }
  698. if !matchAll { break }
  699. }
  700. if matchAll {
  701. return 1, nil
  702. }
  703. return 0, nil
  704. }
  705. }
  706. var candidates map[string]bool
  707. var useFTIndex bool = false
  708. // 2. Try to use FT Index (if enabled)
  709. if e.FTIndex != nil {
  710. for _, match := range matches {
  711. if match[1] == "value" && match[2] == "like" {
  712. pattern := extractString(match[3])
  713. clean := strings.Trim(pattern, "*")
  714. if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
  715. matches := e.FTIndex.Search(pattern)
  716. if matches != nil {
  717. currentSet := make(map[string]bool)
  718. for _, k := range matches {
  719. currentSet[k] = true
  720. }
  721. if !useFTIndex {
  722. candidates = currentSet
  723. useFTIndex = true
  724. } else {
  725. newSet := make(map[string]bool)
  726. for k := range candidates {
  727. if currentSet[k] { newSet[k] = true }
  728. }
  729. candidates = newSet
  730. }
  731. } else {
  732. return 0, nil
  733. }
  734. }
  735. }
  736. }
  737. } else {
  738. for _, match := range matches {
  739. if match[1] == "value" && match[2] == "like" {
  740. // Value search requested but index disabled -> return 0
  741. return 0, nil
  742. }
  743. }
  744. }
  745. var iterator func(func(string, IndexEntry) bool)
  746. if useFTIndex {
  747. iterator = func(cb func(string, IndexEntry) bool) {
  748. keys := make([]string, 0, len(candidates))
  749. for k := range candidates { keys = append(keys, k) }
  750. sort.Strings(keys)
  751. for _, k := range keys {
  752. if entry, ok := e.Index.Get(k); ok {
  753. if !cb(k, entry) { return }
  754. }
  755. }
  756. }
  757. } else {
  758. // Use FlatIndex Prefix Search
  759. var prefix string = ""
  760. var usePrefix bool = false
  761. for _, match := range matches {
  762. if match[1] == "key" && match[2] == "like" {
  763. pattern := extractString(match[3])
  764. // Flat Index supports simple prefix match
  765. if strings.HasSuffix(pattern, "*") {
  766. clean := pattern[:len(pattern)-1]
  767. if !strings.ContainsAny(clean, "*?") {
  768. prefix = clean
  769. usePrefix = true
  770. break
  771. }
  772. }
  773. }
  774. }
  775. iterator = func(cb func(string, IndexEntry) bool) {
  776. if usePrefix {
  777. e.Index.WalkPrefix(prefix, cb)
  778. } else {
  779. e.Index.WalkPrefix("", cb)
  780. }
  781. }
  782. }
  783. matchedCount := 0
  784. iterator(func(key string, entry IndexEntry) bool {
  785. var valStr string
  786. var valLoaded bool
  787. matchAll := true
  788. for _, match := range matches {
  789. field, op, valRaw := match[1], match[2], match[3]
  790. switch field {
  791. case "CommitIndex":
  792. num, _ := strconv.ParseUint(valRaw, 10, 64)
  793. switch op {
  794. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  795. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  796. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  797. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  798. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  799. }
  800. case "key":
  801. target := extractString(valRaw)
  802. switch op {
  803. case "=": if key != target { matchAll = false }
  804. case "like": if !WildcardMatch(key, target) { matchAll = false }
  805. }
  806. case "value":
  807. if e.FTIndex == nil && op == "like" {
  808. matchAll = false
  809. break
  810. }
  811. if !valLoaded {
  812. v, err := e.Storage.ReadValue(entry.ValueOffset)
  813. if err != nil { matchAll = false; break }
  814. valStr = v
  815. valLoaded = true
  816. }
  817. target := extractString(valRaw)
  818. switch op {
  819. case "=": if valStr != target { matchAll = false }
  820. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  821. }
  822. }
  823. if !matchAll { break }
  824. }
  825. if matchAll {
  826. matchedCount++
  827. if limit > 0 && offset == 0 && matchedCount >= limit {
  828. return false
  829. }
  830. }
  831. return true
  832. })
  833. if offset > 0 {
  834. if offset >= matchedCount {
  835. return 0, nil
  836. }
  837. matchedCount -= offset
  838. }
  839. if limit >= 0 {
  840. if limit < matchedCount {
  841. matchedCount = limit
  842. }
  843. }
  844. return matchedCount, nil
  845. }
  846. func (e *Engine) Query(sql string) ([]QueryResult, error) {
  847. return e.queryInternal(sql)
  848. }
  849. func (e *Engine) queryInternal(sql string) ([]QueryResult, error) {
  850. sql = strings.TrimSpace(sql)
  851. reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
  852. reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
  853. limit := -1
  854. offset := 0
  855. if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
  856. limit, _ = strconv.Atoi(match[1])
  857. sql = reLimit.ReplaceAllString(sql, "")
  858. }
  859. if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
  860. offset, _ = strconv.Atoi(match[1])
  861. sql = reOffset.ReplaceAllString(sql, "")
  862. }
  863. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  864. matches := re.FindAllStringSubmatch(sql, -1)
  865. if len(matches) == 0 {
  866. return nil, fmt.Errorf("invalid query")
  867. }
  868. extractString := func(s string) string {
  869. return strings.Trim(strings.TrimSpace(s), "\"")
  870. }
  871. for _, match := range matches {
  872. if match[1] == "key" && match[2] == "=" {
  873. targetKey := extractString(match[3])
  874. entry, ok := e.Index.Get(targetKey)
  875. if !ok {
  876. return []QueryResult{}, nil
  877. }
  878. val, err := e.Storage.ReadValue(entry.ValueOffset)
  879. if err != nil {
  880. return []QueryResult{}, nil
  881. }
  882. matchAll := true
  883. for _, m := range matches {
  884. field, op, valRaw := m[1], m[2], m[3]
  885. switch field {
  886. case "CommitIndex":
  887. num, _ := strconv.ParseUint(valRaw, 10, 64)
  888. switch op {
  889. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  890. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  891. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  892. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  893. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  894. }
  895. case "value":
  896. t := extractString(valRaw)
  897. switch op {
  898. case "=": if val != t { matchAll = false }
  899. case "like": if !WildcardMatch(val, t) { matchAll = false }
  900. }
  901. }
  902. if !matchAll { break }
  903. }
  904. if matchAll {
  905. return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
  906. }
  907. return []QueryResult{}, nil
  908. }
  909. }
  910. var candidates map[string]bool
  911. var useFTIndex bool = false
  912. if e.FTIndex != nil {
  913. for _, match := range matches {
  914. if match[1] == "value" && match[2] == "like" {
  915. pattern := extractString(match[3])
  916. clean := strings.Trim(pattern, "*")
  917. if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
  918. matches := e.FTIndex.Search(pattern)
  919. if matches != nil {
  920. currentSet := make(map[string]bool)
  921. for _, k := range matches {
  922. currentSet[k] = true
  923. }
  924. if !useFTIndex {
  925. candidates = currentSet
  926. useFTIndex = true
  927. } else {
  928. newSet := make(map[string]bool)
  929. for k := range candidates {
  930. if currentSet[k] {
  931. newSet[k] = true
  932. }
  933. }
  934. candidates = newSet
  935. }
  936. } else {
  937. return []QueryResult{}, nil
  938. }
  939. }
  940. }
  941. }
  942. } else {
  943. for _, match := range matches {
  944. if match[1] == "value" && match[2] == "like" {
  945. return []QueryResult{}, nil
  946. }
  947. }
  948. }
  949. var iterator func(func(string, IndexEntry) bool)
  950. if useFTIndex {
  951. iterator = func(cb func(string, IndexEntry) bool) {
  952. keys := make([]string, 0, len(candidates))
  953. for k := range candidates {
  954. keys = append(keys, k)
  955. }
  956. sort.Strings(keys)
  957. for _, k := range keys {
  958. if entry, ok := e.Index.Get(k); ok {
  959. if !cb(k, entry) {
  960. return
  961. }
  962. }
  963. }
  964. }
  965. } else {
  966. var prefix string = ""
  967. var usePrefix bool = false
  968. for _, match := range matches {
  969. if match[1] == "key" && match[2] == "like" {
  970. pattern := extractString(match[3])
  971. if strings.HasSuffix(pattern, "*") {
  972. clean := pattern[:len(pattern)-1]
  973. if !strings.ContainsAny(clean, "*?") {
  974. prefix = clean
  975. usePrefix = true
  976. break
  977. }
  978. }
  979. }
  980. }
  981. iterator = func(cb func(string, IndexEntry) bool) {
  982. if usePrefix {
  983. e.Index.WalkPrefix(prefix, cb)
  984. } else {
  985. e.Index.WalkPrefix("", cb)
  986. }
  987. }
  988. }
  989. var results []QueryResult
  990. iterator(func(key string, entry IndexEntry) bool {
  991. var valStr string
  992. var valLoaded bool
  993. matchAll := true
  994. for _, match := range matches {
  995. field, op, valRaw := match[1], match[2], match[3]
  996. switch field {
  997. case "CommitIndex":
  998. num, _ := strconv.ParseUint(valRaw, 10, 64)
  999. switch op {
  1000. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  1001. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  1002. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  1003. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  1004. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  1005. }
  1006. case "key":
  1007. target := extractString(valRaw)
  1008. switch op {
  1009. case "=": if key != target { matchAll = false }
  1010. case "like": if !WildcardMatch(key, target) { matchAll = false }
  1011. }
  1012. case "value":
  1013. if e.FTIndex == nil && op == "like" {
  1014. matchAll = false
  1015. break
  1016. }
  1017. if !valLoaded {
  1018. v, err := e.Storage.ReadValue(entry.ValueOffset)
  1019. if err != nil { matchAll = false; break }
  1020. valStr = v
  1021. valLoaded = true
  1022. }
  1023. target := extractString(valRaw)
  1024. switch op {
  1025. case "=": if valStr != target { matchAll = false }
  1026. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  1027. }
  1028. }
  1029. if !matchAll { break }
  1030. }
  1031. if matchAll {
  1032. if !valLoaded {
  1033. v, err := e.Storage.ReadValue(entry.ValueOffset)
  1034. if err == nil { valStr = v }
  1035. }
  1036. results = append(results, QueryResult{
  1037. Key: key,
  1038. Value: valStr,
  1039. CommitIndex: entry.CommitIndex,
  1040. })
  1041. needed := limit + offset
  1042. if limit > 0 && len(results) >= needed {
  1043. return false
  1044. }
  1045. }
  1046. return true
  1047. })
  1048. if offset > 0 {
  1049. if offset >= len(results) {
  1050. return []QueryResult{}, nil
  1051. }
  1052. results = results[offset:]
  1053. }
  1054. if limit >= 0 {
  1055. if limit < len(results) {
  1056. results = results[:limit]
  1057. }
  1058. }
  1059. return results, nil
  1060. }
  1061. // Snapshot creates a full consistency snapshot of the database.
  1062. // Since the DB engine itself IS the state machine, this simply serializes
  1063. // all current valid data. This snapshot can be used to restore a lagging node
  1064. // that has fallen behind the Raft logs.
  1065. func (e *Engine) Snapshot() ([]byte, error) {
  1066. e.Index.mu.RLock()
  1067. defer e.Index.mu.RUnlock()
  1068. // Snapshot Format Version 1:
  1069. // [Count U64] + [Record...]
  1070. // Record: [KeyLen U16] [Key Bytes] [ValLen U32] [Val Bytes] [CommitIndex U64]
  1071. // Pre-calculate size to reduce allocations
  1072. // Estimate: Count * (AverageKeySize + AverageValSize + Overhead)
  1073. // We start with a reasonable buffer size.
  1074. buf := make([]byte, 0, 1024*1024)
  1075. count := uint64(len(e.Index.items))
  1076. // Write Count
  1077. tmp := make([]byte, 8)
  1078. binary.LittleEndian.PutUint64(tmp, count)
  1079. buf = append(buf, tmp...)
  1080. // Buffer for encoding headers to avoid repeated small allocations
  1081. headerBuf := make([]byte, 2+4+8) // KeyLen + ValLen + CommitIndex
  1082. for i := range e.Index.items {
  1083. // Get Key
  1084. key := e.Index.getKey(i)
  1085. entry := e.Index.items[i].entry
  1086. // Get Value
  1087. // We read directly from storage. Since we hold the read lock on index,
  1088. // the offset is valid. The storage file is append-only, so old data exists.
  1089. val, err := e.Storage.ReadValue(entry.ValueOffset)
  1090. if err != nil {
  1091. // If we can't read a value, it's a critical error for snapshot integrity.
  1092. return nil, fmt.Errorf("snapshot failed reading key '%s' at offset %d: %v", key, entry.ValueOffset, err)
  1093. }
  1094. keyBytes := []byte(key)
  1095. valBytes := []byte(val)
  1096. // Encode Header
  1097. binary.LittleEndian.PutUint16(headerBuf[0:2], uint16(len(keyBytes)))
  1098. binary.LittleEndian.PutUint32(headerBuf[2:6], uint32(len(valBytes)))
  1099. binary.LittleEndian.PutUint64(headerBuf[6:14], entry.CommitIndex)
  1100. // Append to buffer
  1101. buf = append(buf, headerBuf...)
  1102. buf = append(buf, keyBytes...)
  1103. buf = append(buf, valBytes...)
  1104. }
  1105. return buf, nil
  1106. }
  1107. // Restore completely replaces the database content with the provided snapshot.
  1108. // This matches the "DB as Snapshot" design: we wipe the current state
  1109. // and rebuild from the full image provided by the leader.
  1110. func (e *Engine) Restore(data []byte) error {
  1111. e.writeMu.Lock()
  1112. defer e.writeMu.Unlock()
  1113. // 1. Safety check & Parse Header
  1114. if len(data) < 8 {
  1115. if len(data) == 0 {
  1116. // Empty snapshot implies empty DB.
  1117. return e.resetToEmpty()
  1118. }
  1119. return fmt.Errorf("invalid snapshot data: too short")
  1120. }
  1121. count := binary.LittleEndian.Uint64(data[0:8])
  1122. offset := 8
  1123. // 2. Stop-the-World: Close current storage to safely wipe it
  1124. e.Index.mu.Lock()
  1125. if err := e.Storage.Close(); err != nil {
  1126. e.Index.mu.Unlock()
  1127. return fmt.Errorf("failed to close storage for restore: %v", err)
  1128. }
  1129. // 3. Truncate & Reset
  1130. // We are replacing the entire DB state.
  1131. // Reset In-Memory Index
  1132. e.Index = NewFlatIndex()
  1133. if e.config.EnableValueIndex {
  1134. e.FTIndex = NewFullTextIndex()
  1135. }
  1136. // Clear Cache
  1137. e.Storage.cache = make(map[int64]string)
  1138. // Re-open storage in Truncate mode (Wipe data file)
  1139. f, err := os.OpenFile(e.Storage.filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
  1140. if err != nil {
  1141. e.Index.mu.Unlock()
  1142. return fmt.Errorf("failed to reopen storage for restore: %v", err)
  1143. }
  1144. e.Storage.file = f
  1145. e.Storage.offset = 0
  1146. // We hold the lock during the entire restore process to prevent any reads/writes
  1147. // This is acceptable as Restore is a rare, heavyweight operation.
  1148. defer e.Index.mu.Unlock()
  1149. maxCommitIndex := uint64(0)
  1150. // 4. Rebuild Data from Snapshot Stream
  1151. for i := uint64(0); i < count; i++ {
  1152. // Read KeyLen
  1153. if offset+2 > len(data) {
  1154. return fmt.Errorf("restore failed: truncated data at record %d (keylen)", i)
  1155. }
  1156. keyLen := int(binary.LittleEndian.Uint16(data[offset:]))
  1157. offset += 2
  1158. // Read Key
  1159. if offset+keyLen > len(data) {
  1160. return fmt.Errorf("restore failed: truncated data at record %d (key)", i)
  1161. }
  1162. key := string(data[offset : offset+keyLen])
  1163. offset += keyLen
  1164. // Read ValLen
  1165. if offset+4 > len(data) {
  1166. return fmt.Errorf("restore failed: truncated data at record %d (vallen)", i)
  1167. }
  1168. valLen := int(binary.LittleEndian.Uint32(data[offset:]))
  1169. offset += 4
  1170. // Read Value
  1171. if offset+valLen > len(data) {
  1172. return fmt.Errorf("restore failed: truncated data at record %d (value)", i)
  1173. }
  1174. val := string(data[offset : offset+valLen])
  1175. offset += valLen
  1176. // Read CommitIndex
  1177. if offset+8 > len(data) {
  1178. return fmt.Errorf("restore failed: truncated data at record %d (commitIndex)", i)
  1179. }
  1180. commitIndex := binary.LittleEndian.Uint64(data[offset:])
  1181. offset += 8
  1182. if commitIndex > maxCommitIndex {
  1183. maxCommitIndex = commitIndex
  1184. }
  1185. // Direct Write to Storage (Internal Append)
  1186. // We use the low-level Storage.Append directly since we already hold locks
  1187. // and are bypassing the standard Set path.
  1188. writeOffset, err := e.Storage.Append(key, val, RecordTypePut, commitIndex)
  1189. if err != nil {
  1190. return fmt.Errorf("restore failed appending record %d: %v", i, err)
  1191. }
  1192. // Update Memory Index
  1193. // Accessing e.Index.items directly because we hold e.Index.mu
  1194. // But we should use the helper to safely manage keyBuf
  1195. e.Index.Insert(key, IndexEntry{
  1196. ValueOffset: writeOffset,
  1197. CommitIndex: commitIndex,
  1198. })
  1199. // Update Full Text Index if enabled
  1200. if e.FTIndex != nil {
  1201. e.FTIndex.Add(key, val)
  1202. }
  1203. }
  1204. // 5. Update Metadata
  1205. e.commitMu.Lock()
  1206. e.LastCommitIndex = maxCommitIndex
  1207. e.saveMetadata()
  1208. e.commitMu.Unlock()
  1209. // 6. Force Sync
  1210. if err := e.Sync(); err != nil {
  1211. return fmt.Errorf("failed to sync restored data: %v", err)
  1212. }
  1213. return nil
  1214. }
  1215. // resetToEmpty helper to wipe the DB cleanly
  1216. func (e *Engine) resetToEmpty() error {
  1217. e.Index.mu.Lock()
  1218. defer e.Index.mu.Unlock()
  1219. if err := e.Storage.Close(); err != nil {
  1220. return err
  1221. }
  1222. e.Index = NewFlatIndex()
  1223. if e.config.EnableValueIndex {
  1224. e.FTIndex = NewFullTextIndex()
  1225. }
  1226. e.Storage.cache = make(map[int64]string)
  1227. f, err := os.OpenFile(e.Storage.filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
  1228. if err != nil {
  1229. return err
  1230. }
  1231. e.Storage.file = f
  1232. e.Storage.offset = 0
  1233. e.commitMu.Lock()
  1234. e.LastCommitIndex = 0
  1235. e.saveMetadata()
  1236. e.commitMu.Unlock()
  1237. return nil
  1238. }
  1239. func (e *Engine) DebugClearAuxiliary() {
  1240. e.Storage.cacheMu.Lock()
  1241. e.Storage.cache = make(map[int64]string)
  1242. e.Storage.cacheMu.Unlock()
  1243. if e.FTIndex != nil {
  1244. e.FTIndex.mu.Lock()
  1245. e.FTIndex.index = make(map[string]map[string]bool)
  1246. e.FTIndex.mu.Unlock()
  1247. }
  1248. }