engine.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104
  1. package db
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "os"
  8. "regexp"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. )
  15. // hash returns a 32-bit FNV-1a hash of the string
  16. func hash(s string) uint32 {
  17. var h uint32 = 2166136261
  18. for i := 0; i < len(s); i++ {
  19. h = (h * 16777619) ^ uint32(s[i])
  20. }
  21. return h
  22. }
  23. // --- Radix Tree Implementation (Minimal) ---
  24. type radixNode struct {
  25. path string
  26. indices string // first char of children paths for fast lookup
  27. children []*radixNode
  28. leaf *IndexEntry // If non-nil, this is a valid key
  29. key string // Full key stored at leaf for convenience
  30. }
  31. type RadixTree struct {
  32. root *radixNode
  33. size int
  34. mu sync.RWMutex
  35. }
  36. func NewRadixTree() *RadixTree {
  37. return &RadixTree{
  38. root: &radixNode{},
  39. }
  40. }
  41. // longestCommonPrefix finds the length of the shared prefix of two strings
  42. func longestCommonPrefix(k1, k2 string) int {
  43. max := len(k1)
  44. if len(k2) < max {
  45. max = len(k2)
  46. }
  47. var i int
  48. for i = 0; i < max; i++ {
  49. if k1[i] != k2[i] {
  50. break
  51. }
  52. }
  53. return i
  54. }
  55. // Insert adds a key and its index entry to the tree
  56. func (t *RadixTree) Insert(key string, entry IndexEntry) {
  57. t.mu.Lock()
  58. defer t.mu.Unlock()
  59. n := t.root
  60. search := key
  61. for {
  62. // Handle empty search string (shouldn't happen in recursion usually)
  63. if len(search) == 0 {
  64. if n.leaf == nil {
  65. t.size++
  66. }
  67. n.leaf = &entry
  68. n.key = key
  69. return
  70. }
  71. // Find child
  72. parent := n
  73. found := false
  74. for i, char := range []byte(n.indices) {
  75. if char == search[0] {
  76. // Found a child starting with the same char
  77. n = n.children[i]
  78. // Check common prefix
  79. common := longestCommonPrefix(search, n.path)
  80. if common == len(n.path) {
  81. // Full match of child path, continue down
  82. search = search[common:]
  83. found = true
  84. break // Continue outer loop
  85. } else {
  86. // Split the node
  87. // Current child n becomes a child of the new split node
  88. // 1. Create split node with part of path
  89. splitNode := &radixNode{
  90. path: n.path[:common],
  91. indices: string(n.path[common]), // The char that differs
  92. children: []*radixNode{n},
  93. }
  94. // 2. Update existing child n
  95. n.path = n.path[common:] // Remaining part
  96. // 3. Insert new leaf for the new key (if needed)
  97. // The new key diverges at 'common' index
  98. if len(search) == common {
  99. // The new key IS the split node
  100. splitNode.leaf = &entry
  101. splitNode.key = key
  102. t.size++
  103. } else {
  104. // The new key is a child of the split node
  105. newBranch := &radixNode{
  106. path: search[common:],
  107. leaf: &entry,
  108. key: key,
  109. }
  110. t.size++
  111. splitNode.indices += string(search[common])
  112. splitNode.children = append(splitNode.children, newBranch)
  113. // Keep indices sorted? Not strictly necessary for correctness but good for determinism
  114. }
  115. // 4. Update parent to point to splitNode instead of n
  116. parent.children[i] = splitNode
  117. return
  118. }
  119. }
  120. }
  121. if !found {
  122. // No matching child, add new child
  123. newChild := &radixNode{
  124. path: search,
  125. leaf: &entry,
  126. key: key,
  127. }
  128. n.indices += string(search[0])
  129. n.children = append(n.children, newChild)
  130. t.size++
  131. return
  132. }
  133. }
  134. }
  135. // Get retrieves an entry
  136. func (t *RadixTree) Get(key string) (IndexEntry, bool) {
  137. t.mu.RLock()
  138. defer t.mu.RUnlock()
  139. n := t.root
  140. search := key
  141. for {
  142. if len(search) == 0 {
  143. if n.leaf != nil {
  144. return *n.leaf, true
  145. }
  146. return IndexEntry{}, false
  147. }
  148. found := false
  149. for i, char := range []byte(n.indices) {
  150. if char == search[0] {
  151. n = n.children[i]
  152. if strings.HasPrefix(search, n.path) {
  153. search = search[len(n.path):]
  154. found = true
  155. break
  156. }
  157. return IndexEntry{}, false
  158. }
  159. }
  160. if !found {
  161. return IndexEntry{}, false
  162. }
  163. }
  164. }
  165. // WalkPrefix iterates over all keys starting with prefix.
  166. // Returns true from callback to continue, false to stop.
  167. func (t *RadixTree) WalkPrefix(prefix string, callback func(key string, entry IndexEntry) bool) {
  168. t.mu.RLock()
  169. defer t.mu.RUnlock()
  170. n := t.root
  171. search := prefix
  172. // 1. Locate the node covering the prefix
  173. for len(search) > 0 {
  174. found := false
  175. for i, char := range []byte(n.indices) {
  176. if char == search[0] {
  177. n = n.children[i]
  178. // 3 cases:
  179. // 1. n.path == search (Exact match or consume all search) -> found target node
  180. // 2. n.path starts with search -> found target node (it's inside n)
  181. // 3. search starts with n.path -> continue down
  182. common := longestCommonPrefix(search, n.path)
  183. if common == len(search) {
  184. // Prefix fully consumed. 'n' is the root of the subtree.
  185. search = ""
  186. found = true
  187. break
  188. } else if common == len(n.path) {
  189. // 'n' path fully consumed, continue deeper
  190. search = search[common:]
  191. found = true
  192. break
  193. } else {
  194. // Mismatch, prefix not found
  195. return
  196. }
  197. }
  198. }
  199. if !found {
  200. return
  201. }
  202. }
  203. // 2. Recursive walk from n
  204. t.recursiveWalk(n, callback)
  205. }
  206. func (t *RadixTree) recursiveWalk(n *radixNode, callback func(key string, entry IndexEntry) bool) bool {
  207. if n.leaf != nil {
  208. if !callback(n.key, *n.leaf) {
  209. return false
  210. }
  211. }
  212. // To iterate in order, we should ideally keep indices sorted.
  213. // For now, we just iterate. If order matters, we sort children locally.
  214. // Let's do a quick sort of indices to ensure lexicographical order.
  215. if len(n.children) > 1 {
  216. // Optimization: clone to avoid mutating tree during read lock?
  217. // Actually indices string is immutable. We need to know the permutation.
  218. // A simple way is to build a list of children sorted by first char.
  219. type childRef struct {
  220. char byte
  221. node *radixNode
  222. }
  223. refs := make([]childRef, len(n.children))
  224. for i, char := range []byte(n.indices) {
  225. refs[i] = childRef{char, n.children[i]}
  226. }
  227. sort.Slice(refs, func(i, j int) bool { return refs[i].char < refs[j].char })
  228. for _, ref := range refs {
  229. if !t.recursiveWalk(ref.node, callback) {
  230. return false
  231. }
  232. }
  233. } else {
  234. for _, child := range n.children {
  235. if !t.recursiveWalk(child, callback) {
  236. return false
  237. }
  238. }
  239. }
  240. return true
  241. }
  242. // --- End Radix Tree ---
  243. // --- Full Text Index (Simple In-Memory) ---
  244. // Replacing Table 3 with a cleaner Token -> []Key logic handled via Radix too?
  245. // No, Inverted Index maps Token -> List of Keys.
  246. // We can use a map[string][]string (Token -> Keys) for simplicity and speed.
  247. type FullTextIndex struct {
  248. // Token -> List of Keys (strings)
  249. // Using string keys instead of IDs simplifies things as we moved away from KeyMap ID logic.
  250. index map[string][]string
  251. mu sync.RWMutex
  252. }
  253. func NewFullTextIndex() *FullTextIndex {
  254. return &FullTextIndex{
  255. index: make(map[string][]string),
  256. }
  257. }
  258. func (fti *FullTextIndex) Add(key string, value string) {
  259. fti.mu.Lock()
  260. defer fti.mu.Unlock()
  261. tokens := tokenize(value)
  262. for _, token := range tokens {
  263. // Deduplication check per key is expensive O(N).
  264. // For high perf, we might accept duplicates or use a set per token (map[string]map[string]bool).
  265. // Let's use simple append for now, optimized for write.
  266. fti.index[token] = append(fti.index[token], key)
  267. }
  268. }
  269. // Search returns keys containing the token.
  270. // Supports wildcards: "token", "prefix*", "*suffix", "*contains*"
  271. // For simplicity in this demo, we implement prefix matching via iteration if wildcard used.
  272. // Exact match is O(1).
  273. func (fti *FullTextIndex) Search(tokenPattern string) []string {
  274. fti.mu.RLock()
  275. defer fti.mu.RUnlock()
  276. // 1. Exact Match
  277. if !strings.Contains(tokenPattern, "*") {
  278. if keys, ok := fti.index[tokenPattern]; ok {
  279. // Return copy to avoid race
  280. res := make([]string, len(keys))
  281. copy(res, keys)
  282. return res
  283. }
  284. return nil
  285. }
  286. // 2. Wildcard Scan (In-Memory Map Scan)
  287. // For production, we'd use a RadixTree for tokens too!
  288. // But let's keep it simple for now, just iterating map keys.
  289. // Optimization: If map is large, this is slow.
  290. var results []string
  291. seen := make(map[string]bool)
  292. for token, keys := range fti.index {
  293. if WildcardMatch(token, tokenPattern) {
  294. for _, k := range keys {
  295. if !seen[k] {
  296. results = append(results, k)
  297. seen[k] = true
  298. }
  299. }
  300. }
  301. }
  302. return results
  303. }
  304. func tokenize(val string) []string {
  305. f := func(c rune) bool {
  306. return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
  307. }
  308. // Optimization: Lowercase tokens for case-insensitive search if needed
  309. // For now, keep original case or lower?
  310. // Let's keep original to match benchmark expectations if any.
  311. // But usually FTI is case-insensitive.
  312. return strings.FieldsFunc(val, f)
  313. }
  314. // --- Storage & Cache ---
  315. // FreeList manages reusable disk space in memory using Best-Fit strategy.
  316. type FreeList struct {
  317. // Capacity -> Stack of Offsets
  318. buckets map[uint32][]int64
  319. // Sorted list of capacities available in buckets for fast Best-Fit lookup
  320. sortedCaps []uint32
  321. mu sync.Mutex
  322. }
  323. func NewFreeList() *FreeList {
  324. return &FreeList{
  325. buckets: make(map[uint32][]int64),
  326. }
  327. }
  328. // Add adds an offset to the free list for a given capacity.
  329. func (fl *FreeList) Add(cap uint32, offset int64) {
  330. fl.mu.Lock()
  331. defer fl.mu.Unlock()
  332. if _, exists := fl.buckets[cap]; !exists {
  333. fl.buckets[cap] = []int64{offset}
  334. // Maintain sortedCaps
  335. fl.sortedCaps = append(fl.sortedCaps, cap)
  336. sort.Slice(fl.sortedCaps, func(i, j int) bool { return fl.sortedCaps[i] < fl.sortedCaps[j] })
  337. } else {
  338. fl.buckets[cap] = append(fl.buckets[cap], offset)
  339. }
  340. }
  341. // Pop tries to get an available offset using Best-Fit strategy.
  342. func (fl *FreeList) Pop(targetCap uint32) (int64, uint32, bool) {
  343. fl.mu.Lock()
  344. defer fl.mu.Unlock()
  345. idx := sort.Search(len(fl.sortedCaps), func(i int) bool {
  346. return fl.sortedCaps[i] >= targetCap
  347. })
  348. if idx >= len(fl.sortedCaps) {
  349. return 0, 0, false
  350. }
  351. foundCap := fl.sortedCaps[idx]
  352. offsets := fl.buckets[foundCap]
  353. if len(offsets) == 0 {
  354. return 0, 0, false
  355. }
  356. lastIdx := len(offsets) - 1
  357. offset := offsets[lastIdx]
  358. if lastIdx == 0 {
  359. delete(fl.buckets, foundCap)
  360. fl.sortedCaps = append(fl.sortedCaps[:idx], fl.sortedCaps[idx+1:]...)
  361. } else {
  362. fl.buckets[foundCap] = offsets[:lastIdx]
  363. }
  364. return offset, foundCap, true
  365. }
  366. // StripedLock provides hashed locks for keys
  367. type StripedLock struct {
  368. locks [1024]sync.RWMutex
  369. }
  370. func (sl *StripedLock) GetLock(key string) *sync.RWMutex {
  371. h := hash(key)
  372. return &sl.locks[h%1024]
  373. }
  374. // Storage (Table 2) manages disk storage for values.
  375. type Storage struct {
  376. file *os.File
  377. filename string
  378. offset int64 // Current end of file (Atomic)
  379. freeList *FreeList
  380. // Simple LRU Cache for Values
  381. // Using sync.Map for simplicity in this iteration, though it's not LRU.
  382. // For true LRU we'd need a list+map protected by mutex.
  383. // Let's use a Mutex protected Map as a "Hot Cache".
  384. cache map[int64]string
  385. cacheMu sync.RWMutex
  386. }
  387. const (
  388. FlagDeleted = 0x00
  389. FlagValid = 0x01
  390. HeaderSize = 9 // 1(Flag) + 4(Cap) + 4(Len)
  391. AlignSize = 16 // Align capacity to 16 bytes
  392. MaxCacheSize = 10000 // Simple cap
  393. )
  394. func NewStorage(filename string) (*Storage, error) {
  395. f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
  396. if err != nil {
  397. return nil, err
  398. }
  399. st := &Storage{
  400. file: f,
  401. filename: filename,
  402. freeList: NewFreeList(),
  403. cache: make(map[int64]string),
  404. }
  405. if err := st.scan(); err != nil {
  406. f.Close()
  407. return nil, err
  408. }
  409. return st, nil
  410. }
  411. func (s *Storage) scan() error {
  412. offset := int64(0)
  413. if _, err := s.file.Seek(0, 0); err != nil {
  414. return err
  415. }
  416. reader := bufio.NewReader(s.file)
  417. for {
  418. header := make([]byte, HeaderSize)
  419. n, err := io.ReadFull(reader, header)
  420. if err == io.EOF {
  421. break
  422. }
  423. if err == io.ErrUnexpectedEOF && n == 0 {
  424. break
  425. }
  426. if err != nil {
  427. return err
  428. }
  429. flag := header[0]
  430. cap := binary.LittleEndian.Uint32(header[1:])
  431. if flag == FlagDeleted {
  432. s.freeList.Add(cap, offset)
  433. }
  434. discardLen := int(cap)
  435. if _, err := reader.Discard(discardLen); err != nil {
  436. buf := make([]byte, discardLen)
  437. if _, err := io.ReadFull(reader, buf); err != nil {
  438. return err
  439. }
  440. }
  441. offset += int64(HeaderSize + discardLen)
  442. }
  443. s.offset = offset
  444. return nil
  445. }
  446. func alignCapacity(length int) uint32 {
  447. if length == 0 {
  448. return AlignSize
  449. }
  450. cap := (length + AlignSize - 1) / AlignSize * AlignSize
  451. return uint32(cap)
  452. }
  453. // TryUpdateInPlace tries to update value in-place if capacity allows.
  454. func (s *Storage) TryUpdateInPlace(val string, offset int64) bool {
  455. if offset < 0 {
  456. return false
  457. }
  458. valLen := len(val)
  459. capBuf := make([]byte, 4)
  460. if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
  461. return false
  462. }
  463. oldCap := binary.LittleEndian.Uint32(capBuf)
  464. if uint32(valLen) > oldCap {
  465. return false
  466. }
  467. // Fits! Write Length then Data
  468. lenBuf := make([]byte, 4)
  469. binary.LittleEndian.PutUint32(lenBuf, uint32(valLen))
  470. if _, err := s.file.WriteAt(lenBuf, offset+5); err != nil {
  471. return false
  472. }
  473. if _, err := s.file.WriteAt([]byte(val), offset+HeaderSize); err != nil {
  474. return false
  475. }
  476. // Update Cache
  477. s.cacheMu.Lock()
  478. s.cache[offset] = val
  479. s.cacheMu.Unlock()
  480. return true
  481. }
  482. // AppendOrReuse writes a new value, reusing FreeList or Appending.
  483. func (s *Storage) AppendOrReuse(val string) (int64, error) {
  484. valLen := len(val)
  485. newCap := alignCapacity(valLen)
  486. var writeOffset int64
  487. var actualCap uint32
  488. // Try Reuse
  489. reusedOffset, capFromFree, found := s.freeList.Pop(newCap)
  490. if found {
  491. writeOffset = reusedOffset
  492. actualCap = capFromFree
  493. } else {
  494. // Append
  495. totalSize := HeaderSize + int(newCap)
  496. newEnd := atomic.AddInt64(&s.offset, int64(totalSize))
  497. writeOffset = newEnd - int64(totalSize)
  498. actualCap = newCap
  499. }
  500. buf := make([]byte, HeaderSize+int(actualCap))
  501. buf[0] = FlagValid
  502. binary.LittleEndian.PutUint32(buf[1:], actualCap)
  503. binary.LittleEndian.PutUint32(buf[5:], uint32(valLen))
  504. copy(buf[HeaderSize:], []byte(val))
  505. if _, err := s.file.WriteAt(buf, writeOffset); err != nil {
  506. return 0, err
  507. }
  508. // Add to Cache
  509. s.cacheMu.Lock()
  510. if len(s.cache) < MaxCacheSize {
  511. s.cache[writeOffset] = val
  512. }
  513. s.cacheMu.Unlock()
  514. return writeOffset, nil
  515. }
  516. // MarkDeleted marks a slot as deleted.
  517. func (s *Storage) MarkDeleted(offset int64) error {
  518. capBuf := make([]byte, 4)
  519. if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
  520. return err
  521. }
  522. oldCap := binary.LittleEndian.Uint32(capBuf)
  523. if _, err := s.file.WriteAt([]byte{FlagDeleted}, offset); err != nil {
  524. return err
  525. }
  526. s.freeList.Add(oldCap, offset)
  527. // Remove from Cache
  528. s.cacheMu.Lock()
  529. delete(s.cache, offset)
  530. s.cacheMu.Unlock()
  531. return nil
  532. }
  533. // ReadValue reads value at offset with Caching.
  534. func (s *Storage) ReadValue(offset int64) (string, error) {
  535. // 1. Check Cache
  536. s.cacheMu.RLock()
  537. if val, ok := s.cache[offset]; ok {
  538. s.cacheMu.RUnlock()
  539. return val, nil
  540. }
  541. s.cacheMu.RUnlock()
  542. // 2. Read Disk
  543. header := make([]byte, HeaderSize)
  544. if _, err := s.file.ReadAt(header, offset); err != nil {
  545. return "", err
  546. }
  547. flag := header[0]
  548. if flag == FlagDeleted {
  549. return "", fmt.Errorf("record deleted")
  550. }
  551. length := binary.LittleEndian.Uint32(header[5:])
  552. data := make([]byte, length)
  553. if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
  554. return "", err
  555. }
  556. val := string(data)
  557. // 3. Fill Cache
  558. s.cacheMu.Lock()
  559. if len(s.cache) < MaxCacheSize {
  560. s.cache[offset] = val
  561. } else {
  562. // Random eviction (map iteration order is random)
  563. // This is a poor man's eviction, but fast.
  564. for k := range s.cache {
  565. delete(s.cache, k)
  566. break
  567. }
  568. s.cache[offset] = val
  569. }
  570. s.cacheMu.Unlock()
  571. return val, nil
  572. }
  573. func (s *Storage) Close() error {
  574. return s.file.Close()
  575. }
  576. // IndexEntry (Table 1)
  577. type IndexEntry struct {
  578. ValueOffset int64
  579. CommitIndex uint64
  580. }
  581. // Engine is the core storage engine.
  582. type Engine struct {
  583. // Replaces KeyMap + Sharded Index with a single Radix Tree
  584. // Radix Tree is thread-safe and stores IndexEntry directly.
  585. Index *RadixTree
  586. // Table 2: Disk Storage
  587. Storage *Storage
  588. // Table 3: Full Text Index (New Implementation)
  589. FTIndex *FullTextIndex
  590. KeyLocks StripedLock
  591. LastCommitIndex uint64
  592. commitMu sync.Mutex // Protects LastCommitIndex
  593. dataDir string
  594. }
  595. func NewEngine(dataDir string) (*Engine, error) {
  596. if err := os.MkdirAll(dataDir, 0755); err != nil {
  597. return nil, err
  598. }
  599. store, err := NewStorage(dataDir + "/values.data")
  600. if err != nil {
  601. return nil, err
  602. }
  603. e := &Engine{
  604. Index: NewRadixTree(),
  605. Storage: store,
  606. FTIndex: NewFullTextIndex(),
  607. dataDir: dataDir,
  608. }
  609. return e, nil
  610. }
  611. func (e *Engine) Close() error {
  612. return e.Storage.Close()
  613. }
  614. func (e *Engine) Set(key, value string, commitIndex uint64) error {
  615. // 0. Lock Key
  616. kLock := e.KeyLocks.GetLock(key)
  617. kLock.Lock()
  618. defer kLock.Unlock()
  619. // 1. Check existing
  620. // We no longer need separate KeyID lookup. Radix Tree stores it all.
  621. var oldOffset int64 = -1
  622. if entry, ok := e.Index.Get(key); ok {
  623. oldOffset = entry.ValueOffset
  624. }
  625. // 2. Try In-Place
  626. if e.Storage.TryUpdateInPlace(value, oldOffset) {
  627. // Update Index (CommitIndex might change)
  628. entry := IndexEntry{ValueOffset: oldOffset, CommitIndex: commitIndex}
  629. e.Index.Insert(key, entry)
  630. e.commitMu.Lock()
  631. if commitIndex > e.LastCommitIndex {
  632. e.LastCommitIndex = commitIndex
  633. }
  634. e.commitMu.Unlock()
  635. return nil
  636. }
  637. // 3. Write New
  638. newOffset, err := e.Storage.AppendOrReuse(value)
  639. if err != nil {
  640. return err
  641. }
  642. // 4. Update Index
  643. // Write New Entry
  644. e.Index.Insert(key, IndexEntry{
  645. ValueOffset: newOffset,
  646. CommitIndex: commitIndex,
  647. })
  648. // 5. Delete Old
  649. if oldOffset >= 0 {
  650. e.Storage.MarkDeleted(oldOffset)
  651. }
  652. // Update global commit index
  653. e.commitMu.Lock()
  654. if commitIndex > e.LastCommitIndex {
  655. e.LastCommitIndex = commitIndex
  656. }
  657. e.commitMu.Unlock()
  658. // 6. Update Full Text Index
  659. // Ideally we remove old tokens too, but that's expensive without reverse index.
  660. // For append-only log structured db, we just add new ones.
  661. e.FTIndex.Add(key, value)
  662. return nil
  663. }
  664. func (e *Engine) Get(key string) (string, bool) {
  665. kLock := e.KeyLocks.GetLock(key)
  666. kLock.RLock()
  667. defer kLock.RUnlock()
  668. entry, ok := e.Index.Get(key)
  669. if !ok {
  670. return "", false
  671. }
  672. val, err := e.Storage.ReadValue(entry.ValueOffset)
  673. if err != nil {
  674. return "", false
  675. }
  676. return val, true
  677. }
  678. type QueryResult struct {
  679. Key string `json:"key"`
  680. Value string `json:"value"`
  681. CommitIndex uint64 `json:"commit_index"`
  682. }
  683. func WildcardMatch(str, pattern string) bool {
  684. s := []rune(str)
  685. p := []rune(pattern)
  686. sLen, pLen := len(s), len(p)
  687. i, j := 0, 0
  688. starIdx, matchIdx := -1, -1
  689. for i < sLen {
  690. if j < pLen && (p[j] == '?' || p[j] == s[i]) {
  691. i++
  692. j++
  693. } else if j < pLen && p[j] == '*' {
  694. starIdx = j
  695. matchIdx = i
  696. j++
  697. } else if starIdx != -1 {
  698. j = starIdx + 1
  699. matchIdx++
  700. i = matchIdx
  701. } else {
  702. return false
  703. }
  704. }
  705. for j < pLen && p[j] == '*' {
  706. j++
  707. }
  708. return j == pLen
  709. }
  710. func (e *Engine) Query(sql string) ([]QueryResult, error) {
  711. sql = strings.TrimSpace(sql)
  712. reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
  713. reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
  714. limit := -1
  715. offset := 0
  716. if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
  717. limit, _ = strconv.Atoi(match[1])
  718. sql = reLimit.ReplaceAllString(sql, "")
  719. }
  720. if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
  721. offset, _ = strconv.Atoi(match[1])
  722. sql = reOffset.ReplaceAllString(sql, "")
  723. }
  724. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  725. matches := re.FindAllStringSubmatch(sql, -1)
  726. if len(matches) == 0 {
  727. return nil, fmt.Errorf("invalid query")
  728. }
  729. extractString := func(s string) string {
  730. return strings.Trim(strings.TrimSpace(s), "\"")
  731. }
  732. // Optimization: Point Lookup Fast Path
  733. // If query is exactly `key = "..."`, utilize Hash/Radix Lookup O(1)
  734. for _, match := range matches {
  735. if match[1] == "key" && match[2] == "=" {
  736. targetKey := extractString(match[3])
  737. // 1. Get Entry
  738. entry, ok := e.Index.Get(targetKey)
  739. if !ok {
  740. return []QueryResult{}, nil
  741. }
  742. // 2. Load Value (if needed by other filters, but here we load anyway for result)
  743. val, err := e.Storage.ReadValue(entry.ValueOffset)
  744. if err != nil {
  745. return []QueryResult{}, nil
  746. }
  747. // 3. Verify other conditions
  748. matchAll := true
  749. for _, m := range matches {
  750. field, op, valRaw := m[1], m[2], m[3]
  751. switch field {
  752. case "CommitIndex":
  753. num, _ := strconv.ParseUint(valRaw, 10, 64)
  754. switch op {
  755. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  756. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  757. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  758. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  759. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  760. }
  761. case "value":
  762. t := extractString(valRaw)
  763. switch op {
  764. case "=": if val != t { matchAll = false }
  765. case "like": if !WildcardMatch(val, t) { matchAll = false }
  766. }
  767. }
  768. if !matchAll { break }
  769. }
  770. if matchAll {
  771. return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
  772. }
  773. return []QueryResult{}, nil
  774. }
  775. }
  776. // Optimization: Inverted Index for Value Queries
  777. // Strategy:
  778. // 1. Extract potential tokens from `value like "..."`
  779. // e.g. `value like "*keyword*"` -> token "keyword"
  780. // 2. Look up candidates from FTIndex
  781. // 3. Intersect/Union candidates (if multiple)
  782. // 4. Fallback to Scan if no tokens found or complex query
  783. var candidates map[string]bool
  784. var useFTIndex bool = false
  785. for _, match := range matches {
  786. if match[1] == "value" && match[2] == "like" {
  787. pattern := extractString(match[3])
  788. // Extract a core token: remove * from ends
  789. // Simplistic extraction: find longest sequence of alphanumeric?
  790. // For now, assume pattern is like "*token*" or "token*"
  791. clean := strings.Trim(pattern, "*")
  792. if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
  793. // We have a candidate token "clean"
  794. // FTIndex stores partial tokens? No, exact tokens.
  795. // If query is *partial*, we need FTI scan.
  796. // Our FTI.Search handles wildcards!
  797. matches := e.FTIndex.Search(pattern) // Pass original pattern to FTI
  798. if matches != nil {
  799. // We found candidates!
  800. currentSet := make(map[string]bool)
  801. for _, k := range matches {
  802. currentSet[k] = true
  803. }
  804. if !useFTIndex {
  805. candidates = currentSet
  806. useFTIndex = true
  807. } else {
  808. // Intersect
  809. newSet := make(map[string]bool)
  810. for k := range candidates {
  811. if currentSet[k] {
  812. newSet[k] = true
  813. }
  814. }
  815. candidates = newSet
  816. }
  817. } else {
  818. // Pattern produced NO matches -> Empty Result
  819. return []QueryResult{}, nil
  820. }
  821. }
  822. }
  823. }
  824. // Prepare Iterator
  825. var iterator func(func(string, IndexEntry) bool)
  826. if useFTIndex {
  827. // Iterate ONLY candidates
  828. iterator = func(cb func(string, IndexEntry) bool) {
  829. // Iterate candidates sorted for deterministic output (and better cache locality?)
  830. // Sort candidates keys
  831. keys := make([]string, 0, len(candidates))
  832. for k := range candidates {
  833. keys = append(keys, k)
  834. }
  835. sort.Strings(keys)
  836. for _, k := range keys {
  837. if entry, ok := e.Index.Get(k); ok {
  838. if !cb(k, entry) {
  839. return
  840. }
  841. }
  842. }
  843. }
  844. } else {
  845. // Full Scan or Prefix Scan
  846. var prefix string = ""
  847. var usePrefix bool = false
  848. for _, match := range matches {
  849. if match[1] == "key" && match[2] == "like" {
  850. pattern := extractString(match[3])
  851. if strings.HasSuffix(pattern, "*") {
  852. clean := pattern[:len(pattern)-1]
  853. if !strings.ContainsAny(clean, "*?") {
  854. prefix = clean
  855. usePrefix = true
  856. break
  857. }
  858. }
  859. }
  860. }
  861. iterator = func(cb func(string, IndexEntry) bool) {
  862. if usePrefix {
  863. e.Index.WalkPrefix(prefix, cb)
  864. } else {
  865. e.Index.WalkPrefix("", cb)
  866. }
  867. }
  868. }
  869. var results []QueryResult
  870. var mu sync.Mutex
  871. // Execution
  872. iterator(func(key string, entry IndexEntry) bool {
  873. var valStr string
  874. var valLoaded bool
  875. matchAll := true
  876. for _, match := range matches {
  877. field, op, valRaw := match[1], match[2], match[3]
  878. switch field {
  879. case "CommitIndex":
  880. num, _ := strconv.ParseUint(valRaw, 10, 64)
  881. switch op {
  882. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  883. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  884. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  885. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  886. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  887. }
  888. case "key":
  889. target := extractString(valRaw)
  890. switch op {
  891. case "=": if key != target { matchAll = false }
  892. case "like": if !WildcardMatch(key, target) { matchAll = false }
  893. }
  894. case "value":
  895. // Optimization: If using FTIndex, we know token matches, but pattern might be complex.
  896. // However, FTI.Search already filtered by pattern!
  897. // So if we trusted FTI, we could skip this check IF it was the only value check.
  898. // But let's be safe and re-check, especially if multiple value conditions exist.
  899. if !valLoaded {
  900. v, err := e.Storage.ReadValue(entry.ValueOffset)
  901. if err != nil { matchAll = false; break }
  902. valStr = v
  903. valLoaded = true
  904. }
  905. target := extractString(valRaw)
  906. switch op {
  907. case "=": if valStr != target { matchAll = false }
  908. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  909. }
  910. }
  911. if !matchAll { break }
  912. }
  913. if matchAll {
  914. if !valLoaded {
  915. v, err := e.Storage.ReadValue(entry.ValueOffset)
  916. if err == nil { valStr = v }
  917. }
  918. mu.Lock()
  919. results = append(results, QueryResult{
  920. Key: key,
  921. Value: valStr,
  922. CommitIndex: entry.CommitIndex,
  923. })
  924. // Optimization: Early termination for LIMIT
  925. // Current Logic: Only optimizes if offset == 0
  926. // Improvement: Optimize for Offset too.
  927. // If we have collected enough results (Limit + Offset), we can stop.
  928. // Radix Walk is ordered.
  929. needed := limit + offset
  930. if limit > 0 && len(results) >= needed {
  931. mu.Unlock()
  932. return false
  933. }
  934. mu.Unlock()
  935. }
  936. return true
  937. })
  938. // Pagination
  939. if offset > 0 {
  940. if offset >= len(results) {
  941. return []QueryResult{}, nil
  942. }
  943. results = results[offset:]
  944. }
  945. if limit >= 0 {
  946. if limit < len(results) {
  947. results = results[:limit]
  948. }
  949. }
  950. return results, nil
  951. }