engine.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038
  1. package db
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "regexp"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. )
  16. // hash returns a 32-bit FNV-1a hash of the string
  17. func hash(s string) uint32 {
  18. var h uint32 = 2166136261
  19. for i := 0; i < len(s); i++ {
  20. h = (h * 16777619) ^ uint32(s[i])
  21. }
  22. return h
  23. }
  24. // FreeList manages reusable disk space in memory using Best-Fit strategy.
  25. type FreeList struct {
  26. // Capacity -> Stack of Offsets
  27. buckets map[uint32][]int64
  28. // Sorted list of capacities available in buckets for fast Best-Fit lookup
  29. sortedCaps []uint32
  30. mu sync.Mutex
  31. }
  32. func NewFreeList() *FreeList {
  33. return &FreeList{
  34. buckets: make(map[uint32][]int64),
  35. }
  36. }
  37. // Add adds an offset to the free list for a given capacity.
  38. func (fl *FreeList) Add(cap uint32, offset int64) {
  39. fl.mu.Lock()
  40. defer fl.mu.Unlock()
  41. if _, exists := fl.buckets[cap]; !exists {
  42. fl.buckets[cap] = []int64{offset}
  43. // Maintain sortedCaps
  44. fl.sortedCaps = append(fl.sortedCaps, cap)
  45. sort.Slice(fl.sortedCaps, func(i, j int) bool { return fl.sortedCaps[i] < fl.sortedCaps[j] })
  46. } else {
  47. fl.buckets[cap] = append(fl.buckets[cap], offset)
  48. }
  49. }
  50. // Pop tries to get an available offset using Best-Fit strategy.
  51. func (fl *FreeList) Pop(targetCap uint32) (int64, uint32, bool) {
  52. fl.mu.Lock()
  53. defer fl.mu.Unlock()
  54. idx := sort.Search(len(fl.sortedCaps), func(i int) bool {
  55. return fl.sortedCaps[i] >= targetCap
  56. })
  57. if idx >= len(fl.sortedCaps) {
  58. return 0, 0, false
  59. }
  60. foundCap := fl.sortedCaps[idx]
  61. offsets := fl.buckets[foundCap]
  62. if len(offsets) == 0 {
  63. return 0, 0, false
  64. }
  65. lastIdx := len(offsets) - 1
  66. offset := offsets[lastIdx]
  67. if lastIdx == 0 {
  68. delete(fl.buckets, foundCap)
  69. fl.sortedCaps = append(fl.sortedCaps[:idx], fl.sortedCaps[idx+1:]...)
  70. } else {
  71. fl.buckets[foundCap] = offsets[:lastIdx]
  72. }
  73. return offset, foundCap, true
  74. }
  75. // StripedLock provides hashed locks for keys
  76. type StripedLock struct {
  77. locks [1024]sync.RWMutex
  78. }
  79. func (sl *StripedLock) GetLock(key string) *sync.RWMutex {
  80. h := hash(key)
  81. return &sl.locks[h%1024]
  82. }
  83. // Storage (Table 2) manages disk storage for values.
  84. type Storage struct {
  85. file *os.File
  86. filename string
  87. offset int64 // Current end of file (Atomic)
  88. freeList *FreeList
  89. }
  90. const (
  91. FlagDeleted = 0x00
  92. FlagValid = 0x01
  93. HeaderSize = 9 // 1(Flag) + 4(Cap) + 4(Len)
  94. AlignSize = 16 // Align capacity to 16 bytes
  95. )
  96. func NewStorage(filename string) (*Storage, error) {
  97. f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
  98. if err != nil {
  99. return nil, err
  100. }
  101. st := &Storage{
  102. file: f,
  103. filename: filename,
  104. freeList: NewFreeList(),
  105. }
  106. if err := st.scan(); err != nil {
  107. f.Close()
  108. return nil, err
  109. }
  110. return st, nil
  111. }
  112. func (s *Storage) scan() error {
  113. offset := int64(0)
  114. if _, err := s.file.Seek(0, 0); err != nil {
  115. return err
  116. }
  117. reader := bufio.NewReader(s.file)
  118. for {
  119. header := make([]byte, HeaderSize)
  120. n, err := io.ReadFull(reader, header)
  121. if err == io.EOF {
  122. break
  123. }
  124. if err == io.ErrUnexpectedEOF && n == 0 {
  125. break
  126. }
  127. if err != nil {
  128. return err
  129. }
  130. flag := header[0]
  131. cap := binary.LittleEndian.Uint32(header[1:])
  132. if flag == FlagDeleted {
  133. s.freeList.Add(cap, offset)
  134. }
  135. discardLen := int(cap)
  136. if _, err := reader.Discard(discardLen); err != nil {
  137. buf := make([]byte, discardLen)
  138. if _, err := io.ReadFull(reader, buf); err != nil {
  139. return err
  140. }
  141. }
  142. offset += int64(HeaderSize + discardLen)
  143. }
  144. s.offset = offset
  145. return nil
  146. }
  147. func alignCapacity(length int) uint32 {
  148. if length == 0 {
  149. return AlignSize
  150. }
  151. cap := (length + AlignSize - 1) / AlignSize * AlignSize
  152. return uint32(cap)
  153. }
  154. // TryUpdateInPlace tries to update value in-place if capacity allows.
  155. // Returns true if successful.
  156. // This must be called under Key Lock.
  157. func (s *Storage) TryUpdateInPlace(val string, offset int64) bool {
  158. if offset < 0 {
  159. return false
  160. }
  161. valLen := len(val)
  162. capBuf := make([]byte, 4)
  163. if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
  164. return false
  165. }
  166. oldCap := binary.LittleEndian.Uint32(capBuf)
  167. if uint32(valLen) > oldCap {
  168. return false
  169. }
  170. // Fits! Write Length then Data
  171. lenBuf := make([]byte, 4)
  172. binary.LittleEndian.PutUint32(lenBuf, uint32(valLen))
  173. if _, err := s.file.WriteAt(lenBuf, offset+5); err != nil {
  174. return false
  175. }
  176. if _, err := s.file.WriteAt([]byte(val), offset+HeaderSize); err != nil {
  177. return false
  178. }
  179. return true
  180. }
  181. // AppendOrReuse writes a new value, reusing FreeList or Appending.
  182. // Returns new offset.
  183. func (s *Storage) AppendOrReuse(val string) (int64, error) {
  184. valLen := len(val)
  185. newCap := alignCapacity(valLen)
  186. var writeOffset int64
  187. var actualCap uint32
  188. // Try Reuse
  189. reusedOffset, capFromFree, found := s.freeList.Pop(newCap)
  190. if found {
  191. writeOffset = reusedOffset
  192. actualCap = capFromFree
  193. } else {
  194. // Append
  195. totalSize := HeaderSize + int(newCap)
  196. newEnd := atomic.AddInt64(&s.offset, int64(totalSize))
  197. writeOffset = newEnd - int64(totalSize)
  198. actualCap = newCap
  199. }
  200. buf := make([]byte, HeaderSize+int(actualCap))
  201. buf[0] = FlagValid
  202. binary.LittleEndian.PutUint32(buf[1:], actualCap)
  203. binary.LittleEndian.PutUint32(buf[5:], uint32(valLen))
  204. copy(buf[HeaderSize:], []byte(val))
  205. if _, err := s.file.WriteAt(buf, writeOffset); err != nil {
  206. return 0, err
  207. }
  208. return writeOffset, nil
  209. }
  210. // MarkDeleted marks a slot as deleted and adds to free list.
  211. func (s *Storage) MarkDeleted(offset int64) error {
  212. capBuf := make([]byte, 4)
  213. if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
  214. return err
  215. }
  216. oldCap := binary.LittleEndian.Uint32(capBuf)
  217. if _, err := s.file.WriteAt([]byte{FlagDeleted}, offset); err != nil {
  218. return err
  219. }
  220. s.freeList.Add(oldCap, offset)
  221. return nil
  222. }
  223. // WriteValue is deprecated in favor of granular methods but kept for compatibility if needed.
  224. // We remove it to force usage of new safe flow.
  225. // ReadValue reads value at offset.
  226. func (s *Storage) ReadValue(offset int64) (string, error) {
  227. header := make([]byte, HeaderSize)
  228. if _, err := s.file.ReadAt(header, offset); err != nil {
  229. return "", err
  230. }
  231. flag := header[0]
  232. if flag == FlagDeleted {
  233. return "", fmt.Errorf("record deleted")
  234. }
  235. length := binary.LittleEndian.Uint32(header[5:])
  236. data := make([]byte, length)
  237. if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
  238. return "", err
  239. }
  240. return string(data), nil
  241. }
  242. func (s *Storage) Close() error {
  243. return s.file.Close()
  244. }
  245. // IndexEntry (Table 1)
  246. type IndexEntry struct {
  247. ValueOffset int64
  248. CommitIndex uint64
  249. }
  250. // InvertedIndexShard manages tokens for a subset of strings
  251. type InvertedIndexShard struct {
  252. KeyTokens map[string][]uint32
  253. ValueTokens map[string][]uint32
  254. mu sync.RWMutex
  255. }
  256. // InvertedIndex (Table 3) - Thread Safe with Sharding
  257. type InvertedIndex struct {
  258. Shards [16]*InvertedIndexShard
  259. }
  260. func NewInvertedIndex() *InvertedIndex {
  261. ii := &InvertedIndex{}
  262. for i := 0; i < 16; i++ {
  263. ii.Shards[i] = &InvertedIndexShard{
  264. KeyTokens: make(map[string][]uint32),
  265. ValueTokens: make(map[string][]uint32),
  266. }
  267. }
  268. return ii
  269. }
  270. func (ii *InvertedIndex) AddToken(token string, keyID uint32, isKey bool) {
  271. h := hash(token)
  272. shard := ii.Shards[h%16]
  273. shard.mu.Lock()
  274. defer shard.mu.Unlock()
  275. var targetMap map[string][]uint32
  276. if isKey {
  277. targetMap = shard.KeyTokens
  278. } else {
  279. targetMap = shard.ValueTokens
  280. }
  281. ids := targetMap[token]
  282. for _, id := range ids {
  283. if id == keyID {
  284. return
  285. }
  286. }
  287. targetMap[token] = append(ids, keyID)
  288. }
  289. // KeyMapShard manages a subset of keys.
  290. type KeyMapShard struct {
  291. StrToID map[string]uint32
  292. IDToStr map[uint32]string
  293. NextID uint32
  294. mu sync.RWMutex
  295. }
  296. // KeyMap maintains mapping using sharding to reduce lock contention.
  297. type KeyMap struct {
  298. Shards [16]*KeyMapShard
  299. }
  300. func NewKeyMap() *KeyMap {
  301. km := &KeyMap{}
  302. for i := 0; i < 16; i++ {
  303. km.Shards[i] = &KeyMapShard{
  304. StrToID: make(map[string]uint32),
  305. IDToStr: make(map[uint32]string),
  306. NextID: uint32(i + 1),
  307. }
  308. }
  309. return km
  310. }
  311. func (km *KeyMap) GetOrCreateID(key string) uint32 {
  312. h := hash(key)
  313. shard := km.Shards[h%16]
  314. shard.mu.Lock()
  315. defer shard.mu.Unlock()
  316. if id, ok := shard.StrToID[key]; ok {
  317. return id
  318. }
  319. id := shard.NextID
  320. shard.NextID += 16
  321. shard.StrToID[key] = id
  322. shard.IDToStr[id] = key
  323. return id
  324. }
  325. func (km *KeyMap) GetID(key string) (uint32, bool) {
  326. h := hash(key)
  327. shard := km.Shards[h%16]
  328. shard.mu.RLock()
  329. defer shard.mu.RUnlock()
  330. id, ok := shard.StrToID[key]
  331. return id, ok
  332. }
  333. func (km *KeyMap) GetStr(id uint32) (string, bool) {
  334. if id == 0 {
  335. return "", false
  336. }
  337. shardIdx := (id - 1) % 16
  338. shard := km.Shards[shardIdx]
  339. shard.mu.RLock()
  340. defer shard.mu.RUnlock()
  341. s, ok := shard.IDToStr[id]
  342. return s, ok
  343. }
  344. // IndexShard manages a subset of keys
  345. type IndexShard struct {
  346. Index map[uint32]IndexEntry
  347. mu sync.RWMutex
  348. }
  349. // Engine is the core storage engine with Sharded Locking.
  350. type Engine struct {
  351. // Table 1: Sharded Index
  352. Shards [16]*IndexShard
  353. // Key mapping
  354. Keys *KeyMap
  355. // Table 2: Disk Storage
  356. Storage *Storage
  357. // Table 3: Inverted Index
  358. SearchIndex *InvertedIndex
  359. KeyLocks StripedLock
  360. LastCommitIndex uint64
  361. commitMu sync.Mutex // Protects LastCommitIndex
  362. dataDir string
  363. }
  364. func NewEngine(dataDir string) (*Engine, error) {
  365. if err := os.MkdirAll(dataDir, 0755); err != nil {
  366. return nil, err
  367. }
  368. store, err := NewStorage(dataDir + "/values.data")
  369. if err != nil {
  370. return nil, err
  371. }
  372. e := &Engine{
  373. Keys: NewKeyMap(),
  374. Storage: store,
  375. SearchIndex: NewInvertedIndex(),
  376. dataDir: dataDir,
  377. }
  378. // Initialize shards
  379. for i := 0; i < 16; i++ {
  380. e.Shards[i] = &IndexShard{
  381. Index: make(map[uint32]IndexEntry),
  382. }
  383. }
  384. return e, nil
  385. }
  386. func (e *Engine) Close() error {
  387. return e.Storage.Close()
  388. }
  389. func (e *Engine) tokenizeKey(key string) []string {
  390. return strings.Split(key, ".")
  391. }
  392. func (e *Engine) tokenizeValue(val string) []string {
  393. f := func(c rune) bool {
  394. return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
  395. }
  396. return strings.FieldsFunc(val, f)
  397. }
  398. func (e *Engine) getShard(keyID uint32) *IndexShard {
  399. // Simple mod hashing for sharding
  400. return e.Shards[keyID%16]
  401. }
  402. func (e *Engine) Set(key, value string, commitIndex uint64) error {
  403. // 0. Lock Key (Protects against concurrent access to SAME key)
  404. // This ensures we don't have race conditions on "In-Place vs Move" or "Read vs Write" for this specific key.
  405. // Different keys are still processed in parallel.
  406. kLock := e.KeyLocks.GetLock(key)
  407. kLock.Lock()
  408. defer kLock.Unlock()
  409. // 1. Get KeyID (Thread Safe)
  410. keyID := e.Keys.GetOrCreateID(key)
  411. shard := e.getShard(keyID)
  412. // 2. Check existing entry (Lock Shard)
  413. var oldOffset int64 = -1
  414. shard.mu.RLock()
  415. if entry, ok := shard.Index[keyID]; ok {
  416. oldOffset = entry.ValueOffset
  417. }
  418. shard.mu.RUnlock()
  419. // 3. Try In-Place Update
  420. if e.Storage.TryUpdateInPlace(value, oldOffset) {
  421. // Update CommitIndex if needed (even for in-place)
  422. shard.mu.Lock()
  423. // Re-read to ensure no weird state, though kLock protects us mostly.
  424. // We just update the commit index here.
  425. entry := shard.Index[keyID]
  426. entry.CommitIndex = commitIndex
  427. shard.Index[keyID] = entry
  428. shard.mu.Unlock()
  429. e.commitMu.Lock()
  430. if commitIndex > e.LastCommitIndex {
  431. e.LastCommitIndex = commitIndex
  432. }
  433. e.commitMu.Unlock()
  434. return nil
  435. }
  436. // 4. Write New Value (Append/Reuse)
  437. // This happens if In-Place failed or it's a new key.
  438. newOffset, err := e.Storage.AppendOrReuse(value)
  439. if err != nil {
  440. return err
  441. }
  442. // 5. Update Index (Lock Shard)
  443. // CRITICAL: We update Index to point to NEW data BEFORE deleting OLD data.
  444. shard.mu.Lock()
  445. shard.Index[keyID] = IndexEntry{
  446. ValueOffset: newOffset,
  447. CommitIndex: commitIndex,
  448. }
  449. shard.mu.Unlock()
  450. // 6. Delete Old Data (if any)
  451. // Now that Index points to New, we can safely mark Old as deleted.
  452. if oldOffset >= 0 {
  453. e.Storage.MarkDeleted(oldOffset)
  454. }
  455. // Update global commit index
  456. e.commitMu.Lock()
  457. if commitIndex > e.LastCommitIndex {
  458. e.LastCommitIndex = commitIndex
  459. }
  460. e.commitMu.Unlock()
  461. // 7. Update Inverted Index (Thread Safe)
  462. for _, token := range e.tokenizeKey(key) {
  463. e.SearchIndex.AddToken(token, keyID, true)
  464. }
  465. for _, token := range e.tokenizeValue(value) {
  466. e.SearchIndex.AddToken(token, keyID, false)
  467. }
  468. return nil
  469. }
  470. func (e *Engine) Get(key string) (string, bool) {
  471. // Lock Key to prevent reading while it's being moved/updated
  472. kLock := e.KeyLocks.GetLock(key)
  473. kLock.RLock()
  474. defer kLock.RUnlock()
  475. keyID, ok := e.Keys.GetID(key)
  476. if !ok {
  477. return "", false
  478. }
  479. shard := e.getShard(keyID)
  480. shard.mu.RLock()
  481. entry, ok := shard.Index[keyID]
  482. shard.mu.RUnlock()
  483. if !ok {
  484. return "", false
  485. }
  486. val, err := e.Storage.ReadValue(entry.ValueOffset)
  487. if err != nil {
  488. return "", false
  489. }
  490. return val, true
  491. }
  492. type QueryResult struct {
  493. Key string `json:"key"`
  494. Value string `json:"value"`
  495. CommitIndex uint64 `json:"commit_index"`
  496. }
  497. func WildcardMatch(str, pattern string) bool {
  498. s := []rune(str)
  499. p := []rune(pattern)
  500. sLen, pLen := len(s), len(p)
  501. i, j := 0, 0
  502. starIdx, matchIdx := -1, -1
  503. for i < sLen {
  504. if j < pLen && (p[j] == '?' || p[j] == s[i]) {
  505. i++
  506. j++
  507. } else if j < pLen && p[j] == '*' {
  508. starIdx = j
  509. matchIdx = i
  510. j++
  511. } else if starIdx != -1 {
  512. j = starIdx + 1
  513. matchIdx++
  514. i = matchIdx
  515. } else {
  516. return false
  517. }
  518. }
  519. for j < pLen && p[j] == '*' {
  520. j++
  521. }
  522. return j == pLen
  523. }
  524. func (e *Engine) scanTokens(indexType string, pattern string) []uint32 {
  525. // pattern is usually like "*word*" or "word"
  526. // We extract simple tokens from pattern.
  527. // If pattern contains * or ?, we must iterate ALL tokens in index (slow but maybe faster than all docs).
  528. // If pattern is simple word, we do direct lookup.
  529. cleanPattern := strings.ReplaceAll(pattern, "*", "")
  530. cleanPattern = strings.ReplaceAll(cleanPattern, "?", "")
  531. // If the pattern, after removing wildcards, still contains separators,
  532. // it means it spans multiple tokens. Single token lookup won't work easily.
  533. // e.g. "hello.world" -> tokens "hello", "world".
  534. // But simple check: if cleanPattern has no special chars, we try exact lookup first?
  535. // Actually, inverted index keys are EXACT tokens.
  536. var candidates []uint32
  537. candidatesMap := make(map[uint32]bool)
  538. // Scan all 16 shards
  539. for i := 0; i < 16; i++ {
  540. shard := e.SearchIndex.Shards[i]
  541. shard.mu.RLock()
  542. var targetMap map[string][]uint32
  543. if indexType == "key" {
  544. targetMap = shard.KeyTokens
  545. } else {
  546. targetMap = shard.ValueTokens
  547. }
  548. for token, ids := range targetMap {
  549. if WildcardMatch(token, pattern) {
  550. for _, id := range ids {
  551. candidatesMap[id] = true
  552. }
  553. }
  554. }
  555. shard.mu.RUnlock()
  556. }
  557. for id := range candidatesMap {
  558. candidates = append(candidates, id)
  559. }
  560. return candidates
  561. }
  562. func (e *Engine) Query(sql string) ([]QueryResult, error) {
  563. sql = strings.TrimSpace(sql)
  564. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  565. matches := re.FindAllStringSubmatch(sql, -1)
  566. if len(matches) == 0 {
  567. return nil, fmt.Errorf("invalid query")
  568. }
  569. extractString := func(s string) string {
  570. return strings.Trim(strings.TrimSpace(s), "\"")
  571. }
  572. // Optimization 1: Key Point Lookup
  573. // Check if there is a 'key = "..."' condition
  574. for _, match := range matches {
  575. if match[1] == "key" && match[2] == "=" {
  576. targetKey := extractString(match[3])
  577. // Fast path!
  578. if val, ok := e.Get(targetKey); ok {
  579. // We have the record, but we must check other conditions too
  580. // Construct a dummy entry to reuse filter logic or just check manual
  581. // Since we have the value, let's just check.
  582. // Need CommitIndex.
  583. keyID, _ := e.Keys.GetID(targetKey)
  584. shard := e.getShard(keyID)
  585. shard.mu.RLock()
  586. entry := shard.Index[keyID]
  587. shard.mu.RUnlock()
  588. matchAll := true
  589. for _, m := range matches {
  590. f, op, vRaw := m[1], m[2], m[3]
  591. switch f {
  592. case "CommitIndex":
  593. num, _ := strconv.ParseUint(vRaw, 10, 64)
  594. switch op {
  595. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  596. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  597. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  598. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  599. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  600. }
  601. case "value":
  602. t := extractString(vRaw)
  603. switch op {
  604. case "=": if val != t { matchAll = false }
  605. case "like": if !WildcardMatch(val, t) { matchAll = false }
  606. }
  607. }
  608. if !matchAll { break }
  609. }
  610. if matchAll {
  611. return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
  612. }
  613. return []QueryResult{}, nil
  614. }
  615. return []QueryResult{}, nil
  616. }
  617. }
  618. // Optimization 2: Inverted Index Candidate Generation
  619. // Check for 'like' queries on key/value that are simple enough
  620. var candidateIDs []uint32
  621. useCandidates := false
  622. for _, match := range matches {
  623. if (match[1] == "key" || match[1] == "value") && match[2] == "like" {
  624. pattern := extractString(match[3])
  625. // Basic heuristic: pattern should have at least some non-wildcard chars to be useful
  626. clean := strings.ReplaceAll(strings.ReplaceAll(pattern, "*", ""), "?", "")
  627. if len(clean) > 2 { // Only optimize if pattern is specific enough
  628. ids := e.scanTokens(match[1], pattern)
  629. if !useCandidates {
  630. candidateIDs = ids
  631. useCandidates = true
  632. } else {
  633. // Intersection
  634. // Convert current candidates to map for fast check
  635. valid := make(map[uint32]bool)
  636. for _, id := range ids {
  637. valid[id] = true
  638. }
  639. var newCandidates []uint32
  640. for _, existing := range candidateIDs {
  641. if valid[existing] {
  642. newCandidates = append(newCandidates, existing)
  643. }
  644. }
  645. candidateIDs = newCandidates
  646. }
  647. }
  648. }
  649. }
  650. var results []QueryResult
  651. var mu sync.Mutex // Protect results append
  652. // Scan Function
  653. scanLogic := func(ids []uint32) {
  654. // Group IDs by shard to minimize lock thrashing if we are careful,
  655. // but simple parallel loop is easier.
  656. // If we have specific IDs, we don't need to scan all shards.
  657. var wg sync.WaitGroup
  658. // Use limited workers for ID list to avoid spawning too many goroutines if list is huge
  659. // Or just simple loop if list is small.
  660. processID := func(kID uint32) {
  661. // Get Key String
  662. keyStr, ok := e.Keys.GetStr(kID)
  663. if !ok { return }
  664. // Get Entry
  665. shard := e.getShard(kID)
  666. shard.mu.RLock()
  667. entry, ok := shard.Index[kID]
  668. shard.mu.RUnlock()
  669. if !ok { return }
  670. var valStr string
  671. var valLoaded bool
  672. matchAll := true
  673. for _, match := range matches {
  674. field := match[1]
  675. op := match[2]
  676. valRaw := match[3]
  677. switch field {
  678. case "CommitIndex":
  679. num, _ := strconv.ParseUint(valRaw, 10, 64)
  680. switch op {
  681. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  682. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  683. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  684. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  685. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  686. }
  687. case "key":
  688. target := extractString(valRaw)
  689. switch op {
  690. case "=": if keyStr != target { matchAll = false }
  691. case "like": if !WildcardMatch(keyStr, target) { matchAll = false }
  692. }
  693. case "value":
  694. if !valLoaded {
  695. v, err := e.Storage.ReadValue(entry.ValueOffset)
  696. if err != nil { matchAll = false; break }
  697. valStr = v
  698. valLoaded = true
  699. }
  700. target := extractString(valRaw)
  701. switch op {
  702. case "=": if valStr != target { matchAll = false }
  703. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  704. }
  705. }
  706. if !matchAll { break }
  707. }
  708. if matchAll {
  709. if !valLoaded {
  710. v, err := e.Storage.ReadValue(entry.ValueOffset)
  711. if err == nil { valStr = v }
  712. }
  713. mu.Lock()
  714. results = append(results, QueryResult{
  715. Key: keyStr,
  716. Value: valStr,
  717. CommitIndex: entry.CommitIndex,
  718. })
  719. mu.Unlock()
  720. }
  721. }
  722. if len(ids) < 1000 {
  723. for _, id := range ids {
  724. processID(id)
  725. }
  726. } else {
  727. // Parallelize
  728. chunks := 16
  729. chunkSize := (len(ids) + chunks - 1) / chunks
  730. for i := 0; i < chunks; i++ {
  731. start := i * chunkSize
  732. end := start + chunkSize
  733. if start >= len(ids) { break }
  734. if end > len(ids) { end = len(ids) }
  735. wg.Add(1)
  736. go func(sub []uint32) {
  737. defer wg.Done()
  738. for _, id := range sub {
  739. processID(id)
  740. }
  741. }(ids[start:end])
  742. }
  743. wg.Wait()
  744. }
  745. }
  746. if useCandidates {
  747. // Use Index optimization
  748. scanLogic(candidateIDs)
  749. } else {
  750. // Full Scan (Parallel by Shard)
  751. var wg sync.WaitGroup
  752. for i := 0; i < 16; i++ {
  753. wg.Add(1)
  754. go func(shardIdx int) {
  755. defer wg.Done()
  756. shard := e.Shards[shardIdx]
  757. shard.mu.RLock()
  758. // Snapshot IDs to minimize lock time
  759. ids := make([]uint32, 0, len(shard.Index))
  760. for k := range shard.Index {
  761. ids = append(ids, k)
  762. }
  763. shard.mu.RUnlock()
  764. // Re-use scanLogic logic but simplified for single goroutine per shard
  765. // Actually we can just copy-paste the inner logic or call a helper.
  766. // To avoid huge refactor, let's just iterate.
  767. for _, kID := range ids {
  768. // ... (Same logic as processID above) ...
  769. // Copying processID logic here for now to avoid scope issues or closure overhead
  770. keyStr, ok := e.Keys.GetStr(kID)
  771. if !ok { continue }
  772. shard.mu.RLock()
  773. entry, ok := shard.Index[kID]
  774. shard.mu.RUnlock()
  775. if !ok { continue }
  776. var valStr string
  777. var valLoaded bool
  778. matchAll := true
  779. for _, match := range matches {
  780. field := match[1]
  781. op := match[2]
  782. valRaw := match[3]
  783. switch field {
  784. case "CommitIndex":
  785. num, _ := strconv.ParseUint(valRaw, 10, 64)
  786. switch op {
  787. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  788. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  789. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  790. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  791. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  792. }
  793. case "key":
  794. target := extractString(valRaw)
  795. switch op {
  796. case "=": if keyStr != target { matchAll = false }
  797. case "like": if !WildcardMatch(keyStr, target) { matchAll = false }
  798. }
  799. case "value":
  800. if !valLoaded {
  801. v, err := e.Storage.ReadValue(entry.ValueOffset)
  802. if err != nil { matchAll = false; break }
  803. valStr = v
  804. valLoaded = true
  805. }
  806. target := extractString(valRaw)
  807. switch op {
  808. case "=": if valStr != target { matchAll = false }
  809. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  810. }
  811. }
  812. if !matchAll { break }
  813. }
  814. if matchAll {
  815. if !valLoaded {
  816. v, err := e.Storage.ReadValue(entry.ValueOffset)
  817. if err == nil { valStr = v }
  818. }
  819. mu.Lock()
  820. results = append(results, QueryResult{
  821. Key: keyStr,
  822. Value: valStr,
  823. CommitIndex: entry.CommitIndex,
  824. })
  825. mu.Unlock()
  826. }
  827. }
  828. }(i)
  829. }
  830. wg.Wait()
  831. }
  832. sort.Slice(results, func(i, j int) bool {
  833. return results[i].Key < results[j].Key
  834. })
  835. return results, nil
  836. }
  837. func (e *Engine) Snapshot() ([]byte, error) {
  838. // Lock all shards to get consistent snapshot?
  839. // Or just iterate. For Raft, we usually want a consistent point in time.
  840. // But simply locking one by one is "good enough" if state machine is paused during snapshot.
  841. // If state machine is NOT paused, we need global lock.
  842. // Assuming external caller pauses Apply(), we can just read.
  843. combinedIndex := make(map[uint32]IndexEntry)
  844. for i := 0; i < 16; i++ {
  845. e.Shards[i].mu.RLock()
  846. for k, v := range e.Shards[i].Index {
  847. combinedIndex[k] = v
  848. }
  849. e.Shards[i].mu.RUnlock()
  850. }
  851. data := struct {
  852. Index map[uint32]IndexEntry
  853. Keys *KeyMap
  854. LastCommitIndex uint64
  855. SearchIndex *InvertedIndex
  856. }{
  857. Index: combinedIndex,
  858. Keys: e.Keys,
  859. LastCommitIndex: e.LastCommitIndex,
  860. SearchIndex: e.SearchIndex,
  861. }
  862. return json.Marshal(data)
  863. }
  864. func (e *Engine) Restore(data []byte) error {
  865. var dump struct {
  866. Index map[uint32]IndexEntry
  867. Keys *KeyMap
  868. LastCommitIndex uint64
  869. SearchIndex *InvertedIndex
  870. }
  871. if err := json.Unmarshal(data, &dump); err != nil {
  872. return err
  873. }
  874. e.Keys = dump.Keys
  875. e.LastCommitIndex = dump.LastCommitIndex
  876. e.SearchIndex = dump.SearchIndex
  877. // Distribute Index to shards
  878. for i := 0; i < 16; i++ {
  879. e.Shards[i] = &IndexShard{Index: make(map[uint32]IndexEntry)}
  880. }
  881. for kID, entry := range dump.Index {
  882. shard := e.getShard(kID)
  883. shard.Index[kID] = entry
  884. }
  885. return nil
  886. }