engine.go 23 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000
  1. package db
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "os"
  8. "regexp"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. )
  15. // hash returns a 32-bit FNV-1a hash of the string
  16. func hash(s string) uint32 {
  17. var h uint32 = 2166136261
  18. for i := 0; i < len(s); i++ {
  19. h = (h * 16777619) ^ uint32(s[i])
  20. }
  21. return h
  22. }
  23. // --- Radix Tree Implementation (Minimal) ---
  24. type radixNode struct {
  25. path string
  26. indices string // first char of children paths for fast lookup
  27. children []*radixNode
  28. leaf *IndexEntry // If non-nil, this is a valid key
  29. key string // Full key stored at leaf for convenience
  30. }
  31. type RadixTree struct {
  32. root *radixNode
  33. size int
  34. mu sync.RWMutex
  35. }
  36. func NewRadixTree() *RadixTree {
  37. return &RadixTree{
  38. root: &radixNode{},
  39. }
  40. }
  41. // longestCommonPrefix finds the length of the shared prefix of two strings
  42. func longestCommonPrefix(k1, k2 string) int {
  43. max := len(k1)
  44. if len(k2) < max {
  45. max = len(k2)
  46. }
  47. var i int
  48. for i = 0; i < max; i++ {
  49. if k1[i] != k2[i] {
  50. break
  51. }
  52. }
  53. return i
  54. }
  55. // Insert adds a key and its index entry to the tree
  56. func (t *RadixTree) Insert(key string, entry IndexEntry) {
  57. t.mu.Lock()
  58. defer t.mu.Unlock()
  59. n := t.root
  60. search := key
  61. for {
  62. // Handle empty search string (shouldn't happen in recursion usually)
  63. if len(search) == 0 {
  64. if n.leaf == nil {
  65. t.size++
  66. }
  67. n.leaf = &entry
  68. n.key = key
  69. return
  70. }
  71. // Find child
  72. parent := n
  73. found := false
  74. for i, char := range []byte(n.indices) {
  75. if char == search[0] {
  76. // Found a child starting with the same char
  77. n = n.children[i]
  78. // Check common prefix
  79. common := longestCommonPrefix(search, n.path)
  80. if common == len(n.path) {
  81. // Full match of child path, continue down
  82. search = search[common:]
  83. found = true
  84. break // Continue outer loop
  85. } else {
  86. // Split the node
  87. // Current child n becomes a child of the new split node
  88. // 1. Create split node with part of path
  89. splitNode := &radixNode{
  90. path: n.path[:common],
  91. indices: string(n.path[common]), // The char that differs
  92. children: []*radixNode{n},
  93. }
  94. // 2. Update existing child n
  95. n.path = n.path[common:] // Remaining part
  96. // 3. Insert new leaf for the new key (if needed)
  97. // The new key diverges at 'common' index
  98. if len(search) == common {
  99. // The new key IS the split node
  100. splitNode.leaf = &entry
  101. splitNode.key = key
  102. t.size++
  103. } else {
  104. // The new key is a child of the split node
  105. newBranch := &radixNode{
  106. path: search[common:],
  107. leaf: &entry,
  108. key: key,
  109. }
  110. t.size++
  111. splitNode.indices += string(search[common])
  112. splitNode.children = append(splitNode.children, newBranch)
  113. // Keep indices sorted? Not strictly necessary for correctness but good for determinism
  114. }
  115. // 4. Update parent to point to splitNode instead of n
  116. parent.children[i] = splitNode
  117. return
  118. }
  119. }
  120. }
  121. if !found {
  122. // No matching child, add new child
  123. newChild := &radixNode{
  124. path: search,
  125. leaf: &entry,
  126. key: key,
  127. }
  128. n.indices += string(search[0])
  129. n.children = append(n.children, newChild)
  130. t.size++
  131. return
  132. }
  133. }
  134. }
  135. // Get retrieves an entry
  136. func (t *RadixTree) Get(key string) (IndexEntry, bool) {
  137. t.mu.RLock()
  138. defer t.mu.RUnlock()
  139. n := t.root
  140. search := key
  141. for {
  142. if len(search) == 0 {
  143. if n.leaf != nil {
  144. return *n.leaf, true
  145. }
  146. return IndexEntry{}, false
  147. }
  148. found := false
  149. for i, char := range []byte(n.indices) {
  150. if char == search[0] {
  151. n = n.children[i]
  152. if strings.HasPrefix(search, n.path) {
  153. search = search[len(n.path):]
  154. found = true
  155. break
  156. }
  157. return IndexEntry{}, false
  158. }
  159. }
  160. if !found {
  161. return IndexEntry{}, false
  162. }
  163. }
  164. }
  165. // WalkPrefix iterates over all keys starting with prefix.
  166. // Returns true from callback to continue, false to stop.
  167. func (t *RadixTree) WalkPrefix(prefix string, callback func(key string, entry IndexEntry) bool) {
  168. t.mu.RLock()
  169. defer t.mu.RUnlock()
  170. n := t.root
  171. search := prefix
  172. // 1. Locate the node covering the prefix
  173. for len(search) > 0 {
  174. found := false
  175. for i, char := range []byte(n.indices) {
  176. if char == search[0] {
  177. n = n.children[i]
  178. // 3 cases:
  179. // 1. n.path == search (Exact match or consume all search) -> found target node
  180. // 2. n.path starts with search -> found target node (it's inside n)
  181. // 3. search starts with n.path -> continue down
  182. common := longestCommonPrefix(search, n.path)
  183. if common == len(search) {
  184. // Prefix fully consumed. 'n' is the root of the subtree.
  185. search = ""
  186. found = true
  187. break
  188. } else if common == len(n.path) {
  189. // 'n' path fully consumed, continue deeper
  190. search = search[common:]
  191. found = true
  192. break
  193. } else {
  194. // Mismatch, prefix not found
  195. return
  196. }
  197. }
  198. }
  199. if !found {
  200. return
  201. }
  202. }
  203. // 2. Recursive walk from n
  204. t.recursiveWalk(n, callback)
  205. }
  206. func (t *RadixTree) recursiveWalk(n *radixNode, callback func(key string, entry IndexEntry) bool) bool {
  207. if n.leaf != nil {
  208. if !callback(n.key, *n.leaf) {
  209. return false
  210. }
  211. }
  212. // To iterate in order, we should ideally keep indices sorted.
  213. // For now, we just iterate. If order matters, we sort children locally.
  214. // Let's do a quick sort of indices to ensure lexicographical order.
  215. if len(n.children) > 1 {
  216. // Optimization: clone to avoid mutating tree during read lock?
  217. // Actually indices string is immutable. We need to know the permutation.
  218. // A simple way is to build a list of children sorted by first char.
  219. type childRef struct {
  220. char byte
  221. node *radixNode
  222. }
  223. refs := make([]childRef, len(n.children))
  224. for i, char := range []byte(n.indices) {
  225. refs[i] = childRef{char, n.children[i]}
  226. }
  227. sort.Slice(refs, func(i, j int) bool { return refs[i].char < refs[j].char })
  228. for _, ref := range refs {
  229. if !t.recursiveWalk(ref.node, callback) {
  230. return false
  231. }
  232. }
  233. } else {
  234. for _, child := range n.children {
  235. if !t.recursiveWalk(child, callback) {
  236. return false
  237. }
  238. }
  239. }
  240. return true
  241. }
  242. // --- End Radix Tree ---
  243. // --- Full Text Index (Simple In-Memory) ---
  244. // Replacing Table 3 with a cleaner Token -> []Key logic handled via Radix too?
  245. // No, Inverted Index maps Token -> List of Keys.
  246. // We can use a map[string][]string (Token -> Keys) for simplicity and speed.
  247. type FullTextIndex struct {
  248. // Token -> List of Keys (strings)
  249. // Using string keys instead of IDs simplifies things as we moved away from KeyMap ID logic.
  250. index map[string][]string
  251. mu sync.RWMutex
  252. }
  253. func NewFullTextIndex() *FullTextIndex {
  254. return &FullTextIndex{
  255. index: make(map[string][]string),
  256. }
  257. }
  258. func (fti *FullTextIndex) Add(key string, value string) {
  259. fti.mu.Lock()
  260. defer fti.mu.Unlock()
  261. tokens := tokenize(value)
  262. for _, token := range tokens {
  263. // Deduplication check per key is expensive O(N).
  264. // For high perf, we might accept duplicates or use a set per token (map[string]map[string]bool).
  265. // Let's use simple append for now, optimized for write.
  266. fti.index[token] = append(fti.index[token], key)
  267. }
  268. }
  269. func tokenize(val string) []string {
  270. f := func(c rune) bool {
  271. return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
  272. }
  273. return strings.FieldsFunc(val, f)
  274. }
  275. // --- Storage & Cache ---
  276. // FreeList manages reusable disk space in memory using Best-Fit strategy.
  277. type FreeList struct {
  278. // Capacity -> Stack of Offsets
  279. buckets map[uint32][]int64
  280. // Sorted list of capacities available in buckets for fast Best-Fit lookup
  281. sortedCaps []uint32
  282. mu sync.Mutex
  283. }
  284. func NewFreeList() *FreeList {
  285. return &FreeList{
  286. buckets: make(map[uint32][]int64),
  287. }
  288. }
  289. // Add adds an offset to the free list for a given capacity.
  290. func (fl *FreeList) Add(cap uint32, offset int64) {
  291. fl.mu.Lock()
  292. defer fl.mu.Unlock()
  293. if _, exists := fl.buckets[cap]; !exists {
  294. fl.buckets[cap] = []int64{offset}
  295. // Maintain sortedCaps
  296. fl.sortedCaps = append(fl.sortedCaps, cap)
  297. sort.Slice(fl.sortedCaps, func(i, j int) bool { return fl.sortedCaps[i] < fl.sortedCaps[j] })
  298. } else {
  299. fl.buckets[cap] = append(fl.buckets[cap], offset)
  300. }
  301. }
  302. // Pop tries to get an available offset using Best-Fit strategy.
  303. func (fl *FreeList) Pop(targetCap uint32) (int64, uint32, bool) {
  304. fl.mu.Lock()
  305. defer fl.mu.Unlock()
  306. idx := sort.Search(len(fl.sortedCaps), func(i int) bool {
  307. return fl.sortedCaps[i] >= targetCap
  308. })
  309. if idx >= len(fl.sortedCaps) {
  310. return 0, 0, false
  311. }
  312. foundCap := fl.sortedCaps[idx]
  313. offsets := fl.buckets[foundCap]
  314. if len(offsets) == 0 {
  315. return 0, 0, false
  316. }
  317. lastIdx := len(offsets) - 1
  318. offset := offsets[lastIdx]
  319. if lastIdx == 0 {
  320. delete(fl.buckets, foundCap)
  321. fl.sortedCaps = append(fl.sortedCaps[:idx], fl.sortedCaps[idx+1:]...)
  322. } else {
  323. fl.buckets[foundCap] = offsets[:lastIdx]
  324. }
  325. return offset, foundCap, true
  326. }
  327. // StripedLock provides hashed locks for keys
  328. type StripedLock struct {
  329. locks [1024]sync.RWMutex
  330. }
  331. func (sl *StripedLock) GetLock(key string) *sync.RWMutex {
  332. h := hash(key)
  333. return &sl.locks[h%1024]
  334. }
  335. // Storage (Table 2) manages disk storage for values.
  336. type Storage struct {
  337. file *os.File
  338. filename string
  339. offset int64 // Current end of file (Atomic)
  340. freeList *FreeList
  341. // Simple LRU Cache for Values
  342. // Using sync.Map for simplicity in this iteration, though it's not LRU.
  343. // For true LRU we'd need a list+map protected by mutex.
  344. // Let's use a Mutex protected Map as a "Hot Cache".
  345. cache map[int64]string
  346. cacheMu sync.RWMutex
  347. }
  348. const (
  349. FlagDeleted = 0x00
  350. FlagValid = 0x01
  351. HeaderSize = 9 // 1(Flag) + 4(Cap) + 4(Len)
  352. AlignSize = 16 // Align capacity to 16 bytes
  353. MaxCacheSize = 10000 // Simple cap
  354. )
  355. func NewStorage(filename string) (*Storage, error) {
  356. f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
  357. if err != nil {
  358. return nil, err
  359. }
  360. st := &Storage{
  361. file: f,
  362. filename: filename,
  363. freeList: NewFreeList(),
  364. cache: make(map[int64]string),
  365. }
  366. if err := st.scan(); err != nil {
  367. f.Close()
  368. return nil, err
  369. }
  370. return st, nil
  371. }
  372. func (s *Storage) scan() error {
  373. offset := int64(0)
  374. if _, err := s.file.Seek(0, 0); err != nil {
  375. return err
  376. }
  377. reader := bufio.NewReader(s.file)
  378. for {
  379. header := make([]byte, HeaderSize)
  380. n, err := io.ReadFull(reader, header)
  381. if err == io.EOF {
  382. break
  383. }
  384. if err == io.ErrUnexpectedEOF && n == 0 {
  385. break
  386. }
  387. if err != nil {
  388. return err
  389. }
  390. flag := header[0]
  391. cap := binary.LittleEndian.Uint32(header[1:])
  392. if flag == FlagDeleted {
  393. s.freeList.Add(cap, offset)
  394. }
  395. discardLen := int(cap)
  396. if _, err := reader.Discard(discardLen); err != nil {
  397. buf := make([]byte, discardLen)
  398. if _, err := io.ReadFull(reader, buf); err != nil {
  399. return err
  400. }
  401. }
  402. offset += int64(HeaderSize + discardLen)
  403. }
  404. s.offset = offset
  405. return nil
  406. }
  407. func alignCapacity(length int) uint32 {
  408. if length == 0 {
  409. return AlignSize
  410. }
  411. cap := (length + AlignSize - 1) / AlignSize * AlignSize
  412. return uint32(cap)
  413. }
  414. // TryUpdateInPlace tries to update value in-place if capacity allows.
  415. func (s *Storage) TryUpdateInPlace(val string, offset int64) bool {
  416. if offset < 0 {
  417. return false
  418. }
  419. valLen := len(val)
  420. capBuf := make([]byte, 4)
  421. if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
  422. return false
  423. }
  424. oldCap := binary.LittleEndian.Uint32(capBuf)
  425. if uint32(valLen) > oldCap {
  426. return false
  427. }
  428. // Fits! Write Length then Data
  429. lenBuf := make([]byte, 4)
  430. binary.LittleEndian.PutUint32(lenBuf, uint32(valLen))
  431. if _, err := s.file.WriteAt(lenBuf, offset+5); err != nil {
  432. return false
  433. }
  434. if _, err := s.file.WriteAt([]byte(val), offset+HeaderSize); err != nil {
  435. return false
  436. }
  437. // Update Cache
  438. s.cacheMu.Lock()
  439. s.cache[offset] = val
  440. s.cacheMu.Unlock()
  441. return true
  442. }
  443. // AppendOrReuse writes a new value, reusing FreeList or Appending.
  444. func (s *Storage) AppendOrReuse(val string) (int64, error) {
  445. valLen := len(val)
  446. newCap := alignCapacity(valLen)
  447. var writeOffset int64
  448. var actualCap uint32
  449. // Try Reuse
  450. reusedOffset, capFromFree, found := s.freeList.Pop(newCap)
  451. if found {
  452. writeOffset = reusedOffset
  453. actualCap = capFromFree
  454. } else {
  455. // Append
  456. totalSize := HeaderSize + int(newCap)
  457. newEnd := atomic.AddInt64(&s.offset, int64(totalSize))
  458. writeOffset = newEnd - int64(totalSize)
  459. actualCap = newCap
  460. }
  461. buf := make([]byte, HeaderSize+int(actualCap))
  462. buf[0] = FlagValid
  463. binary.LittleEndian.PutUint32(buf[1:], actualCap)
  464. binary.LittleEndian.PutUint32(buf[5:], uint32(valLen))
  465. copy(buf[HeaderSize:], []byte(val))
  466. if _, err := s.file.WriteAt(buf, writeOffset); err != nil {
  467. return 0, err
  468. }
  469. // Add to Cache
  470. s.cacheMu.Lock()
  471. if len(s.cache) < MaxCacheSize {
  472. s.cache[writeOffset] = val
  473. }
  474. s.cacheMu.Unlock()
  475. return writeOffset, nil
  476. }
  477. // MarkDeleted marks a slot as deleted.
  478. func (s *Storage) MarkDeleted(offset int64) error {
  479. capBuf := make([]byte, 4)
  480. if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
  481. return err
  482. }
  483. oldCap := binary.LittleEndian.Uint32(capBuf)
  484. if _, err := s.file.WriteAt([]byte{FlagDeleted}, offset); err != nil {
  485. return err
  486. }
  487. s.freeList.Add(oldCap, offset)
  488. // Remove from Cache
  489. s.cacheMu.Lock()
  490. delete(s.cache, offset)
  491. s.cacheMu.Unlock()
  492. return nil
  493. }
  494. // ReadValue reads value at offset with Caching.
  495. func (s *Storage) ReadValue(offset int64) (string, error) {
  496. // 1. Check Cache
  497. s.cacheMu.RLock()
  498. if val, ok := s.cache[offset]; ok {
  499. s.cacheMu.RUnlock()
  500. return val, nil
  501. }
  502. s.cacheMu.RUnlock()
  503. // 2. Read Disk
  504. header := make([]byte, HeaderSize)
  505. if _, err := s.file.ReadAt(header, offset); err != nil {
  506. return "", err
  507. }
  508. flag := header[0]
  509. if flag == FlagDeleted {
  510. return "", fmt.Errorf("record deleted")
  511. }
  512. length := binary.LittleEndian.Uint32(header[5:])
  513. data := make([]byte, length)
  514. if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
  515. return "", err
  516. }
  517. val := string(data)
  518. // 3. Fill Cache
  519. s.cacheMu.Lock()
  520. if len(s.cache) < MaxCacheSize {
  521. s.cache[offset] = val
  522. } else {
  523. // Random eviction (map iteration order is random)
  524. // This is a poor man's eviction, but fast.
  525. for k := range s.cache {
  526. delete(s.cache, k)
  527. break
  528. }
  529. s.cache[offset] = val
  530. }
  531. s.cacheMu.Unlock()
  532. return val, nil
  533. }
  534. func (s *Storage) Close() error {
  535. return s.file.Close()
  536. }
  537. // IndexEntry (Table 1)
  538. type IndexEntry struct {
  539. ValueOffset int64
  540. CommitIndex uint64
  541. }
  542. // Engine is the core storage engine.
  543. type Engine struct {
  544. // Replaces KeyMap + Sharded Index with a single Radix Tree
  545. // Radix Tree is thread-safe and stores IndexEntry directly.
  546. Index *RadixTree
  547. // Table 2: Disk Storage
  548. Storage *Storage
  549. // Table 3: Full Text Index (New Implementation)
  550. FTIndex *FullTextIndex
  551. KeyLocks StripedLock
  552. LastCommitIndex uint64
  553. commitMu sync.Mutex // Protects LastCommitIndex
  554. dataDir string
  555. }
  556. func NewEngine(dataDir string) (*Engine, error) {
  557. if err := os.MkdirAll(dataDir, 0755); err != nil {
  558. return nil, err
  559. }
  560. store, err := NewStorage(dataDir + "/values.data")
  561. if err != nil {
  562. return nil, err
  563. }
  564. e := &Engine{
  565. Index: NewRadixTree(),
  566. Storage: store,
  567. FTIndex: NewFullTextIndex(),
  568. dataDir: dataDir,
  569. }
  570. return e, nil
  571. }
  572. func (e *Engine) Close() error {
  573. return e.Storage.Close()
  574. }
  575. func (e *Engine) Set(key, value string, commitIndex uint64) error {
  576. // 0. Lock Key
  577. kLock := e.KeyLocks.GetLock(key)
  578. kLock.Lock()
  579. defer kLock.Unlock()
  580. // 1. Check existing
  581. // We no longer need separate KeyID lookup. Radix Tree stores it all.
  582. var oldOffset int64 = -1
  583. if entry, ok := e.Index.Get(key); ok {
  584. oldOffset = entry.ValueOffset
  585. }
  586. // 2. Try In-Place
  587. if e.Storage.TryUpdateInPlace(value, oldOffset) {
  588. // Update Index (CommitIndex might change)
  589. entry := IndexEntry{ValueOffset: oldOffset, CommitIndex: commitIndex}
  590. e.Index.Insert(key, entry)
  591. e.commitMu.Lock()
  592. if commitIndex > e.LastCommitIndex {
  593. e.LastCommitIndex = commitIndex
  594. }
  595. e.commitMu.Unlock()
  596. return nil
  597. }
  598. // 3. Write New
  599. newOffset, err := e.Storage.AppendOrReuse(value)
  600. if err != nil {
  601. return err
  602. }
  603. // 4. Update Index
  604. // Write New Entry
  605. e.Index.Insert(key, IndexEntry{
  606. ValueOffset: newOffset,
  607. CommitIndex: commitIndex,
  608. })
  609. // 5. Delete Old
  610. if oldOffset >= 0 {
  611. e.Storage.MarkDeleted(oldOffset)
  612. }
  613. // Update global commit index
  614. e.commitMu.Lock()
  615. if commitIndex > e.LastCommitIndex {
  616. e.LastCommitIndex = commitIndex
  617. }
  618. e.commitMu.Unlock()
  619. // 6. Update Full Text Index
  620. // Ideally we remove old tokens too, but that's expensive without reverse index.
  621. // For append-only log structured db, we just add new ones.
  622. e.FTIndex.Add(key, value)
  623. return nil
  624. }
  625. func (e *Engine) Get(key string) (string, bool) {
  626. kLock := e.KeyLocks.GetLock(key)
  627. kLock.RLock()
  628. defer kLock.RUnlock()
  629. entry, ok := e.Index.Get(key)
  630. if !ok {
  631. return "", false
  632. }
  633. val, err := e.Storage.ReadValue(entry.ValueOffset)
  634. if err != nil {
  635. return "", false
  636. }
  637. return val, true
  638. }
  639. type QueryResult struct {
  640. Key string `json:"key"`
  641. Value string `json:"value"`
  642. CommitIndex uint64 `json:"commit_index"`
  643. }
  644. func WildcardMatch(str, pattern string) bool {
  645. s := []rune(str)
  646. p := []rune(pattern)
  647. sLen, pLen := len(s), len(p)
  648. i, j := 0, 0
  649. starIdx, matchIdx := -1, -1
  650. for i < sLen {
  651. if j < pLen && (p[j] == '?' || p[j] == s[i]) {
  652. i++
  653. j++
  654. } else if j < pLen && p[j] == '*' {
  655. starIdx = j
  656. matchIdx = i
  657. j++
  658. } else if starIdx != -1 {
  659. j = starIdx + 1
  660. matchIdx++
  661. i = matchIdx
  662. } else {
  663. return false
  664. }
  665. }
  666. for j < pLen && p[j] == '*' {
  667. j++
  668. }
  669. return j == pLen
  670. }
  671. func (e *Engine) Query(sql string) ([]QueryResult, error) {
  672. sql = strings.TrimSpace(sql)
  673. reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
  674. reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
  675. limit := -1
  676. offset := 0
  677. if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
  678. limit, _ = strconv.Atoi(match[1])
  679. sql = reLimit.ReplaceAllString(sql, "")
  680. }
  681. if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
  682. offset, _ = strconv.Atoi(match[1])
  683. sql = reOffset.ReplaceAllString(sql, "")
  684. }
  685. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  686. matches := re.FindAllStringSubmatch(sql, -1)
  687. if len(matches) == 0 {
  688. return nil, fmt.Errorf("invalid query")
  689. }
  690. extractString := func(s string) string {
  691. return strings.Trim(strings.TrimSpace(s), "\"")
  692. }
  693. // Optimization: Point Lookup Fast Path
  694. // If query is exactly `key = "..."`, utilize Hash/Radix Lookup O(1)
  695. for _, match := range matches {
  696. if match[1] == "key" && match[2] == "=" {
  697. targetKey := extractString(match[3])
  698. // 1. Get Entry
  699. entry, ok := e.Index.Get(targetKey)
  700. if !ok {
  701. return []QueryResult{}, nil
  702. }
  703. // 2. Load Value (if needed by other filters, but here we load anyway for result)
  704. // Optimize: check if we filter on Value before loading?
  705. // For simplicity, just load.
  706. val, err := e.Storage.ReadValue(entry.ValueOffset)
  707. if err != nil {
  708. return []QueryResult{}, nil
  709. }
  710. // 3. Verify other conditions (e.g. CommitIndex)
  711. matchAll := true
  712. for _, m := range matches {
  713. field, op, valRaw := m[1], m[2], m[3]
  714. switch field {
  715. case "CommitIndex":
  716. num, _ := strconv.ParseUint(valRaw, 10, 64)
  717. switch op {
  718. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  719. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  720. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  721. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  722. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  723. }
  724. case "value":
  725. t := extractString(valRaw)
  726. switch op {
  727. case "=": if val != t { matchAll = false }
  728. case "like": if !WildcardMatch(val, t) { matchAll = false }
  729. }
  730. }
  731. if !matchAll { break }
  732. }
  733. if matchAll {
  734. return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
  735. }
  736. return []QueryResult{}, nil
  737. }
  738. }
  739. var results []QueryResult
  740. var mu sync.Mutex
  741. // Strategy:
  742. // 1. Identify primary filter (Key Prefix is best)
  743. // 2. Iterate candidates
  744. // 3. Filter remaining conditions
  745. var prefix string = ""
  746. var usePrefix bool = false
  747. // Check for key prefix
  748. for _, match := range matches {
  749. if match[1] == "key" && match[2] == "like" {
  750. pattern := extractString(match[3])
  751. if strings.HasSuffix(pattern, "*") {
  752. clean := pattern[:len(pattern)-1]
  753. if !strings.ContainsAny(clean, "*?") {
  754. prefix = clean
  755. usePrefix = true
  756. break
  757. }
  758. }
  759. }
  760. }
  761. // Iterator
  762. iterator := func(key string, entry IndexEntry) bool {
  763. // Filter Logic
  764. var valStr string
  765. var valLoaded bool
  766. matchAll := true
  767. for _, match := range matches {
  768. field, op, valRaw := match[1], match[2], match[3]
  769. switch field {
  770. case "CommitIndex":
  771. num, _ := strconv.ParseUint(valRaw, 10, 64)
  772. switch op {
  773. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  774. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  775. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  776. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  777. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  778. }
  779. case "key":
  780. target := extractString(valRaw)
  781. switch op {
  782. case "=": if key != target { matchAll = false }
  783. case "like": if !WildcardMatch(key, target) { matchAll = false }
  784. }
  785. case "value":
  786. // Lazy load
  787. if !valLoaded {
  788. v, err := e.Storage.ReadValue(entry.ValueOffset)
  789. if err != nil { matchAll = false; break }
  790. valStr = v
  791. valLoaded = true
  792. }
  793. target := extractString(valRaw)
  794. switch op {
  795. case "=": if valStr != target { matchAll = false }
  796. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  797. }
  798. }
  799. if !matchAll { break }
  800. }
  801. if matchAll {
  802. // Load value if needed for result
  803. if !valLoaded {
  804. v, err := e.Storage.ReadValue(entry.ValueOffset)
  805. if err == nil { valStr = v }
  806. }
  807. mu.Lock()
  808. results = append(results, QueryResult{
  809. Key: key,
  810. Value: valStr,
  811. CommitIndex: entry.CommitIndex,
  812. })
  813. // Optimization: If simple LIMIT without Offset and no sorting requirements (default key sort is free in Radix),
  814. // we could stop early. But we need to support Offset and flexible Sort.
  815. // Currently Radix Walk is Key-Ordered.
  816. // If user doesn't require specific sort (or accepts key sort), we can stop.
  817. // Let's assume Key Sort is default.
  818. if limit > 0 && offset == 0 && len(results) >= limit {
  819. mu.Unlock()
  820. return false // Stop iteration
  821. }
  822. mu.Unlock()
  823. }
  824. return true // Continue
  825. }
  826. if usePrefix {
  827. e.Index.WalkPrefix(prefix, iterator)
  828. } else {
  829. e.Index.WalkPrefix("", iterator) // Full Scan
  830. }
  831. // Results are already sorted by Key due to Radix Tree Walk order!
  832. // So we can skip sort.Slice if we trust WalkPrefix.
  833. // My WalkPrefix implementation attempts to be ordered.
  834. // Pagination
  835. if offset > 0 {
  836. if offset >= len(results) {
  837. return []QueryResult{}, nil
  838. }
  839. results = results[offset:]
  840. }
  841. if limit >= 0 {
  842. if limit < len(results) {
  843. results = results[:limit]
  844. }
  845. }
  846. return results, nil
  847. }
  848. func (e *Engine) Snapshot() ([]byte, error) {
  849. // Not implemented for Radix Tree yet in this demo
  850. return nil, nil
  851. }
  852. func (e *Engine) Restore(data []byte) error {
  853. return nil
  854. }