| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104 |
- package db
- import (
- "bufio"
- "encoding/binary"
- "fmt"
- "io"
- "os"
- "regexp"
- "sort"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- )
- // hash returns a 32-bit FNV-1a hash of the string
- func hash(s string) uint32 {
- var h uint32 = 2166136261
- for i := 0; i < len(s); i++ {
- h = (h * 16777619) ^ uint32(s[i])
- }
- return h
- }
- // --- Radix Tree Implementation (Minimal) ---
- type radixNode struct {
- path string
- indices string // first char of children paths for fast lookup
- children []*radixNode
- leaf *IndexEntry // If non-nil, this is a valid key
- key string // Full key stored at leaf for convenience
- }
- type RadixTree struct {
- root *radixNode
- size int
- mu sync.RWMutex
- }
- func NewRadixTree() *RadixTree {
- return &RadixTree{
- root: &radixNode{},
- }
- }
- // longestCommonPrefix finds the length of the shared prefix of two strings
- func longestCommonPrefix(k1, k2 string) int {
- max := len(k1)
- if len(k2) < max {
- max = len(k2)
- }
- var i int
- for i = 0; i < max; i++ {
- if k1[i] != k2[i] {
- break
- }
- }
- return i
- }
- // Insert adds a key and its index entry to the tree
- func (t *RadixTree) Insert(key string, entry IndexEntry) {
- t.mu.Lock()
- defer t.mu.Unlock()
- n := t.root
- search := key
- for {
- // Handle empty search string (shouldn't happen in recursion usually)
- if len(search) == 0 {
- if n.leaf == nil {
- t.size++
- }
- n.leaf = &entry
- n.key = key
- return
- }
- // Find child
- parent := n
- found := false
- for i, char := range []byte(n.indices) {
- if char == search[0] {
- // Found a child starting with the same char
- n = n.children[i]
-
- // Check common prefix
- common := longestCommonPrefix(search, n.path)
-
- if common == len(n.path) {
- // Full match of child path, continue down
- search = search[common:]
- found = true
- break // Continue outer loop
- } else {
- // Split the node
- // Current child n becomes a child of the new split node
-
- // 1. Create split node with part of path
- splitNode := &radixNode{
- path: n.path[:common],
- indices: string(n.path[common]), // The char that differs
- children: []*radixNode{n},
- }
-
- // 2. Update existing child n
- n.path = n.path[common:] // Remaining part
-
- // 3. Insert new leaf for the new key (if needed)
- // The new key diverges at 'common' index
- if len(search) == common {
- // The new key IS the split node
- splitNode.leaf = &entry
- splitNode.key = key
- t.size++
- } else {
- // The new key is a child of the split node
- newBranch := &radixNode{
- path: search[common:],
- leaf: &entry,
- key: key,
- }
- t.size++
- splitNode.indices += string(search[common])
- splitNode.children = append(splitNode.children, newBranch)
- // Keep indices sorted? Not strictly necessary for correctness but good for determinism
- }
-
- // 4. Update parent to point to splitNode instead of n
- parent.children[i] = splitNode
- return
- }
- }
- }
-
- if !found {
- // No matching child, add new child
- newChild := &radixNode{
- path: search,
- leaf: &entry,
- key: key,
- }
- n.indices += string(search[0])
- n.children = append(n.children, newChild)
- t.size++
- return
- }
- }
- }
- // Get retrieves an entry
- func (t *RadixTree) Get(key string) (IndexEntry, bool) {
- t.mu.RLock()
- defer t.mu.RUnlock()
- n := t.root
- search := key
- for {
- if len(search) == 0 {
- if n.leaf != nil {
- return *n.leaf, true
- }
- return IndexEntry{}, false
- }
- found := false
- for i, char := range []byte(n.indices) {
- if char == search[0] {
- n = n.children[i]
- if strings.HasPrefix(search, n.path) {
- search = search[len(n.path):]
- found = true
- break
- }
- return IndexEntry{}, false
- }
- }
- if !found {
- return IndexEntry{}, false
- }
- }
- }
- // WalkPrefix iterates over all keys starting with prefix.
- // Returns true from callback to continue, false to stop.
- func (t *RadixTree) WalkPrefix(prefix string, callback func(key string, entry IndexEntry) bool) {
- t.mu.RLock()
- defer t.mu.RUnlock()
- n := t.root
- search := prefix
- // 1. Locate the node covering the prefix
- for len(search) > 0 {
- found := false
- for i, char := range []byte(n.indices) {
- if char == search[0] {
- n = n.children[i]
- // 3 cases:
- // 1. n.path == search (Exact match or consume all search) -> found target node
- // 2. n.path starts with search -> found target node (it's inside n)
- // 3. search starts with n.path -> continue down
-
- common := longestCommonPrefix(search, n.path)
- if common == len(search) {
- // Prefix fully consumed. 'n' is the root of the subtree.
- search = ""
- found = true
- break
- } else if common == len(n.path) {
- // 'n' path fully consumed, continue deeper
- search = search[common:]
- found = true
- break
- } else {
- // Mismatch, prefix not found
- return
- }
- }
- }
- if !found {
- return
- }
- }
- // 2. Recursive walk from n
- t.recursiveWalk(n, callback)
- }
- func (t *RadixTree) recursiveWalk(n *radixNode, callback func(key string, entry IndexEntry) bool) bool {
- if n.leaf != nil {
- if !callback(n.key, *n.leaf) {
- return false
- }
- }
- // To iterate in order, we should ideally keep indices sorted.
- // For now, we just iterate. If order matters, we sort children locally.
- // Let's do a quick sort of indices to ensure lexicographical order.
- if len(n.children) > 1 {
- // Optimization: clone to avoid mutating tree during read lock?
- // Actually indices string is immutable. We need to know the permutation.
- // A simple way is to build a list of children sorted by first char.
- type childRef struct {
- char byte
- node *radixNode
- }
- refs := make([]childRef, len(n.children))
- for i, char := range []byte(n.indices) {
- refs[i] = childRef{char, n.children[i]}
- }
- sort.Slice(refs, func(i, j int) bool { return refs[i].char < refs[j].char })
-
- for _, ref := range refs {
- if !t.recursiveWalk(ref.node, callback) {
- return false
- }
- }
- } else {
- for _, child := range n.children {
- if !t.recursiveWalk(child, callback) {
- return false
- }
- }
- }
- return true
- }
- // --- End Radix Tree ---
- // --- Full Text Index (Simple In-Memory) ---
- // Replacing Table 3 with a cleaner Token -> []Key logic handled via Radix too?
- // No, Inverted Index maps Token -> List of Keys.
- // We can use a map[string][]string (Token -> Keys) for simplicity and speed.
- type FullTextIndex struct {
- // Token -> List of Keys (strings)
- // Using string keys instead of IDs simplifies things as we moved away from KeyMap ID logic.
- index map[string][]string
- mu sync.RWMutex
- }
- func NewFullTextIndex() *FullTextIndex {
- return &FullTextIndex{
- index: make(map[string][]string),
- }
- }
- func (fti *FullTextIndex) Add(key string, value string) {
- fti.mu.Lock()
- defer fti.mu.Unlock()
-
- tokens := tokenize(value)
- for _, token := range tokens {
- // Deduplication check per key is expensive O(N).
- // For high perf, we might accept duplicates or use a set per token (map[string]map[string]bool).
- // Let's use simple append for now, optimized for write.
- fti.index[token] = append(fti.index[token], key)
- }
- }
- // Search returns keys containing the token.
- // Supports wildcards: "token", "prefix*", "*suffix", "*contains*"
- // For simplicity in this demo, we implement prefix matching via iteration if wildcard used.
- // Exact match is O(1).
- func (fti *FullTextIndex) Search(tokenPattern string) []string {
- fti.mu.RLock()
- defer fti.mu.RUnlock()
- // 1. Exact Match
- if !strings.Contains(tokenPattern, "*") {
- if keys, ok := fti.index[tokenPattern]; ok {
- // Return copy to avoid race
- res := make([]string, len(keys))
- copy(res, keys)
- return res
- }
- return nil
- }
- // 2. Wildcard Scan (In-Memory Map Scan)
- // For production, we'd use a RadixTree for tokens too!
- // But let's keep it simple for now, just iterating map keys.
- // Optimization: If map is large, this is slow.
- var results []string
- seen := make(map[string]bool)
- for token, keys := range fti.index {
- if WildcardMatch(token, tokenPattern) {
- for _, k := range keys {
- if !seen[k] {
- results = append(results, k)
- seen[k] = true
- }
- }
- }
- }
- return results
- }
- func tokenize(val string) []string {
- f := func(c rune) bool {
- return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
- }
- // Optimization: Lowercase tokens for case-insensitive search if needed
- // For now, keep original case or lower?
- // Let's keep original to match benchmark expectations if any.
- // But usually FTI is case-insensitive.
- return strings.FieldsFunc(val, f)
- }
- // --- Storage & Cache ---
- // FreeList manages reusable disk space in memory using Best-Fit strategy.
- type FreeList struct {
- // Capacity -> Stack of Offsets
- buckets map[uint32][]int64
- // Sorted list of capacities available in buckets for fast Best-Fit lookup
- sortedCaps []uint32
- mu sync.Mutex
- }
- func NewFreeList() *FreeList {
- return &FreeList{
- buckets: make(map[uint32][]int64),
- }
- }
- // Add adds an offset to the free list for a given capacity.
- func (fl *FreeList) Add(cap uint32, offset int64) {
- fl.mu.Lock()
- defer fl.mu.Unlock()
-
- if _, exists := fl.buckets[cap]; !exists {
- fl.buckets[cap] = []int64{offset}
- // Maintain sortedCaps
- fl.sortedCaps = append(fl.sortedCaps, cap)
- sort.Slice(fl.sortedCaps, func(i, j int) bool { return fl.sortedCaps[i] < fl.sortedCaps[j] })
- } else {
- fl.buckets[cap] = append(fl.buckets[cap], offset)
- }
- }
- // Pop tries to get an available offset using Best-Fit strategy.
- func (fl *FreeList) Pop(targetCap uint32) (int64, uint32, bool) {
- fl.mu.Lock()
- defer fl.mu.Unlock()
-
- idx := sort.Search(len(fl.sortedCaps), func(i int) bool {
- return fl.sortedCaps[i] >= targetCap
- })
-
- if idx >= len(fl.sortedCaps) {
- return 0, 0, false
- }
-
- foundCap := fl.sortedCaps[idx]
- offsets := fl.buckets[foundCap]
-
- if len(offsets) == 0 {
- return 0, 0, false
- }
-
- lastIdx := len(offsets) - 1
- offset := offsets[lastIdx]
-
- if lastIdx == 0 {
- delete(fl.buckets, foundCap)
- fl.sortedCaps = append(fl.sortedCaps[:idx], fl.sortedCaps[idx+1:]...)
- } else {
- fl.buckets[foundCap] = offsets[:lastIdx]
- }
-
- return offset, foundCap, true
- }
- // StripedLock provides hashed locks for keys
- type StripedLock struct {
- locks [1024]sync.RWMutex
- }
- func (sl *StripedLock) GetLock(key string) *sync.RWMutex {
- h := hash(key)
- return &sl.locks[h%1024]
- }
- // Storage (Table 2) manages disk storage for values.
- type Storage struct {
- file *os.File
- filename string
- offset int64 // Current end of file (Atomic)
- freeList *FreeList
-
- // Simple LRU Cache for Values
- // Using sync.Map for simplicity in this iteration, though it's not LRU.
- // For true LRU we'd need a list+map protected by mutex.
- // Let's use a Mutex protected Map as a "Hot Cache".
- cache map[int64]string
- cacheMu sync.RWMutex
- }
- const (
- FlagDeleted = 0x00
- FlagValid = 0x01
- HeaderSize = 9 // 1(Flag) + 4(Cap) + 4(Len)
- AlignSize = 16 // Align capacity to 16 bytes
- MaxCacheSize = 10000 // Simple cap
- )
- func NewStorage(filename string) (*Storage, error) {
- f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
- if err != nil {
- return nil, err
- }
- st := &Storage{
- file: f,
- filename: filename,
- freeList: NewFreeList(),
- cache: make(map[int64]string),
- }
- if err := st.scan(); err != nil {
- f.Close()
- return nil, err
- }
- return st, nil
- }
- func (s *Storage) scan() error {
- offset := int64(0)
-
- if _, err := s.file.Seek(0, 0); err != nil {
- return err
- }
-
- reader := bufio.NewReader(s.file)
-
- for {
- header := make([]byte, HeaderSize)
- n, err := io.ReadFull(reader, header)
- if err == io.EOF {
- break
- }
- if err == io.ErrUnexpectedEOF && n == 0 {
- break
- }
- if err != nil {
- return err
- }
-
- flag := header[0]
- cap := binary.LittleEndian.Uint32(header[1:])
-
- if flag == FlagDeleted {
- s.freeList.Add(cap, offset)
- }
-
- discardLen := int(cap)
- if _, err := reader.Discard(discardLen); err != nil {
- buf := make([]byte, discardLen)
- if _, err := io.ReadFull(reader, buf); err != nil {
- return err
- }
- }
-
- offset += int64(HeaderSize + discardLen)
- }
-
- s.offset = offset
- return nil
- }
- func alignCapacity(length int) uint32 {
- if length == 0 {
- return AlignSize
- }
- cap := (length + AlignSize - 1) / AlignSize * AlignSize
- return uint32(cap)
- }
- // TryUpdateInPlace tries to update value in-place if capacity allows.
- func (s *Storage) TryUpdateInPlace(val string, offset int64) bool {
- if offset < 0 {
- return false
- }
-
- valLen := len(val)
- capBuf := make([]byte, 4)
- if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
- return false
- }
- oldCap := binary.LittleEndian.Uint32(capBuf)
- if uint32(valLen) > oldCap {
- return false
- }
-
- // Fits! Write Length then Data
- lenBuf := make([]byte, 4)
- binary.LittleEndian.PutUint32(lenBuf, uint32(valLen))
-
- if _, err := s.file.WriteAt(lenBuf, offset+5); err != nil {
- return false
- }
- if _, err := s.file.WriteAt([]byte(val), offset+HeaderSize); err != nil {
- return false
- }
-
- // Update Cache
- s.cacheMu.Lock()
- s.cache[offset] = val
- s.cacheMu.Unlock()
-
- return true
- }
- // AppendOrReuse writes a new value, reusing FreeList or Appending.
- func (s *Storage) AppendOrReuse(val string) (int64, error) {
- valLen := len(val)
- newCap := alignCapacity(valLen)
-
- var writeOffset int64
- var actualCap uint32
-
- // Try Reuse
- reusedOffset, capFromFree, found := s.freeList.Pop(newCap)
- if found {
- writeOffset = reusedOffset
- actualCap = capFromFree
- } else {
- // Append
- totalSize := HeaderSize + int(newCap)
- newEnd := atomic.AddInt64(&s.offset, int64(totalSize))
- writeOffset = newEnd - int64(totalSize)
- actualCap = newCap
- }
- buf := make([]byte, HeaderSize+int(actualCap))
- buf[0] = FlagValid
- binary.LittleEndian.PutUint32(buf[1:], actualCap)
- binary.LittleEndian.PutUint32(buf[5:], uint32(valLen))
- copy(buf[HeaderSize:], []byte(val))
-
- if _, err := s.file.WriteAt(buf, writeOffset); err != nil {
- return 0, err
- }
-
- // Add to Cache
- s.cacheMu.Lock()
- if len(s.cache) < MaxCacheSize {
- s.cache[writeOffset] = val
- }
- s.cacheMu.Unlock()
- return writeOffset, nil
- }
- // MarkDeleted marks a slot as deleted.
- func (s *Storage) MarkDeleted(offset int64) error {
- capBuf := make([]byte, 4)
- if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
- return err
- }
- oldCap := binary.LittleEndian.Uint32(capBuf)
- if _, err := s.file.WriteAt([]byte{FlagDeleted}, offset); err != nil {
- return err
- }
- s.freeList.Add(oldCap, offset)
-
- // Remove from Cache
- s.cacheMu.Lock()
- delete(s.cache, offset)
- s.cacheMu.Unlock()
-
- return nil
- }
- // ReadValue reads value at offset with Caching.
- func (s *Storage) ReadValue(offset int64) (string, error) {
- // 1. Check Cache
- s.cacheMu.RLock()
- if val, ok := s.cache[offset]; ok {
- s.cacheMu.RUnlock()
- return val, nil
- }
- s.cacheMu.RUnlock()
- // 2. Read Disk
- header := make([]byte, HeaderSize)
- if _, err := s.file.ReadAt(header, offset); err != nil {
- return "", err
- }
- flag := header[0]
- if flag == FlagDeleted {
- return "", fmt.Errorf("record deleted")
- }
- length := binary.LittleEndian.Uint32(header[5:])
- data := make([]byte, length)
- if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
- return "", err
- }
- val := string(data)
-
- // 3. Fill Cache
- s.cacheMu.Lock()
- if len(s.cache) < MaxCacheSize {
- s.cache[offset] = val
- } else {
- // Random eviction (map iteration order is random)
- // This is a poor man's eviction, but fast.
- for k := range s.cache {
- delete(s.cache, k)
- break
- }
- s.cache[offset] = val
- }
- s.cacheMu.Unlock()
- return val, nil
- }
- func (s *Storage) Close() error {
- return s.file.Close()
- }
- // IndexEntry (Table 1)
- type IndexEntry struct {
- ValueOffset int64
- CommitIndex uint64
- }
- // Engine is the core storage engine.
- type Engine struct {
- // Replaces KeyMap + Sharded Index with a single Radix Tree
- // Radix Tree is thread-safe and stores IndexEntry directly.
- Index *RadixTree
- // Table 2: Disk Storage
- Storage *Storage
- // Table 3: Full Text Index (New Implementation)
- FTIndex *FullTextIndex
-
- KeyLocks StripedLock
- LastCommitIndex uint64
- commitMu sync.Mutex // Protects LastCommitIndex
- dataDir string
- }
- func NewEngine(dataDir string) (*Engine, error) {
- if err := os.MkdirAll(dataDir, 0755); err != nil {
- return nil, err
- }
- store, err := NewStorage(dataDir + "/values.data")
- if err != nil {
- return nil, err
- }
- e := &Engine{
- Index: NewRadixTree(),
- Storage: store,
- FTIndex: NewFullTextIndex(),
- dataDir: dataDir,
- }
- return e, nil
- }
- func (e *Engine) Close() error {
- return e.Storage.Close()
- }
- func (e *Engine) Set(key, value string, commitIndex uint64) error {
- // 0. Lock Key
- kLock := e.KeyLocks.GetLock(key)
- kLock.Lock()
- defer kLock.Unlock()
- // 1. Check existing
- // We no longer need separate KeyID lookup. Radix Tree stores it all.
- var oldOffset int64 = -1
- if entry, ok := e.Index.Get(key); ok {
- oldOffset = entry.ValueOffset
- }
- // 2. Try In-Place
- if e.Storage.TryUpdateInPlace(value, oldOffset) {
- // Update Index (CommitIndex might change)
- entry := IndexEntry{ValueOffset: oldOffset, CommitIndex: commitIndex}
- e.Index.Insert(key, entry)
-
- e.commitMu.Lock()
- if commitIndex > e.LastCommitIndex {
- e.LastCommitIndex = commitIndex
- }
- e.commitMu.Unlock()
- return nil
- }
- // 3. Write New
- newOffset, err := e.Storage.AppendOrReuse(value)
- if err != nil {
- return err
- }
- // 4. Update Index
- // Write New Entry
- e.Index.Insert(key, IndexEntry{
- ValueOffset: newOffset,
- CommitIndex: commitIndex,
- })
-
- // 5. Delete Old
- if oldOffset >= 0 {
- e.Storage.MarkDeleted(oldOffset)
- }
-
- // Update global commit index
- e.commitMu.Lock()
- if commitIndex > e.LastCommitIndex {
- e.LastCommitIndex = commitIndex
- }
- e.commitMu.Unlock()
- // 6. Update Full Text Index
- // Ideally we remove old tokens too, but that's expensive without reverse index.
- // For append-only log structured db, we just add new ones.
- e.FTIndex.Add(key, value)
-
- return nil
- }
- func (e *Engine) Get(key string) (string, bool) {
- kLock := e.KeyLocks.GetLock(key)
- kLock.RLock()
- defer kLock.RUnlock()
- entry, ok := e.Index.Get(key)
- if !ok {
- return "", false
- }
- val, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil {
- return "", false
- }
- return val, true
- }
- type QueryResult struct {
- Key string `json:"key"`
- Value string `json:"value"`
- CommitIndex uint64 `json:"commit_index"`
- }
- func WildcardMatch(str, pattern string) bool {
- s := []rune(str)
- p := []rune(pattern)
- sLen, pLen := len(s), len(p)
- i, j := 0, 0
- starIdx, matchIdx := -1, -1
- for i < sLen {
- if j < pLen && (p[j] == '?' || p[j] == s[i]) {
- i++
- j++
- } else if j < pLen && p[j] == '*' {
- starIdx = j
- matchIdx = i
- j++
- } else if starIdx != -1 {
- j = starIdx + 1
- matchIdx++
- i = matchIdx
- } else {
- return false
- }
- }
- for j < pLen && p[j] == '*' {
- j++
- }
- return j == pLen
- }
- func (e *Engine) Query(sql string) ([]QueryResult, error) {
- sql = strings.TrimSpace(sql)
-
- reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
- reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
-
- limit := -1
- offset := 0
-
- if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
- limit, _ = strconv.Atoi(match[1])
- sql = reLimit.ReplaceAllString(sql, "")
- }
- if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
- offset, _ = strconv.Atoi(match[1])
- sql = reOffset.ReplaceAllString(sql, "")
- }
- re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
- matches := re.FindAllStringSubmatch(sql, -1)
- if len(matches) == 0 {
- return nil, fmt.Errorf("invalid query")
- }
- extractString := func(s string) string {
- return strings.Trim(strings.TrimSpace(s), "\"")
- }
- // Optimization: Point Lookup Fast Path
- // If query is exactly `key = "..."`, utilize Hash/Radix Lookup O(1)
- for _, match := range matches {
- if match[1] == "key" && match[2] == "=" {
- targetKey := extractString(match[3])
- // 1. Get Entry
- entry, ok := e.Index.Get(targetKey)
- if !ok {
- return []QueryResult{}, nil
- }
-
- // 2. Load Value (if needed by other filters, but here we load anyway for result)
- val, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil {
- return []QueryResult{}, nil
- }
-
- // 3. Verify other conditions
- matchAll := true
- for _, m := range matches {
- field, op, valRaw := m[1], m[2], m[3]
- switch field {
- case "CommitIndex":
- num, _ := strconv.ParseUint(valRaw, 10, 64)
- switch op {
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
- }
- case "value":
- t := extractString(valRaw)
- switch op {
- case "=": if val != t { matchAll = false }
- case "like": if !WildcardMatch(val, t) { matchAll = false }
- }
- }
- if !matchAll { break }
- }
- if matchAll {
- return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
- }
- return []QueryResult{}, nil
- }
- }
- // Optimization: Inverted Index for Value Queries
- // Strategy:
- // 1. Extract potential tokens from `value like "..."`
- // e.g. `value like "*keyword*"` -> token "keyword"
- // 2. Look up candidates from FTIndex
- // 3. Intersect/Union candidates (if multiple)
- // 4. Fallback to Scan if no tokens found or complex query
-
- var candidates map[string]bool
- var useFTIndex bool = false
-
- for _, match := range matches {
- if match[1] == "value" && match[2] == "like" {
- pattern := extractString(match[3])
- // Extract a core token: remove * from ends
- // Simplistic extraction: find longest sequence of alphanumeric?
- // For now, assume pattern is like "*token*" or "token*"
- clean := strings.Trim(pattern, "*")
- if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
- // We have a candidate token "clean"
- // FTIndex stores partial tokens? No, exact tokens.
- // If query is *partial*, we need FTI scan.
- // Our FTI.Search handles wildcards!
-
- matches := e.FTIndex.Search(pattern) // Pass original pattern to FTI
- if matches != nil {
- // We found candidates!
- currentSet := make(map[string]bool)
- for _, k := range matches {
- currentSet[k] = true
- }
-
- if !useFTIndex {
- candidates = currentSet
- useFTIndex = true
- } else {
- // Intersect
- newSet := make(map[string]bool)
- for k := range candidates {
- if currentSet[k] {
- newSet[k] = true
- }
- }
- candidates = newSet
- }
- } else {
- // Pattern produced NO matches -> Empty Result
- return []QueryResult{}, nil
- }
- }
- }
- }
-
- // Prepare Iterator
- var iterator func(func(string, IndexEntry) bool)
-
- if useFTIndex {
- // Iterate ONLY candidates
- iterator = func(cb func(string, IndexEntry) bool) {
- // Iterate candidates sorted for deterministic output (and better cache locality?)
- // Sort candidates keys
- keys := make([]string, 0, len(candidates))
- for k := range candidates {
- keys = append(keys, k)
- }
- sort.Strings(keys)
-
- for _, k := range keys {
- if entry, ok := e.Index.Get(k); ok {
- if !cb(k, entry) {
- return
- }
- }
- }
- }
- } else {
- // Full Scan or Prefix Scan
- var prefix string = ""
- var usePrefix bool = false
- for _, match := range matches {
- if match[1] == "key" && match[2] == "like" {
- pattern := extractString(match[3])
- if strings.HasSuffix(pattern, "*") {
- clean := pattern[:len(pattern)-1]
- if !strings.ContainsAny(clean, "*?") {
- prefix = clean
- usePrefix = true
- break
- }
- }
- }
- }
-
- iterator = func(cb func(string, IndexEntry) bool) {
- if usePrefix {
- e.Index.WalkPrefix(prefix, cb)
- } else {
- e.Index.WalkPrefix("", cb)
- }
- }
- }
- var results []QueryResult
- var mu sync.Mutex
- // Execution
- iterator(func(key string, entry IndexEntry) bool {
- var valStr string
- var valLoaded bool
-
- matchAll := true
- for _, match := range matches {
- field, op, valRaw := match[1], match[2], match[3]
- switch field {
- case "CommitIndex":
- num, _ := strconv.ParseUint(valRaw, 10, 64)
- switch op {
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
- }
- case "key":
- target := extractString(valRaw)
- switch op {
- case "=": if key != target { matchAll = false }
- case "like": if !WildcardMatch(key, target) { matchAll = false }
- }
- case "value":
- // Optimization: If using FTIndex, we know token matches, but pattern might be complex.
- // However, FTI.Search already filtered by pattern!
- // So if we trusted FTI, we could skip this check IF it was the only value check.
- // But let's be safe and re-check, especially if multiple value conditions exist.
-
- if !valLoaded {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil { matchAll = false; break }
- valStr = v
- valLoaded = true
- }
- target := extractString(valRaw)
- switch op {
- case "=": if valStr != target { matchAll = false }
- case "like": if !WildcardMatch(valStr, target) { matchAll = false }
- }
- }
- if !matchAll { break }
- }
-
- if matchAll {
- if !valLoaded {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err == nil { valStr = v }
- }
- mu.Lock()
- results = append(results, QueryResult{
- Key: key,
- Value: valStr,
- CommitIndex: entry.CommitIndex,
- })
- // Optimization: Early termination for LIMIT
- // Current Logic: Only optimizes if offset == 0
- // Improvement: Optimize for Offset too.
- // If we have collected enough results (Limit + Offset), we can stop.
- // Radix Walk is ordered.
- needed := limit + offset
- if limit > 0 && len(results) >= needed {
- mu.Unlock()
- return false
- }
- mu.Unlock()
- }
- return true
- })
- // Pagination
- if offset > 0 {
- if offset >= len(results) {
- return []QueryResult{}, nil
- }
- results = results[offset:]
- }
- if limit >= 0 {
- if limit < len(results) {
- results = results[:limit]
- }
- }
- return results, nil
- }
|