engine.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367
  1. package db
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "os"
  8. "regexp"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. )
  15. // hash returns a 32-bit FNV-1a hash of the string
  16. func hash(s string) uint32 {
  17. var h uint32 = 2166136261
  18. for i := 0; i < len(s); i++ {
  19. h = (h * 16777619) ^ uint32(s[i])
  20. }
  21. return h
  22. }
  23. // --- Radix Tree Implementation (Minimal) ---
  24. type radixNode struct {
  25. path string
  26. indices string // first char of children paths for fast lookup
  27. children []*radixNode
  28. leaf *IndexEntry // If non-nil, this is a valid key
  29. key string // Full key stored at leaf for convenience
  30. }
  31. type RadixTree struct {
  32. root *radixNode
  33. size int
  34. mu sync.RWMutex
  35. }
  36. func NewRadixTree() *RadixTree {
  37. return &RadixTree{
  38. root: &radixNode{},
  39. }
  40. }
  41. // longestCommonPrefix finds the length of the shared prefix of two strings
  42. func longestCommonPrefix(k1, k2 string) int {
  43. max := len(k1)
  44. if len(k2) < max {
  45. max = len(k2)
  46. }
  47. var i int
  48. for i = 0; i < max; i++ {
  49. if k1[i] != k2[i] {
  50. break
  51. }
  52. }
  53. return i
  54. }
  55. // Insert adds a key and its index entry to the tree
  56. func (t *RadixTree) Insert(key string, entry IndexEntry) {
  57. t.mu.Lock()
  58. defer t.mu.Unlock()
  59. n := t.root
  60. search := key
  61. for {
  62. // Handle empty search string (shouldn't happen in recursion usually)
  63. if len(search) == 0 {
  64. if n.leaf == nil {
  65. t.size++
  66. }
  67. n.leaf = &entry
  68. n.key = key
  69. return
  70. }
  71. // Find child
  72. parent := n
  73. found := false
  74. for i, char := range []byte(n.indices) {
  75. if char == search[0] {
  76. // Found a child starting with the same char
  77. n = n.children[i]
  78. // Check common prefix
  79. common := longestCommonPrefix(search, n.path)
  80. if common == len(n.path) {
  81. // Full match of child path, continue down
  82. search = search[common:]
  83. found = true
  84. break // Continue outer loop
  85. } else {
  86. // Split the node
  87. // Current child n becomes a child of the new split node
  88. // 1. Create split node with part of path
  89. splitNode := &radixNode{
  90. path: n.path[:common],
  91. indices: string(n.path[common]), // The char that differs
  92. children: []*radixNode{n},
  93. }
  94. // 2. Update existing child n
  95. n.path = n.path[common:] // Remaining part
  96. // 3. Insert new leaf for the new key (if needed)
  97. // The new key diverges at 'common' index
  98. if len(search) == common {
  99. // The new key IS the split node
  100. splitNode.leaf = &entry
  101. splitNode.key = key
  102. t.size++
  103. } else {
  104. // The new key is a child of the split node
  105. newBranch := &radixNode{
  106. path: search[common:],
  107. leaf: &entry,
  108. key: key,
  109. }
  110. t.size++
  111. splitNode.indices += string(search[common])
  112. splitNode.children = append(splitNode.children, newBranch)
  113. // Keep indices sorted? Not strictly necessary for correctness but good for determinism
  114. }
  115. // 4. Update parent to point to splitNode instead of n
  116. parent.children[i] = splitNode
  117. return
  118. }
  119. }
  120. }
  121. if !found {
  122. // No matching child, add new child
  123. newChild := &radixNode{
  124. path: search,
  125. leaf: &entry,
  126. key: key,
  127. }
  128. n.indices += string(search[0])
  129. n.children = append(n.children, newChild)
  130. t.size++
  131. return
  132. }
  133. }
  134. }
  135. // Get retrieves an entry
  136. func (t *RadixTree) Get(key string) (IndexEntry, bool) {
  137. t.mu.RLock()
  138. defer t.mu.RUnlock()
  139. n := t.root
  140. search := key
  141. for {
  142. if len(search) == 0 {
  143. if n.leaf != nil {
  144. return *n.leaf, true
  145. }
  146. return IndexEntry{}, false
  147. }
  148. found := false
  149. for i, char := range []byte(n.indices) {
  150. if char == search[0] {
  151. n = n.children[i]
  152. if strings.HasPrefix(search, n.path) {
  153. search = search[len(n.path):]
  154. found = true
  155. break
  156. }
  157. return IndexEntry{}, false
  158. }
  159. }
  160. if !found {
  161. return IndexEntry{}, false
  162. }
  163. }
  164. }
  165. // WalkPrefix iterates over all keys starting with prefix.
  166. // Returns true from callback to continue, false to stop.
  167. func (t *RadixTree) WalkPrefix(prefix string, callback func(key string, entry IndexEntry) bool) {
  168. t.mu.RLock()
  169. defer t.mu.RUnlock()
  170. n := t.root
  171. search := prefix
  172. // 1. Locate the node covering the prefix
  173. for len(search) > 0 {
  174. found := false
  175. for i, char := range []byte(n.indices) {
  176. if char == search[0] {
  177. n = n.children[i]
  178. // 3 cases:
  179. // 1. n.path == search (Exact match or consume all search) -> found target node
  180. // 2. n.path starts with search -> found target node (it's inside n)
  181. // 3. search starts with n.path -> continue down
  182. common := longestCommonPrefix(search, n.path)
  183. if common == len(search) {
  184. // Prefix fully consumed. 'n' is the root of the subtree.
  185. search = ""
  186. found = true
  187. break
  188. } else if common == len(n.path) {
  189. // 'n' path fully consumed, continue deeper
  190. search = search[common:]
  191. found = true
  192. break
  193. } else {
  194. // Mismatch, prefix not found
  195. return
  196. }
  197. }
  198. }
  199. if !found {
  200. return
  201. }
  202. }
  203. // 2. Recursive walk from n
  204. t.recursiveWalk(n, callback)
  205. }
  206. func (t *RadixTree) recursiveWalk(n *radixNode, callback func(key string, entry IndexEntry) bool) bool {
  207. if n.leaf != nil {
  208. if !callback(n.key, *n.leaf) {
  209. return false
  210. }
  211. }
  212. // To iterate in order, we should ideally keep indices sorted.
  213. // For now, we just iterate. If order matters, we sort children locally.
  214. // Let's do a quick sort of indices to ensure lexicographical order.
  215. if len(n.children) > 1 {
  216. // Optimization: clone to avoid mutating tree during read lock?
  217. // Actually indices string is immutable. We need to know the permutation.
  218. // A simple way is to build a list of children sorted by first char.
  219. type childRef struct {
  220. char byte
  221. node *radixNode
  222. }
  223. refs := make([]childRef, len(n.children))
  224. for i, char := range []byte(n.indices) {
  225. refs[i] = childRef{char, n.children[i]}
  226. }
  227. sort.Slice(refs, func(i, j int) bool { return refs[i].char < refs[j].char })
  228. for _, ref := range refs {
  229. if !t.recursiveWalk(ref.node, callback) {
  230. return false
  231. }
  232. }
  233. } else {
  234. for _, child := range n.children {
  235. if !t.recursiveWalk(child, callback) {
  236. return false
  237. }
  238. }
  239. }
  240. return true
  241. }
  242. // --- End Radix Tree ---
  243. // --- Full Text Index (Simple In-Memory) ---
  244. // Replacing Table 3 with a cleaner Token -> []Key logic handled via Radix too?
  245. // No, Inverted Index maps Token -> List of Keys.
  246. // We can use a map[string][]string (Token -> Keys) for simplicity and speed.
  247. type FullTextIndex struct {
  248. // Token -> List of Keys (strings)
  249. // Using string keys instead of IDs simplifies things as we moved away from KeyMap ID logic.
  250. index map[string][]string
  251. mu sync.RWMutex
  252. }
  253. func NewFullTextIndex() *FullTextIndex {
  254. return &FullTextIndex{
  255. index: make(map[string][]string),
  256. }
  257. }
  258. func (fti *FullTextIndex) Add(key string, value string) {
  259. fti.mu.Lock()
  260. defer fti.mu.Unlock()
  261. tokens := tokenize(value)
  262. for _, token := range tokens {
  263. // Deduplication check per key is expensive O(N).
  264. // For high perf, we might accept duplicates or use a set per token (map[string]map[string]bool).
  265. // Let's use simple append for now, optimized for write.
  266. fti.index[token] = append(fti.index[token], key)
  267. }
  268. }
  269. // Search returns keys containing the token.
  270. // Supports wildcards: "token", "prefix*", "*suffix", "*contains*"
  271. // For simplicity in this demo, we implement prefix matching via iteration if wildcard used.
  272. // Exact match is O(1).
  273. func (fti *FullTextIndex) Search(tokenPattern string) []string {
  274. fti.mu.RLock()
  275. defer fti.mu.RUnlock()
  276. // 1. Exact Match
  277. if !strings.Contains(tokenPattern, "*") {
  278. if keys, ok := fti.index[tokenPattern]; ok {
  279. // Return copy to avoid race
  280. res := make([]string, len(keys))
  281. copy(res, keys)
  282. return res
  283. }
  284. return nil
  285. }
  286. // 2. Wildcard Scan (In-Memory Map Scan)
  287. // For production, we'd use a RadixTree for tokens too!
  288. // But let's keep it simple for now, just iterating map keys.
  289. // Optimization: If map is large, this is slow.
  290. var results []string
  291. seen := make(map[string]bool)
  292. for token, keys := range fti.index {
  293. if WildcardMatch(token, tokenPattern) {
  294. for _, k := range keys {
  295. if !seen[k] {
  296. results = append(results, k)
  297. seen[k] = true
  298. }
  299. }
  300. }
  301. }
  302. return results
  303. }
  304. func tokenize(val string) []string {
  305. f := func(c rune) bool {
  306. return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
  307. }
  308. // Optimization: Lowercase tokens for case-insensitive search if needed
  309. // For now, keep original case or lower?
  310. // Let's keep original to match benchmark expectations if any.
  311. // But usually FTI is case-insensitive.
  312. return strings.FieldsFunc(val, f)
  313. }
  314. // --- Storage & Cache ---
  315. // FreeList manages reusable disk space in memory using Best-Fit strategy.
  316. type FreeList struct {
  317. // Capacity -> Stack of Offsets
  318. buckets map[uint32][]int64
  319. // Sorted list of capacities available in buckets for fast Best-Fit lookup
  320. sortedCaps []uint32
  321. mu sync.Mutex
  322. }
  323. func NewFreeList() *FreeList {
  324. return &FreeList{
  325. buckets: make(map[uint32][]int64),
  326. }
  327. }
  328. // Add adds an offset to the free list for a given capacity.
  329. func (fl *FreeList) Add(cap uint32, offset int64) {
  330. fl.mu.Lock()
  331. defer fl.mu.Unlock()
  332. if _, exists := fl.buckets[cap]; !exists {
  333. fl.buckets[cap] = []int64{offset}
  334. // Maintain sortedCaps
  335. fl.sortedCaps = append(fl.sortedCaps, cap)
  336. sort.Slice(fl.sortedCaps, func(i, j int) bool { return fl.sortedCaps[i] < fl.sortedCaps[j] })
  337. } else {
  338. fl.buckets[cap] = append(fl.buckets[cap], offset)
  339. }
  340. }
  341. // Pop tries to get an available offset using Best-Fit strategy.
  342. func (fl *FreeList) Pop(targetCap uint32) (int64, uint32, bool) {
  343. fl.mu.Lock()
  344. defer fl.mu.Unlock()
  345. idx := sort.Search(len(fl.sortedCaps), func(i int) bool {
  346. return fl.sortedCaps[i] >= targetCap
  347. })
  348. if idx >= len(fl.sortedCaps) {
  349. return 0, 0, false
  350. }
  351. foundCap := fl.sortedCaps[idx]
  352. offsets := fl.buckets[foundCap]
  353. if len(offsets) == 0 {
  354. return 0, 0, false
  355. }
  356. lastIdx := len(offsets) - 1
  357. offset := offsets[lastIdx]
  358. if lastIdx == 0 {
  359. delete(fl.buckets, foundCap)
  360. fl.sortedCaps = append(fl.sortedCaps[:idx], fl.sortedCaps[idx+1:]...)
  361. } else {
  362. fl.buckets[foundCap] = offsets[:lastIdx]
  363. }
  364. return offset, foundCap, true
  365. }
  366. // StripedLock provides hashed locks for keys
  367. type StripedLock struct {
  368. locks [1024]sync.RWMutex
  369. }
  370. func (sl *StripedLock) GetLock(key string) *sync.RWMutex {
  371. h := hash(key)
  372. return &sl.locks[h%1024]
  373. }
  374. // Storage (Table 2) manages disk storage for values.
  375. type Storage struct {
  376. file *os.File
  377. filename string
  378. offset int64 // Current end of file (Atomic)
  379. freeList *FreeList
  380. // Simple LRU Cache for Values
  381. // Using sync.Map for simplicity in this iteration, though it's not LRU.
  382. // For true LRU we'd need a list+map protected by mutex.
  383. // Let's use a Mutex protected Map as a "Hot Cache".
  384. cache map[int64]string
  385. cacheMu sync.RWMutex
  386. }
  387. const (
  388. FlagDeleted = 0x00
  389. FlagValid = 0x01
  390. HeaderSize = 9 // 1(Flag) + 4(Cap) + 4(Len)
  391. AlignSize = 16 // Align capacity to 16 bytes
  392. MaxCacheSize = 10000 // Simple cap
  393. )
  394. func NewStorage(filename string) (*Storage, error) {
  395. f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
  396. if err != nil {
  397. return nil, err
  398. }
  399. st := &Storage{
  400. file: f,
  401. filename: filename,
  402. freeList: NewFreeList(),
  403. cache: make(map[int64]string),
  404. }
  405. if err := st.scan(); err != nil {
  406. f.Close()
  407. return nil, err
  408. }
  409. return st, nil
  410. }
  411. func (s *Storage) scan() error {
  412. offset := int64(0)
  413. if _, err := s.file.Seek(0, 0); err != nil {
  414. return err
  415. }
  416. reader := bufio.NewReader(s.file)
  417. for {
  418. header := make([]byte, HeaderSize)
  419. n, err := io.ReadFull(reader, header)
  420. if err == io.EOF {
  421. break
  422. }
  423. if err == io.ErrUnexpectedEOF && n == 0 {
  424. break
  425. }
  426. if err != nil {
  427. return err
  428. }
  429. flag := header[0]
  430. cap := binary.LittleEndian.Uint32(header[1:])
  431. if flag == FlagDeleted {
  432. s.freeList.Add(cap, offset)
  433. }
  434. discardLen := int(cap)
  435. if _, err := reader.Discard(discardLen); err != nil {
  436. buf := make([]byte, discardLen)
  437. if _, err := io.ReadFull(reader, buf); err != nil {
  438. return err
  439. }
  440. }
  441. offset += int64(HeaderSize + discardLen)
  442. }
  443. s.offset = offset
  444. return nil
  445. }
  446. func alignCapacity(length int) uint32 {
  447. if length == 0 {
  448. return AlignSize
  449. }
  450. cap := (length + AlignSize - 1) / AlignSize * AlignSize
  451. return uint32(cap)
  452. }
  453. // TryUpdateInPlace tries to update value in-place if capacity allows.
  454. func (s *Storage) TryUpdateInPlace(val string, offset int64) bool {
  455. if offset < 0 {
  456. return false
  457. }
  458. valLen := len(val)
  459. capBuf := make([]byte, 4)
  460. if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
  461. return false
  462. }
  463. oldCap := binary.LittleEndian.Uint32(capBuf)
  464. if uint32(valLen) > oldCap {
  465. return false
  466. }
  467. // Fits! Write Length then Data
  468. lenBuf := make([]byte, 4)
  469. binary.LittleEndian.PutUint32(lenBuf, uint32(valLen))
  470. if _, err := s.file.WriteAt(lenBuf, offset+5); err != nil {
  471. return false
  472. }
  473. if _, err := s.file.WriteAt([]byte(val), offset+HeaderSize); err != nil {
  474. return false
  475. }
  476. // Update Cache
  477. s.cacheMu.Lock()
  478. s.cache[offset] = val
  479. s.cacheMu.Unlock()
  480. return true
  481. }
  482. // AppendOrReuse writes a new value, reusing FreeList or Appending.
  483. func (s *Storage) AppendOrReuse(val string) (int64, error) {
  484. valLen := len(val)
  485. newCap := alignCapacity(valLen)
  486. var writeOffset int64
  487. var actualCap uint32
  488. // Try Reuse
  489. reusedOffset, capFromFree, found := s.freeList.Pop(newCap)
  490. if found {
  491. writeOffset = reusedOffset
  492. actualCap = capFromFree
  493. } else {
  494. // Append
  495. totalSize := HeaderSize + int(newCap)
  496. newEnd := atomic.AddInt64(&s.offset, int64(totalSize))
  497. writeOffset = newEnd - int64(totalSize)
  498. actualCap = newCap
  499. }
  500. buf := make([]byte, HeaderSize+int(actualCap))
  501. buf[0] = FlagValid
  502. binary.LittleEndian.PutUint32(buf[1:], actualCap)
  503. binary.LittleEndian.PutUint32(buf[5:], uint32(valLen))
  504. copy(buf[HeaderSize:], []byte(val))
  505. if _, err := s.file.WriteAt(buf, writeOffset); err != nil {
  506. return 0, err
  507. }
  508. // Add to Cache
  509. s.cacheMu.Lock()
  510. if len(s.cache) < MaxCacheSize {
  511. s.cache[writeOffset] = val
  512. }
  513. s.cacheMu.Unlock()
  514. return writeOffset, nil
  515. }
  516. // MarkDeleted marks a slot as deleted.
  517. func (s *Storage) MarkDeleted(offset int64) error {
  518. capBuf := make([]byte, 4)
  519. if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
  520. return err
  521. }
  522. oldCap := binary.LittleEndian.Uint32(capBuf)
  523. if _, err := s.file.WriteAt([]byte{FlagDeleted}, offset); err != nil {
  524. return err
  525. }
  526. s.freeList.Add(oldCap, offset)
  527. // Remove from Cache
  528. s.cacheMu.Lock()
  529. delete(s.cache, offset)
  530. s.cacheMu.Unlock()
  531. return nil
  532. }
  533. // ReadValue reads value at offset with Caching.
  534. func (s *Storage) ReadValue(offset int64) (string, error) {
  535. // 1. Check Cache
  536. s.cacheMu.RLock()
  537. if val, ok := s.cache[offset]; ok {
  538. s.cacheMu.RUnlock()
  539. return val, nil
  540. }
  541. s.cacheMu.RUnlock()
  542. // 2. Read Disk
  543. header := make([]byte, HeaderSize)
  544. if _, err := s.file.ReadAt(header, offset); err != nil {
  545. return "", err
  546. }
  547. flag := header[0]
  548. if flag == FlagDeleted {
  549. return "", fmt.Errorf("record deleted")
  550. }
  551. length := binary.LittleEndian.Uint32(header[5:])
  552. data := make([]byte, length)
  553. if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
  554. return "", err
  555. }
  556. val := string(data)
  557. // 3. Fill Cache
  558. s.cacheMu.Lock()
  559. if len(s.cache) < MaxCacheSize {
  560. s.cache[offset] = val
  561. } else {
  562. // Random eviction (map iteration order is random)
  563. // This is a poor man's eviction, but fast.
  564. for k := range s.cache {
  565. delete(s.cache, k)
  566. break
  567. }
  568. s.cache[offset] = val
  569. }
  570. s.cacheMu.Unlock()
  571. return val, nil
  572. }
  573. func (s *Storage) Close() error {
  574. return s.file.Close()
  575. }
  576. // IndexEntry (Table 1)
  577. type IndexEntry struct {
  578. ValueOffset int64
  579. CommitIndex uint64
  580. }
  581. // Engine is the core storage engine.
  582. type Engine struct {
  583. // Replaces KeyMap + Sharded Index with a single Radix Tree
  584. // Radix Tree is thread-safe and stores IndexEntry directly.
  585. Index *RadixTree
  586. // Table 2: Disk Storage
  587. Storage *Storage
  588. // Table 3: Full Text Index (New Implementation)
  589. FTIndex *FullTextIndex
  590. KeyLocks StripedLock
  591. LastCommitIndex uint64
  592. commitMu sync.Mutex // Protects LastCommitIndex
  593. dataDir string
  594. }
  595. func NewEngine(dataDir string) (*Engine, error) {
  596. if err := os.MkdirAll(dataDir, 0755); err != nil {
  597. return nil, err
  598. }
  599. store, err := NewStorage(dataDir + "/values.data")
  600. if err != nil {
  601. return nil, err
  602. }
  603. e := &Engine{
  604. Index: NewRadixTree(),
  605. Storage: store,
  606. FTIndex: NewFullTextIndex(),
  607. dataDir: dataDir,
  608. }
  609. return e, nil
  610. }
  611. func (e *Engine) Close() error {
  612. return e.Storage.Close()
  613. }
  614. func (e *Engine) Set(key, value string, commitIndex uint64) error {
  615. // 0. Lock Key
  616. kLock := e.KeyLocks.GetLock(key)
  617. kLock.Lock()
  618. defer kLock.Unlock()
  619. // 1. Check existing
  620. // We no longer need separate KeyID lookup. Radix Tree stores it all.
  621. var oldOffset int64 = -1
  622. if entry, ok := e.Index.Get(key); ok {
  623. oldOffset = entry.ValueOffset
  624. }
  625. // 2. Try In-Place
  626. if e.Storage.TryUpdateInPlace(value, oldOffset) {
  627. // Update Index (CommitIndex might change)
  628. entry := IndexEntry{ValueOffset: oldOffset, CommitIndex: commitIndex}
  629. e.Index.Insert(key, entry)
  630. e.commitMu.Lock()
  631. if commitIndex > e.LastCommitIndex {
  632. e.LastCommitIndex = commitIndex
  633. }
  634. e.commitMu.Unlock()
  635. return nil
  636. }
  637. // 3. Write New
  638. newOffset, err := e.Storage.AppendOrReuse(value)
  639. if err != nil {
  640. return err
  641. }
  642. // 4. Update Index
  643. // Write New Entry
  644. e.Index.Insert(key, IndexEntry{
  645. ValueOffset: newOffset,
  646. CommitIndex: commitIndex,
  647. })
  648. // 5. Delete Old
  649. if oldOffset >= 0 {
  650. e.Storage.MarkDeleted(oldOffset)
  651. }
  652. // Update global commit index
  653. e.commitMu.Lock()
  654. if commitIndex > e.LastCommitIndex {
  655. e.LastCommitIndex = commitIndex
  656. }
  657. e.commitMu.Unlock()
  658. // 6. Update Full Text Index
  659. // Ideally we remove old tokens too, but that's expensive without reverse index.
  660. // For append-only log structured db, we just add new ones.
  661. e.FTIndex.Add(key, value)
  662. return nil
  663. }
  664. func (e *Engine) Get(key string) (string, bool) {
  665. kLock := e.KeyLocks.GetLock(key)
  666. kLock.RLock()
  667. defer kLock.RUnlock()
  668. entry, ok := e.Index.Get(key)
  669. if !ok {
  670. return "", false
  671. }
  672. val, err := e.Storage.ReadValue(entry.ValueOffset)
  673. if err != nil {
  674. return "", false
  675. }
  676. return val, true
  677. }
  678. type QueryResult struct {
  679. Key string `json:"key"`
  680. Value string `json:"value"`
  681. CommitIndex uint64 `json:"commit_index"`
  682. }
  683. func WildcardMatch(str, pattern string) bool {
  684. s := []rune(str)
  685. p := []rune(pattern)
  686. sLen, pLen := len(s), len(p)
  687. i, j := 0, 0
  688. starIdx, matchIdx := -1, -1
  689. for i < sLen {
  690. if j < pLen && (p[j] == '?' || p[j] == s[i]) {
  691. i++
  692. j++
  693. } else if j < pLen && p[j] == '*' {
  694. starIdx = j
  695. matchIdx = i
  696. j++
  697. } else if starIdx != -1 {
  698. j = starIdx + 1
  699. matchIdx++
  700. i = matchIdx
  701. } else {
  702. return false
  703. }
  704. }
  705. for j < pLen && p[j] == '*' {
  706. j++
  707. }
  708. return j == pLen
  709. }
  710. func (e *Engine) Count(sql string) (int, error) {
  711. // Simple wrapper around Query for now, but optimized to not load values if possible.
  712. // Actually we should refactor Query to support a "countOnly" mode.
  713. // But to avoid breaking API signatures too much, let's just parse and execute with optimization.
  714. // Reuse Query logic but with count optimization
  715. // We can't easily reuse Query() because it returns []QueryResult (heavy).
  716. // Let's implement a specialized version or refactor Query internals.
  717. // Refactoring Query internals to take a visitor/callback is cleanest.
  718. return e.execute(sql, true)
  719. }
  720. // execute is the internal implementation for Query and Count
  721. // countOnly: if true, returns count in first return val (as int cast), second is nil
  722. // Wait, return types need to be consistent. Let's return (count, results, error)
  723. func (e *Engine) execute(sql string, countOnly bool) (int, error) {
  724. sql = strings.TrimSpace(sql)
  725. reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
  726. reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
  727. limit := -1
  728. offset := 0
  729. if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
  730. limit, _ = strconv.Atoi(match[1])
  731. sql = reLimit.ReplaceAllString(sql, "")
  732. }
  733. if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
  734. offset, _ = strconv.Atoi(match[1])
  735. sql = reOffset.ReplaceAllString(sql, "")
  736. }
  737. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  738. matches := re.FindAllStringSubmatch(sql, -1)
  739. if len(matches) == 0 {
  740. return 0, fmt.Errorf("invalid query")
  741. }
  742. extractString := func(s string) string {
  743. return strings.Trim(strings.TrimSpace(s), "\"")
  744. }
  745. // Optimization: Point Lookup Fast Path
  746. // Only if not countOnly or if we need value to verify?
  747. // If countOnly and query is `key=".."`, we can just check existence O(1).
  748. for _, match := range matches {
  749. if match[1] == "key" && match[2] == "=" {
  750. targetKey := extractString(match[3])
  751. entry, ok := e.Index.Get(targetKey)
  752. if !ok {
  753. return 0, nil
  754. }
  755. // If we need to filter by Value, we MUST load it.
  756. needValue := false
  757. for _, m := range matches {
  758. if m[1] == "value" { needValue = true; break }
  759. }
  760. var val string
  761. if needValue {
  762. v, err := e.Storage.ReadValue(entry.ValueOffset)
  763. if err != nil { return 0, nil }
  764. val = v
  765. }
  766. matchAll := true
  767. for _, m := range matches {
  768. field, op, valRaw := m[1], m[2], m[3]
  769. switch field {
  770. case "CommitIndex":
  771. num, _ := strconv.ParseUint(valRaw, 10, 64)
  772. switch op {
  773. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  774. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  775. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  776. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  777. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  778. }
  779. case "value":
  780. t := extractString(valRaw)
  781. switch op {
  782. case "=": if val != t { matchAll = false }
  783. case "like": if !WildcardMatch(val, t) { matchAll = false }
  784. }
  785. }
  786. if !matchAll { break }
  787. }
  788. if matchAll {
  789. return 1, nil
  790. }
  791. return 0, nil
  792. }
  793. }
  794. // Inverted Index Logic
  795. var candidates map[string]bool
  796. var useFTIndex bool = false
  797. for _, match := range matches {
  798. if match[1] == "value" && match[2] == "like" {
  799. pattern := extractString(match[3])
  800. clean := strings.Trim(pattern, "*")
  801. if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
  802. matches := e.FTIndex.Search(pattern)
  803. if matches != nil {
  804. currentSet := make(map[string]bool)
  805. for _, k := range matches {
  806. currentSet[k] = true
  807. }
  808. if !useFTIndex {
  809. candidates = currentSet
  810. useFTIndex = true
  811. } else {
  812. newSet := make(map[string]bool)
  813. for k := range candidates {
  814. if currentSet[k] { newSet[k] = true }
  815. }
  816. candidates = newSet
  817. }
  818. } else {
  819. return 0, nil
  820. }
  821. }
  822. }
  823. }
  824. var iterator func(func(string, IndexEntry) bool)
  825. if useFTIndex {
  826. iterator = func(cb func(string, IndexEntry) bool) {
  827. keys := make([]string, 0, len(candidates))
  828. for k := range candidates { keys = append(keys, k) }
  829. sort.Strings(keys)
  830. for _, k := range keys {
  831. if entry, ok := e.Index.Get(k); ok {
  832. if !cb(k, entry) { return }
  833. }
  834. }
  835. }
  836. } else {
  837. var prefix string = ""
  838. var usePrefix bool = false
  839. for _, match := range matches {
  840. if match[1] == "key" && match[2] == "like" {
  841. pattern := extractString(match[3])
  842. if strings.HasSuffix(pattern, "*") {
  843. clean := pattern[:len(pattern)-1]
  844. if !strings.ContainsAny(clean, "*?") {
  845. prefix = clean
  846. usePrefix = true
  847. break
  848. }
  849. }
  850. }
  851. }
  852. iterator = func(cb func(string, IndexEntry) bool) {
  853. if usePrefix {
  854. e.Index.WalkPrefix(prefix, cb)
  855. } else {
  856. e.Index.WalkPrefix("", cb)
  857. }
  858. }
  859. }
  860. matchedCount := 0
  861. // Execution
  862. iterator(func(key string, entry IndexEntry) bool {
  863. var valStr string
  864. var valLoaded bool
  865. matchAll := true
  866. for _, match := range matches {
  867. field, op, valRaw := match[1], match[2], match[3]
  868. switch field {
  869. case "CommitIndex":
  870. num, _ := strconv.ParseUint(valRaw, 10, 64)
  871. switch op {
  872. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  873. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  874. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  875. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  876. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  877. }
  878. case "key":
  879. target := extractString(valRaw)
  880. switch op {
  881. case "=": if key != target { matchAll = false }
  882. case "like": if !WildcardMatch(key, target) { matchAll = false }
  883. }
  884. case "value":
  885. if !valLoaded {
  886. v, err := e.Storage.ReadValue(entry.ValueOffset)
  887. if err != nil { matchAll = false; break }
  888. valStr = v
  889. valLoaded = true
  890. }
  891. target := extractString(valRaw)
  892. switch op {
  893. case "=": if valStr != target { matchAll = false }
  894. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  895. }
  896. }
  897. if !matchAll { break }
  898. }
  899. if matchAll {
  900. matchedCount++
  901. if limit > 0 && offset == 0 && matchedCount >= limit {
  902. return false
  903. }
  904. }
  905. return true
  906. })
  907. if offset > 0 {
  908. if offset >= matchedCount {
  909. return 0, nil
  910. }
  911. matchedCount -= offset
  912. }
  913. if limit >= 0 {
  914. if limit < matchedCount {
  915. matchedCount = limit
  916. }
  917. }
  918. return matchedCount, nil
  919. }
  920. func (e *Engine) Query(sql string) ([]QueryResult, error) {
  921. return e.queryInternal(sql)
  922. }
  923. func (e *Engine) queryInternal(sql string) ([]QueryResult, error) {
  924. sql = strings.TrimSpace(sql)
  925. reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
  926. reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
  927. limit := -1
  928. offset := 0
  929. if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
  930. limit, _ = strconv.Atoi(match[1])
  931. sql = reLimit.ReplaceAllString(sql, "")
  932. }
  933. if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
  934. offset, _ = strconv.Atoi(match[1])
  935. sql = reOffset.ReplaceAllString(sql, "")
  936. }
  937. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  938. matches := re.FindAllStringSubmatch(sql, -1)
  939. if len(matches) == 0 {
  940. return nil, fmt.Errorf("invalid query")
  941. }
  942. extractString := func(s string) string {
  943. return strings.Trim(strings.TrimSpace(s), "\"")
  944. }
  945. // Optimization: Point Lookup Fast Path
  946. // If query is exactly `key = "..."`, utilize Hash/Radix Lookup O(1)
  947. for _, match := range matches {
  948. if match[1] == "key" && match[2] == "=" {
  949. targetKey := extractString(match[3])
  950. // 1. Get Entry
  951. entry, ok := e.Index.Get(targetKey)
  952. if !ok {
  953. return []QueryResult{}, nil
  954. }
  955. // 2. Load Value (if needed by other filters, but here we load anyway for result)
  956. val, err := e.Storage.ReadValue(entry.ValueOffset)
  957. if err != nil {
  958. return []QueryResult{}, nil
  959. }
  960. // 3. Verify other conditions
  961. matchAll := true
  962. for _, m := range matches {
  963. field, op, valRaw := m[1], m[2], m[3]
  964. switch field {
  965. case "CommitIndex":
  966. num, _ := strconv.ParseUint(valRaw, 10, 64)
  967. switch op {
  968. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  969. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  970. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  971. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  972. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  973. }
  974. case "value":
  975. t := extractString(valRaw)
  976. switch op {
  977. case "=": if val != t { matchAll = false }
  978. case "like": if !WildcardMatch(val, t) { matchAll = false }
  979. }
  980. }
  981. if !matchAll { break }
  982. }
  983. if matchAll {
  984. return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
  985. }
  986. return []QueryResult{}, nil
  987. }
  988. }
  989. // Optimization: Inverted Index for Value Queries
  990. // Strategy:
  991. // 1. Extract potential tokens from `value like "..."`
  992. // e.g. `value like "*keyword*"` -> token "keyword"
  993. // 2. Look up candidates from FTIndex
  994. // 3. Intersect/Union candidates (if multiple)
  995. // 4. Fallback to Scan if no tokens found or complex query
  996. var candidates map[string]bool
  997. var useFTIndex bool = false
  998. for _, match := range matches {
  999. if match[1] == "value" && match[2] == "like" {
  1000. pattern := extractString(match[3])
  1001. // Extract a core token: remove * from ends
  1002. // Simplistic extraction: find longest sequence of alphanumeric?
  1003. // For now, assume pattern is like "*token*" or "token*"
  1004. clean := strings.Trim(pattern, "*")
  1005. if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
  1006. // We have a candidate token "clean"
  1007. // FTIndex stores partial tokens? No, exact tokens.
  1008. // If query is *partial*, we need FTI scan.
  1009. // Our FTI.Search handles wildcards!
  1010. matches := e.FTIndex.Search(pattern) // Pass original pattern to FTI
  1011. if matches != nil {
  1012. // We found candidates!
  1013. currentSet := make(map[string]bool)
  1014. for _, k := range matches {
  1015. currentSet[k] = true
  1016. }
  1017. if !useFTIndex {
  1018. candidates = currentSet
  1019. useFTIndex = true
  1020. } else {
  1021. // Intersect
  1022. newSet := make(map[string]bool)
  1023. for k := range candidates {
  1024. if currentSet[k] {
  1025. newSet[k] = true
  1026. }
  1027. }
  1028. candidates = newSet
  1029. }
  1030. } else {
  1031. // Pattern produced NO matches -> Empty Result
  1032. return []QueryResult{}, nil
  1033. }
  1034. }
  1035. }
  1036. }
  1037. // Prepare Iterator
  1038. var iterator func(func(string, IndexEntry) bool)
  1039. if useFTIndex {
  1040. // Iterate ONLY candidates
  1041. iterator = func(cb func(string, IndexEntry) bool) {
  1042. // Iterate candidates sorted for deterministic output (and better cache locality?)
  1043. // Sort candidates keys
  1044. keys := make([]string, 0, len(candidates))
  1045. for k := range candidates {
  1046. keys = append(keys, k)
  1047. }
  1048. sort.Strings(keys)
  1049. for _, k := range keys {
  1050. if entry, ok := e.Index.Get(k); ok {
  1051. if !cb(k, entry) {
  1052. return
  1053. }
  1054. }
  1055. }
  1056. }
  1057. } else {
  1058. // Full Scan or Prefix Scan
  1059. var prefix string = ""
  1060. var usePrefix bool = false
  1061. for _, match := range matches {
  1062. if match[1] == "key" && match[2] == "like" {
  1063. pattern := extractString(match[3])
  1064. if strings.HasSuffix(pattern, "*") {
  1065. clean := pattern[:len(pattern)-1]
  1066. if !strings.ContainsAny(clean, "*?") {
  1067. prefix = clean
  1068. usePrefix = true
  1069. break
  1070. }
  1071. }
  1072. }
  1073. }
  1074. iterator = func(cb func(string, IndexEntry) bool) {
  1075. if usePrefix {
  1076. e.Index.WalkPrefix(prefix, cb)
  1077. } else {
  1078. e.Index.WalkPrefix("", cb)
  1079. }
  1080. }
  1081. }
  1082. var results []QueryResult
  1083. var mu sync.Mutex
  1084. // Execution
  1085. iterator(func(key string, entry IndexEntry) bool {
  1086. var valStr string
  1087. var valLoaded bool
  1088. matchAll := true
  1089. for _, match := range matches {
  1090. field, op, valRaw := match[1], match[2], match[3]
  1091. switch field {
  1092. case "CommitIndex":
  1093. num, _ := strconv.ParseUint(valRaw, 10, 64)
  1094. switch op {
  1095. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  1096. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  1097. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  1098. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  1099. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  1100. }
  1101. case "key":
  1102. target := extractString(valRaw)
  1103. switch op {
  1104. case "=": if key != target { matchAll = false }
  1105. case "like": if !WildcardMatch(key, target) { matchAll = false }
  1106. }
  1107. case "value":
  1108. // Optimization: If using FTIndex, we know token matches, but pattern might be complex.
  1109. // However, FTI.Search already filtered by pattern!
  1110. // So if we trusted FTI, we could skip this check IF it was the only value check.
  1111. // But let's be safe and re-check, especially if multiple value conditions exist.
  1112. if !valLoaded {
  1113. v, err := e.Storage.ReadValue(entry.ValueOffset)
  1114. if err != nil { matchAll = false; break }
  1115. valStr = v
  1116. valLoaded = true
  1117. }
  1118. target := extractString(valRaw)
  1119. switch op {
  1120. case "=": if valStr != target { matchAll = false }
  1121. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  1122. }
  1123. }
  1124. if !matchAll { break }
  1125. }
  1126. if matchAll {
  1127. if !valLoaded {
  1128. v, err := e.Storage.ReadValue(entry.ValueOffset)
  1129. if err == nil { valStr = v }
  1130. }
  1131. mu.Lock()
  1132. results = append(results, QueryResult{
  1133. Key: key,
  1134. Value: valStr,
  1135. CommitIndex: entry.CommitIndex,
  1136. })
  1137. // Optimization: Early termination for LIMIT
  1138. // Current Logic: Only optimizes if offset == 0
  1139. // Improvement: Optimize for Offset too.
  1140. // If we have collected enough results (Limit + Offset), we can stop.
  1141. // Radix Walk is ordered.
  1142. needed := limit + offset
  1143. if limit > 0 && len(results) >= needed {
  1144. mu.Unlock()
  1145. return false
  1146. }
  1147. mu.Unlock()
  1148. }
  1149. return true
  1150. })
  1151. // Pagination
  1152. if offset > 0 {
  1153. if offset >= len(results) {
  1154. return []QueryResult{}, nil
  1155. }
  1156. results = results[offset:]
  1157. }
  1158. if limit >= 0 {
  1159. if limit < len(results) {
  1160. results = results[:limit]
  1161. }
  1162. }
  1163. return results, nil
  1164. }
  1165. func (e *Engine) Snapshot() ([]byte, error) {
  1166. // Not implemented for Radix Tree yet in this demo
  1167. return nil, nil
  1168. }
  1169. func (e *Engine) Restore(data []byte) error {
  1170. return nil
  1171. }
  1172. // DebugClearAuxiliary clears caches and inverted index to measure core index memory usage.
  1173. // For internal benchmark use only.
  1174. func (e *Engine) DebugClearAuxiliary() {
  1175. // Clear Cache
  1176. e.Storage.cacheMu.Lock()
  1177. e.Storage.cache = make(map[int64]string)
  1178. e.Storage.cacheMu.Unlock()
  1179. // Clear FTIndex
  1180. e.FTIndex.mu.Lock()
  1181. e.FTIndex.index = make(map[string][]string)
  1182. e.FTIndex.mu.Unlock()
  1183. }