| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038 |
- package db
- import (
- "bufio"
- "encoding/binary"
- "encoding/json"
- "fmt"
- "io"
- "os"
- "regexp"
- "sort"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- )
- // hash returns a 32-bit FNV-1a hash of the string
- func hash(s string) uint32 {
- var h uint32 = 2166136261
- for i := 0; i < len(s); i++ {
- h = (h * 16777619) ^ uint32(s[i])
- }
- return h
- }
- // FreeList manages reusable disk space in memory using Best-Fit strategy.
- type FreeList struct {
- // Capacity -> Stack of Offsets
- buckets map[uint32][]int64
- // Sorted list of capacities available in buckets for fast Best-Fit lookup
- sortedCaps []uint32
- mu sync.Mutex
- }
- func NewFreeList() *FreeList {
- return &FreeList{
- buckets: make(map[uint32][]int64),
- }
- }
- // Add adds an offset to the free list for a given capacity.
- func (fl *FreeList) Add(cap uint32, offset int64) {
- fl.mu.Lock()
- defer fl.mu.Unlock()
-
- if _, exists := fl.buckets[cap]; !exists {
- fl.buckets[cap] = []int64{offset}
- // Maintain sortedCaps
- fl.sortedCaps = append(fl.sortedCaps, cap)
- sort.Slice(fl.sortedCaps, func(i, j int) bool { return fl.sortedCaps[i] < fl.sortedCaps[j] })
- } else {
- fl.buckets[cap] = append(fl.buckets[cap], offset)
- }
- }
- // Pop tries to get an available offset using Best-Fit strategy.
- func (fl *FreeList) Pop(targetCap uint32) (int64, uint32, bool) {
- fl.mu.Lock()
- defer fl.mu.Unlock()
-
- idx := sort.Search(len(fl.sortedCaps), func(i int) bool {
- return fl.sortedCaps[i] >= targetCap
- })
-
- if idx >= len(fl.sortedCaps) {
- return 0, 0, false
- }
-
- foundCap := fl.sortedCaps[idx]
- offsets := fl.buckets[foundCap]
-
- if len(offsets) == 0 {
- return 0, 0, false
- }
-
- lastIdx := len(offsets) - 1
- offset := offsets[lastIdx]
-
- if lastIdx == 0 {
- delete(fl.buckets, foundCap)
- fl.sortedCaps = append(fl.sortedCaps[:idx], fl.sortedCaps[idx+1:]...)
- } else {
- fl.buckets[foundCap] = offsets[:lastIdx]
- }
-
- return offset, foundCap, true
- }
- // StripedLock provides hashed locks for keys
- type StripedLock struct {
- locks [1024]sync.RWMutex
- }
- func (sl *StripedLock) GetLock(key string) *sync.RWMutex {
- h := hash(key)
- return &sl.locks[h%1024]
- }
- // Storage (Table 2) manages disk storage for values.
- type Storage struct {
- file *os.File
- filename string
- offset int64 // Current end of file (Atomic)
- freeList *FreeList
- }
- const (
- FlagDeleted = 0x00
- FlagValid = 0x01
- HeaderSize = 9 // 1(Flag) + 4(Cap) + 4(Len)
- AlignSize = 16 // Align capacity to 16 bytes
- )
- func NewStorage(filename string) (*Storage, error) {
- f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
- if err != nil {
- return nil, err
- }
- st := &Storage{
- file: f,
- filename: filename,
- freeList: NewFreeList(),
- }
- if err := st.scan(); err != nil {
- f.Close()
- return nil, err
- }
- return st, nil
- }
- func (s *Storage) scan() error {
- offset := int64(0)
-
- if _, err := s.file.Seek(0, 0); err != nil {
- return err
- }
-
- reader := bufio.NewReader(s.file)
-
- for {
- header := make([]byte, HeaderSize)
- n, err := io.ReadFull(reader, header)
- if err == io.EOF {
- break
- }
- if err == io.ErrUnexpectedEOF && n == 0 {
- break
- }
- if err != nil {
- return err
- }
-
- flag := header[0]
- cap := binary.LittleEndian.Uint32(header[1:])
-
- if flag == FlagDeleted {
- s.freeList.Add(cap, offset)
- }
-
- discardLen := int(cap)
- if _, err := reader.Discard(discardLen); err != nil {
- buf := make([]byte, discardLen)
- if _, err := io.ReadFull(reader, buf); err != nil {
- return err
- }
- }
-
- offset += int64(HeaderSize + discardLen)
- }
-
- s.offset = offset
- return nil
- }
- func alignCapacity(length int) uint32 {
- if length == 0 {
- return AlignSize
- }
- cap := (length + AlignSize - 1) / AlignSize * AlignSize
- return uint32(cap)
- }
- // TryUpdateInPlace tries to update value in-place if capacity allows.
- // Returns true if successful.
- // This must be called under Key Lock.
- func (s *Storage) TryUpdateInPlace(val string, offset int64) bool {
- if offset < 0 {
- return false
- }
-
- valLen := len(val)
- capBuf := make([]byte, 4)
- if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
- return false
- }
-
- oldCap := binary.LittleEndian.Uint32(capBuf)
- if uint32(valLen) > oldCap {
- return false
- }
-
- // Fits! Write Length then Data
- lenBuf := make([]byte, 4)
- binary.LittleEndian.PutUint32(lenBuf, uint32(valLen))
-
- if _, err := s.file.WriteAt(lenBuf, offset+5); err != nil {
- return false
- }
- if _, err := s.file.WriteAt([]byte(val), offset+HeaderSize); err != nil {
- return false
- }
- return true
- }
- // AppendOrReuse writes a new value, reusing FreeList or Appending.
- // Returns new offset.
- func (s *Storage) AppendOrReuse(val string) (int64, error) {
- valLen := len(val)
- newCap := alignCapacity(valLen)
-
- var writeOffset int64
- var actualCap uint32
-
- // Try Reuse
- reusedOffset, capFromFree, found := s.freeList.Pop(newCap)
- if found {
- writeOffset = reusedOffset
- actualCap = capFromFree
- } else {
- // Append
- totalSize := HeaderSize + int(newCap)
- newEnd := atomic.AddInt64(&s.offset, int64(totalSize))
- writeOffset = newEnd - int64(totalSize)
- actualCap = newCap
- }
- buf := make([]byte, HeaderSize+int(actualCap))
- buf[0] = FlagValid
- binary.LittleEndian.PutUint32(buf[1:], actualCap)
- binary.LittleEndian.PutUint32(buf[5:], uint32(valLen))
- copy(buf[HeaderSize:], []byte(val))
-
- if _, err := s.file.WriteAt(buf, writeOffset); err != nil {
- return 0, err
- }
- return writeOffset, nil
- }
- // MarkDeleted marks a slot as deleted and adds to free list.
- func (s *Storage) MarkDeleted(offset int64) error {
- capBuf := make([]byte, 4)
- if _, err := s.file.ReadAt(capBuf, offset+1); err != nil {
- return err
- }
- oldCap := binary.LittleEndian.Uint32(capBuf)
- if _, err := s.file.WriteAt([]byte{FlagDeleted}, offset); err != nil {
- return err
- }
- s.freeList.Add(oldCap, offset)
- return nil
- }
- // WriteValue is deprecated in favor of granular methods but kept for compatibility if needed.
- // We remove it to force usage of new safe flow.
- // ReadValue reads value at offset.
- func (s *Storage) ReadValue(offset int64) (string, error) {
- header := make([]byte, HeaderSize)
- if _, err := s.file.ReadAt(header, offset); err != nil {
- return "", err
- }
- flag := header[0]
- if flag == FlagDeleted {
- return "", fmt.Errorf("record deleted")
- }
- length := binary.LittleEndian.Uint32(header[5:])
- data := make([]byte, length)
- if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
- return "", err
- }
- return string(data), nil
- }
- func (s *Storage) Close() error {
- return s.file.Close()
- }
- // IndexEntry (Table 1)
- type IndexEntry struct {
- ValueOffset int64
- CommitIndex uint64
- }
- // InvertedIndexShard manages tokens for a subset of strings
- type InvertedIndexShard struct {
- KeyTokens map[string][]uint32
- ValueTokens map[string][]uint32
- mu sync.RWMutex
- }
- // InvertedIndex (Table 3) - Thread Safe with Sharding
- type InvertedIndex struct {
- Shards [16]*InvertedIndexShard
- }
- func NewInvertedIndex() *InvertedIndex {
- ii := &InvertedIndex{}
- for i := 0; i < 16; i++ {
- ii.Shards[i] = &InvertedIndexShard{
- KeyTokens: make(map[string][]uint32),
- ValueTokens: make(map[string][]uint32),
- }
- }
- return ii
- }
- func (ii *InvertedIndex) AddToken(token string, keyID uint32, isKey bool) {
- h := hash(token)
- shard := ii.Shards[h%16]
- shard.mu.Lock()
- defer shard.mu.Unlock()
-
- var targetMap map[string][]uint32
- if isKey {
- targetMap = shard.KeyTokens
- } else {
- targetMap = shard.ValueTokens
- }
-
- ids := targetMap[token]
- for _, id := range ids {
- if id == keyID {
- return
- }
- }
- targetMap[token] = append(ids, keyID)
- }
- // KeyMapShard manages a subset of keys.
- type KeyMapShard struct {
- StrToID map[string]uint32
- IDToStr map[uint32]string
- NextID uint32
- mu sync.RWMutex
- }
- // KeyMap maintains mapping using sharding to reduce lock contention.
- type KeyMap struct {
- Shards [16]*KeyMapShard
- }
- func NewKeyMap() *KeyMap {
- km := &KeyMap{}
- for i := 0; i < 16; i++ {
- km.Shards[i] = &KeyMapShard{
- StrToID: make(map[string]uint32),
- IDToStr: make(map[uint32]string),
- NextID: uint32(i + 1),
- }
- }
- return km
- }
- func (km *KeyMap) GetOrCreateID(key string) uint32 {
- h := hash(key)
- shard := km.Shards[h%16]
- shard.mu.Lock()
- defer shard.mu.Unlock()
-
- if id, ok := shard.StrToID[key]; ok {
- return id
- }
- id := shard.NextID
- shard.NextID += 16
- shard.StrToID[key] = id
- shard.IDToStr[id] = key
- return id
- }
- func (km *KeyMap) GetID(key string) (uint32, bool) {
- h := hash(key)
- shard := km.Shards[h%16]
-
- shard.mu.RLock()
- defer shard.mu.RUnlock()
- id, ok := shard.StrToID[key]
- return id, ok
- }
- func (km *KeyMap) GetStr(id uint32) (string, bool) {
- if id == 0 {
- return "", false
- }
- shardIdx := (id - 1) % 16
- shard := km.Shards[shardIdx]
-
- shard.mu.RLock()
- defer shard.mu.RUnlock()
- s, ok := shard.IDToStr[id]
- return s, ok
- }
- // IndexShard manages a subset of keys
- type IndexShard struct {
- Index map[uint32]IndexEntry
- mu sync.RWMutex
- }
- // Engine is the core storage engine with Sharded Locking.
- type Engine struct {
- // Table 1: Sharded Index
- Shards [16]*IndexShard
- // Key mapping
- Keys *KeyMap
- // Table 2: Disk Storage
- Storage *Storage
- // Table 3: Inverted Index
- SearchIndex *InvertedIndex
-
- KeyLocks StripedLock
- LastCommitIndex uint64
- commitMu sync.Mutex // Protects LastCommitIndex
- dataDir string
- }
- func NewEngine(dataDir string) (*Engine, error) {
- if err := os.MkdirAll(dataDir, 0755); err != nil {
- return nil, err
- }
- store, err := NewStorage(dataDir + "/values.data")
- if err != nil {
- return nil, err
- }
- e := &Engine{
- Keys: NewKeyMap(),
- Storage: store,
- SearchIndex: NewInvertedIndex(),
- dataDir: dataDir,
- }
-
- // Initialize shards
- for i := 0; i < 16; i++ {
- e.Shards[i] = &IndexShard{
- Index: make(map[uint32]IndexEntry),
- }
- }
- return e, nil
- }
- func (e *Engine) Close() error {
- return e.Storage.Close()
- }
- func (e *Engine) tokenizeKey(key string) []string {
- return strings.Split(key, ".")
- }
- func (e *Engine) tokenizeValue(val string) []string {
- f := func(c rune) bool {
- return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
- }
- return strings.FieldsFunc(val, f)
- }
- func (e *Engine) getShard(keyID uint32) *IndexShard {
- // Simple mod hashing for sharding
- return e.Shards[keyID%16]
- }
- func (e *Engine) Set(key, value string, commitIndex uint64) error {
- // 0. Lock Key (Protects against concurrent access to SAME key)
- // This ensures we don't have race conditions on "In-Place vs Move" or "Read vs Write" for this specific key.
- // Different keys are still processed in parallel.
- kLock := e.KeyLocks.GetLock(key)
- kLock.Lock()
- defer kLock.Unlock()
- // 1. Get KeyID (Thread Safe)
- keyID := e.Keys.GetOrCreateID(key)
- shard := e.getShard(keyID)
- // 2. Check existing entry (Lock Shard)
- var oldOffset int64 = -1
- shard.mu.RLock()
- if entry, ok := shard.Index[keyID]; ok {
- oldOffset = entry.ValueOffset
- }
- shard.mu.RUnlock()
- // 3. Try In-Place Update
- if e.Storage.TryUpdateInPlace(value, oldOffset) {
- // Update CommitIndex if needed (even for in-place)
- shard.mu.Lock()
- // Re-read to ensure no weird state, though kLock protects us mostly.
- // We just update the commit index here.
- entry := shard.Index[keyID]
- entry.CommitIndex = commitIndex
- shard.Index[keyID] = entry
- shard.mu.Unlock()
-
- e.commitMu.Lock()
- if commitIndex > e.LastCommitIndex {
- e.LastCommitIndex = commitIndex
- }
- e.commitMu.Unlock()
- return nil
- }
- // 4. Write New Value (Append/Reuse)
- // This happens if In-Place failed or it's a new key.
- newOffset, err := e.Storage.AppendOrReuse(value)
- if err != nil {
- return err
- }
- // 5. Update Index (Lock Shard)
- // CRITICAL: We update Index to point to NEW data BEFORE deleting OLD data.
- shard.mu.Lock()
- shard.Index[keyID] = IndexEntry{
- ValueOffset: newOffset,
- CommitIndex: commitIndex,
- }
- shard.mu.Unlock()
-
- // 6. Delete Old Data (if any)
- // Now that Index points to New, we can safely mark Old as deleted.
- if oldOffset >= 0 {
- e.Storage.MarkDeleted(oldOffset)
- }
- // Update global commit index
- e.commitMu.Lock()
- if commitIndex > e.LastCommitIndex {
- e.LastCommitIndex = commitIndex
- }
- e.commitMu.Unlock()
- // 7. Update Inverted Index (Thread Safe)
- for _, token := range e.tokenizeKey(key) {
- e.SearchIndex.AddToken(token, keyID, true)
- }
- for _, token := range e.tokenizeValue(value) {
- e.SearchIndex.AddToken(token, keyID, false)
- }
- return nil
- }
- func (e *Engine) Get(key string) (string, bool) {
- // Lock Key to prevent reading while it's being moved/updated
- kLock := e.KeyLocks.GetLock(key)
- kLock.RLock()
- defer kLock.RUnlock()
- keyID, ok := e.Keys.GetID(key)
- if !ok {
- return "", false
- }
-
- shard := e.getShard(keyID)
- shard.mu.RLock()
- entry, ok := shard.Index[keyID]
- shard.mu.RUnlock()
-
- if !ok {
- return "", false
- }
- val, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil {
- return "", false
- }
- return val, true
- }
- type QueryResult struct {
- Key string `json:"key"`
- Value string `json:"value"`
- CommitIndex uint64 `json:"commit_index"`
- }
- func WildcardMatch(str, pattern string) bool {
- s := []rune(str)
- p := []rune(pattern)
- sLen, pLen := len(s), len(p)
- i, j := 0, 0
- starIdx, matchIdx := -1, -1
- for i < sLen {
- if j < pLen && (p[j] == '?' || p[j] == s[i]) {
- i++
- j++
- } else if j < pLen && p[j] == '*' {
- starIdx = j
- matchIdx = i
- j++
- } else if starIdx != -1 {
- j = starIdx + 1
- matchIdx++
- i = matchIdx
- } else {
- return false
- }
- }
- for j < pLen && p[j] == '*' {
- j++
- }
- return j == pLen
- }
- func (e *Engine) scanTokens(indexType string, pattern string) []uint32 {
- // pattern is usually like "*word*" or "word"
- // We extract simple tokens from pattern.
- // If pattern contains * or ?, we must iterate ALL tokens in index (slow but maybe faster than all docs).
- // If pattern is simple word, we do direct lookup.
-
- cleanPattern := strings.ReplaceAll(pattern, "*", "")
- cleanPattern = strings.ReplaceAll(cleanPattern, "?", "")
-
- // If the pattern, after removing wildcards, still contains separators,
- // it means it spans multiple tokens. Single token lookup won't work easily.
- // e.g. "hello.world" -> tokens "hello", "world".
- // But simple check: if cleanPattern has no special chars, we try exact lookup first?
- // Actually, inverted index keys are EXACT tokens.
-
- var candidates []uint32
- candidatesMap := make(map[uint32]bool)
-
- // Scan all 16 shards
- for i := 0; i < 16; i++ {
- shard := e.SearchIndex.Shards[i]
- shard.mu.RLock()
-
- var targetMap map[string][]uint32
- if indexType == "key" {
- targetMap = shard.KeyTokens
- } else {
- targetMap = shard.ValueTokens
- }
-
- for token, ids := range targetMap {
- if WildcardMatch(token, pattern) {
- for _, id := range ids {
- candidatesMap[id] = true
- }
- }
- }
- shard.mu.RUnlock()
- }
-
- for id := range candidatesMap {
- candidates = append(candidates, id)
- }
- return candidates
- }
- func (e *Engine) Query(sql string) ([]QueryResult, error) {
- sql = strings.TrimSpace(sql)
- re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
- matches := re.FindAllStringSubmatch(sql, -1)
- if len(matches) == 0 {
- return nil, fmt.Errorf("invalid query")
- }
- extractString := func(s string) string {
- return strings.Trim(strings.TrimSpace(s), "\"")
- }
- // Optimization 1: Key Point Lookup
- // Check if there is a 'key = "..."' condition
- for _, match := range matches {
- if match[1] == "key" && match[2] == "=" {
- targetKey := extractString(match[3])
- // Fast path!
- if val, ok := e.Get(targetKey); ok {
- // We have the record, but we must check other conditions too
- // Construct a dummy entry to reuse filter logic or just check manual
- // Since we have the value, let's just check.
-
- // Need CommitIndex.
- keyID, _ := e.Keys.GetID(targetKey)
- shard := e.getShard(keyID)
- shard.mu.RLock()
- entry := shard.Index[keyID]
- shard.mu.RUnlock()
-
- matchAll := true
- for _, m := range matches {
- f, op, vRaw := m[1], m[2], m[3]
- switch f {
- case "CommitIndex":
- num, _ := strconv.ParseUint(vRaw, 10, 64)
- switch op {
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
- }
- case "value":
- t := extractString(vRaw)
- switch op {
- case "=": if val != t { matchAll = false }
- case "like": if !WildcardMatch(val, t) { matchAll = false }
- }
- }
- if !matchAll { break }
- }
-
- if matchAll {
- return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
- }
- return []QueryResult{}, nil
- }
- return []QueryResult{}, nil
- }
- }
- // Optimization 2: Inverted Index Candidate Generation
- // Check for 'like' queries on key/value that are simple enough
- var candidateIDs []uint32
- useCandidates := false
-
- for _, match := range matches {
- if (match[1] == "key" || match[1] == "value") && match[2] == "like" {
- pattern := extractString(match[3])
- // Basic heuristic: pattern should have at least some non-wildcard chars to be useful
- clean := strings.ReplaceAll(strings.ReplaceAll(pattern, "*", ""), "?", "")
- if len(clean) > 2 { // Only optimize if pattern is specific enough
- ids := e.scanTokens(match[1], pattern)
-
- if !useCandidates {
- candidateIDs = ids
- useCandidates = true
- } else {
- // Intersection
- // Convert current candidates to map for fast check
- valid := make(map[uint32]bool)
- for _, id := range ids {
- valid[id] = true
- }
- var newCandidates []uint32
- for _, existing := range candidateIDs {
- if valid[existing] {
- newCandidates = append(newCandidates, existing)
- }
- }
- candidateIDs = newCandidates
- }
- }
- }
- }
- var results []QueryResult
- var mu sync.Mutex // Protect results append
- // Scan Function
- scanLogic := func(ids []uint32) {
- // Group IDs by shard to minimize lock thrashing if we are careful,
- // but simple parallel loop is easier.
- // If we have specific IDs, we don't need to scan all shards.
-
- var wg sync.WaitGroup
- // Use limited workers for ID list to avoid spawning too many goroutines if list is huge
- // Or just simple loop if list is small.
-
- processID := func(kID uint32) {
- // Get Key String
- keyStr, ok := e.Keys.GetStr(kID)
- if !ok { return }
-
- // Get Entry
- shard := e.getShard(kID)
- shard.mu.RLock()
- entry, ok := shard.Index[kID]
- shard.mu.RUnlock()
- if !ok { return }
-
- var valStr string
- var valLoaded bool
-
- matchAll := true
- for _, match := range matches {
- field := match[1]
- op := match[2]
- valRaw := match[3]
- switch field {
- case "CommitIndex":
- num, _ := strconv.ParseUint(valRaw, 10, 64)
- switch op {
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
- }
- case "key":
- target := extractString(valRaw)
- switch op {
- case "=": if keyStr != target { matchAll = false }
- case "like": if !WildcardMatch(keyStr, target) { matchAll = false }
- }
- case "value":
- if !valLoaded {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil { matchAll = false; break }
- valStr = v
- valLoaded = true
- }
- target := extractString(valRaw)
- switch op {
- case "=": if valStr != target { matchAll = false }
- case "like": if !WildcardMatch(valStr, target) { matchAll = false }
- }
- }
- if !matchAll { break }
- }
-
- if matchAll {
- if !valLoaded {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err == nil { valStr = v }
- }
- mu.Lock()
- results = append(results, QueryResult{
- Key: keyStr,
- Value: valStr,
- CommitIndex: entry.CommitIndex,
- })
- mu.Unlock()
- }
- }
- if len(ids) < 1000 {
- for _, id := range ids {
- processID(id)
- }
- } else {
- // Parallelize
- chunks := 16
- chunkSize := (len(ids) + chunks - 1) / chunks
- for i := 0; i < chunks; i++ {
- start := i * chunkSize
- end := start + chunkSize
- if start >= len(ids) { break }
- if end > len(ids) { end = len(ids) }
-
- wg.Add(1)
- go func(sub []uint32) {
- defer wg.Done()
- for _, id := range sub {
- processID(id)
- }
- }(ids[start:end])
- }
- wg.Wait()
- }
- }
- if useCandidates {
- // Use Index optimization
- scanLogic(candidateIDs)
- } else {
- // Full Scan (Parallel by Shard)
- var wg sync.WaitGroup
- for i := 0; i < 16; i++ {
- wg.Add(1)
- go func(shardIdx int) {
- defer wg.Done()
- shard := e.Shards[shardIdx]
- shard.mu.RLock()
- // Snapshot IDs to minimize lock time
- ids := make([]uint32, 0, len(shard.Index))
- for k := range shard.Index {
- ids = append(ids, k)
- }
- shard.mu.RUnlock()
-
- // Re-use scanLogic logic but simplified for single goroutine per shard
- // Actually we can just copy-paste the inner logic or call a helper.
- // To avoid huge refactor, let's just iterate.
- for _, kID := range ids {
- // ... (Same logic as processID above) ...
- // Copying processID logic here for now to avoid scope issues or closure overhead
- keyStr, ok := e.Keys.GetStr(kID)
- if !ok { continue }
-
- shard.mu.RLock()
- entry, ok := shard.Index[kID]
- shard.mu.RUnlock()
- if !ok { continue }
-
- var valStr string
- var valLoaded bool
- matchAll := true
-
- for _, match := range matches {
- field := match[1]
- op := match[2]
- valRaw := match[3]
- switch field {
- case "CommitIndex":
- num, _ := strconv.ParseUint(valRaw, 10, 64)
- switch op {
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
- }
- case "key":
- target := extractString(valRaw)
- switch op {
- case "=": if keyStr != target { matchAll = false }
- case "like": if !WildcardMatch(keyStr, target) { matchAll = false }
- }
- case "value":
- if !valLoaded {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil { matchAll = false; break }
- valStr = v
- valLoaded = true
- }
- target := extractString(valRaw)
- switch op {
- case "=": if valStr != target { matchAll = false }
- case "like": if !WildcardMatch(valStr, target) { matchAll = false }
- }
- }
- if !matchAll { break }
- }
-
- if matchAll {
- if !valLoaded {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err == nil { valStr = v }
- }
- mu.Lock()
- results = append(results, QueryResult{
- Key: keyStr,
- Value: valStr,
- CommitIndex: entry.CommitIndex,
- })
- mu.Unlock()
- }
- }
- }(i)
- }
- wg.Wait()
- }
- sort.Slice(results, func(i, j int) bool {
- return results[i].Key < results[j].Key
- })
- return results, nil
- }
- func (e *Engine) Snapshot() ([]byte, error) {
- // Lock all shards to get consistent snapshot?
- // Or just iterate. For Raft, we usually want a consistent point in time.
- // But simply locking one by one is "good enough" if state machine is paused during snapshot.
- // If state machine is NOT paused, we need global lock.
- // Assuming external caller pauses Apply(), we can just read.
-
- combinedIndex := make(map[uint32]IndexEntry)
- for i := 0; i < 16; i++ {
- e.Shards[i].mu.RLock()
- for k, v := range e.Shards[i].Index {
- combinedIndex[k] = v
- }
- e.Shards[i].mu.RUnlock()
- }
- data := struct {
- Index map[uint32]IndexEntry
- Keys *KeyMap
- LastCommitIndex uint64
- SearchIndex *InvertedIndex
- }{
- Index: combinedIndex,
- Keys: e.Keys,
- LastCommitIndex: e.LastCommitIndex,
- SearchIndex: e.SearchIndex,
- }
- return json.Marshal(data)
- }
- func (e *Engine) Restore(data []byte) error {
- var dump struct {
- Index map[uint32]IndexEntry
- Keys *KeyMap
- LastCommitIndex uint64
- SearchIndex *InvertedIndex
- }
- if err := json.Unmarshal(data, &dump); err != nil {
- return err
- }
- e.Keys = dump.Keys
- e.LastCommitIndex = dump.LastCommitIndex
- e.SearchIndex = dump.SearchIndex
-
- // Distribute Index to shards
- for i := 0; i < 16; i++ {
- e.Shards[i] = &IndexShard{Index: make(map[uint32]IndexEntry)}
- }
-
- for kID, entry := range dump.Index {
- shard := e.getShard(kID)
- shard.Index[kID] = entry
- }
-
- return nil
- }
|