package db import ( "bufio" "encoding/binary" "fmt" "hash/crc32" "io" "os" "regexp" "sort" "strconv" "strings" "sync" "sync/atomic" ) // hash returns a 32-bit FNV-1a hash of the string func hash(s string) uint32 { var h uint32 = 2166136261 for i := 0; i < len(s); i++ { h = (h * 16777619) ^ uint32(s[i]) } return h } // --- Flat Sorted Array Index Implementation --- // Extreme memory optimization: // 1. Removes all tree node overhead (pointers, structs, map buckets). // 2. Stores keys in a single monolithic []byte buffer (no string headers). // 3. Uses a sorted slice of offsets for O(log N) lookup and O(N) insertion. // IndexEntry type IndexEntry struct { ValueOffset int64 CommitIndex uint64 } type FlatIndex struct { // Monolithic buffer to store all key strings // Format: [key1 bytes][key2 bytes]... keyBuf []byte // Sorted list of index items items []flatItem mu sync.RWMutex } // flatItem is a compact struct (16 bytes aligned) type flatItem struct { keyOffset uint32 // 4 bytes: Offset into keyBuf (supports 4GB total key size) keyLen uint16 // 2 bytes: Length of key (max key len 65535) // Padding 2 bytes? No, Go struct alignment might add padding. // IndexEntry is 16 bytes (int64 + uint64). entry IndexEntry // 16 bytes } // Total per item: 4 + 2 + (2 padding) + 16 = 24 bytes per Key. // 100k keys => 2.4 MB. // Plus keyBuf: 100k * 18 bytes = 1.8 MB. // Total expected: ~4.2 MB. func NewFlatIndex() *FlatIndex { return &FlatIndex{ keyBuf: make([]byte, 0, 1024*1024), // 1MB initial cap items: make([]flatItem, 0, 10000), } } // getKey unsafe-ish helper to get string from buffer without alloc? // Standard conversion string(b) allocates. // For comparison in binary search, we ideally want to avoid allocation. // But Go 1.20+ optimization might handle string(bytes) well in map keys or comparisons if they don't escape. // Let's rely on standard string() conversion for safety first. func (fi *FlatIndex) getKey(idx int) string { item := fi.items[idx] return string(fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)]) } // Compare helper to avoid string allocation func (fi *FlatIndex) compare(idx int, target string) int { item := fi.items[idx] keyBytes := fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)] return strings.Compare(string(keyBytes), target) } func (fi *FlatIndex) Insert(key string, entry IndexEntry) { fi.mu.Lock() defer fi.mu.Unlock() // 1. Binary Search idx := sort.Search(len(fi.items), func(i int) bool { return fi.getKey(i) >= key }) // 2. Update existing if idx < len(fi.items) && fi.getKey(idx) == key { fi.items[idx].entry = entry return } // 3. Add new key offset := len(fi.keyBuf) fi.keyBuf = append(fi.keyBuf, key...) if len(key) > 65535 { // Should have been caught earlier, but just in case cap it or panic // We can panic or truncate. // panic("key too long") } newItem := flatItem{ keyOffset: uint32(offset), keyLen: uint16(len(key)), entry: entry, } // 4. Insert into sorted slice // Optimization: check if appending to end (common case for sequential inserts) if idx == len(fi.items) { fi.items = append(fi.items, newItem) } else { fi.items = append(fi.items, flatItem{}) copy(fi.items[idx+1:], fi.items[idx:]) fi.items[idx] = newItem } } func (fi *FlatIndex) Get(key string) (IndexEntry, bool) { fi.mu.RLock() defer fi.mu.RUnlock() idx := sort.Search(len(fi.items), func(i int) bool { return fi.getKey(i) >= key }) if idx < len(fi.items) && fi.getKey(idx) == key { return fi.items[idx].entry, true } return IndexEntry{}, false } func (fi *FlatIndex) Delete(key string) bool { fi.mu.Lock() defer fi.mu.Unlock() idx := sort.Search(len(fi.items), func(i int) bool { return fi.getKey(i) >= key }) if idx < len(fi.items) && fi.getKey(idx) == key { // Delete from slice fi.items = append(fi.items[:idx], fi.items[idx+1:]...) // Note: We don't reclaim space in keyBuf. It's append-only until compaction (not implemented). // This is fine for many cases, but heavy deletes might waste keyBuf space. return true } return false } // WalkPrefix iterates over keys starting with prefix. // Since keys are sorted, we find the first match and iterate until mismatch. func (fi *FlatIndex) WalkPrefix(prefix string, callback func(key string, entry IndexEntry) bool) { fi.mu.RLock() defer fi.mu.RUnlock() // Binary search for the start idx := sort.Search(len(fi.items), func(i int) bool { return fi.getKey(i) >= prefix }) for i := idx; i < len(fi.items); i++ { key := fi.getKey(i) // Optimization: Check prefix match if !strings.HasPrefix(key, prefix) { break } if !callback(key, fi.items[i].entry) { return } } } // --- End Flat Index --- // --- Full Text Index --- type FullTextIndex struct { // Token -> Set of Keys index map[string]map[string]bool mu sync.RWMutex } func NewFullTextIndex() *FullTextIndex { return &FullTextIndex{ index: make(map[string]map[string]bool), } } func (fti *FullTextIndex) Add(key string, value string) { fti.mu.Lock() defer fti.mu.Unlock() tokens := tokenize(value) for _, token := range tokens { if fti.index[token] == nil { fti.index[token] = make(map[string]bool) } fti.index[token][key] = true } } func (fti *FullTextIndex) Remove(key string, value string) { fti.mu.Lock() defer fti.mu.Unlock() tokens := tokenize(value) for _, token := range tokens { if set, ok := fti.index[token]; ok { delete(set, key) if len(set) == 0 { delete(fti.index, token) } } } } func (fti *FullTextIndex) Search(tokenPattern string) []string { fti.mu.RLock() defer fti.mu.RUnlock() // 1. Exact Match if !strings.Contains(tokenPattern, "*") { if keys, ok := fti.index[tokenPattern]; ok { res := make([]string, 0, len(keys)) for k := range keys { res = append(res, k) } return res } return nil } // 2. Wildcard Scan var results []string seen := make(map[string]bool) for token, keys := range fti.index { if WildcardMatch(token, tokenPattern) { for k := range keys { if !seen[k] { results = append(results, k) seen[k] = true } } } } return results } func tokenize(val string) []string { f := func(c rune) bool { return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9')) } return strings.FieldsFunc(val, f) } // --- Storage & Cache --- // StripedLock provides hashed locks for keys type StripedLock struct { locks [1024]sync.RWMutex } func (sl *StripedLock) GetLock(key string) *sync.RWMutex { h := hash(key) return &sl.locks[h%1024] } // Metadata handles the persistent state of the engine type Metadata struct { LastCommitIndex uint64 } // Storage manages disk storage using Append-Only Log. type Storage struct { file *os.File filename string offset int64 // Current end of file cache map[int64]string cacheMu sync.RWMutex } const ( RecordTypePut = 0x01 RecordTypeDelete = 0x02 // CRC(4) + Type(1) + KeyLen(2) + ValLen(4) + CommitIndex(8) HeaderSize = 4 + 1 + 2 + 4 + 8 MaxCacheSize = 10000 ) func NewStorage(filename string) (*Storage, error) { f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return nil, err } st := &Storage{ file: f, filename: filename, cache: make(map[int64]string), } return st, nil } // Record represents a log entry type Record struct { Type byte Key string Value string Offset int64 CommitIndex uint64 } // Scan iterates over the log file, validating CRCs and returning records. // It returns the valid offset after the last record. func (s *Storage) Scan(callback func(Record)) (int64, error) { offset := int64(0) if _, err := s.file.Seek(0, 0); err != nil { return 0, err } reader := bufio.NewReader(s.file) for { header := make([]byte, HeaderSize) n, err := io.ReadFull(reader, header) if err == io.EOF { break } if err == io.ErrUnexpectedEOF && n == 0 { break } if err != nil { fmt.Printf("Storage Scan Error at %d: %v\n", offset, err) break } storedCRC := binary.LittleEndian.Uint32(header[0:4]) recType := header[4] keyLen := binary.LittleEndian.Uint16(header[5:7]) valLen := binary.LittleEndian.Uint32(header[7:11]) commitIndex := binary.LittleEndian.Uint64(header[11:19]) totalLen := int(keyLen) + int(valLen) data := make([]byte, totalLen) if _, err := io.ReadFull(reader, data); err != nil { fmt.Printf("Storage Scan Data Error at %d: %v\n", offset, err) break } // Verify CRC crc := crc32.ChecksumIEEE(header[4:]) // Checksum of Type+Lengths+CommitIndex crc = crc32.Update(crc, crc32.IEEETable, data) // + Data if crc != storedCRC { fmt.Printf("CRC Mismatch at %d\n", offset) break } key := string(data[:keyLen]) val := string(data[keyLen:]) callback(Record{ Type: recType, Key: key, Value: val, Offset: offset, CommitIndex: commitIndex, }) offset += int64(HeaderSize + totalLen) } // Update storage offset s.offset = offset s.file.Seek(offset, 0) return offset, nil } // Append writes a new record func (s *Storage) Append(key, val string, recType byte, commitIndex uint64) (int64, error) { keyBytes := []byte(key) valBytes := []byte(val) keyLen := len(keyBytes) valLen := len(valBytes) if keyLen > 65535 { return 0, fmt.Errorf("key too large") } buf := make([]byte, HeaderSize+keyLen+valLen) // 1. Build Header Data (skip CRC) buf[4] = recType binary.LittleEndian.PutUint16(buf[5:7], uint16(keyLen)) binary.LittleEndian.PutUint32(buf[7:11], uint32(valLen)) binary.LittleEndian.PutUint64(buf[11:19], commitIndex) // 2. Copy Data copy(buf[HeaderSize:], keyBytes) copy(buf[HeaderSize+keyLen:], valBytes) // 3. Calc CRC crc := crc32.ChecksumIEEE(buf[4:]) // Everything after CRC field binary.LittleEndian.PutUint32(buf[0:4], crc) // 4. Write totalSize := int64(len(buf)) writeOffset := atomic.AddInt64(&s.offset, totalSize) - totalSize if _, err := s.file.WriteAt(buf, writeOffset); err != nil { return 0, err } // Add to Cache if Put if recType == RecordTypePut { s.cacheMu.Lock() if len(s.cache) < MaxCacheSize { s.cache[writeOffset] = val } s.cacheMu.Unlock() } return writeOffset, nil } // Sync flushes writes to stable storage func (s *Storage) Sync() error { return s.file.Sync() } // ReadValue reads value at offset func (s *Storage) ReadValue(offset int64) (string, error) { // 1. Check Cache s.cacheMu.RLock() if val, ok := s.cache[offset]; ok { s.cacheMu.RUnlock() return val, nil } s.cacheMu.RUnlock() // 2. Read Header header := make([]byte, HeaderSize) if _, err := s.file.ReadAt(header, offset); err != nil { return "", err } keyLen := binary.LittleEndian.Uint16(header[5:7]) valLen := binary.LittleEndian.Uint32(header[7:11]) totalLen := int(keyLen) + int(valLen) data := make([]byte, totalLen) if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil { return "", err } // Verify CRC on read for safety storedCRC := binary.LittleEndian.Uint32(header[0:4]) crc := crc32.ChecksumIEEE(header[4:]) crc = crc32.Update(crc, crc32.IEEETable, data) if crc != storedCRC { return "", fmt.Errorf("data corruption detected at offset %d", offset) } val := string(data[keyLen:]) // 3. Fill Cache s.cacheMu.Lock() if len(s.cache) < MaxCacheSize { s.cache[offset] = val } else { for k := range s.cache { delete(s.cache, k) break } s.cache[offset] = val } s.cacheMu.Unlock() return val, nil } func (s *Storage) Close() error { return s.file.Close() } // EngineOption defines a configuration option for the Engine type EngineOption func(*EngineConfig) type EngineConfig struct { EnableValueIndex bool } // WithValueIndex enables or disables the value index (Full Text Index) func WithValueIndex(enable bool) EngineOption { return func(c *EngineConfig) { c.EnableValueIndex = enable } } // Engine is the core storage engine. type Engine struct { Index *FlatIndex // Flattened memory structure Storage *Storage FTIndex *FullTextIndex // Can be nil if disabled KeyLocks StripedLock LastCommitIndex uint64 commitMu sync.Mutex dataDir string // Metadata state metaFile *os.File writeMu sync.Mutex config EngineConfig } func NewEngine(dataDir string, opts ...EngineOption) (*Engine, error) { if err := os.MkdirAll(dataDir, 0755); err != nil { return nil, err } config := EngineConfig{ EnableValueIndex: false, // Default to false (key-only) } for _, opt := range opts { opt(&config) } store, err := NewStorage(dataDir + "/values.data") if err != nil { return nil, err } // Open or create metadata file metaPath := dataDir + "/meta.state" metaFile, err := os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0644) if err != nil { store.Close() return nil, err } e := &Engine{ Index: NewFlatIndex(), Storage: store, FTIndex: nil, dataDir: dataDir, metaFile: metaFile, config: config, } if config.EnableValueIndex { e.FTIndex = NewFullTextIndex() } // Load Metadata stat, err := metaFile.Stat() if err == nil && stat.Size() >= 8 { b := make([]byte, 8) if _, err := metaFile.ReadAt(b, 0); err == nil { e.LastCommitIndex = binary.LittleEndian.Uint64(b) } } // Rebuild Index from Disk // Note: We still scan to rebuild the memory index, but LastCommitIndex is initialized from meta file // We can update LastCommitIndex if the log is ahead of the meta file (e.g. crash before meta update) _, err = store.Scan(func(rec Record) { if rec.Type == RecordTypePut { e.Index.Insert(rec.Key, IndexEntry{ ValueOffset: rec.Offset, CommitIndex: rec.CommitIndex, }) if e.FTIndex != nil { e.FTIndex.Add(rec.Key, rec.Value) } // Update LastCommitIndex if log is ahead if rec.CommitIndex > e.LastCommitIndex { e.LastCommitIndex = rec.CommitIndex } } else if rec.Type == RecordTypeDelete { // Cleanup FTIndex using old value if possible e.removeValueFromFTIndex(rec.Key) e.Index.Delete(rec.Key) if rec.CommitIndex > e.LastCommitIndex { e.LastCommitIndex = rec.CommitIndex } } }) return e, nil } func (e *Engine) removeValueFromFTIndex(key string) { if e.FTIndex == nil { return } if entry, ok := e.Index.Get(key); ok { val, err := e.Storage.ReadValue(entry.ValueOffset) if err == nil { e.FTIndex.Remove(key, val) } } } func (e *Engine) Close() error { e.saveMetadata() e.metaFile.Close() return e.Storage.Close() } func (e *Engine) saveMetadata() { b := make([]byte, 8) binary.LittleEndian.PutUint64(b, e.LastCommitIndex) e.metaFile.WriteAt(b, 0) } func (e *Engine) Set(key, value string, commitIndex uint64) error { e.writeMu.Lock() defer e.writeMu.Unlock() // Idempotency check: if this log entry has already been applied, skip it. // This handles Raft replay on restart. if commitIndex > 0 && commitIndex <= e.LastCommitIndex { return nil } e.removeValueFromFTIndex(key) offset, err := e.Storage.Append(key, value, RecordTypePut, commitIndex) if err != nil { return err } e.Index.Insert(key, IndexEntry{ ValueOffset: offset, CommitIndex: commitIndex, }) if e.FTIndex != nil { e.FTIndex.Add(key, value) } e.commitMu.Lock() if commitIndex > e.LastCommitIndex { e.LastCommitIndex = commitIndex // Update metadata on disk periodically or on every write? // For safety, let's update it. Since it's pwrite at offset 0, it's fast. e.saveMetadata() } e.commitMu.Unlock() return nil } func (e *Engine) Delete(key string, commitIndex uint64) error { e.writeMu.Lock() defer e.writeMu.Unlock() // Idempotency check if commitIndex > 0 && commitIndex <= e.LastCommitIndex { return nil } e.removeValueFromFTIndex(key) _, err := e.Storage.Append(key, "", RecordTypeDelete, commitIndex) if err != nil { return err } e.Index.Delete(key) e.commitMu.Lock() if commitIndex > e.LastCommitIndex { e.LastCommitIndex = commitIndex e.saveMetadata() } e.commitMu.Unlock() return nil } // Sync flushes underlying storage to disk // GetLastAppliedIndex returns the last Raft log index applied to the DB func (e *Engine) GetLastAppliedIndex() uint64 { e.commitMu.Lock() defer e.commitMu.Unlock() return e.LastCommitIndex } func (e *Engine) Sync() error { e.metaFile.Sync() return e.Storage.Sync() } func (e *Engine) Get(key string) (string, bool) { entry, ok := e.Index.Get(key) if !ok { return "", false } val, err := e.Storage.ReadValue(entry.ValueOffset) if err != nil { return "", false } return val, true } type QueryResult struct { Key string `json:"key"` Value string `json:"value"` CommitIndex uint64 `json:"commit_index"` } func WildcardMatch(str, pattern string) bool { s := []rune(str) p := []rune(pattern) sLen, pLen := len(s), len(p) i, j := 0, 0 starIdx, matchIdx := -1, -1 for i < sLen { if j < pLen && (p[j] == '?' || p[j] == s[i]) { i++ j++ } else if j < pLen && p[j] == '*' { starIdx = j matchIdx = i j++ } else if starIdx != -1 { j = starIdx + 1 matchIdx++ i = matchIdx } else { return false } } for j < pLen && p[j] == '*' { j++ } return j == pLen } func (e *Engine) Count(sql string) (int, error) { return e.execute(sql, true) } func (e *Engine) execute(sql string, countOnly bool) (int, error) { sql = strings.TrimSpace(sql) reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`) reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`) limit := -1 offset := 0 if match := reLimit.FindStringSubmatch(sql); len(match) > 0 { limit, _ = strconv.Atoi(match[1]) sql = reLimit.ReplaceAllString(sql, "") } if match := reOffset.FindStringSubmatch(sql); len(match) > 0 { offset, _ = strconv.Atoi(match[1]) sql = reOffset.ReplaceAllString(sql, "") } re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`) matches := re.FindAllStringSubmatch(sql, -1) if len(matches) == 0 { return 0, fmt.Errorf("invalid query") } extractString := func(s string) string { return strings.Trim(strings.TrimSpace(s), "\"") } // 1. Optimize for Point Lookup for _, match := range matches { if match[1] == "key" && match[2] == "=" { targetKey := extractString(match[3]) entry, ok := e.Index.Get(targetKey) if !ok { return 0, nil } needValue := false for _, m := range matches { if m[1] == "value" { needValue = true; break } } var val string if needValue { v, err := e.Storage.ReadValue(entry.ValueOffset) if err != nil { return 0, nil } val = v } matchAll := true for _, m := range matches { field, op, valRaw := m[1], m[2], m[3] switch field { case "CommitIndex": num, _ := strconv.ParseUint(valRaw, 10, 64) switch op { case ">": if !(entry.CommitIndex > num) { matchAll = false } case "<": if !(entry.CommitIndex < num) { matchAll = false } case ">=": if !(entry.CommitIndex >= num) { matchAll = false } case "<=": if !(entry.CommitIndex <= num) { matchAll = false } case "=": if !(entry.CommitIndex == num) { matchAll = false } } case "value": t := extractString(valRaw) switch op { case "=": if val != t { matchAll = false } case "like": if !WildcardMatch(val, t) { matchAll = false } } } if !matchAll { break } } if matchAll { return 1, nil } return 0, nil } } var candidates map[string]bool var useFTIndex bool = false // 2. Try to use FT Index (if enabled) if e.FTIndex != nil { for _, match := range matches { if match[1] == "value" && match[2] == "like" { pattern := extractString(match[3]) clean := strings.Trim(pattern, "*") if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") { matches := e.FTIndex.Search(pattern) if matches != nil { currentSet := make(map[string]bool) for _, k := range matches { currentSet[k] = true } if !useFTIndex { candidates = currentSet useFTIndex = true } else { newSet := make(map[string]bool) for k := range candidates { if currentSet[k] { newSet[k] = true } } candidates = newSet } } else { return 0, nil } } } } } else { for _, match := range matches { if match[1] == "value" && match[2] == "like" { // Value search requested but index disabled -> return 0 return 0, nil } } } var iterator func(func(string, IndexEntry) bool) if useFTIndex { iterator = func(cb func(string, IndexEntry) bool) { keys := make([]string, 0, len(candidates)) for k := range candidates { keys = append(keys, k) } sort.Strings(keys) for _, k := range keys { if entry, ok := e.Index.Get(k); ok { if !cb(k, entry) { return } } } } } else { // Use FlatIndex Prefix Search var prefix string = "" var usePrefix bool = false for _, match := range matches { if match[1] == "key" && match[2] == "like" { pattern := extractString(match[3]) // Flat Index supports simple prefix match if strings.HasSuffix(pattern, "*") { clean := pattern[:len(pattern)-1] if !strings.ContainsAny(clean, "*?") { prefix = clean usePrefix = true break } } } } iterator = func(cb func(string, IndexEntry) bool) { if usePrefix { e.Index.WalkPrefix(prefix, cb) } else { e.Index.WalkPrefix("", cb) } } } matchedCount := 0 iterator(func(key string, entry IndexEntry) bool { var valStr string var valLoaded bool matchAll := true for _, match := range matches { field, op, valRaw := match[1], match[2], match[3] switch field { case "CommitIndex": num, _ := strconv.ParseUint(valRaw, 10, 64) switch op { case ">": if !(entry.CommitIndex > num) { matchAll = false } case "<": if !(entry.CommitIndex < num) { matchAll = false } case ">=": if !(entry.CommitIndex >= num) { matchAll = false } case "<=": if !(entry.CommitIndex <= num) { matchAll = false } case "=": if !(entry.CommitIndex == num) { matchAll = false } } case "key": target := extractString(valRaw) switch op { case "=": if key != target { matchAll = false } case "like": if !WildcardMatch(key, target) { matchAll = false } } case "value": if e.FTIndex == nil && op == "like" { matchAll = false break } if !valLoaded { v, err := e.Storage.ReadValue(entry.ValueOffset) if err != nil { matchAll = false; break } valStr = v valLoaded = true } target := extractString(valRaw) switch op { case "=": if valStr != target { matchAll = false } case "like": if !WildcardMatch(valStr, target) { matchAll = false } } } if !matchAll { break } } if matchAll { matchedCount++ if limit > 0 && offset == 0 && matchedCount >= limit { return false } } return true }) if offset > 0 { if offset >= matchedCount { return 0, nil } matchedCount -= offset } if limit >= 0 { if limit < matchedCount { matchedCount = limit } } return matchedCount, nil } func (e *Engine) Query(sql string) ([]QueryResult, error) { return e.queryInternal(sql) } func (e *Engine) queryInternal(sql string) ([]QueryResult, error) { sql = strings.TrimSpace(sql) reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`) reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`) limit := -1 offset := 0 if match := reLimit.FindStringSubmatch(sql); len(match) > 0 { limit, _ = strconv.Atoi(match[1]) sql = reLimit.ReplaceAllString(sql, "") } if match := reOffset.FindStringSubmatch(sql); len(match) > 0 { offset, _ = strconv.Atoi(match[1]) sql = reOffset.ReplaceAllString(sql, "") } re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`) matches := re.FindAllStringSubmatch(sql, -1) if len(matches) == 0 { return nil, fmt.Errorf("invalid query") } extractString := func(s string) string { return strings.Trim(strings.TrimSpace(s), "\"") } for _, match := range matches { if match[1] == "key" && match[2] == "=" { targetKey := extractString(match[3]) entry, ok := e.Index.Get(targetKey) if !ok { return []QueryResult{}, nil } val, err := e.Storage.ReadValue(entry.ValueOffset) if err != nil { return []QueryResult{}, nil } matchAll := true for _, m := range matches { field, op, valRaw := m[1], m[2], m[3] switch field { case "CommitIndex": num, _ := strconv.ParseUint(valRaw, 10, 64) switch op { case ">": if !(entry.CommitIndex > num) { matchAll = false } case "<": if !(entry.CommitIndex < num) { matchAll = false } case ">=": if !(entry.CommitIndex >= num) { matchAll = false } case "<=": if !(entry.CommitIndex <= num) { matchAll = false } case "=": if !(entry.CommitIndex == num) { matchAll = false } } case "value": t := extractString(valRaw) switch op { case "=": if val != t { matchAll = false } case "like": if !WildcardMatch(val, t) { matchAll = false } } } if !matchAll { break } } if matchAll { return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil } return []QueryResult{}, nil } } var candidates map[string]bool var useFTIndex bool = false if e.FTIndex != nil { for _, match := range matches { if match[1] == "value" && match[2] == "like" { pattern := extractString(match[3]) clean := strings.Trim(pattern, "*") if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") { matches := e.FTIndex.Search(pattern) if matches != nil { currentSet := make(map[string]bool) for _, k := range matches { currentSet[k] = true } if !useFTIndex { candidates = currentSet useFTIndex = true } else { newSet := make(map[string]bool) for k := range candidates { if currentSet[k] { newSet[k] = true } } candidates = newSet } } else { return []QueryResult{}, nil } } } } } else { for _, match := range matches { if match[1] == "value" && match[2] == "like" { return []QueryResult{}, nil } } } var iterator func(func(string, IndexEntry) bool) if useFTIndex { iterator = func(cb func(string, IndexEntry) bool) { keys := make([]string, 0, len(candidates)) for k := range candidates { keys = append(keys, k) } sort.Strings(keys) for _, k := range keys { if entry, ok := e.Index.Get(k); ok { if !cb(k, entry) { return } } } } } else { var prefix string = "" var usePrefix bool = false for _, match := range matches { if match[1] == "key" && match[2] == "like" { pattern := extractString(match[3]) if strings.HasSuffix(pattern, "*") { clean := pattern[:len(pattern)-1] if !strings.ContainsAny(clean, "*?") { prefix = clean usePrefix = true break } } } } iterator = func(cb func(string, IndexEntry) bool) { if usePrefix { e.Index.WalkPrefix(prefix, cb) } else { e.Index.WalkPrefix("", cb) } } } var results []QueryResult iterator(func(key string, entry IndexEntry) bool { var valStr string var valLoaded bool matchAll := true for _, match := range matches { field, op, valRaw := match[1], match[2], match[3] switch field { case "CommitIndex": num, _ := strconv.ParseUint(valRaw, 10, 64) switch op { case ">": if !(entry.CommitIndex > num) { matchAll = false } case "<": if !(entry.CommitIndex < num) { matchAll = false } case ">=": if !(entry.CommitIndex >= num) { matchAll = false } case "<=": if !(entry.CommitIndex <= num) { matchAll = false } case "=": if !(entry.CommitIndex == num) { matchAll = false } } case "key": target := extractString(valRaw) switch op { case "=": if key != target { matchAll = false } case "like": if !WildcardMatch(key, target) { matchAll = false } } case "value": if e.FTIndex == nil && op == "like" { matchAll = false break } if !valLoaded { v, err := e.Storage.ReadValue(entry.ValueOffset) if err != nil { matchAll = false; break } valStr = v valLoaded = true } target := extractString(valRaw) switch op { case "=": if valStr != target { matchAll = false } case "like": if !WildcardMatch(valStr, target) { matchAll = false } } } if !matchAll { break } } if matchAll { if !valLoaded { v, err := e.Storage.ReadValue(entry.ValueOffset) if err == nil { valStr = v } } results = append(results, QueryResult{ Key: key, Value: valStr, CommitIndex: entry.CommitIndex, }) needed := limit + offset if limit > 0 && len(results) >= needed { return false } } return true }) if offset > 0 { if offset >= len(results) { return []QueryResult{}, nil } results = results[offset:] } if limit >= 0 { if limit < len(results) { results = results[:limit] } } return results, nil } // Snapshot creates a full consistency snapshot of the database. // Since the DB engine itself IS the state machine, this simply serializes // all current valid data. This snapshot can be used to restore a lagging node // that has fallen behind the Raft logs. func (e *Engine) Snapshot() ([]byte, error) { e.Index.mu.RLock() defer e.Index.mu.RUnlock() // Snapshot Format Version 1: // [Count U64] + [Record...] // Record: [KeyLen U16] [Key Bytes] [ValLen U32] [Val Bytes] [CommitIndex U64] // Pre-calculate size to reduce allocations // Estimate: Count * (AverageKeySize + AverageValSize + Overhead) // We start with a reasonable buffer size. buf := make([]byte, 0, 1024*1024) count := uint64(len(e.Index.items)) // Write Count tmp := make([]byte, 8) binary.LittleEndian.PutUint64(tmp, count) buf = append(buf, tmp...) // Buffer for encoding headers to avoid repeated small allocations headerBuf := make([]byte, 2+4+8) // KeyLen + ValLen + CommitIndex for i := range e.Index.items { // Get Key key := e.Index.getKey(i) entry := e.Index.items[i].entry // Get Value // We read directly from storage. Since we hold the read lock on index, // the offset is valid. The storage file is append-only, so old data exists. val, err := e.Storage.ReadValue(entry.ValueOffset) if err != nil { // If we can't read a value, it's a critical error for snapshot integrity. return nil, fmt.Errorf("snapshot failed reading key '%s' at offset %d: %v", key, entry.ValueOffset, err) } keyBytes := []byte(key) valBytes := []byte(val) // Encode Header binary.LittleEndian.PutUint16(headerBuf[0:2], uint16(len(keyBytes))) binary.LittleEndian.PutUint32(headerBuf[2:6], uint32(len(valBytes))) binary.LittleEndian.PutUint64(headerBuf[6:14], entry.CommitIndex) // Append to buffer buf = append(buf, headerBuf...) buf = append(buf, keyBytes...) buf = append(buf, valBytes...) } return buf, nil } // Restore completely replaces the database content with the provided snapshot. // This matches the "DB as Snapshot" design: we wipe the current state // and rebuild from the full image provided by the leader. func (e *Engine) Restore(data []byte) error { e.writeMu.Lock() defer e.writeMu.Unlock() // 1. Safety check & Parse Header if len(data) < 8 { if len(data) == 0 { // Empty snapshot implies empty DB. return e.resetToEmpty() } return fmt.Errorf("invalid snapshot data: too short") } count := binary.LittleEndian.Uint64(data[0:8]) offset := 8 // 2. Stop-the-World: Close current storage to safely wipe it e.Index.mu.Lock() if err := e.Storage.Close(); err != nil { e.Index.mu.Unlock() return fmt.Errorf("failed to close storage for restore: %v", err) } // 3. Truncate & Reset // We are replacing the entire DB state. // Reset In-Memory Index e.Index = NewFlatIndex() if e.config.EnableValueIndex { e.FTIndex = NewFullTextIndex() } // Clear Cache e.Storage.cache = make(map[int64]string) // Re-open storage in Truncate mode (Wipe data file) f, err := os.OpenFile(e.Storage.filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { e.Index.mu.Unlock() return fmt.Errorf("failed to reopen storage for restore: %v", err) } e.Storage.file = f e.Storage.offset = 0 // We hold the lock during the entire restore process to prevent any reads/writes // This is acceptable as Restore is a rare, heavyweight operation. defer e.Index.mu.Unlock() maxCommitIndex := uint64(0) // 4. Rebuild Data from Snapshot Stream for i := uint64(0); i < count; i++ { // Read KeyLen if offset+2 > len(data) { return fmt.Errorf("restore failed: truncated data at record %d (keylen)", i) } keyLen := int(binary.LittleEndian.Uint16(data[offset:])) offset += 2 // Read Key if offset+keyLen > len(data) { return fmt.Errorf("restore failed: truncated data at record %d (key)", i) } key := string(data[offset : offset+keyLen]) offset += keyLen // Read ValLen if offset+4 > len(data) { return fmt.Errorf("restore failed: truncated data at record %d (vallen)", i) } valLen := int(binary.LittleEndian.Uint32(data[offset:])) offset += 4 // Read Value if offset+valLen > len(data) { return fmt.Errorf("restore failed: truncated data at record %d (value)", i) } val := string(data[offset : offset+valLen]) offset += valLen // Read CommitIndex if offset+8 > len(data) { return fmt.Errorf("restore failed: truncated data at record %d (commitIndex)", i) } commitIndex := binary.LittleEndian.Uint64(data[offset:]) offset += 8 if commitIndex > maxCommitIndex { maxCommitIndex = commitIndex } // Direct Write to Storage (Internal Append) // We use the low-level Storage.Append directly since we already hold locks // and are bypassing the standard Set path. writeOffset, err := e.Storage.Append(key, val, RecordTypePut, commitIndex) if err != nil { return fmt.Errorf("restore failed appending record %d: %v", i, err) } // Update Memory Index // Accessing e.Index.items directly because we hold e.Index.mu // But we should use the helper to safely manage keyBuf e.Index.Insert(key, IndexEntry{ ValueOffset: writeOffset, CommitIndex: commitIndex, }) // Update Full Text Index if enabled if e.FTIndex != nil { e.FTIndex.Add(key, val) } } // 5. Update Metadata e.commitMu.Lock() e.LastCommitIndex = maxCommitIndex e.saveMetadata() e.commitMu.Unlock() // 6. Force Sync if err := e.Sync(); err != nil { return fmt.Errorf("failed to sync restored data: %v", err) } return nil } // resetToEmpty helper to wipe the DB cleanly func (e *Engine) resetToEmpty() error { e.Index.mu.Lock() defer e.Index.mu.Unlock() if err := e.Storage.Close(); err != nil { return err } e.Index = NewFlatIndex() if e.config.EnableValueIndex { e.FTIndex = NewFullTextIndex() } e.Storage.cache = make(map[int64]string) f, err := os.OpenFile(e.Storage.filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return err } e.Storage.file = f e.Storage.offset = 0 e.commitMu.Lock() e.LastCommitIndex = 0 e.saveMetadata() e.commitMu.Unlock() return nil } func (e *Engine) DebugClearAuxiliary() { e.Storage.cacheMu.Lock() e.Storage.cache = make(map[int64]string) e.Storage.cacheMu.Unlock() if e.FTIndex != nil { e.FTIndex.mu.Lock() e.FTIndex.index = make(map[string]map[string]bool) e.FTIndex.mu.Unlock() } }