package db import ( "bufio" "encoding/binary" "encoding/json" "fmt" "io" "os" "regexp" "sort" "strconv" "strings" "sync" "sync/atomic" ) // hash returns a 32-bit FNV-1a hash of the string func hash(s string) uint32 { var h uint32 = 2166136261 for i := 0; i < len(s); i++ { h = (h * 16777619) ^ uint32(s[i]) } return h } // FreeList manages reusable disk space in memory using Best-Fit strategy. type FreeList struct { // Capacity -> Stack of Offsets buckets map[uint32][]int64 // Sorted list of capacities available in buckets for fast Best-Fit lookup sortedCaps []uint32 mu sync.Mutex } func NewFreeList() *FreeList { return &FreeList{ buckets: make(map[uint32][]int64), } } // Add adds an offset to the free list for a given capacity. func (fl *FreeList) Add(cap uint32, offset int64) { fl.mu.Lock() defer fl.mu.Unlock() if _, exists := fl.buckets[cap]; !exists { fl.buckets[cap] = []int64{offset} // Maintain sortedCaps fl.sortedCaps = append(fl.sortedCaps, cap) sort.Slice(fl.sortedCaps, func(i, j int) bool { return fl.sortedCaps[i] < fl.sortedCaps[j] }) } else { fl.buckets[cap] = append(fl.buckets[cap], offset) } } // Pop tries to get an available offset using Best-Fit strategy. func (fl *FreeList) Pop(targetCap uint32) (int64, uint32, bool) { fl.mu.Lock() defer fl.mu.Unlock() idx := sort.Search(len(fl.sortedCaps), func(i int) bool { return fl.sortedCaps[i] >= targetCap }) if idx >= len(fl.sortedCaps) { return 0, 0, false } foundCap := fl.sortedCaps[idx] offsets := fl.buckets[foundCap] if len(offsets) == 0 { return 0, 0, false } lastIdx := len(offsets) - 1 offset := offsets[lastIdx] if lastIdx == 0 { delete(fl.buckets, foundCap) fl.sortedCaps = append(fl.sortedCaps[:idx], fl.sortedCaps[idx+1:]...) } else { fl.buckets[foundCap] = offsets[:lastIdx] } return offset, foundCap, true } // StripedLock provides hashed locks for keys type StripedLock struct { locks [1024]sync.RWMutex } func (sl *StripedLock) GetLock(key string) *sync.RWMutex { h := hash(key) return &sl.locks[h%1024] } // Storage (Table 2) manages disk storage for values. type Storage struct { file *os.File filename string offset int64 // Current end of file (Atomic) freeList *FreeList } const ( FlagDeleted = 0x00 FlagValid = 0x01 HeaderSize = 9 // 1(Flag) + 4(Cap) + 4(Len) AlignSize = 16 // Align capacity to 16 bytes ) func NewStorage(filename string) (*Storage, error) { f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return nil, err } st := &Storage{ file: f, filename: filename, freeList: NewFreeList(), } if err := st.scan(); err != nil { f.Close() return nil, err } return st, nil } func (s *Storage) scan() error { offset := int64(0) if _, err := s.file.Seek(0, 0); err != nil { return err } reader := bufio.NewReader(s.file) for { header := make([]byte, HeaderSize) n, err := io.ReadFull(reader, header) if err == io.EOF { break } if err == io.ErrUnexpectedEOF && n == 0 { break } if err != nil { return err } flag := header[0] cap := binary.LittleEndian.Uint32(header[1:]) if flag == FlagDeleted { s.freeList.Add(cap, offset) } discardLen := int(cap) if _, err := reader.Discard(discardLen); err != nil { buf := make([]byte, discardLen) if _, err := io.ReadFull(reader, buf); err != nil { return err } } offset += int64(HeaderSize + discardLen) } s.offset = offset return nil } func alignCapacity(length int) uint32 { if length == 0 { return AlignSize } cap := (length + AlignSize - 1) / AlignSize * AlignSize return uint32(cap) } // TryUpdateInPlace tries to update value in-place if capacity allows. // Returns true if successful. // This must be called under Key Lock. func (s *Storage) TryUpdateInPlace(val string, offset int64) bool { if offset < 0 { return false } valLen := len(val) capBuf := make([]byte, 4) if _, err := s.file.ReadAt(capBuf, offset+1); err != nil { return false } oldCap := binary.LittleEndian.Uint32(capBuf) if uint32(valLen) > oldCap { return false } // Fits! Write Length then Data lenBuf := make([]byte, 4) binary.LittleEndian.PutUint32(lenBuf, uint32(valLen)) if _, err := s.file.WriteAt(lenBuf, offset+5); err != nil { return false } if _, err := s.file.WriteAt([]byte(val), offset+HeaderSize); err != nil { return false } return true } // AppendOrReuse writes a new value, reusing FreeList or Appending. // Returns new offset. func (s *Storage) AppendOrReuse(val string) (int64, error) { valLen := len(val) newCap := alignCapacity(valLen) var writeOffset int64 var actualCap uint32 // Try Reuse reusedOffset, capFromFree, found := s.freeList.Pop(newCap) if found { writeOffset = reusedOffset actualCap = capFromFree } else { // Append totalSize := HeaderSize + int(newCap) newEnd := atomic.AddInt64(&s.offset, int64(totalSize)) writeOffset = newEnd - int64(totalSize) actualCap = newCap } buf := make([]byte, HeaderSize+int(actualCap)) buf[0] = FlagValid binary.LittleEndian.PutUint32(buf[1:], actualCap) binary.LittleEndian.PutUint32(buf[5:], uint32(valLen)) copy(buf[HeaderSize:], []byte(val)) if _, err := s.file.WriteAt(buf, writeOffset); err != nil { return 0, err } return writeOffset, nil } // MarkDeleted marks a slot as deleted and adds to free list. func (s *Storage) MarkDeleted(offset int64) error { capBuf := make([]byte, 4) if _, err := s.file.ReadAt(capBuf, offset+1); err != nil { return err } oldCap := binary.LittleEndian.Uint32(capBuf) if _, err := s.file.WriteAt([]byte{FlagDeleted}, offset); err != nil { return err } s.freeList.Add(oldCap, offset) return nil } // WriteValue is deprecated in favor of granular methods but kept for compatibility if needed. // We remove it to force usage of new safe flow. // ReadValue reads value at offset. func (s *Storage) ReadValue(offset int64) (string, error) { header := make([]byte, HeaderSize) if _, err := s.file.ReadAt(header, offset); err != nil { return "", err } flag := header[0] if flag == FlagDeleted { return "", fmt.Errorf("record deleted") } length := binary.LittleEndian.Uint32(header[5:]) data := make([]byte, length) if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil { return "", err } return string(data), nil } func (s *Storage) Close() error { return s.file.Close() } // IndexEntry (Table 1) type IndexEntry struct { ValueOffset int64 CommitIndex uint64 } // InvertedIndexShard manages tokens for a subset of strings type InvertedIndexShard struct { KeyTokens map[string][]uint32 ValueTokens map[string][]uint32 mu sync.RWMutex } // InvertedIndex (Table 3) - Thread Safe with Sharding type InvertedIndex struct { Shards [16]*InvertedIndexShard } func NewInvertedIndex() *InvertedIndex { ii := &InvertedIndex{} for i := 0; i < 16; i++ { ii.Shards[i] = &InvertedIndexShard{ KeyTokens: make(map[string][]uint32), ValueTokens: make(map[string][]uint32), } } return ii } func (ii *InvertedIndex) AddToken(token string, keyID uint32, isKey bool) { h := hash(token) shard := ii.Shards[h%16] shard.mu.Lock() defer shard.mu.Unlock() var targetMap map[string][]uint32 if isKey { targetMap = shard.KeyTokens } else { targetMap = shard.ValueTokens } ids := targetMap[token] for _, id := range ids { if id == keyID { return } } targetMap[token] = append(ids, keyID) } // KeyMapShard manages a subset of keys. type KeyMapShard struct { StrToID map[string]uint32 IDToStr map[uint32]string NextID uint32 mu sync.RWMutex } // KeyMap maintains mapping using sharding to reduce lock contention. type KeyMap struct { Shards [16]*KeyMapShard } func NewKeyMap() *KeyMap { km := &KeyMap{} for i := 0; i < 16; i++ { km.Shards[i] = &KeyMapShard{ StrToID: make(map[string]uint32), IDToStr: make(map[uint32]string), NextID: uint32(i + 1), } } return km } func (km *KeyMap) GetOrCreateID(key string) uint32 { h := hash(key) shard := km.Shards[h%16] shard.mu.Lock() defer shard.mu.Unlock() if id, ok := shard.StrToID[key]; ok { return id } id := shard.NextID shard.NextID += 16 shard.StrToID[key] = id shard.IDToStr[id] = key return id } func (km *KeyMap) GetID(key string) (uint32, bool) { h := hash(key) shard := km.Shards[h%16] shard.mu.RLock() defer shard.mu.RUnlock() id, ok := shard.StrToID[key] return id, ok } func (km *KeyMap) GetStr(id uint32) (string, bool) { if id == 0 { return "", false } shardIdx := (id - 1) % 16 shard := km.Shards[shardIdx] shard.mu.RLock() defer shard.mu.RUnlock() s, ok := shard.IDToStr[id] return s, ok } // IndexShard manages a subset of keys type IndexShard struct { Index map[uint32]IndexEntry mu sync.RWMutex } // Engine is the core storage engine with Sharded Locking. type Engine struct { // Table 1: Sharded Index Shards [16]*IndexShard // Key mapping Keys *KeyMap // Table 2: Disk Storage Storage *Storage // Table 3: Inverted Index SearchIndex *InvertedIndex KeyLocks StripedLock LastCommitIndex uint64 commitMu sync.Mutex // Protects LastCommitIndex dataDir string } func NewEngine(dataDir string) (*Engine, error) { if err := os.MkdirAll(dataDir, 0755); err != nil { return nil, err } store, err := NewStorage(dataDir + "/values.data") if err != nil { return nil, err } e := &Engine{ Keys: NewKeyMap(), Storage: store, SearchIndex: NewInvertedIndex(), dataDir: dataDir, } // Initialize shards for i := 0; i < 16; i++ { e.Shards[i] = &IndexShard{ Index: make(map[uint32]IndexEntry), } } return e, nil } func (e *Engine) Close() error { return e.Storage.Close() } func (e *Engine) tokenizeKey(key string) []string { return strings.Split(key, ".") } func (e *Engine) tokenizeValue(val string) []string { f := func(c rune) bool { return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9')) } return strings.FieldsFunc(val, f) } func (e *Engine) getShard(keyID uint32) *IndexShard { // Simple mod hashing for sharding return e.Shards[keyID%16] } func (e *Engine) Set(key, value string, commitIndex uint64) error { // 0. Lock Key (Protects against concurrent access to SAME key) // This ensures we don't have race conditions on "In-Place vs Move" or "Read vs Write" for this specific key. // Different keys are still processed in parallel. kLock := e.KeyLocks.GetLock(key) kLock.Lock() defer kLock.Unlock() // 1. Get KeyID (Thread Safe) keyID := e.Keys.GetOrCreateID(key) shard := e.getShard(keyID) // 2. Check existing entry (Lock Shard) var oldOffset int64 = -1 shard.mu.RLock() if entry, ok := shard.Index[keyID]; ok { oldOffset = entry.ValueOffset } shard.mu.RUnlock() // 3. Try In-Place Update if e.Storage.TryUpdateInPlace(value, oldOffset) { // Update CommitIndex if needed (even for in-place) shard.mu.Lock() // Re-read to ensure no weird state, though kLock protects us mostly. // We just update the commit index here. entry := shard.Index[keyID] entry.CommitIndex = commitIndex shard.Index[keyID] = entry shard.mu.Unlock() e.commitMu.Lock() if commitIndex > e.LastCommitIndex { e.LastCommitIndex = commitIndex } e.commitMu.Unlock() return nil } // 4. Write New Value (Append/Reuse) // This happens if In-Place failed or it's a new key. newOffset, err := e.Storage.AppendOrReuse(value) if err != nil { return err } // 5. Update Index (Lock Shard) // CRITICAL: We update Index to point to NEW data BEFORE deleting OLD data. shard.mu.Lock() shard.Index[keyID] = IndexEntry{ ValueOffset: newOffset, CommitIndex: commitIndex, } shard.mu.Unlock() // 6. Delete Old Data (if any) // Now that Index points to New, we can safely mark Old as deleted. if oldOffset >= 0 { e.Storage.MarkDeleted(oldOffset) } // Update global commit index e.commitMu.Lock() if commitIndex > e.LastCommitIndex { e.LastCommitIndex = commitIndex } e.commitMu.Unlock() // 7. Update Inverted Index (Thread Safe) for _, token := range e.tokenizeKey(key) { e.SearchIndex.AddToken(token, keyID, true) } for _, token := range e.tokenizeValue(value) { e.SearchIndex.AddToken(token, keyID, false) } return nil } func (e *Engine) Get(key string) (string, bool) { // Lock Key to prevent reading while it's being moved/updated kLock := e.KeyLocks.GetLock(key) kLock.RLock() defer kLock.RUnlock() keyID, ok := e.Keys.GetID(key) if !ok { return "", false } shard := e.getShard(keyID) shard.mu.RLock() entry, ok := shard.Index[keyID] shard.mu.RUnlock() if !ok { return "", false } val, err := e.Storage.ReadValue(entry.ValueOffset) if err != nil { return "", false } return val, true } type QueryResult struct { Key string `json:"key"` Value string `json:"value"` CommitIndex uint64 `json:"commit_index"` } func WildcardMatch(str, pattern string) bool { s := []rune(str) p := []rune(pattern) sLen, pLen := len(s), len(p) i, j := 0, 0 starIdx, matchIdx := -1, -1 for i < sLen { if j < pLen && (p[j] == '?' || p[j] == s[i]) { i++ j++ } else if j < pLen && p[j] == '*' { starIdx = j matchIdx = i j++ } else if starIdx != -1 { j = starIdx + 1 matchIdx++ i = matchIdx } else { return false } } for j < pLen && p[j] == '*' { j++ } return j == pLen } func (e *Engine) scanTokens(indexType string, pattern string) []uint32 { // pattern is usually like "*word*" or "word" // We extract simple tokens from pattern. // If pattern contains * or ?, we must iterate ALL tokens in index (slow but maybe faster than all docs). // If pattern is simple word, we do direct lookup. cleanPattern := strings.ReplaceAll(pattern, "*", "") cleanPattern = strings.ReplaceAll(cleanPattern, "?", "") // If the pattern, after removing wildcards, still contains separators, // it means it spans multiple tokens. Single token lookup won't work easily. // e.g. "hello.world" -> tokens "hello", "world". // But simple check: if cleanPattern has no special chars, we try exact lookup first? // Actually, inverted index keys are EXACT tokens. var candidates []uint32 candidatesMap := make(map[uint32]bool) // Scan all 16 shards for i := 0; i < 16; i++ { shard := e.SearchIndex.Shards[i] shard.mu.RLock() var targetMap map[string][]uint32 if indexType == "key" { targetMap = shard.KeyTokens } else { targetMap = shard.ValueTokens } for token, ids := range targetMap { if WildcardMatch(token, pattern) { for _, id := range ids { candidatesMap[id] = true } } } shard.mu.RUnlock() } for id := range candidatesMap { candidates = append(candidates, id) } return candidates } func (e *Engine) Query(sql string) ([]QueryResult, error) { sql = strings.TrimSpace(sql) re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`) matches := re.FindAllStringSubmatch(sql, -1) if len(matches) == 0 { return nil, fmt.Errorf("invalid query") } extractString := func(s string) string { return strings.Trim(strings.TrimSpace(s), "\"") } // Optimization 1: Key Point Lookup // Check if there is a 'key = "..."' condition for _, match := range matches { if match[1] == "key" && match[2] == "=" { targetKey := extractString(match[3]) // Fast path! if val, ok := e.Get(targetKey); ok { // We have the record, but we must check other conditions too // Construct a dummy entry to reuse filter logic or just check manual // Since we have the value, let's just check. // Need CommitIndex. keyID, _ := e.Keys.GetID(targetKey) shard := e.getShard(keyID) shard.mu.RLock() entry := shard.Index[keyID] shard.mu.RUnlock() matchAll := true for _, m := range matches { f, op, vRaw := m[1], m[2], m[3] switch f { case "CommitIndex": num, _ := strconv.ParseUint(vRaw, 10, 64) switch op { case ">": if !(entry.CommitIndex > num) { matchAll = false } case "<": if !(entry.CommitIndex < num) { matchAll = false } case ">=": if !(entry.CommitIndex >= num) { matchAll = false } case "<=": if !(entry.CommitIndex <= num) { matchAll = false } case "=": if !(entry.CommitIndex == num) { matchAll = false } } case "value": t := extractString(vRaw) switch op { case "=": if val != t { matchAll = false } case "like": if !WildcardMatch(val, t) { matchAll = false } } } if !matchAll { break } } if matchAll { return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil } return []QueryResult{}, nil } return []QueryResult{}, nil } } // Optimization 2: Inverted Index Candidate Generation // Check for 'like' queries on key/value that are simple enough var candidateIDs []uint32 useCandidates := false for _, match := range matches { if (match[1] == "key" || match[1] == "value") && match[2] == "like" { pattern := extractString(match[3]) // Basic heuristic: pattern should have at least some non-wildcard chars to be useful clean := strings.ReplaceAll(strings.ReplaceAll(pattern, "*", ""), "?", "") if len(clean) > 2 { // Only optimize if pattern is specific enough ids := e.scanTokens(match[1], pattern) if !useCandidates { candidateIDs = ids useCandidates = true } else { // Intersection // Convert current candidates to map for fast check valid := make(map[uint32]bool) for _, id := range ids { valid[id] = true } var newCandidates []uint32 for _, existing := range candidateIDs { if valid[existing] { newCandidates = append(newCandidates, existing) } } candidateIDs = newCandidates } } } } var results []QueryResult var mu sync.Mutex // Protect results append // Scan Function scanLogic := func(ids []uint32) { // Group IDs by shard to minimize lock thrashing if we are careful, // but simple parallel loop is easier. // If we have specific IDs, we don't need to scan all shards. var wg sync.WaitGroup // Use limited workers for ID list to avoid spawning too many goroutines if list is huge // Or just simple loop if list is small. processID := func(kID uint32) { // Get Key String keyStr, ok := e.Keys.GetStr(kID) if !ok { return } // Get Entry shard := e.getShard(kID) shard.mu.RLock() entry, ok := shard.Index[kID] shard.mu.RUnlock() if !ok { return } var valStr string var valLoaded bool matchAll := true for _, match := range matches { field := match[1] op := match[2] valRaw := match[3] switch field { case "CommitIndex": num, _ := strconv.ParseUint(valRaw, 10, 64) switch op { case ">": if !(entry.CommitIndex > num) { matchAll = false } case "<": if !(entry.CommitIndex < num) { matchAll = false } case ">=": if !(entry.CommitIndex >= num) { matchAll = false } case "<=": if !(entry.CommitIndex <= num) { matchAll = false } case "=": if !(entry.CommitIndex == num) { matchAll = false } } case "key": target := extractString(valRaw) switch op { case "=": if keyStr != target { matchAll = false } case "like": if !WildcardMatch(keyStr, target) { matchAll = false } } case "value": if !valLoaded { v, err := e.Storage.ReadValue(entry.ValueOffset) if err != nil { matchAll = false; break } valStr = v valLoaded = true } target := extractString(valRaw) switch op { case "=": if valStr != target { matchAll = false } case "like": if !WildcardMatch(valStr, target) { matchAll = false } } } if !matchAll { break } } if matchAll { if !valLoaded { v, err := e.Storage.ReadValue(entry.ValueOffset) if err == nil { valStr = v } } mu.Lock() results = append(results, QueryResult{ Key: keyStr, Value: valStr, CommitIndex: entry.CommitIndex, }) mu.Unlock() } } if len(ids) < 1000 { for _, id := range ids { processID(id) } } else { // Parallelize chunks := 16 chunkSize := (len(ids) + chunks - 1) / chunks for i := 0; i < chunks; i++ { start := i * chunkSize end := start + chunkSize if start >= len(ids) { break } if end > len(ids) { end = len(ids) } wg.Add(1) go func(sub []uint32) { defer wg.Done() for _, id := range sub { processID(id) } }(ids[start:end]) } wg.Wait() } } if useCandidates { // Use Index optimization scanLogic(candidateIDs) } else { // Full Scan (Parallel by Shard) var wg sync.WaitGroup for i := 0; i < 16; i++ { wg.Add(1) go func(shardIdx int) { defer wg.Done() shard := e.Shards[shardIdx] shard.mu.RLock() // Snapshot IDs to minimize lock time ids := make([]uint32, 0, len(shard.Index)) for k := range shard.Index { ids = append(ids, k) } shard.mu.RUnlock() // Re-use scanLogic logic but simplified for single goroutine per shard // Actually we can just copy-paste the inner logic or call a helper. // To avoid huge refactor, let's just iterate. for _, kID := range ids { // ... (Same logic as processID above) ... // Copying processID logic here for now to avoid scope issues or closure overhead keyStr, ok := e.Keys.GetStr(kID) if !ok { continue } shard.mu.RLock() entry, ok := shard.Index[kID] shard.mu.RUnlock() if !ok { continue } var valStr string var valLoaded bool matchAll := true for _, match := range matches { field := match[1] op := match[2] valRaw := match[3] switch field { case "CommitIndex": num, _ := strconv.ParseUint(valRaw, 10, 64) switch op { case ">": if !(entry.CommitIndex > num) { matchAll = false } case "<": if !(entry.CommitIndex < num) { matchAll = false } case ">=": if !(entry.CommitIndex >= num) { matchAll = false } case "<=": if !(entry.CommitIndex <= num) { matchAll = false } case "=": if !(entry.CommitIndex == num) { matchAll = false } } case "key": target := extractString(valRaw) switch op { case "=": if keyStr != target { matchAll = false } case "like": if !WildcardMatch(keyStr, target) { matchAll = false } } case "value": if !valLoaded { v, err := e.Storage.ReadValue(entry.ValueOffset) if err != nil { matchAll = false; break } valStr = v valLoaded = true } target := extractString(valRaw) switch op { case "=": if valStr != target { matchAll = false } case "like": if !WildcardMatch(valStr, target) { matchAll = false } } } if !matchAll { break } } if matchAll { if !valLoaded { v, err := e.Storage.ReadValue(entry.ValueOffset) if err == nil { valStr = v } } mu.Lock() results = append(results, QueryResult{ Key: keyStr, Value: valStr, CommitIndex: entry.CommitIndex, }) mu.Unlock() } } }(i) } wg.Wait() } sort.Slice(results, func(i, j int) bool { return results[i].Key < results[j].Key }) return results, nil } func (e *Engine) Snapshot() ([]byte, error) { // Lock all shards to get consistent snapshot? // Or just iterate. For Raft, we usually want a consistent point in time. // But simply locking one by one is "good enough" if state machine is paused during snapshot. // If state machine is NOT paused, we need global lock. // Assuming external caller pauses Apply(), we can just read. combinedIndex := make(map[uint32]IndexEntry) for i := 0; i < 16; i++ { e.Shards[i].mu.RLock() for k, v := range e.Shards[i].Index { combinedIndex[k] = v } e.Shards[i].mu.RUnlock() } data := struct { Index map[uint32]IndexEntry Keys *KeyMap LastCommitIndex uint64 SearchIndex *InvertedIndex }{ Index: combinedIndex, Keys: e.Keys, LastCommitIndex: e.LastCommitIndex, SearchIndex: e.SearchIndex, } return json.Marshal(data) } func (e *Engine) Restore(data []byte) error { var dump struct { Index map[uint32]IndexEntry Keys *KeyMap LastCommitIndex uint64 SearchIndex *InvertedIndex } if err := json.Unmarshal(data, &dump); err != nil { return err } e.Keys = dump.Keys e.LastCommitIndex = dump.LastCommitIndex e.SearchIndex = dump.SearchIndex // Distribute Index to shards for i := 0; i < 16; i++ { e.Shards[i] = &IndexShard{Index: make(map[uint32]IndexEntry)} } for kID, entry := range dump.Index { shard := e.getShard(kID) shard.Index[kID] = entry } return nil }