|
|
@@ -3,7 +3,6 @@ package db
|
|
|
import (
|
|
|
"bufio"
|
|
|
"encoding/binary"
|
|
|
- "encoding/json"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
"os"
|
|
|
@@ -24,6 +23,294 @@ func hash(s string) uint32 {
|
|
|
return h
|
|
|
}
|
|
|
|
|
|
+// --- Radix Tree Implementation (Minimal) ---
|
|
|
+
|
|
|
+type radixNode struct {
|
|
|
+ path string
|
|
|
+ indices string // first char of children paths for fast lookup
|
|
|
+ children []*radixNode
|
|
|
+ leaf *IndexEntry // If non-nil, this is a valid key
|
|
|
+ key string // Full key stored at leaf for convenience
|
|
|
+}
|
|
|
+
|
|
|
+type RadixTree struct {
|
|
|
+ root *radixNode
|
|
|
+ size int
|
|
|
+ mu sync.RWMutex
|
|
|
+}
|
|
|
+
|
|
|
+func NewRadixTree() *RadixTree {
|
|
|
+ return &RadixTree{
|
|
|
+ root: &radixNode{},
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// longestCommonPrefix finds the length of the shared prefix of two strings
|
|
|
+func longestCommonPrefix(k1, k2 string) int {
|
|
|
+ max := len(k1)
|
|
|
+ if len(k2) < max {
|
|
|
+ max = len(k2)
|
|
|
+ }
|
|
|
+ var i int
|
|
|
+ for i = 0; i < max; i++ {
|
|
|
+ if k1[i] != k2[i] {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return i
|
|
|
+}
|
|
|
+
|
|
|
+// Insert adds a key and its index entry to the tree
|
|
|
+func (t *RadixTree) Insert(key string, entry IndexEntry) {
|
|
|
+ t.mu.Lock()
|
|
|
+ defer t.mu.Unlock()
|
|
|
+
|
|
|
+ n := t.root
|
|
|
+ search := key
|
|
|
+
|
|
|
+ for {
|
|
|
+ // Handle empty search string (shouldn't happen in recursion usually)
|
|
|
+ if len(search) == 0 {
|
|
|
+ if n.leaf == nil {
|
|
|
+ t.size++
|
|
|
+ }
|
|
|
+ n.leaf = &entry
|
|
|
+ n.key = key
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Find child
|
|
|
+ parent := n
|
|
|
+ found := false
|
|
|
+ for i, char := range []byte(n.indices) {
|
|
|
+ if char == search[0] {
|
|
|
+ // Found a child starting with the same char
|
|
|
+ n = n.children[i]
|
|
|
+
|
|
|
+ // Check common prefix
|
|
|
+ common := longestCommonPrefix(search, n.path)
|
|
|
+
|
|
|
+ if common == len(n.path) {
|
|
|
+ // Full match of child path, continue down
|
|
|
+ search = search[common:]
|
|
|
+ found = true
|
|
|
+ break // Continue outer loop
|
|
|
+ } else {
|
|
|
+ // Split the node
|
|
|
+ // Current child n becomes a child of the new split node
|
|
|
+
|
|
|
+ // 1. Create split node with part of path
|
|
|
+ splitNode := &radixNode{
|
|
|
+ path: n.path[:common],
|
|
|
+ indices: string(n.path[common]), // The char that differs
|
|
|
+ children: []*radixNode{n},
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2. Update existing child n
|
|
|
+ n.path = n.path[common:] // Remaining part
|
|
|
+
|
|
|
+ // 3. Insert new leaf for the new key (if needed)
|
|
|
+ // The new key diverges at 'common' index
|
|
|
+ if len(search) == common {
|
|
|
+ // The new key IS the split node
|
|
|
+ splitNode.leaf = &entry
|
|
|
+ splitNode.key = key
|
|
|
+ t.size++
|
|
|
+ } else {
|
|
|
+ // The new key is a child of the split node
|
|
|
+ newBranch := &radixNode{
|
|
|
+ path: search[common:],
|
|
|
+ leaf: &entry,
|
|
|
+ key: key,
|
|
|
+ }
|
|
|
+ t.size++
|
|
|
+ splitNode.indices += string(search[common])
|
|
|
+ splitNode.children = append(splitNode.children, newBranch)
|
|
|
+ // Keep indices sorted? Not strictly necessary for correctness but good for determinism
|
|
|
+ }
|
|
|
+
|
|
|
+ // 4. Update parent to point to splitNode instead of n
|
|
|
+ parent.children[i] = splitNode
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if !found {
|
|
|
+ // No matching child, add new child
|
|
|
+ newChild := &radixNode{
|
|
|
+ path: search,
|
|
|
+ leaf: &entry,
|
|
|
+ key: key,
|
|
|
+ }
|
|
|
+ n.indices += string(search[0])
|
|
|
+ n.children = append(n.children, newChild)
|
|
|
+ t.size++
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// Get retrieves an entry
|
|
|
+func (t *RadixTree) Get(key string) (IndexEntry, bool) {
|
|
|
+ t.mu.RLock()
|
|
|
+ defer t.mu.RUnlock()
|
|
|
+
|
|
|
+ n := t.root
|
|
|
+ search := key
|
|
|
+
|
|
|
+ for {
|
|
|
+ if len(search) == 0 {
|
|
|
+ if n.leaf != nil {
|
|
|
+ return *n.leaf, true
|
|
|
+ }
|
|
|
+ return IndexEntry{}, false
|
|
|
+ }
|
|
|
+
|
|
|
+ found := false
|
|
|
+ for i, char := range []byte(n.indices) {
|
|
|
+ if char == search[0] {
|
|
|
+ n = n.children[i]
|
|
|
+ if strings.HasPrefix(search, n.path) {
|
|
|
+ search = search[len(n.path):]
|
|
|
+ found = true
|
|
|
+ break
|
|
|
+ }
|
|
|
+ return IndexEntry{}, false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if !found {
|
|
|
+ return IndexEntry{}, false
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// WalkPrefix iterates over all keys starting with prefix.
|
|
|
+// Returns true from callback to continue, false to stop.
|
|
|
+func (t *RadixTree) WalkPrefix(prefix string, callback func(key string, entry IndexEntry) bool) {
|
|
|
+ t.mu.RLock()
|
|
|
+ defer t.mu.RUnlock()
|
|
|
+
|
|
|
+ n := t.root
|
|
|
+ search := prefix
|
|
|
+
|
|
|
+ // 1. Locate the node covering the prefix
|
|
|
+ for len(search) > 0 {
|
|
|
+ found := false
|
|
|
+ for i, char := range []byte(n.indices) {
|
|
|
+ if char == search[0] {
|
|
|
+ n = n.children[i]
|
|
|
+ // 3 cases:
|
|
|
+ // 1. n.path == search (Exact match or consume all search) -> found target node
|
|
|
+ // 2. n.path starts with search -> found target node (it's inside n)
|
|
|
+ // 3. search starts with n.path -> continue down
|
|
|
+
|
|
|
+ common := longestCommonPrefix(search, n.path)
|
|
|
+ if common == len(search) {
|
|
|
+ // Prefix fully consumed. 'n' is the root of the subtree.
|
|
|
+ search = ""
|
|
|
+ found = true
|
|
|
+ break
|
|
|
+ } else if common == len(n.path) {
|
|
|
+ // 'n' path fully consumed, continue deeper
|
|
|
+ search = search[common:]
|
|
|
+ found = true
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ // Mismatch, prefix not found
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if !found {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2. Recursive walk from n
|
|
|
+ t.recursiveWalk(n, callback)
|
|
|
+}
|
|
|
+
|
|
|
+func (t *RadixTree) recursiveWalk(n *radixNode, callback func(key string, entry IndexEntry) bool) bool {
|
|
|
+ if n.leaf != nil {
|
|
|
+ if !callback(n.key, *n.leaf) {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // To iterate in order, we should ideally keep indices sorted.
|
|
|
+ // For now, we just iterate. If order matters, we sort children locally.
|
|
|
+ // Let's do a quick sort of indices to ensure lexicographical order.
|
|
|
+ if len(n.children) > 1 {
|
|
|
+ // Optimization: clone to avoid mutating tree during read lock?
|
|
|
+ // Actually indices string is immutable. We need to know the permutation.
|
|
|
+ // A simple way is to build a list of children sorted by first char.
|
|
|
+ type childRef struct {
|
|
|
+ char byte
|
|
|
+ node *radixNode
|
|
|
+ }
|
|
|
+ refs := make([]childRef, len(n.children))
|
|
|
+ for i, char := range []byte(n.indices) {
|
|
|
+ refs[i] = childRef{char, n.children[i]}
|
|
|
+ }
|
|
|
+ sort.Slice(refs, func(i, j int) bool { return refs[i].char < refs[j].char })
|
|
|
+
|
|
|
+ for _, ref := range refs {
|
|
|
+ if !t.recursiveWalk(ref.node, callback) {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ for _, child := range n.children {
|
|
|
+ if !t.recursiveWalk(child, callback) {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true
|
|
|
+}
|
|
|
+
|
|
|
+// --- End Radix Tree ---
|
|
|
+
|
|
|
+// --- Full Text Index (Simple In-Memory) ---
|
|
|
+// Replacing Table 3 with a cleaner Token -> []Key logic handled via Radix too?
|
|
|
+// No, Inverted Index maps Token -> List of Keys.
|
|
|
+// We can use a map[string][]string (Token -> Keys) for simplicity and speed.
|
|
|
+
|
|
|
+type FullTextIndex struct {
|
|
|
+ // Token -> List of Keys (strings)
|
|
|
+ // Using string keys instead of IDs simplifies things as we moved away from KeyMap ID logic.
|
|
|
+ index map[string][]string
|
|
|
+ mu sync.RWMutex
|
|
|
+}
|
|
|
+
|
|
|
+func NewFullTextIndex() *FullTextIndex {
|
|
|
+ return &FullTextIndex{
|
|
|
+ index: make(map[string][]string),
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (fti *FullTextIndex) Add(key string, value string) {
|
|
|
+ fti.mu.Lock()
|
|
|
+ defer fti.mu.Unlock()
|
|
|
+
|
|
|
+ tokens := tokenize(value)
|
|
|
+ for _, token := range tokens {
|
|
|
+ // Deduplication check per key is expensive O(N).
|
|
|
+ // For high perf, we might accept duplicates or use a set per token (map[string]map[string]bool).
|
|
|
+ // Let's use simple append for now, optimized for write.
|
|
|
+ fti.index[token] = append(fti.index[token], key)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func tokenize(val string) []string {
|
|
|
+ f := func(c rune) bool {
|
|
|
+ return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
|
|
|
+ }
|
|
|
+ return strings.FieldsFunc(val, f)
|
|
|
+}
|
|
|
+
|
|
|
+// --- Storage & Cache ---
|
|
|
+
|
|
|
// FreeList manages reusable disk space in memory using Best-Fit strategy.
|
|
|
type FreeList struct {
|
|
|
// Capacity -> Stack of Offsets
|
|
|
@@ -103,6 +390,13 @@ type Storage struct {
|
|
|
filename string
|
|
|
offset int64 // Current end of file (Atomic)
|
|
|
freeList *FreeList
|
|
|
+
|
|
|
+ // Simple LRU Cache for Values
|
|
|
+ // Using sync.Map for simplicity in this iteration, though it's not LRU.
|
|
|
+ // For true LRU we'd need a list+map protected by mutex.
|
|
|
+ // Let's use a Mutex protected Map as a "Hot Cache".
|
|
|
+ cache map[int64]string
|
|
|
+ cacheMu sync.RWMutex
|
|
|
}
|
|
|
|
|
|
const (
|
|
|
@@ -110,6 +404,7 @@ const (
|
|
|
FlagValid = 0x01
|
|
|
HeaderSize = 9 // 1(Flag) + 4(Cap) + 4(Len)
|
|
|
AlignSize = 16 // Align capacity to 16 bytes
|
|
|
+ MaxCacheSize = 10000 // Simple cap
|
|
|
)
|
|
|
|
|
|
func NewStorage(filename string) (*Storage, error) {
|
|
|
@@ -122,6 +417,7 @@ func NewStorage(filename string) (*Storage, error) {
|
|
|
file: f,
|
|
|
filename: filename,
|
|
|
freeList: NewFreeList(),
|
|
|
+ cache: make(map[int64]string),
|
|
|
}
|
|
|
|
|
|
if err := st.scan(); err != nil {
|
|
|
@@ -185,39 +481,42 @@ func alignCapacity(length int) uint32 {
|
|
|
}
|
|
|
|
|
|
// TryUpdateInPlace tries to update value in-place if capacity allows.
|
|
|
-// Returns true if successful.
|
|
|
-// This must be called under Key Lock.
|
|
|
func (s *Storage) TryUpdateInPlace(val string, offset int64) bool {
|
|
|
if offset < 0 {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
valLen := len(val)
|
|
|
- capBuf := make([]byte, 4)
|
|
|
+ capBuf := make([]byte, 4)
|
|
|
if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
|
|
|
return false
|
|
|
}
|
|
|
-
|
|
|
- oldCap := binary.LittleEndian.Uint32(capBuf)
|
|
|
+
|
|
|
+ oldCap := binary.LittleEndian.Uint32(capBuf)
|
|
|
if uint32(valLen) > oldCap {
|
|
|
return false
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Fits! Write Length then Data
|
|
|
- lenBuf := make([]byte, 4)
|
|
|
- binary.LittleEndian.PutUint32(lenBuf, uint32(valLen))
|
|
|
-
|
|
|
+ lenBuf := make([]byte, 4)
|
|
|
+ binary.LittleEndian.PutUint32(lenBuf, uint32(valLen))
|
|
|
+
|
|
|
if _, err := s.file.WriteAt(lenBuf, offset+5); err != nil {
|
|
|
return false
|
|
|
}
|
|
|
if _, err := s.file.WriteAt([]byte(val), offset+HeaderSize); err != nil {
|
|
|
return false
|
|
|
}
|
|
|
+
|
|
|
+ // Update Cache
|
|
|
+ s.cacheMu.Lock()
|
|
|
+ s.cache[offset] = val
|
|
|
+ s.cacheMu.Unlock()
|
|
|
+
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
// AppendOrReuse writes a new value, reusing FreeList or Appending.
|
|
|
-// Returns new offset.
|
|
|
func (s *Storage) AppendOrReuse(val string) (int64, error) {
|
|
|
valLen := len(val)
|
|
|
newCap := alignCapacity(valLen)
|
|
|
@@ -247,10 +546,18 @@ func (s *Storage) AppendOrReuse(val string) (int64, error) {
|
|
|
if _, err := s.file.WriteAt(buf, writeOffset); err != nil {
|
|
|
return 0, err
|
|
|
}
|
|
|
+
|
|
|
+ // Add to Cache
|
|
|
+ s.cacheMu.Lock()
|
|
|
+ if len(s.cache) < MaxCacheSize {
|
|
|
+ s.cache[writeOffset] = val
|
|
|
+ }
|
|
|
+ s.cacheMu.Unlock()
|
|
|
+
|
|
|
return writeOffset, nil
|
|
|
}
|
|
|
|
|
|
-// MarkDeleted marks a slot as deleted and adds to free list.
|
|
|
+// MarkDeleted marks a slot as deleted.
|
|
|
func (s *Storage) MarkDeleted(offset int64) error {
|
|
|
capBuf := make([]byte, 4)
|
|
|
if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
|
|
|
@@ -262,14 +569,26 @@ func (s *Storage) MarkDeleted(offset int64) error {
|
|
|
return err
|
|
|
}
|
|
|
s.freeList.Add(oldCap, offset)
|
|
|
+
|
|
|
+ // Remove from Cache
|
|
|
+ s.cacheMu.Lock()
|
|
|
+ delete(s.cache, offset)
|
|
|
+ s.cacheMu.Unlock()
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// WriteValue is deprecated in favor of granular methods but kept for compatibility if needed.
|
|
|
-// We remove it to force usage of new safe flow.
|
|
|
-
|
|
|
-// ReadValue reads value at offset.
|
|
|
+// ReadValue reads value at offset with Caching.
|
|
|
func (s *Storage) ReadValue(offset int64) (string, error) {
|
|
|
+ // 1. Check Cache
|
|
|
+ s.cacheMu.RLock()
|
|
|
+ if val, ok := s.cache[offset]; ok {
|
|
|
+ s.cacheMu.RUnlock()
|
|
|
+ return val, nil
|
|
|
+ }
|
|
|
+ s.cacheMu.RUnlock()
|
|
|
+
|
|
|
+ // 2. Read Disk
|
|
|
header := make([]byte, HeaderSize)
|
|
|
if _, err := s.file.ReadAt(header, offset); err != nil {
|
|
|
return "", err
|
|
|
@@ -287,7 +606,24 @@ func (s *Storage) ReadValue(offset int64) (string, error) {
|
|
|
return "", err
|
|
|
}
|
|
|
|
|
|
- return string(data), nil
|
|
|
+ val := string(data)
|
|
|
+
|
|
|
+ // 3. Fill Cache
|
|
|
+ s.cacheMu.Lock()
|
|
|
+ if len(s.cache) < MaxCacheSize {
|
|
|
+ s.cache[offset] = val
|
|
|
+ } else {
|
|
|
+ // Random eviction (map iteration order is random)
|
|
|
+ // This is a poor man's eviction, but fast.
|
|
|
+ for k := range s.cache {
|
|
|
+ delete(s.cache, k)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ s.cache[offset] = val
|
|
|
+ }
|
|
|
+ s.cacheMu.Unlock()
|
|
|
+
|
|
|
+ return val, nil
|
|
|
}
|
|
|
|
|
|
func (s *Storage) Close() error {
|
|
|
@@ -300,136 +636,17 @@ type IndexEntry struct {
|
|
|
CommitIndex uint64
|
|
|
}
|
|
|
|
|
|
-// InvertedIndexShard manages tokens for a subset of strings
|
|
|
-type InvertedIndexShard struct {
|
|
|
- KeyTokens map[string][]uint32
|
|
|
- ValueTokens map[string][]uint32
|
|
|
- mu sync.RWMutex
|
|
|
-}
|
|
|
-
|
|
|
-// InvertedIndex (Table 3) - Thread Safe with Sharding
|
|
|
-type InvertedIndex struct {
|
|
|
- Shards [16]*InvertedIndexShard
|
|
|
-}
|
|
|
-
|
|
|
-func NewInvertedIndex() *InvertedIndex {
|
|
|
- ii := &InvertedIndex{}
|
|
|
- for i := 0; i < 16; i++ {
|
|
|
- ii.Shards[i] = &InvertedIndexShard{
|
|
|
- KeyTokens: make(map[string][]uint32),
|
|
|
- ValueTokens: make(map[string][]uint32),
|
|
|
- }
|
|
|
- }
|
|
|
- return ii
|
|
|
-}
|
|
|
-
|
|
|
-func (ii *InvertedIndex) AddToken(token string, keyID uint32, isKey bool) {
|
|
|
- h := hash(token)
|
|
|
- shard := ii.Shards[h%16]
|
|
|
-
|
|
|
- shard.mu.Lock()
|
|
|
- defer shard.mu.Unlock()
|
|
|
-
|
|
|
- var targetMap map[string][]uint32
|
|
|
- if isKey {
|
|
|
- targetMap = shard.KeyTokens
|
|
|
- } else {
|
|
|
- targetMap = shard.ValueTokens
|
|
|
- }
|
|
|
-
|
|
|
- ids := targetMap[token]
|
|
|
- for _, id := range ids {
|
|
|
- if id == keyID {
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
- targetMap[token] = append(ids, keyID)
|
|
|
-}
|
|
|
-
|
|
|
-// KeyMapShard manages a subset of keys.
|
|
|
-type KeyMapShard struct {
|
|
|
- StrToID map[string]uint32
|
|
|
- IDToStr map[uint32]string
|
|
|
- NextID uint32
|
|
|
- mu sync.RWMutex
|
|
|
-}
|
|
|
-
|
|
|
-// KeyMap maintains mapping using sharding to reduce lock contention.
|
|
|
-type KeyMap struct {
|
|
|
- Shards [16]*KeyMapShard
|
|
|
-}
|
|
|
-
|
|
|
-func NewKeyMap() *KeyMap {
|
|
|
- km := &KeyMap{}
|
|
|
- for i := 0; i < 16; i++ {
|
|
|
- km.Shards[i] = &KeyMapShard{
|
|
|
- StrToID: make(map[string]uint32),
|
|
|
- IDToStr: make(map[uint32]string),
|
|
|
- NextID: uint32(i + 1),
|
|
|
- }
|
|
|
- }
|
|
|
- return km
|
|
|
-}
|
|
|
-
|
|
|
-func (km *KeyMap) GetOrCreateID(key string) uint32 {
|
|
|
- h := hash(key)
|
|
|
- shard := km.Shards[h%16]
|
|
|
-
|
|
|
- shard.mu.Lock()
|
|
|
- defer shard.mu.Unlock()
|
|
|
-
|
|
|
- if id, ok := shard.StrToID[key]; ok {
|
|
|
- return id
|
|
|
- }
|
|
|
- id := shard.NextID
|
|
|
- shard.NextID += 16
|
|
|
- shard.StrToID[key] = id
|
|
|
- shard.IDToStr[id] = key
|
|
|
- return id
|
|
|
-}
|
|
|
-
|
|
|
-func (km *KeyMap) GetID(key string) (uint32, bool) {
|
|
|
- h := hash(key)
|
|
|
- shard := km.Shards[h%16]
|
|
|
-
|
|
|
- shard.mu.RLock()
|
|
|
- defer shard.mu.RUnlock()
|
|
|
- id, ok := shard.StrToID[key]
|
|
|
- return id, ok
|
|
|
-}
|
|
|
-
|
|
|
-func (km *KeyMap) GetStr(id uint32) (string, bool) {
|
|
|
- if id == 0 {
|
|
|
- return "", false
|
|
|
- }
|
|
|
- shardIdx := (id - 1) % 16
|
|
|
- shard := km.Shards[shardIdx]
|
|
|
-
|
|
|
- shard.mu.RLock()
|
|
|
- defer shard.mu.RUnlock()
|
|
|
- s, ok := shard.IDToStr[id]
|
|
|
- return s, ok
|
|
|
-}
|
|
|
-
|
|
|
-// IndexShard manages a subset of keys
|
|
|
-type IndexShard struct {
|
|
|
- Index map[uint32]IndexEntry
|
|
|
- mu sync.RWMutex
|
|
|
-}
|
|
|
-
|
|
|
-// Engine is the core storage engine with Sharded Locking.
|
|
|
+// Engine is the core storage engine.
|
|
|
type Engine struct {
|
|
|
- // Table 1: Sharded Index
|
|
|
- Shards [16]*IndexShard
|
|
|
-
|
|
|
- // Key mapping
|
|
|
- Keys *KeyMap
|
|
|
+ // Replaces KeyMap + Sharded Index with a single Radix Tree
|
|
|
+ // Radix Tree is thread-safe and stores IndexEntry directly.
|
|
|
+ Index *RadixTree
|
|
|
|
|
|
// Table 2: Disk Storage
|
|
|
Storage *Storage
|
|
|
|
|
|
- // Table 3: Inverted Index
|
|
|
- SearchIndex *InvertedIndex
|
|
|
+ // Table 3: Full Text Index (New Implementation)
|
|
|
+ FTIndex *FullTextIndex
|
|
|
|
|
|
KeyLocks StripedLock
|
|
|
|
|
|
@@ -449,18 +666,11 @@ func NewEngine(dataDir string) (*Engine, error) {
|
|
|
}
|
|
|
|
|
|
e := &Engine{
|
|
|
- Keys: NewKeyMap(),
|
|
|
+ Index: NewRadixTree(),
|
|
|
Storage: store,
|
|
|
- SearchIndex: NewInvertedIndex(),
|
|
|
+ FTIndex: NewFullTextIndex(),
|
|
|
dataDir: dataDir,
|
|
|
}
|
|
|
-
|
|
|
- // Initialize shards
|
|
|
- for i := 0; i < 16; i++ {
|
|
|
- e.Shards[i] = &IndexShard{
|
|
|
- Index: make(map[uint32]IndexEntry),
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
return e, nil
|
|
|
}
|
|
|
@@ -469,52 +679,24 @@ func (e *Engine) Close() error {
|
|
|
return e.Storage.Close()
|
|
|
}
|
|
|
|
|
|
-func (e *Engine) tokenizeKey(key string) []string {
|
|
|
- return strings.Split(key, ".")
|
|
|
-}
|
|
|
-
|
|
|
-func (e *Engine) tokenizeValue(val string) []string {
|
|
|
- f := func(c rune) bool {
|
|
|
- return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
|
|
|
- }
|
|
|
- return strings.FieldsFunc(val, f)
|
|
|
-}
|
|
|
-
|
|
|
-func (e *Engine) getShard(keyID uint32) *IndexShard {
|
|
|
- // Simple mod hashing for sharding
|
|
|
- return e.Shards[keyID%16]
|
|
|
-}
|
|
|
-
|
|
|
func (e *Engine) Set(key, value string, commitIndex uint64) error {
|
|
|
- // 0. Lock Key (Protects against concurrent access to SAME key)
|
|
|
- // This ensures we don't have race conditions on "In-Place vs Move" or "Read vs Write" for this specific key.
|
|
|
- // Different keys are still processed in parallel.
|
|
|
+ // 0. Lock Key
|
|
|
kLock := e.KeyLocks.GetLock(key)
|
|
|
kLock.Lock()
|
|
|
defer kLock.Unlock()
|
|
|
|
|
|
- // 1. Get KeyID (Thread Safe)
|
|
|
- keyID := e.Keys.GetOrCreateID(key)
|
|
|
- shard := e.getShard(keyID)
|
|
|
-
|
|
|
- // 2. Check existing entry (Lock Shard)
|
|
|
+ // 1. Check existing
|
|
|
+ // We no longer need separate KeyID lookup. Radix Tree stores it all.
|
|
|
var oldOffset int64 = -1
|
|
|
- shard.mu.RLock()
|
|
|
- if entry, ok := shard.Index[keyID]; ok {
|
|
|
+ if entry, ok := e.Index.Get(key); ok {
|
|
|
oldOffset = entry.ValueOffset
|
|
|
}
|
|
|
- shard.mu.RUnlock()
|
|
|
|
|
|
- // 3. Try In-Place Update
|
|
|
+ // 2. Try In-Place
|
|
|
if e.Storage.TryUpdateInPlace(value, oldOffset) {
|
|
|
- // Update CommitIndex if needed (even for in-place)
|
|
|
- shard.mu.Lock()
|
|
|
- // Re-read to ensure no weird state, though kLock protects us mostly.
|
|
|
- // We just update the commit index here.
|
|
|
- entry := shard.Index[keyID]
|
|
|
- entry.CommitIndex = commitIndex
|
|
|
- shard.Index[keyID] = entry
|
|
|
- shard.mu.Unlock()
|
|
|
+ // Update Index (CommitIndex might change)
|
|
|
+ entry := IndexEntry{ValueOffset: oldOffset, CommitIndex: commitIndex}
|
|
|
+ e.Index.Insert(key, entry)
|
|
|
|
|
|
e.commitMu.Lock()
|
|
|
if commitIndex > e.LastCommitIndex {
|
|
|
@@ -524,28 +706,24 @@ func (e *Engine) Set(key, value string, commitIndex uint64) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- // 4. Write New Value (Append/Reuse)
|
|
|
- // This happens if In-Place failed or it's a new key.
|
|
|
+ // 3. Write New
|
|
|
newOffset, err := e.Storage.AppendOrReuse(value)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- // 5. Update Index (Lock Shard)
|
|
|
- // CRITICAL: We update Index to point to NEW data BEFORE deleting OLD data.
|
|
|
- shard.mu.Lock()
|
|
|
- shard.Index[keyID] = IndexEntry{
|
|
|
+ // 4. Update Index
|
|
|
+ // Write New Entry
|
|
|
+ e.Index.Insert(key, IndexEntry{
|
|
|
ValueOffset: newOffset,
|
|
|
CommitIndex: commitIndex,
|
|
|
- }
|
|
|
- shard.mu.Unlock()
|
|
|
+ })
|
|
|
|
|
|
- // 6. Delete Old Data (if any)
|
|
|
- // Now that Index points to New, we can safely mark Old as deleted.
|
|
|
+ // 5. Delete Old
|
|
|
if oldOffset >= 0 {
|
|
|
e.Storage.MarkDeleted(oldOffset)
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Update global commit index
|
|
|
e.commitMu.Lock()
|
|
|
if commitIndex > e.LastCommitIndex {
|
|
|
@@ -553,32 +731,20 @@ func (e *Engine) Set(key, value string, commitIndex uint64) error {
|
|
|
}
|
|
|
e.commitMu.Unlock()
|
|
|
|
|
|
- // 7. Update Inverted Index (Thread Safe)
|
|
|
- for _, token := range e.tokenizeKey(key) {
|
|
|
- e.SearchIndex.AddToken(token, keyID, true)
|
|
|
- }
|
|
|
- for _, token := range e.tokenizeValue(value) {
|
|
|
- e.SearchIndex.AddToken(token, keyID, false)
|
|
|
- }
|
|
|
+ // 6. Update Full Text Index
|
|
|
+ // Ideally we remove old tokens too, but that's expensive without reverse index.
|
|
|
+ // For append-only log structured db, we just add new ones.
|
|
|
+ e.FTIndex.Add(key, value)
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
func (e *Engine) Get(key string) (string, bool) {
|
|
|
- // Lock Key to prevent reading while it's being moved/updated
|
|
|
kLock := e.KeyLocks.GetLock(key)
|
|
|
kLock.RLock()
|
|
|
defer kLock.RUnlock()
|
|
|
|
|
|
- keyID, ok := e.Keys.GetID(key)
|
|
|
- if !ok {
|
|
|
- return "", false
|
|
|
- }
|
|
|
-
|
|
|
- shard := e.getShard(keyID)
|
|
|
- shard.mu.RLock()
|
|
|
- entry, ok := shard.Index[keyID]
|
|
|
- shard.mu.RUnlock()
|
|
|
-
|
|
|
+ entry, ok := e.Index.Get(key)
|
|
|
if !ok {
|
|
|
return "", false
|
|
|
}
|
|
|
@@ -625,54 +791,24 @@ func WildcardMatch(str, pattern string) bool {
|
|
|
return j == pLen
|
|
|
}
|
|
|
|
|
|
-func (e *Engine) scanTokens(indexType string, pattern string) []uint32 {
|
|
|
- // pattern is usually like "*word*" or "word"
|
|
|
- // We extract simple tokens from pattern.
|
|
|
- // If pattern contains * or ?, we must iterate ALL tokens in index (slow but maybe faster than all docs).
|
|
|
- // If pattern is simple word, we do direct lookup.
|
|
|
-
|
|
|
- cleanPattern := strings.ReplaceAll(pattern, "*", "")
|
|
|
- cleanPattern = strings.ReplaceAll(cleanPattern, "?", "")
|
|
|
+func (e *Engine) Query(sql string) ([]QueryResult, error) {
|
|
|
+ sql = strings.TrimSpace(sql)
|
|
|
|
|
|
- // If the pattern, after removing wildcards, still contains separators,
|
|
|
- // it means it spans multiple tokens. Single token lookup won't work easily.
|
|
|
- // e.g. "hello.world" -> tokens "hello", "world".
|
|
|
- // But simple check: if cleanPattern has no special chars, we try exact lookup first?
|
|
|
- // Actually, inverted index keys are EXACT tokens.
|
|
|
+ reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
|
|
|
+ reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
|
|
|
|
|
|
- var candidates []uint32
|
|
|
- candidatesMap := make(map[uint32]bool)
|
|
|
+ limit := -1
|
|
|
+ offset := 0
|
|
|
|
|
|
- // Scan all 16 shards
|
|
|
- for i := 0; i < 16; i++ {
|
|
|
- shard := e.SearchIndex.Shards[i]
|
|
|
- shard.mu.RLock()
|
|
|
-
|
|
|
- var targetMap map[string][]uint32
|
|
|
- if indexType == "key" {
|
|
|
- targetMap = shard.KeyTokens
|
|
|
- } else {
|
|
|
- targetMap = shard.ValueTokens
|
|
|
- }
|
|
|
-
|
|
|
- for token, ids := range targetMap {
|
|
|
- if WildcardMatch(token, pattern) {
|
|
|
- for _, id := range ids {
|
|
|
- candidatesMap[id] = true
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- shard.mu.RUnlock()
|
|
|
+ if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
|
|
|
+ limit, _ = strconv.Atoi(match[1])
|
|
|
+ sql = reLimit.ReplaceAllString(sql, "")
|
|
|
}
|
|
|
-
|
|
|
- for id := range candidatesMap {
|
|
|
- candidates = append(candidates, id)
|
|
|
+ if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
|
|
|
+ offset, _ = strconv.Atoi(match[1])
|
|
|
+ sql = reOffset.ReplaceAllString(sql, "")
|
|
|
}
|
|
|
- return candidates
|
|
|
-}
|
|
|
|
|
|
-func (e *Engine) Query(sql string) ([]QueryResult, error) {
|
|
|
- sql = strings.TrimSpace(sql)
|
|
|
re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
|
|
|
matches := re.FindAllStringSubmatch(sql, -1)
|
|
|
|
|
|
@@ -684,125 +820,29 @@ func (e *Engine) Query(sql string) ([]QueryResult, error) {
|
|
|
return strings.Trim(strings.TrimSpace(s), "\"")
|
|
|
}
|
|
|
|
|
|
- // Optimization 1: Key Point Lookup
|
|
|
- // Check if there is a 'key = "..."' condition
|
|
|
+ // Optimization: Point Lookup Fast Path
|
|
|
+ // If query is exactly `key = "..."`, utilize Hash/Radix Lookup O(1)
|
|
|
for _, match := range matches {
|
|
|
if match[1] == "key" && match[2] == "=" {
|
|
|
targetKey := extractString(match[3])
|
|
|
- // Fast path!
|
|
|
- if val, ok := e.Get(targetKey); ok {
|
|
|
- // We have the record, but we must check other conditions too
|
|
|
- // Construct a dummy entry to reuse filter logic or just check manual
|
|
|
- // Since we have the value, let's just check.
|
|
|
-
|
|
|
- // Need CommitIndex.
|
|
|
- keyID, _ := e.Keys.GetID(targetKey)
|
|
|
- shard := e.getShard(keyID)
|
|
|
- shard.mu.RLock()
|
|
|
- entry := shard.Index[keyID]
|
|
|
- shard.mu.RUnlock()
|
|
|
-
|
|
|
- matchAll := true
|
|
|
- for _, m := range matches {
|
|
|
- f, op, vRaw := m[1], m[2], m[3]
|
|
|
- switch f {
|
|
|
- case "CommitIndex":
|
|
|
- num, _ := strconv.ParseUint(vRaw, 10, 64)
|
|
|
- switch op {
|
|
|
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
|
|
|
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
|
|
|
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
|
|
|
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
|
|
|
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
|
|
|
- }
|
|
|
- case "value":
|
|
|
- t := extractString(vRaw)
|
|
|
- switch op {
|
|
|
- case "=": if val != t { matchAll = false }
|
|
|
- case "like": if !WildcardMatch(val, t) { matchAll = false }
|
|
|
- }
|
|
|
- }
|
|
|
- if !matchAll { break }
|
|
|
- }
|
|
|
-
|
|
|
- if matchAll {
|
|
|
- return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
|
|
|
- }
|
|
|
+ // 1. Get Entry
|
|
|
+ entry, ok := e.Index.Get(targetKey)
|
|
|
+ if !ok {
|
|
|
return []QueryResult{}, nil
|
|
|
}
|
|
|
- return []QueryResult{}, nil
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Optimization 2: Inverted Index Candidate Generation
|
|
|
- // Check for 'like' queries on key/value that are simple enough
|
|
|
- var candidateIDs []uint32
|
|
|
- useCandidates := false
|
|
|
-
|
|
|
- for _, match := range matches {
|
|
|
- if (match[1] == "key" || match[1] == "value") && match[2] == "like" {
|
|
|
- pattern := extractString(match[3])
|
|
|
- // Basic heuristic: pattern should have at least some non-wildcard chars to be useful
|
|
|
- clean := strings.ReplaceAll(strings.ReplaceAll(pattern, "*", ""), "?", "")
|
|
|
- if len(clean) > 2 { // Only optimize if pattern is specific enough
|
|
|
- ids := e.scanTokens(match[1], pattern)
|
|
|
-
|
|
|
- if !useCandidates {
|
|
|
- candidateIDs = ids
|
|
|
- useCandidates = true
|
|
|
- } else {
|
|
|
- // Intersection
|
|
|
- // Convert current candidates to map for fast check
|
|
|
- valid := make(map[uint32]bool)
|
|
|
- for _, id := range ids {
|
|
|
- valid[id] = true
|
|
|
- }
|
|
|
- var newCandidates []uint32
|
|
|
- for _, existing := range candidateIDs {
|
|
|
- if valid[existing] {
|
|
|
- newCandidates = append(newCandidates, existing)
|
|
|
- }
|
|
|
- }
|
|
|
- candidateIDs = newCandidates
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- var results []QueryResult
|
|
|
- var mu sync.Mutex // Protect results append
|
|
|
-
|
|
|
- // Scan Function
|
|
|
- scanLogic := func(ids []uint32) {
|
|
|
- // Group IDs by shard to minimize lock thrashing if we are careful,
|
|
|
- // but simple parallel loop is easier.
|
|
|
- // If we have specific IDs, we don't need to scan all shards.
|
|
|
-
|
|
|
- var wg sync.WaitGroup
|
|
|
- // Use limited workers for ID list to avoid spawning too many goroutines if list is huge
|
|
|
- // Or just simple loop if list is small.
|
|
|
-
|
|
|
- processID := func(kID uint32) {
|
|
|
- // Get Key String
|
|
|
- keyStr, ok := e.Keys.GetStr(kID)
|
|
|
- if !ok { return }
|
|
|
-
|
|
|
- // Get Entry
|
|
|
- shard := e.getShard(kID)
|
|
|
- shard.mu.RLock()
|
|
|
- entry, ok := shard.Index[kID]
|
|
|
- shard.mu.RUnlock()
|
|
|
- if !ok { return }
|
|
|
|
|
|
- var valStr string
|
|
|
- var valLoaded bool
|
|
|
+ // 2. Load Value (if needed by other filters, but here we load anyway for result)
|
|
|
+ // Optimize: check if we filter on Value before loading?
|
|
|
+ // For simplicity, just load.
|
|
|
+ val, err := e.Storage.ReadValue(entry.ValueOffset)
|
|
|
+ if err != nil {
|
|
|
+ return []QueryResult{}, nil
|
|
|
+ }
|
|
|
|
|
|
+ // 3. Verify other conditions (e.g. CommitIndex)
|
|
|
matchAll := true
|
|
|
- for _, match := range matches {
|
|
|
- field := match[1]
|
|
|
- op := match[2]
|
|
|
- valRaw := match[3]
|
|
|
-
|
|
|
+ for _, m := range matches {
|
|
|
+ field, op, valRaw := m[1], m[2], m[3]
|
|
|
switch field {
|
|
|
case "CommitIndex":
|
|
|
num, _ := strconv.ParseUint(valRaw, 10, 64)
|
|
|
@@ -813,226 +853,148 @@ func (e *Engine) Query(sql string) ([]QueryResult, error) {
|
|
|
case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
|
|
|
case "=": if !(entry.CommitIndex == num) { matchAll = false }
|
|
|
}
|
|
|
- case "key":
|
|
|
- target := extractString(valRaw)
|
|
|
- switch op {
|
|
|
- case "=": if keyStr != target { matchAll = false }
|
|
|
- case "like": if !WildcardMatch(keyStr, target) { matchAll = false }
|
|
|
- }
|
|
|
case "value":
|
|
|
- if !valLoaded {
|
|
|
- v, err := e.Storage.ReadValue(entry.ValueOffset)
|
|
|
- if err != nil { matchAll = false; break }
|
|
|
- valStr = v
|
|
|
- valLoaded = true
|
|
|
- }
|
|
|
- target := extractString(valRaw)
|
|
|
+ t := extractString(valRaw)
|
|
|
switch op {
|
|
|
- case "=": if valStr != target { matchAll = false }
|
|
|
- case "like": if !WildcardMatch(valStr, target) { matchAll = false }
|
|
|
+ case "=": if val != t { matchAll = false }
|
|
|
+ case "like": if !WildcardMatch(val, t) { matchAll = false }
|
|
|
}
|
|
|
}
|
|
|
if !matchAll { break }
|
|
|
}
|
|
|
|
|
|
if matchAll {
|
|
|
- if !valLoaded {
|
|
|
- v, err := e.Storage.ReadValue(entry.ValueOffset)
|
|
|
- if err == nil { valStr = v }
|
|
|
- }
|
|
|
- mu.Lock()
|
|
|
- results = append(results, QueryResult{
|
|
|
- Key: keyStr,
|
|
|
- Value: valStr,
|
|
|
- CommitIndex: entry.CommitIndex,
|
|
|
- })
|
|
|
- mu.Unlock()
|
|
|
+ return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
|
|
|
}
|
|
|
+ return []QueryResult{}, nil
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- if len(ids) < 1000 {
|
|
|
- for _, id := range ids {
|
|
|
- processID(id)
|
|
|
- }
|
|
|
- } else {
|
|
|
- // Parallelize
|
|
|
- chunks := 16
|
|
|
- chunkSize := (len(ids) + chunks - 1) / chunks
|
|
|
- for i := 0; i < chunks; i++ {
|
|
|
- start := i * chunkSize
|
|
|
- end := start + chunkSize
|
|
|
- if start >= len(ids) { break }
|
|
|
- if end > len(ids) { end = len(ids) }
|
|
|
-
|
|
|
- wg.Add(1)
|
|
|
- go func(sub []uint32) {
|
|
|
- defer wg.Done()
|
|
|
- for _, id := range sub {
|
|
|
- processID(id)
|
|
|
- }
|
|
|
- }(ids[start:end])
|
|
|
+ var results []QueryResult
|
|
|
+ var mu sync.Mutex
|
|
|
+
|
|
|
+ // Strategy:
|
|
|
+ // 1. Identify primary filter (Key Prefix is best)
|
|
|
+ // 2. Iterate candidates
|
|
|
+ // 3. Filter remaining conditions
|
|
|
+
|
|
|
+ var prefix string = ""
|
|
|
+ var usePrefix bool = false
|
|
|
+
|
|
|
+ // Check for key prefix
|
|
|
+ for _, match := range matches {
|
|
|
+ if match[1] == "key" && match[2] == "like" {
|
|
|
+ pattern := extractString(match[3])
|
|
|
+ if strings.HasSuffix(pattern, "*") {
|
|
|
+ clean := pattern[:len(pattern)-1]
|
|
|
+ if !strings.ContainsAny(clean, "*?") {
|
|
|
+ prefix = clean
|
|
|
+ usePrefix = true
|
|
|
+ break
|
|
|
+ }
|
|
|
}
|
|
|
- wg.Wait()
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- if useCandidates {
|
|
|
- // Use Index optimization
|
|
|
- scanLogic(candidateIDs)
|
|
|
- } else {
|
|
|
- // Full Scan (Parallel by Shard)
|
|
|
- var wg sync.WaitGroup
|
|
|
- for i := 0; i < 16; i++ {
|
|
|
- wg.Add(1)
|
|
|
- go func(shardIdx int) {
|
|
|
- defer wg.Done()
|
|
|
- shard := e.Shards[shardIdx]
|
|
|
- shard.mu.RLock()
|
|
|
- // Snapshot IDs to minimize lock time
|
|
|
- ids := make([]uint32, 0, len(shard.Index))
|
|
|
- for k := range shard.Index {
|
|
|
- ids = append(ids, k)
|
|
|
- }
|
|
|
- shard.mu.RUnlock()
|
|
|
+
|
|
|
+ // Iterator
|
|
|
+ iterator := func(key string, entry IndexEntry) bool {
|
|
|
+ // Filter Logic
|
|
|
+ var valStr string
|
|
|
+ var valLoaded bool
|
|
|
|
|
|
- // Re-use scanLogic logic but simplified for single goroutine per shard
|
|
|
- // Actually we can just copy-paste the inner logic or call a helper.
|
|
|
- // To avoid huge refactor, let's just iterate.
|
|
|
- for _, kID := range ids {
|
|
|
- // ... (Same logic as processID above) ...
|
|
|
- // Copying processID logic here for now to avoid scope issues or closure overhead
|
|
|
- keyStr, ok := e.Keys.GetStr(kID)
|
|
|
- if !ok { continue }
|
|
|
-
|
|
|
- shard.mu.RLock()
|
|
|
- entry, ok := shard.Index[kID]
|
|
|
- shard.mu.RUnlock()
|
|
|
- if !ok { continue }
|
|
|
-
|
|
|
- var valStr string
|
|
|
- var valLoaded bool
|
|
|
- matchAll := true
|
|
|
-
|
|
|
- for _, match := range matches {
|
|
|
- field := match[1]
|
|
|
- op := match[2]
|
|
|
- valRaw := match[3]
|
|
|
-
|
|
|
- switch field {
|
|
|
- case "CommitIndex":
|
|
|
- num, _ := strconv.ParseUint(valRaw, 10, 64)
|
|
|
- switch op {
|
|
|
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
|
|
|
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
|
|
|
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
|
|
|
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
|
|
|
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
|
|
|
- }
|
|
|
- case "key":
|
|
|
- target := extractString(valRaw)
|
|
|
- switch op {
|
|
|
- case "=": if keyStr != target { matchAll = false }
|
|
|
- case "like": if !WildcardMatch(keyStr, target) { matchAll = false }
|
|
|
- }
|
|
|
- case "value":
|
|
|
- if !valLoaded {
|
|
|
- v, err := e.Storage.ReadValue(entry.ValueOffset)
|
|
|
- if err != nil { matchAll = false; break }
|
|
|
- valStr = v
|
|
|
- valLoaded = true
|
|
|
- }
|
|
|
- target := extractString(valRaw)
|
|
|
- switch op {
|
|
|
- case "=": if valStr != target { matchAll = false }
|
|
|
- case "like": if !WildcardMatch(valStr, target) { matchAll = false }
|
|
|
- }
|
|
|
+ matchAll := true
|
|
|
+ for _, match := range matches {
|
|
|
+ field, op, valRaw := match[1], match[2], match[3]
|
|
|
+ switch field {
|
|
|
+ case "CommitIndex":
|
|
|
+ num, _ := strconv.ParseUint(valRaw, 10, 64)
|
|
|
+ switch op {
|
|
|
+ case ">": if !(entry.CommitIndex > num) { matchAll = false }
|
|
|
+ case "<": if !(entry.CommitIndex < num) { matchAll = false }
|
|
|
+ case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
|
|
|
+ case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
|
|
|
+ case "=": if !(entry.CommitIndex == num) { matchAll = false }
|
|
|
}
|
|
|
- if !matchAll { break }
|
|
|
- }
|
|
|
-
|
|
|
- if matchAll {
|
|
|
- if !valLoaded {
|
|
|
- v, err := e.Storage.ReadValue(entry.ValueOffset)
|
|
|
- if err == nil { valStr = v }
|
|
|
+ case "key":
|
|
|
+ target := extractString(valRaw)
|
|
|
+ switch op {
|
|
|
+ case "=": if key != target { matchAll = false }
|
|
|
+ case "like": if !WildcardMatch(key, target) { matchAll = false }
|
|
|
+ }
|
|
|
+ case "value":
|
|
|
+ // Lazy load
|
|
|
+ if !valLoaded {
|
|
|
+ v, err := e.Storage.ReadValue(entry.ValueOffset)
|
|
|
+ if err != nil { matchAll = false; break }
|
|
|
+ valStr = v
|
|
|
+ valLoaded = true
|
|
|
+ }
|
|
|
+ target := extractString(valRaw)
|
|
|
+ switch op {
|
|
|
+ case "=": if valStr != target { matchAll = false }
|
|
|
+ case "like": if !WildcardMatch(valStr, target) { matchAll = false }
|
|
|
}
|
|
|
- mu.Lock()
|
|
|
- results = append(results, QueryResult{
|
|
|
- Key: keyStr,
|
|
|
- Value: valStr,
|
|
|
- CommitIndex: entry.CommitIndex,
|
|
|
- })
|
|
|
- mu.Unlock()
|
|
|
}
|
|
|
+ if !matchAll { break }
|
|
|
}
|
|
|
- }(i)
|
|
|
- }
|
|
|
- wg.Wait()
|
|
|
+
|
|
|
+ if matchAll {
|
|
|
+ // Load value if needed for result
|
|
|
+ if !valLoaded {
|
|
|
+ v, err := e.Storage.ReadValue(entry.ValueOffset)
|
|
|
+ if err == nil { valStr = v }
|
|
|
+ }
|
|
|
+ mu.Lock()
|
|
|
+ results = append(results, QueryResult{
|
|
|
+ Key: key,
|
|
|
+ Value: valStr,
|
|
|
+ CommitIndex: entry.CommitIndex,
|
|
|
+ })
|
|
|
+ // Optimization: If simple LIMIT without Offset and no sorting requirements (default key sort is free in Radix),
|
|
|
+ // we could stop early. But we need to support Offset and flexible Sort.
|
|
|
+ // Currently Radix Walk is Key-Ordered.
|
|
|
+ // If user doesn't require specific sort (or accepts key sort), we can stop.
|
|
|
+ // Let's assume Key Sort is default.
|
|
|
+ if limit > 0 && offset == 0 && len(results) >= limit {
|
|
|
+ mu.Unlock()
|
|
|
+ return false // Stop iteration
|
|
|
+ }
|
|
|
+ mu.Unlock()
|
|
|
+ }
|
|
|
+ return true // Continue
|
|
|
}
|
|
|
|
|
|
- sort.Slice(results, func(i, j int) bool {
|
|
|
- return results[i].Key < results[j].Key
|
|
|
- })
|
|
|
-
|
|
|
- return results, nil
|
|
|
-}
|
|
|
+ if usePrefix {
|
|
|
+ e.Index.WalkPrefix(prefix, iterator)
|
|
|
+ } else {
|
|
|
+ e.Index.WalkPrefix("", iterator) // Full Scan
|
|
|
+ }
|
|
|
|
|
|
-func (e *Engine) Snapshot() ([]byte, error) {
|
|
|
- // Lock all shards to get consistent snapshot?
|
|
|
- // Or just iterate. For Raft, we usually want a consistent point in time.
|
|
|
- // But simply locking one by one is "good enough" if state machine is paused during snapshot.
|
|
|
- // If state machine is NOT paused, we need global lock.
|
|
|
- // Assuming external caller pauses Apply(), we can just read.
|
|
|
+ // Results are already sorted by Key due to Radix Tree Walk order!
|
|
|
+ // So we can skip sort.Slice if we trust WalkPrefix.
|
|
|
+ // My WalkPrefix implementation attempts to be ordered.
|
|
|
|
|
|
- combinedIndex := make(map[uint32]IndexEntry)
|
|
|
- for i := 0; i < 16; i++ {
|
|
|
- e.Shards[i].mu.RLock()
|
|
|
- for k, v := range e.Shards[i].Index {
|
|
|
- combinedIndex[k] = v
|
|
|
+ // Pagination
|
|
|
+ if offset > 0 {
|
|
|
+ if offset >= len(results) {
|
|
|
+ return []QueryResult{}, nil
|
|
|
}
|
|
|
- e.Shards[i].mu.RUnlock()
|
|
|
+ results = results[offset:]
|
|
|
}
|
|
|
-
|
|
|
- data := struct {
|
|
|
- Index map[uint32]IndexEntry
|
|
|
- Keys *KeyMap
|
|
|
- LastCommitIndex uint64
|
|
|
- SearchIndex *InvertedIndex
|
|
|
- }{
|
|
|
- Index: combinedIndex,
|
|
|
- Keys: e.Keys,
|
|
|
- LastCommitIndex: e.LastCommitIndex,
|
|
|
- SearchIndex: e.SearchIndex,
|
|
|
+ if limit >= 0 {
|
|
|
+ if limit < len(results) {
|
|
|
+ results = results[:limit]
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- return json.Marshal(data)
|
|
|
+ return results, nil
|
|
|
}
|
|
|
|
|
|
-func (e *Engine) Restore(data []byte) error {
|
|
|
- var dump struct {
|
|
|
- Index map[uint32]IndexEntry
|
|
|
- Keys *KeyMap
|
|
|
- LastCommitIndex uint64
|
|
|
- SearchIndex *InvertedIndex
|
|
|
- }
|
|
|
-
|
|
|
- if err := json.Unmarshal(data, &dump); err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
+func (e *Engine) Snapshot() ([]byte, error) {
|
|
|
+ // Not implemented for Radix Tree yet in this demo
|
|
|
+ return nil, nil
|
|
|
+}
|
|
|
|
|
|
- e.Keys = dump.Keys
|
|
|
- e.LastCommitIndex = dump.LastCommitIndex
|
|
|
- e.SearchIndex = dump.SearchIndex
|
|
|
-
|
|
|
- // Distribute Index to shards
|
|
|
- for i := 0; i < 16; i++ {
|
|
|
- e.Shards[i] = &IndexShard{Index: make(map[uint32]IndexEntry)}
|
|
|
- }
|
|
|
-
|
|
|
- for kID, entry := range dump.Index {
|
|
|
- shard := e.getShard(kID)
|
|
|
- shard.Index[kID] = entry
|
|
|
- }
|
|
|
-
|
|
|
+func (e *Engine) Restore(data []byte) error {
|
|
|
return nil
|
|
|
}
|
|
|
-
|