| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202 |
- package db
- import (
- "bufio"
- "encoding/binary"
- "fmt"
- "hash/crc32"
- "io"
- "os"
- "regexp"
- "sort"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- )
- // hash returns a 32-bit FNV-1a hash of the string
- func hash(s string) uint32 {
- var h uint32 = 2166136261
- for i := 0; i < len(s); i++ {
- h = (h * 16777619) ^ uint32(s[i])
- }
- return h
- }
- // --- Flat Sorted Array Index Implementation ---
- // Extreme memory optimization:
- // 1. Removes all tree node overhead (pointers, structs, map buckets).
- // 2. Stores keys in a single monolithic []byte buffer (no string headers).
- // 3. Uses a sorted slice of offsets for O(log N) lookup and O(N) insertion.
- // IndexEntry
- type IndexEntry struct {
- ValueOffset int64
- CommitIndex uint64
- }
- type FlatIndex struct {
- // Monolithic buffer to store all key strings
- // Format: [key1 bytes][key2 bytes]...
- keyBuf []byte
-
- // Sorted list of index items
- items []flatItem
- mu sync.RWMutex
- }
- // flatItem is a compact struct (16 bytes aligned)
- type flatItem struct {
- keyOffset uint32 // 4 bytes: Offset into keyBuf (supports 4GB total key size)
- keyLen uint16 // 2 bytes: Length of key (max key len 65535)
- // Padding 2 bytes? No, Go struct alignment might add padding.
- // IndexEntry is 16 bytes (int64 + uint64).
- entry IndexEntry // 16 bytes
- }
- // Total per item: 4 + 2 + (2 padding) + 16 = 24 bytes per Key.
- // 100k keys => 2.4 MB.
- // Plus keyBuf: 100k * 18 bytes = 1.8 MB.
- // Total expected: ~4.2 MB.
- func NewFlatIndex() *FlatIndex {
- return &FlatIndex{
- keyBuf: make([]byte, 0, 1024*1024), // 1MB initial cap
- items: make([]flatItem, 0, 10000),
- }
- }
- // getKey unsafe-ish helper to get string from buffer without alloc?
- // Standard conversion string(b) allocates.
- // For comparison in binary search, we ideally want to avoid allocation.
- // But Go 1.20+ optimization might handle string(bytes) well in map keys or comparisons if they don't escape.
- // Let's rely on standard string() conversion for safety first.
- func (fi *FlatIndex) getKey(idx int) string {
- item := fi.items[idx]
- return string(fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)])
- }
- // Compare helper to avoid string allocation
- func (fi *FlatIndex) compare(idx int, target string) int {
- item := fi.items[idx]
- keyBytes := fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)]
- return strings.Compare(string(keyBytes), target)
- }
- func (fi *FlatIndex) Insert(key string, entry IndexEntry) {
- fi.mu.Lock()
- defer fi.mu.Unlock()
- // 1. Binary Search
- idx := sort.Search(len(fi.items), func(i int) bool {
- return fi.getKey(i) >= key
- })
- // 2. Update existing
- if idx < len(fi.items) && fi.getKey(idx) == key {
- fi.items[idx].entry = entry
- return
- }
- // 3. Add new key
- offset := len(fi.keyBuf)
- fi.keyBuf = append(fi.keyBuf, key...)
-
- if len(key) > 65535 {
- // Should have been caught earlier, but just in case cap it or panic
- // We can panic or truncate.
- // panic("key too long")
- }
- newItem := flatItem{
- keyOffset: uint32(offset),
- keyLen: uint16(len(key)),
- entry: entry,
- }
- // 4. Insert into sorted slice
- // Optimization: check if appending to end (common case for sequential inserts)
- if idx == len(fi.items) {
- fi.items = append(fi.items, newItem)
- } else {
- fi.items = append(fi.items, flatItem{})
- copy(fi.items[idx+1:], fi.items[idx:])
- fi.items[idx] = newItem
- }
- }
- func (fi *FlatIndex) Get(key string) (IndexEntry, bool) {
- fi.mu.RLock()
- defer fi.mu.RUnlock()
- idx := sort.Search(len(fi.items), func(i int) bool {
- return fi.getKey(i) >= key
- })
- if idx < len(fi.items) && fi.getKey(idx) == key {
- return fi.items[idx].entry, true
- }
- return IndexEntry{}, false
- }
- func (fi *FlatIndex) Delete(key string) bool {
- fi.mu.Lock()
- defer fi.mu.Unlock()
- idx := sort.Search(len(fi.items), func(i int) bool {
- return fi.getKey(i) >= key
- })
- if idx < len(fi.items) && fi.getKey(idx) == key {
- // Delete from slice
- fi.items = append(fi.items[:idx], fi.items[idx+1:]...)
- // Note: We don't reclaim space in keyBuf. It's append-only until compaction (not implemented).
- // This is fine for many cases, but heavy deletes might waste keyBuf space.
- return true
- }
- return false
- }
- // WalkPrefix iterates over keys starting with prefix.
- // Since keys are sorted, we find the first match and iterate until mismatch.
- func (fi *FlatIndex) WalkPrefix(prefix string, callback func(key string, entry IndexEntry) bool) {
- fi.mu.RLock()
- defer fi.mu.RUnlock()
- // Binary search for the start
- idx := sort.Search(len(fi.items), func(i int) bool {
- return fi.getKey(i) >= prefix
- })
- for i := idx; i < len(fi.items); i++ {
- key := fi.getKey(i)
- // Optimization: Check prefix match
- if !strings.HasPrefix(key, prefix) {
- break
- }
- if !callback(key, fi.items[i].entry) {
- return
- }
- }
- }
- // --- End Flat Index ---
- // --- Full Text Index ---
- type FullTextIndex struct {
- // Token -> Set of Keys
- index map[string]map[string]bool
- mu sync.RWMutex
- }
- func NewFullTextIndex() *FullTextIndex {
- return &FullTextIndex{
- index: make(map[string]map[string]bool),
- }
- }
- func (fti *FullTextIndex) Add(key string, value string) {
- fti.mu.Lock()
- defer fti.mu.Unlock()
-
- tokens := tokenize(value)
- for _, token := range tokens {
- if fti.index[token] == nil {
- fti.index[token] = make(map[string]bool)
- }
- fti.index[token][key] = true
- }
- }
- func (fti *FullTextIndex) Remove(key string, value string) {
- fti.mu.Lock()
- defer fti.mu.Unlock()
-
- tokens := tokenize(value)
- for _, token := range tokens {
- if set, ok := fti.index[token]; ok {
- delete(set, key)
- if len(set) == 0 {
- delete(fti.index, token)
- }
- }
- }
- }
- func (fti *FullTextIndex) Search(tokenPattern string) []string {
- fti.mu.RLock()
- defer fti.mu.RUnlock()
- // 1. Exact Match
- if !strings.Contains(tokenPattern, "*") {
- if keys, ok := fti.index[tokenPattern]; ok {
- res := make([]string, 0, len(keys))
- for k := range keys {
- res = append(res, k)
- }
- return res
- }
- return nil
- }
- // 2. Wildcard Scan
- var results []string
- seen := make(map[string]bool)
- for token, keys := range fti.index {
- if WildcardMatch(token, tokenPattern) {
- for k := range keys {
- if !seen[k] {
- results = append(results, k)
- seen[k] = true
- }
- }
- }
- }
- return results
- }
- func tokenize(val string) []string {
- f := func(c rune) bool {
- return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
- }
- return strings.FieldsFunc(val, f)
- }
- // --- Storage & Cache ---
- // StripedLock provides hashed locks for keys
- type StripedLock struct {
- locks [1024]sync.RWMutex
- }
- func (sl *StripedLock) GetLock(key string) *sync.RWMutex {
- h := hash(key)
- return &sl.locks[h%1024]
- }
- // Storage manages disk storage using Append-Only Log.
- type Storage struct {
- file *os.File
- filename string
- offset int64 // Current end of file
-
- cache map[int64]string
- cacheMu sync.RWMutex
- }
- const (
- RecordTypePut = 0x01
- RecordTypeDelete = 0x02
-
- // CRC(4) + Type(1) + KeyLen(2) + ValLen(4) + CommitIndex(8)
- HeaderSize = 4 + 1 + 2 + 4 + 8
-
- MaxCacheSize = 10000
- )
- func NewStorage(filename string) (*Storage, error) {
- f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
- if err != nil {
- return nil, err
- }
- st := &Storage{
- file: f,
- filename: filename,
- cache: make(map[int64]string),
- }
-
- return st, nil
- }
- // Record represents a log entry
- type Record struct {
- Type byte
- Key string
- Value string
- Offset int64
- CommitIndex uint64
- }
- // Scan iterates over the log file, validating CRCs and returning records.
- // It returns the valid offset after the last record.
- func (s *Storage) Scan(callback func(Record)) (int64, error) {
- offset := int64(0)
-
- if _, err := s.file.Seek(0, 0); err != nil {
- return 0, err
- }
-
- reader := bufio.NewReader(s.file)
-
- for {
- header := make([]byte, HeaderSize)
- n, err := io.ReadFull(reader, header)
- if err == io.EOF {
- break
- }
- if err == io.ErrUnexpectedEOF && n == 0 {
- break
- }
- if err != nil {
- fmt.Printf("Storage Scan Error at %d: %v\n", offset, err)
- break
- }
-
- storedCRC := binary.LittleEndian.Uint32(header[0:4])
- recType := header[4]
- keyLen := binary.LittleEndian.Uint16(header[5:7])
- valLen := binary.LittleEndian.Uint32(header[7:11])
- commitIndex := binary.LittleEndian.Uint64(header[11:19])
-
- totalLen := int(keyLen) + int(valLen)
- data := make([]byte, totalLen)
-
- if _, err := io.ReadFull(reader, data); err != nil {
- fmt.Printf("Storage Scan Data Error at %d: %v\n", offset, err)
- break
- }
-
- // Verify CRC
- crc := crc32.ChecksumIEEE(header[4:]) // Checksum of Type+Lengths+CommitIndex
- crc = crc32.Update(crc, crc32.IEEETable, data) // + Data
-
- if crc != storedCRC {
- fmt.Printf("CRC Mismatch at %d\n", offset)
- break
- }
-
- key := string(data[:keyLen])
- val := string(data[keyLen:])
-
- callback(Record{
- Type: recType,
- Key: key,
- Value: val,
- Offset: offset,
- CommitIndex: commitIndex,
- })
-
- offset += int64(HeaderSize + totalLen)
- }
-
- // Update storage offset
- s.offset = offset
- s.file.Seek(offset, 0)
-
- return offset, nil
- }
- // Append writes a new record
- func (s *Storage) Append(key, val string, recType byte, commitIndex uint64) (int64, error) {
- keyBytes := []byte(key)
- valBytes := []byte(val)
-
- keyLen := len(keyBytes)
- valLen := len(valBytes)
-
- if keyLen > 65535 {
- return 0, fmt.Errorf("key too large")
- }
-
- buf := make([]byte, HeaderSize+keyLen+valLen)
-
- // 1. Build Header Data (skip CRC)
- buf[4] = recType
- binary.LittleEndian.PutUint16(buf[5:7], uint16(keyLen))
- binary.LittleEndian.PutUint32(buf[7:11], uint32(valLen))
- binary.LittleEndian.PutUint64(buf[11:19], commitIndex)
-
- // 2. Copy Data
- copy(buf[HeaderSize:], keyBytes)
- copy(buf[HeaderSize+keyLen:], valBytes)
-
- // 3. Calc CRC
- crc := crc32.ChecksumIEEE(buf[4:]) // Everything after CRC field
- binary.LittleEndian.PutUint32(buf[0:4], crc)
-
- // 4. Write
- totalSize := int64(len(buf))
- writeOffset := atomic.AddInt64(&s.offset, totalSize) - totalSize
-
- if _, err := s.file.WriteAt(buf, writeOffset); err != nil {
- return 0, err
- }
-
- // Add to Cache if Put
- if recType == RecordTypePut {
- s.cacheMu.Lock()
- if len(s.cache) < MaxCacheSize {
- s.cache[writeOffset] = val
- }
- s.cacheMu.Unlock()
- }
-
- return writeOffset, nil
- }
- // Sync flushes writes to stable storage
- func (s *Storage) Sync() error {
- return s.file.Sync()
- }
- // ReadValue reads value at offset
- func (s *Storage) ReadValue(offset int64) (string, error) {
- // 1. Check Cache
- s.cacheMu.RLock()
- if val, ok := s.cache[offset]; ok {
- s.cacheMu.RUnlock()
- return val, nil
- }
- s.cacheMu.RUnlock()
- // 2. Read Header
- header := make([]byte, HeaderSize)
- if _, err := s.file.ReadAt(header, offset); err != nil {
- return "", err
- }
-
- keyLen := binary.LittleEndian.Uint16(header[5:7])
- valLen := binary.LittleEndian.Uint32(header[7:11])
-
- totalLen := int(keyLen) + int(valLen)
- data := make([]byte, totalLen)
-
- if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
- return "", err
- }
-
- // Verify CRC on read for safety
- storedCRC := binary.LittleEndian.Uint32(header[0:4])
- crc := crc32.ChecksumIEEE(header[4:])
- crc = crc32.Update(crc, crc32.IEEETable, data)
-
- if crc != storedCRC {
- return "", fmt.Errorf("data corruption detected at offset %d", offset)
- }
-
- val := string(data[keyLen:])
-
- // 3. Fill Cache
- s.cacheMu.Lock()
- if len(s.cache) < MaxCacheSize {
- s.cache[offset] = val
- } else {
- for k := range s.cache {
- delete(s.cache, k)
- break
- }
- s.cache[offset] = val
- }
- s.cacheMu.Unlock()
- return val, nil
- }
- func (s *Storage) Close() error {
- return s.file.Close()
- }
- // EngineOption defines a configuration option for the Engine
- type EngineOption func(*EngineConfig)
- type EngineConfig struct {
- EnableValueIndex bool
- }
- // WithValueIndex enables or disables the value index (Full Text Index)
- func WithValueIndex(enable bool) EngineOption {
- return func(c *EngineConfig) {
- c.EnableValueIndex = enable
- }
- }
- // Engine is the core storage engine.
- type Engine struct {
- Index *FlatIndex // Flattened memory structure
- Storage *Storage
- FTIndex *FullTextIndex // Can be nil if disabled
- KeyLocks StripedLock
- LastCommitIndex uint64
- commitMu sync.Mutex
- dataDir string
-
- writeMu sync.Mutex
- config EngineConfig
- }
- func NewEngine(dataDir string, opts ...EngineOption) (*Engine, error) {
- if err := os.MkdirAll(dataDir, 0755); err != nil {
- return nil, err
- }
- config := EngineConfig{
- EnableValueIndex: false, // Default to false (key-only)
- }
- for _, opt := range opts {
- opt(&config)
- }
- store, err := NewStorage(dataDir + "/values.data")
- if err != nil {
- return nil, err
- }
- e := &Engine{
- Index: NewFlatIndex(),
- Storage: store,
- FTIndex: nil,
- dataDir: dataDir,
- config: config,
- }
-
- if config.EnableValueIndex {
- e.FTIndex = NewFullTextIndex()
- }
- // Rebuild Index from Disk
- _, err = store.Scan(func(rec Record) {
- if rec.Type == RecordTypePut {
- e.Index.Insert(rec.Key, IndexEntry{
- ValueOffset: rec.Offset,
- CommitIndex: rec.CommitIndex,
- })
- if e.FTIndex != nil {
- e.FTIndex.Add(rec.Key, rec.Value)
- }
-
- // Update LastCommitIndex
- if rec.CommitIndex > e.LastCommitIndex {
- e.LastCommitIndex = rec.CommitIndex
- }
-
- } else if rec.Type == RecordTypeDelete {
- // Cleanup FTIndex using old value if possible
- e.removeValueFromFTIndex(rec.Key)
- e.Index.Delete(rec.Key)
-
- if rec.CommitIndex > e.LastCommitIndex {
- e.LastCommitIndex = rec.CommitIndex
- }
- }
- })
-
- return e, nil
- }
- func (e *Engine) removeValueFromFTIndex(key string) {
- if e.FTIndex == nil {
- return
- }
- if entry, ok := e.Index.Get(key); ok {
- val, err := e.Storage.ReadValue(entry.ValueOffset)
- if err == nil {
- e.FTIndex.Remove(key, val)
- }
- }
- }
- func (e *Engine) Close() error {
- return e.Storage.Close()
- }
- func (e *Engine) Set(key, value string, commitIndex uint64) error {
- e.writeMu.Lock()
- defer e.writeMu.Unlock()
-
- e.removeValueFromFTIndex(key)
- offset, err := e.Storage.Append(key, value, RecordTypePut, commitIndex)
- if err != nil {
- return err
- }
- e.Index.Insert(key, IndexEntry{
- ValueOffset: offset,
- CommitIndex: commitIndex,
- })
-
- if e.FTIndex != nil {
- e.FTIndex.Add(key, value)
- }
-
- e.commitMu.Lock()
- if commitIndex > e.LastCommitIndex {
- e.LastCommitIndex = commitIndex
- }
- e.commitMu.Unlock()
-
- return nil
- }
- func (e *Engine) Delete(key string, commitIndex uint64) error {
- e.writeMu.Lock()
- defer e.writeMu.Unlock()
-
- e.removeValueFromFTIndex(key)
-
- _, err := e.Storage.Append(key, "", RecordTypeDelete, commitIndex)
- if err != nil {
- return err
- }
-
- e.Index.Delete(key)
-
- e.commitMu.Lock()
- if commitIndex > e.LastCommitIndex {
- e.LastCommitIndex = commitIndex
- }
- e.commitMu.Unlock()
-
- return nil
- }
- // Sync flushes underlying storage to disk
- func (e *Engine) Sync() error {
- return e.Storage.Sync()
- }
- func (e *Engine) Get(key string) (string, bool) {
- entry, ok := e.Index.Get(key)
- if !ok {
- return "", false
- }
- val, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil {
- return "", false
- }
- return val, true
- }
- type QueryResult struct {
- Key string `json:"key"`
- Value string `json:"value"`
- CommitIndex uint64 `json:"commit_index"`
- }
- func WildcardMatch(str, pattern string) bool {
- s := []rune(str)
- p := []rune(pattern)
- sLen, pLen := len(s), len(p)
- i, j := 0, 0
- starIdx, matchIdx := -1, -1
- for i < sLen {
- if j < pLen && (p[j] == '?' || p[j] == s[i]) {
- i++
- j++
- } else if j < pLen && p[j] == '*' {
- starIdx = j
- matchIdx = i
- j++
- } else if starIdx != -1 {
- j = starIdx + 1
- matchIdx++
- i = matchIdx
- } else {
- return false
- }
- }
- for j < pLen && p[j] == '*' {
- j++
- }
- return j == pLen
- }
- func (e *Engine) Count(sql string) (int, error) {
- return e.execute(sql, true)
- }
- func (e *Engine) execute(sql string, countOnly bool) (int, error) {
- sql = strings.TrimSpace(sql)
-
- reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
- reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
-
- limit := -1
- offset := 0
-
- if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
- limit, _ = strconv.Atoi(match[1])
- sql = reLimit.ReplaceAllString(sql, "")
- }
- if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
- offset, _ = strconv.Atoi(match[1])
- sql = reOffset.ReplaceAllString(sql, "")
- }
- re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
- matches := re.FindAllStringSubmatch(sql, -1)
- if len(matches) == 0 {
- return 0, fmt.Errorf("invalid query")
- }
- extractString := func(s string) string {
- return strings.Trim(strings.TrimSpace(s), "\"")
- }
- // 1. Optimize for Point Lookup
- for _, match := range matches {
- if match[1] == "key" && match[2] == "=" {
- targetKey := extractString(match[3])
- entry, ok := e.Index.Get(targetKey)
- if !ok {
- return 0, nil
- }
-
- needValue := false
- for _, m := range matches {
- if m[1] == "value" { needValue = true; break }
- }
-
- var val string
- if needValue {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil { return 0, nil }
- val = v
- }
-
- matchAll := true
- for _, m := range matches {
- field, op, valRaw := m[1], m[2], m[3]
- switch field {
- case "CommitIndex":
- num, _ := strconv.ParseUint(valRaw, 10, 64)
- switch op {
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
- }
- case "value":
- t := extractString(valRaw)
- switch op {
- case "=": if val != t { matchAll = false }
- case "like": if !WildcardMatch(val, t) { matchAll = false }
- }
- }
- if !matchAll { break }
- }
- if matchAll {
- return 1, nil
- }
- return 0, nil
- }
- }
- var candidates map[string]bool
- var useFTIndex bool = false
-
- // 2. Try to use FT Index (if enabled)
- if e.FTIndex != nil {
- for _, match := range matches {
- if match[1] == "value" && match[2] == "like" {
- pattern := extractString(match[3])
- clean := strings.Trim(pattern, "*")
- if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
- matches := e.FTIndex.Search(pattern)
- if matches != nil {
- currentSet := make(map[string]bool)
- for _, k := range matches {
- currentSet[k] = true
- }
- if !useFTIndex {
- candidates = currentSet
- useFTIndex = true
- } else {
- newSet := make(map[string]bool)
- for k := range candidates {
- if currentSet[k] { newSet[k] = true }
- }
- candidates = newSet
- }
- } else {
- return 0, nil
- }
- }
- }
- }
- } else {
- for _, match := range matches {
- if match[1] == "value" && match[2] == "like" {
- // Value search requested but index disabled -> return 0
- return 0, nil
- }
- }
- }
-
- var iterator func(func(string, IndexEntry) bool)
-
- if useFTIndex {
- iterator = func(cb func(string, IndexEntry) bool) {
- keys := make([]string, 0, len(candidates))
- for k := range candidates { keys = append(keys, k) }
- sort.Strings(keys)
- for _, k := range keys {
- if entry, ok := e.Index.Get(k); ok {
- if !cb(k, entry) { return }
- }
- }
- }
- } else {
- // Use FlatIndex Prefix Search
- var prefix string = ""
- var usePrefix bool = false
- for _, match := range matches {
- if match[1] == "key" && match[2] == "like" {
- pattern := extractString(match[3])
- // Flat Index supports simple prefix match
- if strings.HasSuffix(pattern, "*") {
- clean := pattern[:len(pattern)-1]
- if !strings.ContainsAny(clean, "*?") {
- prefix = clean
- usePrefix = true
- break
- }
- }
- }
- }
-
- iterator = func(cb func(string, IndexEntry) bool) {
- if usePrefix {
- e.Index.WalkPrefix(prefix, cb)
- } else {
- e.Index.WalkPrefix("", cb)
- }
- }
- }
- matchedCount := 0
-
- iterator(func(key string, entry IndexEntry) bool {
- var valStr string
- var valLoaded bool
-
- matchAll := true
- for _, match := range matches {
- field, op, valRaw := match[1], match[2], match[3]
- switch field {
- case "CommitIndex":
- num, _ := strconv.ParseUint(valRaw, 10, 64)
- switch op {
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
- }
- case "key":
- target := extractString(valRaw)
- switch op {
- case "=": if key != target { matchAll = false }
- case "like": if !WildcardMatch(key, target) { matchAll = false }
- }
- case "value":
- if e.FTIndex == nil && op == "like" {
- matchAll = false
- break
- }
- if !valLoaded {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil { matchAll = false; break }
- valStr = v
- valLoaded = true
- }
- target := extractString(valRaw)
- switch op {
- case "=": if valStr != target { matchAll = false }
- case "like": if !WildcardMatch(valStr, target) { matchAll = false }
- }
- }
- if !matchAll { break }
- }
-
- if matchAll {
- matchedCount++
- if limit > 0 && offset == 0 && matchedCount >= limit {
- return false
- }
- }
- return true
- })
- if offset > 0 {
- if offset >= matchedCount {
- return 0, nil
- }
- matchedCount -= offset
- }
- if limit >= 0 {
- if limit < matchedCount {
- matchedCount = limit
- }
- }
- return matchedCount, nil
- }
- func (e *Engine) Query(sql string) ([]QueryResult, error) {
- return e.queryInternal(sql)
- }
- func (e *Engine) queryInternal(sql string) ([]QueryResult, error) {
- sql = strings.TrimSpace(sql)
-
- reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
- reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
-
- limit := -1
- offset := 0
-
- if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
- limit, _ = strconv.Atoi(match[1])
- sql = reLimit.ReplaceAllString(sql, "")
- }
- if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
- offset, _ = strconv.Atoi(match[1])
- sql = reOffset.ReplaceAllString(sql, "")
- }
- re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
- matches := re.FindAllStringSubmatch(sql, -1)
- if len(matches) == 0 {
- return nil, fmt.Errorf("invalid query")
- }
- extractString := func(s string) string {
- return strings.Trim(strings.TrimSpace(s), "\"")
- }
- for _, match := range matches {
- if match[1] == "key" && match[2] == "=" {
- targetKey := extractString(match[3])
- entry, ok := e.Index.Get(targetKey)
- if !ok {
- return []QueryResult{}, nil
- }
-
- val, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil {
- return []QueryResult{}, nil
- }
-
- matchAll := true
- for _, m := range matches {
- field, op, valRaw := m[1], m[2], m[3]
- switch field {
- case "CommitIndex":
- num, _ := strconv.ParseUint(valRaw, 10, 64)
- switch op {
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
- }
- case "value":
- t := extractString(valRaw)
- switch op {
- case "=": if val != t { matchAll = false }
- case "like": if !WildcardMatch(val, t) { matchAll = false }
- }
- }
- if !matchAll { break }
- }
- if matchAll {
- return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
- }
- return []QueryResult{}, nil
- }
- }
- var candidates map[string]bool
- var useFTIndex bool = false
-
- if e.FTIndex != nil {
- for _, match := range matches {
- if match[1] == "value" && match[2] == "like" {
- pattern := extractString(match[3])
- clean := strings.Trim(pattern, "*")
- if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
- matches := e.FTIndex.Search(pattern)
- if matches != nil {
- currentSet := make(map[string]bool)
- for _, k := range matches {
- currentSet[k] = true
- }
-
- if !useFTIndex {
- candidates = currentSet
- useFTIndex = true
- } else {
- newSet := make(map[string]bool)
- for k := range candidates {
- if currentSet[k] {
- newSet[k] = true
- }
- }
- candidates = newSet
- }
- } else {
- return []QueryResult{}, nil
- }
- }
- }
- }
- } else {
- for _, match := range matches {
- if match[1] == "value" && match[2] == "like" {
- return []QueryResult{}, nil
- }
- }
- }
-
- var iterator func(func(string, IndexEntry) bool)
-
- if useFTIndex {
- iterator = func(cb func(string, IndexEntry) bool) {
- keys := make([]string, 0, len(candidates))
- for k := range candidates {
- keys = append(keys, k)
- }
- sort.Strings(keys)
-
- for _, k := range keys {
- if entry, ok := e.Index.Get(k); ok {
- if !cb(k, entry) {
- return
- }
- }
- }
- }
- } else {
- var prefix string = ""
- var usePrefix bool = false
- for _, match := range matches {
- if match[1] == "key" && match[2] == "like" {
- pattern := extractString(match[3])
- if strings.HasSuffix(pattern, "*") {
- clean := pattern[:len(pattern)-1]
- if !strings.ContainsAny(clean, "*?") {
- prefix = clean
- usePrefix = true
- break
- }
- }
- }
- }
-
- iterator = func(cb func(string, IndexEntry) bool) {
- if usePrefix {
- e.Index.WalkPrefix(prefix, cb)
- } else {
- e.Index.WalkPrefix("", cb)
- }
- }
- }
- var results []QueryResult
- iterator(func(key string, entry IndexEntry) bool {
- var valStr string
- var valLoaded bool
-
- matchAll := true
- for _, match := range matches {
- field, op, valRaw := match[1], match[2], match[3]
- switch field {
- case "CommitIndex":
- num, _ := strconv.ParseUint(valRaw, 10, 64)
- switch op {
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
- }
- case "key":
- target := extractString(valRaw)
- switch op {
- case "=": if key != target { matchAll = false }
- case "like": if !WildcardMatch(key, target) { matchAll = false }
- }
- case "value":
- if e.FTIndex == nil && op == "like" {
- matchAll = false
- break
- }
- if !valLoaded {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil { matchAll = false; break }
- valStr = v
- valLoaded = true
- }
- target := extractString(valRaw)
- switch op {
- case "=": if valStr != target { matchAll = false }
- case "like": if !WildcardMatch(valStr, target) { matchAll = false }
- }
- }
- if !matchAll { break }
- }
-
- if matchAll {
- if !valLoaded {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err == nil { valStr = v }
- }
- results = append(results, QueryResult{
- Key: key,
- Value: valStr,
- CommitIndex: entry.CommitIndex,
- })
- needed := limit + offset
- if limit > 0 && len(results) >= needed {
- return false
- }
- }
- return true
- })
- if offset > 0 {
- if offset >= len(results) {
- return []QueryResult{}, nil
- }
- results = results[offset:]
- }
- if limit >= 0 {
- if limit < len(results) {
- results = results[:limit]
- }
- }
- return results, nil
- }
- func (e *Engine) Snapshot() ([]byte, error) {
- // Not implemented for Flat Index yet
- return nil, nil
- }
- func (e *Engine) Restore(data []byte) error {
- return nil
- }
- func (e *Engine) DebugClearAuxiliary() {
- e.Storage.cacheMu.Lock()
- e.Storage.cache = make(map[int64]string)
- e.Storage.cacheMu.Unlock()
-
- if e.FTIndex != nil {
- e.FTIndex.mu.Lock()
- e.FTIndex.index = make(map[string]map[string]bool)
- e.FTIndex.mu.Unlock()
- }
- }
|