engine.go 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202
  1. package db
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "fmt"
  6. "hash/crc32"
  7. "io"
  8. "os"
  9. "regexp"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. )
  16. // hash returns a 32-bit FNV-1a hash of the string
  17. func hash(s string) uint32 {
  18. var h uint32 = 2166136261
  19. for i := 0; i < len(s); i++ {
  20. h = (h * 16777619) ^ uint32(s[i])
  21. }
  22. return h
  23. }
  24. // --- Flat Sorted Array Index Implementation ---
  25. // Extreme memory optimization:
  26. // 1. Removes all tree node overhead (pointers, structs, map buckets).
  27. // 2. Stores keys in a single monolithic []byte buffer (no string headers).
  28. // 3. Uses a sorted slice of offsets for O(log N) lookup and O(N) insertion.
  29. // IndexEntry
  30. type IndexEntry struct {
  31. ValueOffset int64
  32. CommitIndex uint64
  33. }
  34. type FlatIndex struct {
  35. // Monolithic buffer to store all key strings
  36. // Format: [key1 bytes][key2 bytes]...
  37. keyBuf []byte
  38. // Sorted list of index items
  39. items []flatItem
  40. mu sync.RWMutex
  41. }
  42. // flatItem is a compact struct (16 bytes aligned)
  43. type flatItem struct {
  44. keyOffset uint32 // 4 bytes: Offset into keyBuf (supports 4GB total key size)
  45. keyLen uint16 // 2 bytes: Length of key (max key len 65535)
  46. // Padding 2 bytes? No, Go struct alignment might add padding.
  47. // IndexEntry is 16 bytes (int64 + uint64).
  48. entry IndexEntry // 16 bytes
  49. }
  50. // Total per item: 4 + 2 + (2 padding) + 16 = 24 bytes per Key.
  51. // 100k keys => 2.4 MB.
  52. // Plus keyBuf: 100k * 18 bytes = 1.8 MB.
  53. // Total expected: ~4.2 MB.
  54. func NewFlatIndex() *FlatIndex {
  55. return &FlatIndex{
  56. keyBuf: make([]byte, 0, 1024*1024), // 1MB initial cap
  57. items: make([]flatItem, 0, 10000),
  58. }
  59. }
  60. // getKey unsafe-ish helper to get string from buffer without alloc?
  61. // Standard conversion string(b) allocates.
  62. // For comparison in binary search, we ideally want to avoid allocation.
  63. // But Go 1.20+ optimization might handle string(bytes) well in map keys or comparisons if they don't escape.
  64. // Let's rely on standard string() conversion for safety first.
  65. func (fi *FlatIndex) getKey(idx int) string {
  66. item := fi.items[idx]
  67. return string(fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)])
  68. }
  69. // Compare helper to avoid string allocation
  70. func (fi *FlatIndex) compare(idx int, target string) int {
  71. item := fi.items[idx]
  72. keyBytes := fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)]
  73. return strings.Compare(string(keyBytes), target)
  74. }
  75. func (fi *FlatIndex) Insert(key string, entry IndexEntry) {
  76. fi.mu.Lock()
  77. defer fi.mu.Unlock()
  78. // 1. Binary Search
  79. idx := sort.Search(len(fi.items), func(i int) bool {
  80. return fi.getKey(i) >= key
  81. })
  82. // 2. Update existing
  83. if idx < len(fi.items) && fi.getKey(idx) == key {
  84. fi.items[idx].entry = entry
  85. return
  86. }
  87. // 3. Add new key
  88. offset := len(fi.keyBuf)
  89. fi.keyBuf = append(fi.keyBuf, key...)
  90. if len(key) > 65535 {
  91. // Should have been caught earlier, but just in case cap it or panic
  92. // We can panic or truncate.
  93. // panic("key too long")
  94. }
  95. newItem := flatItem{
  96. keyOffset: uint32(offset),
  97. keyLen: uint16(len(key)),
  98. entry: entry,
  99. }
  100. // 4. Insert into sorted slice
  101. // Optimization: check if appending to end (common case for sequential inserts)
  102. if idx == len(fi.items) {
  103. fi.items = append(fi.items, newItem)
  104. } else {
  105. fi.items = append(fi.items, flatItem{})
  106. copy(fi.items[idx+1:], fi.items[idx:])
  107. fi.items[idx] = newItem
  108. }
  109. }
  110. func (fi *FlatIndex) Get(key string) (IndexEntry, bool) {
  111. fi.mu.RLock()
  112. defer fi.mu.RUnlock()
  113. idx := sort.Search(len(fi.items), func(i int) bool {
  114. return fi.getKey(i) >= key
  115. })
  116. if idx < len(fi.items) && fi.getKey(idx) == key {
  117. return fi.items[idx].entry, true
  118. }
  119. return IndexEntry{}, false
  120. }
  121. func (fi *FlatIndex) Delete(key string) bool {
  122. fi.mu.Lock()
  123. defer fi.mu.Unlock()
  124. idx := sort.Search(len(fi.items), func(i int) bool {
  125. return fi.getKey(i) >= key
  126. })
  127. if idx < len(fi.items) && fi.getKey(idx) == key {
  128. // Delete from slice
  129. fi.items = append(fi.items[:idx], fi.items[idx+1:]...)
  130. // Note: We don't reclaim space in keyBuf. It's append-only until compaction (not implemented).
  131. // This is fine for many cases, but heavy deletes might waste keyBuf space.
  132. return true
  133. }
  134. return false
  135. }
  136. // WalkPrefix iterates over keys starting with prefix.
  137. // Since keys are sorted, we find the first match and iterate until mismatch.
  138. func (fi *FlatIndex) WalkPrefix(prefix string, callback func(key string, entry IndexEntry) bool) {
  139. fi.mu.RLock()
  140. defer fi.mu.RUnlock()
  141. // Binary search for the start
  142. idx := sort.Search(len(fi.items), func(i int) bool {
  143. return fi.getKey(i) >= prefix
  144. })
  145. for i := idx; i < len(fi.items); i++ {
  146. key := fi.getKey(i)
  147. // Optimization: Check prefix match
  148. if !strings.HasPrefix(key, prefix) {
  149. break
  150. }
  151. if !callback(key, fi.items[i].entry) {
  152. return
  153. }
  154. }
  155. }
  156. // --- End Flat Index ---
  157. // --- Full Text Index ---
  158. type FullTextIndex struct {
  159. // Token -> Set of Keys
  160. index map[string]map[string]bool
  161. mu sync.RWMutex
  162. }
  163. func NewFullTextIndex() *FullTextIndex {
  164. return &FullTextIndex{
  165. index: make(map[string]map[string]bool),
  166. }
  167. }
  168. func (fti *FullTextIndex) Add(key string, value string) {
  169. fti.mu.Lock()
  170. defer fti.mu.Unlock()
  171. tokens := tokenize(value)
  172. for _, token := range tokens {
  173. if fti.index[token] == nil {
  174. fti.index[token] = make(map[string]bool)
  175. }
  176. fti.index[token][key] = true
  177. }
  178. }
  179. func (fti *FullTextIndex) Remove(key string, value string) {
  180. fti.mu.Lock()
  181. defer fti.mu.Unlock()
  182. tokens := tokenize(value)
  183. for _, token := range tokens {
  184. if set, ok := fti.index[token]; ok {
  185. delete(set, key)
  186. if len(set) == 0 {
  187. delete(fti.index, token)
  188. }
  189. }
  190. }
  191. }
  192. func (fti *FullTextIndex) Search(tokenPattern string) []string {
  193. fti.mu.RLock()
  194. defer fti.mu.RUnlock()
  195. // 1. Exact Match
  196. if !strings.Contains(tokenPattern, "*") {
  197. if keys, ok := fti.index[tokenPattern]; ok {
  198. res := make([]string, 0, len(keys))
  199. for k := range keys {
  200. res = append(res, k)
  201. }
  202. return res
  203. }
  204. return nil
  205. }
  206. // 2. Wildcard Scan
  207. var results []string
  208. seen := make(map[string]bool)
  209. for token, keys := range fti.index {
  210. if WildcardMatch(token, tokenPattern) {
  211. for k := range keys {
  212. if !seen[k] {
  213. results = append(results, k)
  214. seen[k] = true
  215. }
  216. }
  217. }
  218. }
  219. return results
  220. }
  221. func tokenize(val string) []string {
  222. f := func(c rune) bool {
  223. return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
  224. }
  225. return strings.FieldsFunc(val, f)
  226. }
  227. // --- Storage & Cache ---
  228. // StripedLock provides hashed locks for keys
  229. type StripedLock struct {
  230. locks [1024]sync.RWMutex
  231. }
  232. func (sl *StripedLock) GetLock(key string) *sync.RWMutex {
  233. h := hash(key)
  234. return &sl.locks[h%1024]
  235. }
  236. // Storage manages disk storage using Append-Only Log.
  237. type Storage struct {
  238. file *os.File
  239. filename string
  240. offset int64 // Current end of file
  241. cache map[int64]string
  242. cacheMu sync.RWMutex
  243. }
  244. const (
  245. RecordTypePut = 0x01
  246. RecordTypeDelete = 0x02
  247. // CRC(4) + Type(1) + KeyLen(2) + ValLen(4) + CommitIndex(8)
  248. HeaderSize = 4 + 1 + 2 + 4 + 8
  249. MaxCacheSize = 10000
  250. )
  251. func NewStorage(filename string) (*Storage, error) {
  252. f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
  253. if err != nil {
  254. return nil, err
  255. }
  256. st := &Storage{
  257. file: f,
  258. filename: filename,
  259. cache: make(map[int64]string),
  260. }
  261. return st, nil
  262. }
  263. // Record represents a log entry
  264. type Record struct {
  265. Type byte
  266. Key string
  267. Value string
  268. Offset int64
  269. CommitIndex uint64
  270. }
  271. // Scan iterates over the log file, validating CRCs and returning records.
  272. // It returns the valid offset after the last record.
  273. func (s *Storage) Scan(callback func(Record)) (int64, error) {
  274. offset := int64(0)
  275. if _, err := s.file.Seek(0, 0); err != nil {
  276. return 0, err
  277. }
  278. reader := bufio.NewReader(s.file)
  279. for {
  280. header := make([]byte, HeaderSize)
  281. n, err := io.ReadFull(reader, header)
  282. if err == io.EOF {
  283. break
  284. }
  285. if err == io.ErrUnexpectedEOF && n == 0 {
  286. break
  287. }
  288. if err != nil {
  289. fmt.Printf("Storage Scan Error at %d: %v\n", offset, err)
  290. break
  291. }
  292. storedCRC := binary.LittleEndian.Uint32(header[0:4])
  293. recType := header[4]
  294. keyLen := binary.LittleEndian.Uint16(header[5:7])
  295. valLen := binary.LittleEndian.Uint32(header[7:11])
  296. commitIndex := binary.LittleEndian.Uint64(header[11:19])
  297. totalLen := int(keyLen) + int(valLen)
  298. data := make([]byte, totalLen)
  299. if _, err := io.ReadFull(reader, data); err != nil {
  300. fmt.Printf("Storage Scan Data Error at %d: %v\n", offset, err)
  301. break
  302. }
  303. // Verify CRC
  304. crc := crc32.ChecksumIEEE(header[4:]) // Checksum of Type+Lengths+CommitIndex
  305. crc = crc32.Update(crc, crc32.IEEETable, data) // + Data
  306. if crc != storedCRC {
  307. fmt.Printf("CRC Mismatch at %d\n", offset)
  308. break
  309. }
  310. key := string(data[:keyLen])
  311. val := string(data[keyLen:])
  312. callback(Record{
  313. Type: recType,
  314. Key: key,
  315. Value: val,
  316. Offset: offset,
  317. CommitIndex: commitIndex,
  318. })
  319. offset += int64(HeaderSize + totalLen)
  320. }
  321. // Update storage offset
  322. s.offset = offset
  323. s.file.Seek(offset, 0)
  324. return offset, nil
  325. }
  326. // Append writes a new record
  327. func (s *Storage) Append(key, val string, recType byte, commitIndex uint64) (int64, error) {
  328. keyBytes := []byte(key)
  329. valBytes := []byte(val)
  330. keyLen := len(keyBytes)
  331. valLen := len(valBytes)
  332. if keyLen > 65535 {
  333. return 0, fmt.Errorf("key too large")
  334. }
  335. buf := make([]byte, HeaderSize+keyLen+valLen)
  336. // 1. Build Header Data (skip CRC)
  337. buf[4] = recType
  338. binary.LittleEndian.PutUint16(buf[5:7], uint16(keyLen))
  339. binary.LittleEndian.PutUint32(buf[7:11], uint32(valLen))
  340. binary.LittleEndian.PutUint64(buf[11:19], commitIndex)
  341. // 2. Copy Data
  342. copy(buf[HeaderSize:], keyBytes)
  343. copy(buf[HeaderSize+keyLen:], valBytes)
  344. // 3. Calc CRC
  345. crc := crc32.ChecksumIEEE(buf[4:]) // Everything after CRC field
  346. binary.LittleEndian.PutUint32(buf[0:4], crc)
  347. // 4. Write
  348. totalSize := int64(len(buf))
  349. writeOffset := atomic.AddInt64(&s.offset, totalSize) - totalSize
  350. if _, err := s.file.WriteAt(buf, writeOffset); err != nil {
  351. return 0, err
  352. }
  353. // Add to Cache if Put
  354. if recType == RecordTypePut {
  355. s.cacheMu.Lock()
  356. if len(s.cache) < MaxCacheSize {
  357. s.cache[writeOffset] = val
  358. }
  359. s.cacheMu.Unlock()
  360. }
  361. return writeOffset, nil
  362. }
  363. // Sync flushes writes to stable storage
  364. func (s *Storage) Sync() error {
  365. return s.file.Sync()
  366. }
  367. // ReadValue reads value at offset
  368. func (s *Storage) ReadValue(offset int64) (string, error) {
  369. // 1. Check Cache
  370. s.cacheMu.RLock()
  371. if val, ok := s.cache[offset]; ok {
  372. s.cacheMu.RUnlock()
  373. return val, nil
  374. }
  375. s.cacheMu.RUnlock()
  376. // 2. Read Header
  377. header := make([]byte, HeaderSize)
  378. if _, err := s.file.ReadAt(header, offset); err != nil {
  379. return "", err
  380. }
  381. keyLen := binary.LittleEndian.Uint16(header[5:7])
  382. valLen := binary.LittleEndian.Uint32(header[7:11])
  383. totalLen := int(keyLen) + int(valLen)
  384. data := make([]byte, totalLen)
  385. if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
  386. return "", err
  387. }
  388. // Verify CRC on read for safety
  389. storedCRC := binary.LittleEndian.Uint32(header[0:4])
  390. crc := crc32.ChecksumIEEE(header[4:])
  391. crc = crc32.Update(crc, crc32.IEEETable, data)
  392. if crc != storedCRC {
  393. return "", fmt.Errorf("data corruption detected at offset %d", offset)
  394. }
  395. val := string(data[keyLen:])
  396. // 3. Fill Cache
  397. s.cacheMu.Lock()
  398. if len(s.cache) < MaxCacheSize {
  399. s.cache[offset] = val
  400. } else {
  401. for k := range s.cache {
  402. delete(s.cache, k)
  403. break
  404. }
  405. s.cache[offset] = val
  406. }
  407. s.cacheMu.Unlock()
  408. return val, nil
  409. }
  410. func (s *Storage) Close() error {
  411. return s.file.Close()
  412. }
  413. // EngineOption defines a configuration option for the Engine
  414. type EngineOption func(*EngineConfig)
  415. type EngineConfig struct {
  416. EnableValueIndex bool
  417. }
  418. // WithValueIndex enables or disables the value index (Full Text Index)
  419. func WithValueIndex(enable bool) EngineOption {
  420. return func(c *EngineConfig) {
  421. c.EnableValueIndex = enable
  422. }
  423. }
  424. // Engine is the core storage engine.
  425. type Engine struct {
  426. Index *FlatIndex // Flattened memory structure
  427. Storage *Storage
  428. FTIndex *FullTextIndex // Can be nil if disabled
  429. KeyLocks StripedLock
  430. LastCommitIndex uint64
  431. commitMu sync.Mutex
  432. dataDir string
  433. writeMu sync.Mutex
  434. config EngineConfig
  435. }
  436. func NewEngine(dataDir string, opts ...EngineOption) (*Engine, error) {
  437. if err := os.MkdirAll(dataDir, 0755); err != nil {
  438. return nil, err
  439. }
  440. config := EngineConfig{
  441. EnableValueIndex: true, // Default to true
  442. }
  443. for _, opt := range opts {
  444. opt(&config)
  445. }
  446. store, err := NewStorage(dataDir + "/values.data")
  447. if err != nil {
  448. return nil, err
  449. }
  450. e := &Engine{
  451. Index: NewFlatIndex(),
  452. Storage: store,
  453. FTIndex: nil,
  454. dataDir: dataDir,
  455. config: config,
  456. }
  457. if config.EnableValueIndex {
  458. e.FTIndex = NewFullTextIndex()
  459. }
  460. // Rebuild Index from Disk
  461. _, err = store.Scan(func(rec Record) {
  462. if rec.Type == RecordTypePut {
  463. e.Index.Insert(rec.Key, IndexEntry{
  464. ValueOffset: rec.Offset,
  465. CommitIndex: rec.CommitIndex,
  466. })
  467. if e.FTIndex != nil {
  468. e.FTIndex.Add(rec.Key, rec.Value)
  469. }
  470. // Update LastCommitIndex
  471. if rec.CommitIndex > e.LastCommitIndex {
  472. e.LastCommitIndex = rec.CommitIndex
  473. }
  474. } else if rec.Type == RecordTypeDelete {
  475. // Cleanup FTIndex using old value if possible
  476. e.removeValueFromFTIndex(rec.Key)
  477. e.Index.Delete(rec.Key)
  478. if rec.CommitIndex > e.LastCommitIndex {
  479. e.LastCommitIndex = rec.CommitIndex
  480. }
  481. }
  482. })
  483. return e, nil
  484. }
  485. func (e *Engine) removeValueFromFTIndex(key string) {
  486. if e.FTIndex == nil {
  487. return
  488. }
  489. if entry, ok := e.Index.Get(key); ok {
  490. val, err := e.Storage.ReadValue(entry.ValueOffset)
  491. if err == nil {
  492. e.FTIndex.Remove(key, val)
  493. }
  494. }
  495. }
  496. func (e *Engine) Close() error {
  497. return e.Storage.Close()
  498. }
  499. func (e *Engine) Set(key, value string, commitIndex uint64) error {
  500. e.writeMu.Lock()
  501. defer e.writeMu.Unlock()
  502. e.removeValueFromFTIndex(key)
  503. offset, err := e.Storage.Append(key, value, RecordTypePut, commitIndex)
  504. if err != nil {
  505. return err
  506. }
  507. e.Index.Insert(key, IndexEntry{
  508. ValueOffset: offset,
  509. CommitIndex: commitIndex,
  510. })
  511. if e.FTIndex != nil {
  512. e.FTIndex.Add(key, value)
  513. }
  514. e.commitMu.Lock()
  515. if commitIndex > e.LastCommitIndex {
  516. e.LastCommitIndex = commitIndex
  517. }
  518. e.commitMu.Unlock()
  519. return nil
  520. }
  521. func (e *Engine) Delete(key string, commitIndex uint64) error {
  522. e.writeMu.Lock()
  523. defer e.writeMu.Unlock()
  524. e.removeValueFromFTIndex(key)
  525. _, err := e.Storage.Append(key, "", RecordTypeDelete, commitIndex)
  526. if err != nil {
  527. return err
  528. }
  529. e.Index.Delete(key)
  530. e.commitMu.Lock()
  531. if commitIndex > e.LastCommitIndex {
  532. e.LastCommitIndex = commitIndex
  533. }
  534. e.commitMu.Unlock()
  535. return nil
  536. }
  537. // Sync flushes underlying storage to disk
  538. func (e *Engine) Sync() error {
  539. return e.Storage.Sync()
  540. }
  541. func (e *Engine) Get(key string) (string, bool) {
  542. entry, ok := e.Index.Get(key)
  543. if !ok {
  544. return "", false
  545. }
  546. val, err := e.Storage.ReadValue(entry.ValueOffset)
  547. if err != nil {
  548. return "", false
  549. }
  550. return val, true
  551. }
  552. type QueryResult struct {
  553. Key string `json:"key"`
  554. Value string `json:"value"`
  555. CommitIndex uint64 `json:"commit_index"`
  556. }
  557. func WildcardMatch(str, pattern string) bool {
  558. s := []rune(str)
  559. p := []rune(pattern)
  560. sLen, pLen := len(s), len(p)
  561. i, j := 0, 0
  562. starIdx, matchIdx := -1, -1
  563. for i < sLen {
  564. if j < pLen && (p[j] == '?' || p[j] == s[i]) {
  565. i++
  566. j++
  567. } else if j < pLen && p[j] == '*' {
  568. starIdx = j
  569. matchIdx = i
  570. j++
  571. } else if starIdx != -1 {
  572. j = starIdx + 1
  573. matchIdx++
  574. i = matchIdx
  575. } else {
  576. return false
  577. }
  578. }
  579. for j < pLen && p[j] == '*' {
  580. j++
  581. }
  582. return j == pLen
  583. }
  584. func (e *Engine) Count(sql string) (int, error) {
  585. return e.execute(sql, true)
  586. }
  587. func (e *Engine) execute(sql string, countOnly bool) (int, error) {
  588. sql = strings.TrimSpace(sql)
  589. reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
  590. reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
  591. limit := -1
  592. offset := 0
  593. if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
  594. limit, _ = strconv.Atoi(match[1])
  595. sql = reLimit.ReplaceAllString(sql, "")
  596. }
  597. if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
  598. offset, _ = strconv.Atoi(match[1])
  599. sql = reOffset.ReplaceAllString(sql, "")
  600. }
  601. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  602. matches := re.FindAllStringSubmatch(sql, -1)
  603. if len(matches) == 0 {
  604. return 0, fmt.Errorf("invalid query")
  605. }
  606. extractString := func(s string) string {
  607. return strings.Trim(strings.TrimSpace(s), "\"")
  608. }
  609. // 1. Optimize for Point Lookup
  610. for _, match := range matches {
  611. if match[1] == "key" && match[2] == "=" {
  612. targetKey := extractString(match[3])
  613. entry, ok := e.Index.Get(targetKey)
  614. if !ok {
  615. return 0, nil
  616. }
  617. needValue := false
  618. for _, m := range matches {
  619. if m[1] == "value" { needValue = true; break }
  620. }
  621. var val string
  622. if needValue {
  623. v, err := e.Storage.ReadValue(entry.ValueOffset)
  624. if err != nil { return 0, nil }
  625. val = v
  626. }
  627. matchAll := true
  628. for _, m := range matches {
  629. field, op, valRaw := m[1], m[2], m[3]
  630. switch field {
  631. case "CommitIndex":
  632. num, _ := strconv.ParseUint(valRaw, 10, 64)
  633. switch op {
  634. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  635. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  636. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  637. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  638. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  639. }
  640. case "value":
  641. t := extractString(valRaw)
  642. switch op {
  643. case "=": if val != t { matchAll = false }
  644. case "like": if !WildcardMatch(val, t) { matchAll = false }
  645. }
  646. }
  647. if !matchAll { break }
  648. }
  649. if matchAll {
  650. return 1, nil
  651. }
  652. return 0, nil
  653. }
  654. }
  655. var candidates map[string]bool
  656. var useFTIndex bool = false
  657. // 2. Try to use FT Index (if enabled)
  658. if e.FTIndex != nil {
  659. for _, match := range matches {
  660. if match[1] == "value" && match[2] == "like" {
  661. pattern := extractString(match[3])
  662. clean := strings.Trim(pattern, "*")
  663. if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
  664. matches := e.FTIndex.Search(pattern)
  665. if matches != nil {
  666. currentSet := make(map[string]bool)
  667. for _, k := range matches {
  668. currentSet[k] = true
  669. }
  670. if !useFTIndex {
  671. candidates = currentSet
  672. useFTIndex = true
  673. } else {
  674. newSet := make(map[string]bool)
  675. for k := range candidates {
  676. if currentSet[k] { newSet[k] = true }
  677. }
  678. candidates = newSet
  679. }
  680. } else {
  681. return 0, nil
  682. }
  683. }
  684. }
  685. }
  686. } else {
  687. for _, match := range matches {
  688. if match[1] == "value" && match[2] == "like" {
  689. // Value search requested but index disabled -> return 0
  690. return 0, nil
  691. }
  692. }
  693. }
  694. var iterator func(func(string, IndexEntry) bool)
  695. if useFTIndex {
  696. iterator = func(cb func(string, IndexEntry) bool) {
  697. keys := make([]string, 0, len(candidates))
  698. for k := range candidates { keys = append(keys, k) }
  699. sort.Strings(keys)
  700. for _, k := range keys {
  701. if entry, ok := e.Index.Get(k); ok {
  702. if !cb(k, entry) { return }
  703. }
  704. }
  705. }
  706. } else {
  707. // Use FlatIndex Prefix Search
  708. var prefix string = ""
  709. var usePrefix bool = false
  710. for _, match := range matches {
  711. if match[1] == "key" && match[2] == "like" {
  712. pattern := extractString(match[3])
  713. // Flat Index supports simple prefix match
  714. if strings.HasSuffix(pattern, "*") {
  715. clean := pattern[:len(pattern)-1]
  716. if !strings.ContainsAny(clean, "*?") {
  717. prefix = clean
  718. usePrefix = true
  719. break
  720. }
  721. }
  722. }
  723. }
  724. iterator = func(cb func(string, IndexEntry) bool) {
  725. if usePrefix {
  726. e.Index.WalkPrefix(prefix, cb)
  727. } else {
  728. e.Index.WalkPrefix("", cb)
  729. }
  730. }
  731. }
  732. matchedCount := 0
  733. iterator(func(key string, entry IndexEntry) bool {
  734. var valStr string
  735. var valLoaded bool
  736. matchAll := true
  737. for _, match := range matches {
  738. field, op, valRaw := match[1], match[2], match[3]
  739. switch field {
  740. case "CommitIndex":
  741. num, _ := strconv.ParseUint(valRaw, 10, 64)
  742. switch op {
  743. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  744. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  745. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  746. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  747. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  748. }
  749. case "key":
  750. target := extractString(valRaw)
  751. switch op {
  752. case "=": if key != target { matchAll = false }
  753. case "like": if !WildcardMatch(key, target) { matchAll = false }
  754. }
  755. case "value":
  756. if e.FTIndex == nil && op == "like" {
  757. matchAll = false
  758. break
  759. }
  760. if !valLoaded {
  761. v, err := e.Storage.ReadValue(entry.ValueOffset)
  762. if err != nil { matchAll = false; break }
  763. valStr = v
  764. valLoaded = true
  765. }
  766. target := extractString(valRaw)
  767. switch op {
  768. case "=": if valStr != target { matchAll = false }
  769. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  770. }
  771. }
  772. if !matchAll { break }
  773. }
  774. if matchAll {
  775. matchedCount++
  776. if limit > 0 && offset == 0 && matchedCount >= limit {
  777. return false
  778. }
  779. }
  780. return true
  781. })
  782. if offset > 0 {
  783. if offset >= matchedCount {
  784. return 0, nil
  785. }
  786. matchedCount -= offset
  787. }
  788. if limit >= 0 {
  789. if limit < matchedCount {
  790. matchedCount = limit
  791. }
  792. }
  793. return matchedCount, nil
  794. }
  795. func (e *Engine) Query(sql string) ([]QueryResult, error) {
  796. return e.queryInternal(sql)
  797. }
  798. func (e *Engine) queryInternal(sql string) ([]QueryResult, error) {
  799. sql = strings.TrimSpace(sql)
  800. reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
  801. reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
  802. limit := -1
  803. offset := 0
  804. if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
  805. limit, _ = strconv.Atoi(match[1])
  806. sql = reLimit.ReplaceAllString(sql, "")
  807. }
  808. if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
  809. offset, _ = strconv.Atoi(match[1])
  810. sql = reOffset.ReplaceAllString(sql, "")
  811. }
  812. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  813. matches := re.FindAllStringSubmatch(sql, -1)
  814. if len(matches) == 0 {
  815. return nil, fmt.Errorf("invalid query")
  816. }
  817. extractString := func(s string) string {
  818. return strings.Trim(strings.TrimSpace(s), "\"")
  819. }
  820. for _, match := range matches {
  821. if match[1] == "key" && match[2] == "=" {
  822. targetKey := extractString(match[3])
  823. entry, ok := e.Index.Get(targetKey)
  824. if !ok {
  825. return []QueryResult{}, nil
  826. }
  827. val, err := e.Storage.ReadValue(entry.ValueOffset)
  828. if err != nil {
  829. return []QueryResult{}, nil
  830. }
  831. matchAll := true
  832. for _, m := range matches {
  833. field, op, valRaw := m[1], m[2], m[3]
  834. switch field {
  835. case "CommitIndex":
  836. num, _ := strconv.ParseUint(valRaw, 10, 64)
  837. switch op {
  838. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  839. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  840. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  841. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  842. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  843. }
  844. case "value":
  845. t := extractString(valRaw)
  846. switch op {
  847. case "=": if val != t { matchAll = false }
  848. case "like": if !WildcardMatch(val, t) { matchAll = false }
  849. }
  850. }
  851. if !matchAll { break }
  852. }
  853. if matchAll {
  854. return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
  855. }
  856. return []QueryResult{}, nil
  857. }
  858. }
  859. var candidates map[string]bool
  860. var useFTIndex bool = false
  861. if e.FTIndex != nil {
  862. for _, match := range matches {
  863. if match[1] == "value" && match[2] == "like" {
  864. pattern := extractString(match[3])
  865. clean := strings.Trim(pattern, "*")
  866. if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
  867. matches := e.FTIndex.Search(pattern)
  868. if matches != nil {
  869. currentSet := make(map[string]bool)
  870. for _, k := range matches {
  871. currentSet[k] = true
  872. }
  873. if !useFTIndex {
  874. candidates = currentSet
  875. useFTIndex = true
  876. } else {
  877. newSet := make(map[string]bool)
  878. for k := range candidates {
  879. if currentSet[k] {
  880. newSet[k] = true
  881. }
  882. }
  883. candidates = newSet
  884. }
  885. } else {
  886. return []QueryResult{}, nil
  887. }
  888. }
  889. }
  890. }
  891. } else {
  892. for _, match := range matches {
  893. if match[1] == "value" && match[2] == "like" {
  894. return []QueryResult{}, nil
  895. }
  896. }
  897. }
  898. var iterator func(func(string, IndexEntry) bool)
  899. if useFTIndex {
  900. iterator = func(cb func(string, IndexEntry) bool) {
  901. keys := make([]string, 0, len(candidates))
  902. for k := range candidates {
  903. keys = append(keys, k)
  904. }
  905. sort.Strings(keys)
  906. for _, k := range keys {
  907. if entry, ok := e.Index.Get(k); ok {
  908. if !cb(k, entry) {
  909. return
  910. }
  911. }
  912. }
  913. }
  914. } else {
  915. var prefix string = ""
  916. var usePrefix bool = false
  917. for _, match := range matches {
  918. if match[1] == "key" && match[2] == "like" {
  919. pattern := extractString(match[3])
  920. if strings.HasSuffix(pattern, "*") {
  921. clean := pattern[:len(pattern)-1]
  922. if !strings.ContainsAny(clean, "*?") {
  923. prefix = clean
  924. usePrefix = true
  925. break
  926. }
  927. }
  928. }
  929. }
  930. iterator = func(cb func(string, IndexEntry) bool) {
  931. if usePrefix {
  932. e.Index.WalkPrefix(prefix, cb)
  933. } else {
  934. e.Index.WalkPrefix("", cb)
  935. }
  936. }
  937. }
  938. var results []QueryResult
  939. iterator(func(key string, entry IndexEntry) bool {
  940. var valStr string
  941. var valLoaded bool
  942. matchAll := true
  943. for _, match := range matches {
  944. field, op, valRaw := match[1], match[2], match[3]
  945. switch field {
  946. case "CommitIndex":
  947. num, _ := strconv.ParseUint(valRaw, 10, 64)
  948. switch op {
  949. case ">": if !(entry.CommitIndex > num) { matchAll = false }
  950. case "<": if !(entry.CommitIndex < num) { matchAll = false }
  951. case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
  952. case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
  953. case "=": if !(entry.CommitIndex == num) { matchAll = false }
  954. }
  955. case "key":
  956. target := extractString(valRaw)
  957. switch op {
  958. case "=": if key != target { matchAll = false }
  959. case "like": if !WildcardMatch(key, target) { matchAll = false }
  960. }
  961. case "value":
  962. if e.FTIndex == nil && op == "like" {
  963. matchAll = false
  964. break
  965. }
  966. if !valLoaded {
  967. v, err := e.Storage.ReadValue(entry.ValueOffset)
  968. if err != nil { matchAll = false; break }
  969. valStr = v
  970. valLoaded = true
  971. }
  972. target := extractString(valRaw)
  973. switch op {
  974. case "=": if valStr != target { matchAll = false }
  975. case "like": if !WildcardMatch(valStr, target) { matchAll = false }
  976. }
  977. }
  978. if !matchAll { break }
  979. }
  980. if matchAll {
  981. if !valLoaded {
  982. v, err := e.Storage.ReadValue(entry.ValueOffset)
  983. if err == nil { valStr = v }
  984. }
  985. results = append(results, QueryResult{
  986. Key: key,
  987. Value: valStr,
  988. CommitIndex: entry.CommitIndex,
  989. })
  990. needed := limit + offset
  991. if limit > 0 && len(results) >= needed {
  992. return false
  993. }
  994. }
  995. return true
  996. })
  997. if offset > 0 {
  998. if offset >= len(results) {
  999. return []QueryResult{}, nil
  1000. }
  1001. results = results[offset:]
  1002. }
  1003. if limit >= 0 {
  1004. if limit < len(results) {
  1005. results = results[:limit]
  1006. }
  1007. }
  1008. return results, nil
  1009. }
  1010. func (e *Engine) Snapshot() ([]byte, error) {
  1011. // Not implemented for Flat Index yet
  1012. return nil, nil
  1013. }
  1014. func (e *Engine) Restore(data []byte) error {
  1015. return nil
  1016. }
  1017. func (e *Engine) DebugClearAuxiliary() {
  1018. e.Storage.cacheMu.Lock()
  1019. e.Storage.cache = make(map[int64]string)
  1020. e.Storage.cacheMu.Unlock()
  1021. if e.FTIndex != nil {
  1022. e.FTIndex.mu.Lock()
  1023. e.FTIndex.index = make(map[string]map[string]bool)
  1024. e.FTIndex.mu.Unlock()
  1025. }
  1026. }