| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512 |
- package db
- import (
- "bufio"
- "encoding/binary"
- "fmt"
- "hash/crc32"
- "io"
- "os"
- "regexp"
- "sort"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- )
- // hash returns a 32-bit FNV-1a hash of the string
- func hash(s string) uint32 {
- var h uint32 = 2166136261
- for i := 0; i < len(s); i++ {
- h = (h * 16777619) ^ uint32(s[i])
- }
- return h
- }
- // --- Flat Sorted Array Index Implementation ---
- // Extreme memory optimization:
- // 1. Removes all tree node overhead (pointers, structs, map buckets).
- // 2. Stores keys in a single monolithic []byte buffer (no string headers).
- // 3. Uses a sorted slice of offsets for O(log N) lookup and O(N) insertion.
- // IndexEntry
- type IndexEntry struct {
- ValueOffset int64
- CommitIndex uint64
- }
- type FlatIndex struct {
- // Monolithic buffer to store all key strings
- // Format: [key1 bytes][key2 bytes]...
- keyBuf []byte
-
- // Sorted list of index items
- items []flatItem
- mu sync.RWMutex
- }
- // flatItem is a compact struct (16 bytes aligned)
- type flatItem struct {
- keyOffset uint32 // 4 bytes: Offset into keyBuf (supports 4GB total key size)
- keyLen uint16 // 2 bytes: Length of key (max key len 65535)
- // Padding 2 bytes? No, Go struct alignment might add padding.
- // IndexEntry is 16 bytes (int64 + uint64).
- entry IndexEntry // 16 bytes
- }
- // Total per item: 4 + 2 + (2 padding) + 16 = 24 bytes per Key.
- // 100k keys => 2.4 MB.
- // Plus keyBuf: 100k * 18 bytes = 1.8 MB.
- // Total expected: ~4.2 MB.
- func NewFlatIndex() *FlatIndex {
- return &FlatIndex{
- keyBuf: make([]byte, 0, 1024*1024), // 1MB initial cap
- items: make([]flatItem, 0, 10000),
- }
- }
- // getKey unsafe-ish helper to get string from buffer without alloc?
- // Standard conversion string(b) allocates.
- // For comparison in binary search, we ideally want to avoid allocation.
- // But Go 1.20+ optimization might handle string(bytes) well in map keys or comparisons if they don't escape.
- // Let's rely on standard string() conversion for safety first.
- func (fi *FlatIndex) getKey(idx int) string {
- item := fi.items[idx]
- return string(fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)])
- }
- // Compare helper to avoid string allocation
- func (fi *FlatIndex) compare(idx int, target string) int {
- item := fi.items[idx]
- keyBytes := fi.keyBuf[item.keyOffset : item.keyOffset+uint32(item.keyLen)]
- return strings.Compare(string(keyBytes), target)
- }
- func (fi *FlatIndex) Insert(key string, entry IndexEntry) {
- fi.mu.Lock()
- defer fi.mu.Unlock()
- fi.insertLocked(key, entry)
- }
- func (fi *FlatIndex) insertLocked(key string, entry IndexEntry) {
- // 1. Binary Search
- idx := sort.Search(len(fi.items), func(i int) bool {
- return fi.getKey(i) >= key
- })
- // 2. Update existing
- if idx < len(fi.items) && fi.getKey(idx) == key {
- fi.items[idx].entry = entry
- return
- }
- // 3. Add new key
- offset := len(fi.keyBuf)
- fi.keyBuf = append(fi.keyBuf, key...)
-
- if len(key) > 65535 {
- // Should have been caught earlier, but just in case cap it or panic
- // We can panic or truncate.
- // panic("key too long")
- }
- newItem := flatItem{
- keyOffset: uint32(offset),
- keyLen: uint16(len(key)),
- entry: entry,
- }
- // 4. Insert into sorted slice
- // Optimization: check if appending to end (common case for sequential inserts)
- if idx == len(fi.items) {
- fi.items = append(fi.items, newItem)
- } else {
- fi.items = append(fi.items, flatItem{})
- copy(fi.items[idx+1:], fi.items[idx:])
- fi.items[idx] = newItem
- }
- }
- func (fi *FlatIndex) Get(key string) (IndexEntry, bool) {
- fi.mu.RLock()
- defer fi.mu.RUnlock()
- idx := sort.Search(len(fi.items), func(i int) bool {
- return fi.getKey(i) >= key
- })
- if idx < len(fi.items) && fi.getKey(idx) == key {
- return fi.items[idx].entry, true
- }
- return IndexEntry{}, false
- }
- func (fi *FlatIndex) Delete(key string) bool {
- fi.mu.Lock()
- defer fi.mu.Unlock()
- idx := sort.Search(len(fi.items), func(i int) bool {
- return fi.getKey(i) >= key
- })
- if idx < len(fi.items) && fi.getKey(idx) == key {
- // Delete from slice
- fi.items = append(fi.items[:idx], fi.items[idx+1:]...)
- // Note: We don't reclaim space in keyBuf. It's append-only until compaction (not implemented).
- // This is fine for many cases, but heavy deletes might waste keyBuf space.
- return true
- }
- return false
- }
- // WalkPrefix iterates over keys starting with prefix.
- // Since keys are sorted, we find the first match and iterate until mismatch.
- func (fi *FlatIndex) WalkPrefix(prefix string, callback func(key string, entry IndexEntry) bool) {
- fi.mu.RLock()
- defer fi.mu.RUnlock()
- // Binary search for the start
- idx := sort.Search(len(fi.items), func(i int) bool {
- return fi.getKey(i) >= prefix
- })
- for i := idx; i < len(fi.items); i++ {
- key := fi.getKey(i)
- // Optimization: Check prefix match
- if !strings.HasPrefix(key, prefix) {
- break
- }
- if !callback(key, fi.items[i].entry) {
- return
- }
- }
- }
- // --- End Flat Index ---
- // --- Full Text Index ---
- type FullTextIndex struct {
- // Token -> Set of Keys
- index map[string]map[string]bool
- mu sync.RWMutex
- }
- func NewFullTextIndex() *FullTextIndex {
- return &FullTextIndex{
- index: make(map[string]map[string]bool),
- }
- }
- func (fti *FullTextIndex) Add(key string, value string) {
- fti.mu.Lock()
- defer fti.mu.Unlock()
-
- tokens := tokenize(value)
- for _, token := range tokens {
- if fti.index[token] == nil {
- fti.index[token] = make(map[string]bool)
- }
- fti.index[token][key] = true
- }
- }
- func (fti *FullTextIndex) Remove(key string, value string) {
- fti.mu.Lock()
- defer fti.mu.Unlock()
-
- tokens := tokenize(value)
- for _, token := range tokens {
- if set, ok := fti.index[token]; ok {
- delete(set, key)
- if len(set) == 0 {
- delete(fti.index, token)
- }
- }
- }
- }
- func (fti *FullTextIndex) Search(tokenPattern string) []string {
- fti.mu.RLock()
- defer fti.mu.RUnlock()
- // 1. Exact Match
- if !strings.Contains(tokenPattern, "*") {
- if keys, ok := fti.index[tokenPattern]; ok {
- res := make([]string, 0, len(keys))
- for k := range keys {
- res = append(res, k)
- }
- return res
- }
- return nil
- }
- // 2. Wildcard Scan
- var results []string
- seen := make(map[string]bool)
- for token, keys := range fti.index {
- if WildcardMatch(token, tokenPattern) {
- for k := range keys {
- if !seen[k] {
- results = append(results, k)
- seen[k] = true
- }
- }
- }
- }
- return results
- }
- func tokenize(val string) []string {
- f := func(c rune) bool {
- return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
- }
- return strings.FieldsFunc(val, f)
- }
- // --- Storage & Cache ---
- // StripedLock provides hashed locks for keys
- type StripedLock struct {
- locks [1024]sync.RWMutex
- }
- func (sl *StripedLock) GetLock(key string) *sync.RWMutex {
- h := hash(key)
- return &sl.locks[h%1024]
- }
- // Metadata handles the persistent state of the engine
- type Metadata struct {
- LastCommitIndex uint64
- }
- // Storage manages disk storage using Append-Only Log.
- type Storage struct {
- file *os.File
- filename string
- offset int64 // Current end of file
-
- cache map[int64]string
- cacheMu sync.RWMutex
- }
- const (
- RecordTypePut = 0x01
- RecordTypeDelete = 0x02
-
- // CRC(4) + Type(1) + KeyLen(2) + ValLen(4) + CommitIndex(8)
- HeaderSize = 4 + 1 + 2 + 4 + 8
-
- MaxCacheSize = 10000
- )
- func NewStorage(filename string) (*Storage, error) {
- f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
- if err != nil {
- return nil, err
- }
- st := &Storage{
- file: f,
- filename: filename,
- cache: make(map[int64]string),
- }
-
- return st, nil
- }
- // Record represents a log entry
- type Record struct {
- Type byte
- Key string
- Value string
- Offset int64
- CommitIndex uint64
- }
- // Scan iterates over the log file, validating CRCs and returning records.
- // It returns the valid offset after the last record.
- func (s *Storage) Scan(callback func(Record)) (int64, error) {
- offset := int64(0)
-
- if _, err := s.file.Seek(0, 0); err != nil {
- return 0, err
- }
-
- reader := bufio.NewReader(s.file)
-
- for {
- header := make([]byte, HeaderSize)
- n, err := io.ReadFull(reader, header)
- if err == io.EOF {
- break
- }
- if err == io.ErrUnexpectedEOF && n == 0 {
- break
- }
- if err != nil {
- fmt.Printf("Storage Scan Error at %d: %v\n", offset, err)
- break
- }
-
- storedCRC := binary.LittleEndian.Uint32(header[0:4])
- recType := header[4]
- keyLen := binary.LittleEndian.Uint16(header[5:7])
- valLen := binary.LittleEndian.Uint32(header[7:11])
- commitIndex := binary.LittleEndian.Uint64(header[11:19])
-
- totalLen := int(keyLen) + int(valLen)
- data := make([]byte, totalLen)
-
- if _, err := io.ReadFull(reader, data); err != nil {
- fmt.Printf("Storage Scan Data Error at %d: %v\n", offset, err)
- break
- }
-
- // Verify CRC
- crc := crc32.ChecksumIEEE(header[4:]) // Checksum of Type+Lengths+CommitIndex
- crc = crc32.Update(crc, crc32.IEEETable, data) // + Data
-
- if crc != storedCRC {
- fmt.Printf("CRC Mismatch at %d\n", offset)
- break
- }
-
- key := string(data[:keyLen])
- val := string(data[keyLen:])
-
- callback(Record{
- Type: recType,
- Key: key,
- Value: val,
- Offset: offset,
- CommitIndex: commitIndex,
- })
-
- offset += int64(HeaderSize + totalLen)
- }
-
- // Update storage offset
- s.offset = offset
- s.file.Seek(offset, 0)
-
- return offset, nil
- }
- // Append writes a new record
- func (s *Storage) Append(key, val string, recType byte, commitIndex uint64) (int64, error) {
- keyBytes := []byte(key)
- valBytes := []byte(val)
-
- keyLen := len(keyBytes)
- valLen := len(valBytes)
-
- if keyLen > 65535 {
- return 0, fmt.Errorf("key too large")
- }
-
- buf := make([]byte, HeaderSize+keyLen+valLen)
-
- // 1. Build Header Data (skip CRC)
- buf[4] = recType
- binary.LittleEndian.PutUint16(buf[5:7], uint16(keyLen))
- binary.LittleEndian.PutUint32(buf[7:11], uint32(valLen))
- binary.LittleEndian.PutUint64(buf[11:19], commitIndex)
-
- // 2. Copy Data
- copy(buf[HeaderSize:], keyBytes)
- copy(buf[HeaderSize+keyLen:], valBytes)
-
- // 3. Calc CRC
- crc := crc32.ChecksumIEEE(buf[4:]) // Everything after CRC field
- binary.LittleEndian.PutUint32(buf[0:4], crc)
-
- // 4. Write
- totalSize := int64(len(buf))
- writeOffset := atomic.AddInt64(&s.offset, totalSize) - totalSize
-
- if _, err := s.file.WriteAt(buf, writeOffset); err != nil {
- return 0, err
- }
-
- // Add to Cache if Put
- if recType == RecordTypePut {
- s.cacheMu.Lock()
- if len(s.cache) < MaxCacheSize {
- s.cache[writeOffset] = val
- }
- s.cacheMu.Unlock()
- }
-
- return writeOffset, nil
- }
- // Sync flushes writes to stable storage
- func (s *Storage) Sync() error {
- return s.file.Sync()
- }
- // ReadValue reads value at offset
- func (s *Storage) ReadValue(offset int64) (string, error) {
- // 1. Check Cache
- s.cacheMu.RLock()
- if val, ok := s.cache[offset]; ok {
- s.cacheMu.RUnlock()
- return val, nil
- }
- s.cacheMu.RUnlock()
- // 2. Read Header
- header := make([]byte, HeaderSize)
- if _, err := s.file.ReadAt(header, offset); err != nil {
- return "", err
- }
-
- keyLen := binary.LittleEndian.Uint16(header[5:7])
- valLen := binary.LittleEndian.Uint32(header[7:11])
-
- totalLen := int(keyLen) + int(valLen)
- data := make([]byte, totalLen)
-
- if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
- return "", err
- }
-
- // Verify CRC on read for safety
- storedCRC := binary.LittleEndian.Uint32(header[0:4])
- crc := crc32.ChecksumIEEE(header[4:])
- crc = crc32.Update(crc, crc32.IEEETable, data)
-
- if crc != storedCRC {
- return "", fmt.Errorf("data corruption detected at offset %d", offset)
- }
-
- val := string(data[keyLen:])
-
- // 3. Fill Cache
- s.cacheMu.Lock()
- if len(s.cache) < MaxCacheSize {
- s.cache[offset] = val
- } else {
- for k := range s.cache {
- delete(s.cache, k)
- break
- }
- s.cache[offset] = val
- }
- s.cacheMu.Unlock()
- return val, nil
- }
- func (s *Storage) Close() error {
- return s.file.Close()
- }
- // EngineOption defines a configuration option for the Engine
- type EngineOption func(*EngineConfig)
- type EngineConfig struct {
- EnableValueIndex bool
- }
- // WithValueIndex enables or disables the value index (Full Text Index)
- func WithValueIndex(enable bool) EngineOption {
- return func(c *EngineConfig) {
- c.EnableValueIndex = enable
- }
- }
- // Engine is the core storage engine.
- type Engine struct {
- Index *FlatIndex // Flattened memory structure
- Storage *Storage
- FTIndex *FullTextIndex // Can be nil if disabled
- KeyLocks StripedLock
- LastCommitIndex uint64
- commitMu sync.Mutex
- dataDir string
-
- // Metadata state
- metaFile *os.File
-
- writeMu sync.Mutex
- config EngineConfig
- }
- func NewEngine(dataDir string, opts ...EngineOption) (*Engine, error) {
- if err := os.MkdirAll(dataDir, 0755); err != nil {
- return nil, err
- }
- config := EngineConfig{
- EnableValueIndex: false, // Default to false (key-only)
- }
- for _, opt := range opts {
- opt(&config)
- }
- store, err := NewStorage(dataDir + "/values.data")
- if err != nil {
- return nil, err
- }
- // Open or create metadata file
- metaPath := dataDir + "/meta.state"
- metaFile, err := os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0644)
- if err != nil {
- store.Close()
- return nil, err
- }
- e := &Engine{
- Index: NewFlatIndex(),
- Storage: store,
- FTIndex: nil,
- dataDir: dataDir,
- metaFile: metaFile,
- config: config,
- }
-
- if config.EnableValueIndex {
- e.FTIndex = NewFullTextIndex()
- }
- // Load Metadata
- stat, err := metaFile.Stat()
- if err == nil && stat.Size() >= 8 {
- b := make([]byte, 8)
- if _, err := metaFile.ReadAt(b, 0); err == nil {
- e.LastCommitIndex = binary.LittleEndian.Uint64(b)
- }
- }
- // Rebuild Index from Disk
- // Note: We still scan to rebuild the memory index, but LastCommitIndex is initialized from meta file
- // We can update LastCommitIndex if the log is ahead of the meta file (e.g. crash before meta update)
- // We also correct LastCommitIndex if meta file is ahead of data (e.g. data flush lost during power failure)
-
- realMaxIndex := uint64(0)
-
- _, err = store.Scan(func(rec Record) {
- if rec.Type == RecordTypePut {
- e.Index.Insert(rec.Key, IndexEntry{
- ValueOffset: rec.Offset,
- CommitIndex: rec.CommitIndex,
- })
- if e.FTIndex != nil {
- e.FTIndex.Add(rec.Key, rec.Value)
- }
- } else if rec.Type == RecordTypeDelete {
- // Cleanup FTIndex using old value if possible
- e.removeValueFromFTIndex(rec.Key)
- e.Index.Delete(rec.Key)
- }
-
- // Track the actual max index present in the data file
- if rec.CommitIndex > realMaxIndex {
- realMaxIndex = rec.CommitIndex
- }
- })
-
- // Critical Safety Check:
- // 1. If Log > Meta: Update Meta (Normal crash recovery)
- // 2. If Meta > Log: Downgrade Meta (Data loss recovery - force Raft replay)
- if realMaxIndex > e.LastCommitIndex {
- e.LastCommitIndex = realMaxIndex
- } else if realMaxIndex < e.LastCommitIndex {
- // Detect inconsistency: Meta says we have more data than what's on disk.
- // This happens if Meta was flushed but Data tail was lost during power failure.
- // We MUST trust the actual data on disk.
- fmt.Printf("WARNING: Inconsistency detected! Meta LastCommitIndex (%d) > Data RealIndex (%d). Resetting to %d to force Raft replay.\n",
- e.LastCommitIndex, realMaxIndex, realMaxIndex)
- e.LastCommitIndex = realMaxIndex
- // Force save corrected metadata
- e.saveMetadata()
- }
-
- return e, nil
- }
- func (e *Engine) removeValueFromFTIndex(key string) {
- if e.FTIndex == nil {
- return
- }
- if entry, ok := e.Index.Get(key); ok {
- val, err := e.Storage.ReadValue(entry.ValueOffset)
- if err == nil {
- e.FTIndex.Remove(key, val)
- }
- }
- }
- func (e *Engine) Close() error {
- e.saveMetadata()
- e.metaFile.Close()
- return e.Storage.Close()
- }
- func (e *Engine) saveMetadata() {
- b := make([]byte, 8)
- binary.LittleEndian.PutUint64(b, e.LastCommitIndex)
- e.metaFile.WriteAt(b, 0)
- }
- func (e *Engine) Set(key, value string, commitIndex uint64) error {
- e.writeMu.Lock()
- defer e.writeMu.Unlock()
-
- // Idempotency check: if this log entry has already been applied, skip it.
- // This handles Raft replay on restart.
- if commitIndex > 0 && commitIndex <= e.LastCommitIndex {
- return nil
- }
- e.removeValueFromFTIndex(key)
- offset, err := e.Storage.Append(key, value, RecordTypePut, commitIndex)
- if err != nil {
- return err
- }
- e.Index.Insert(key, IndexEntry{
- ValueOffset: offset,
- CommitIndex: commitIndex,
- })
-
- if e.FTIndex != nil {
- e.FTIndex.Add(key, value)
- }
-
- e.commitMu.Lock()
- if commitIndex > e.LastCommitIndex {
- e.LastCommitIndex = commitIndex
- // Update metadata on disk periodically or on every write?
- // For safety, let's update it. Since it's pwrite at offset 0, it's fast.
- e.saveMetadata()
- }
- e.commitMu.Unlock()
-
- return nil
- }
- func (e *Engine) Delete(key string, commitIndex uint64) error {
- e.writeMu.Lock()
- defer e.writeMu.Unlock()
-
- // Idempotency check
- if commitIndex > 0 && commitIndex <= e.LastCommitIndex {
- return nil
- }
- e.removeValueFromFTIndex(key)
-
- _, err := e.Storage.Append(key, "", RecordTypeDelete, commitIndex)
- if err != nil {
- return err
- }
-
- e.Index.Delete(key)
-
- e.commitMu.Lock()
- if commitIndex > e.LastCommitIndex {
- e.LastCommitIndex = commitIndex
- e.saveMetadata()
- }
- e.commitMu.Unlock()
-
- return nil
- }
- // Sync flushes underlying storage to disk
- // GetLastAppliedIndex returns the last Raft log index applied to the DB
- func (e *Engine) GetLastAppliedIndex() uint64 {
- e.commitMu.Lock()
- defer e.commitMu.Unlock()
- return e.LastCommitIndex
- }
- func (e *Engine) GetDBSize() int64 {
- e.commitMu.Lock()
- defer e.commitMu.Unlock()
-
- info, err := e.Storage.file.Stat()
- if err == nil {
- return info.Size()
- }
- return 0
- }
- func (e *Engine) Sync() error {
- e.metaFile.Sync()
- return e.Storage.Sync()
- }
- func (e *Engine) Get(key string) (string, bool) {
- entry, ok := e.Index.Get(key)
- if !ok {
- return "", false
- }
- val, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil {
- return "", false
- }
- return val, true
- }
- type QueryResult struct {
- Key string `json:"key"`
- Value string `json:"value"`
- CommitIndex uint64 `json:"commit_index"`
- }
- func WildcardMatch(str, pattern string) bool {
- s := []rune(str)
- p := []rune(pattern)
- sLen, pLen := len(s), len(p)
- i, j := 0, 0
- starIdx, matchIdx := -1, -1
- for i < sLen {
- if j < pLen && (p[j] == '?' || p[j] == s[i]) {
- i++
- j++
- } else if j < pLen && p[j] == '*' {
- starIdx = j
- matchIdx = i
- j++
- } else if starIdx != -1 {
- j = starIdx + 1
- matchIdx++
- i = matchIdx
- } else {
- return false
- }
- }
- for j < pLen && p[j] == '*' {
- j++
- }
- return j == pLen
- }
- func (e *Engine) Count(sql string) (int, error) {
- return e.execute(sql, true)
- }
- func (e *Engine) execute(sql string, countOnly bool) (int, error) {
- sql = strings.TrimSpace(sql)
-
- reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
- reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
-
- limit := -1
- offset := 0
-
- if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
- limit, _ = strconv.Atoi(match[1])
- sql = reLimit.ReplaceAllString(sql, "")
- }
- if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
- offset, _ = strconv.Atoi(match[1])
- sql = reOffset.ReplaceAllString(sql, "")
- }
- // 0. Handle Count All efficient case
- // Check specifically for "*" or simple wildcard match
- if sql == "*" || sql == "\"*\"" {
- e.Index.mu.RLock()
- count := len(e.Index.items)
- e.Index.mu.RUnlock()
- return count, nil
- }
- re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
- matches := re.FindAllStringSubmatch(sql, -1)
- if len(matches) == 0 {
- // Fallback for when regex fails but user meant "count all" or similar via other syntax
- // But strictly speaking, if not "*", it's invalid query per current parser
- return 0, fmt.Errorf("invalid query")
- }
- // Also check for 'key like "*"' pattern which is equivalent to count all
- if len(matches) == 1 && matches[0][1] == "key" && matches[0][2] == "like" && (matches[0][3] == "\"*\"" || matches[0][3] == "\"%\"") {
- e.Index.mu.RLock()
- count := len(e.Index.items)
- e.Index.mu.RUnlock()
- return count, nil
- }
- extractString := func(s string) string {
- return strings.Trim(strings.TrimSpace(s), "\"")
- }
- // 1. Optimize for Point Lookup
- for _, match := range matches {
- if match[1] == "key" && match[2] == "=" {
- targetKey := extractString(match[3])
- entry, ok := e.Index.Get(targetKey)
- if !ok {
- return 0, nil
- }
-
- needValue := false
- for _, m := range matches {
- if m[1] == "value" { needValue = true; break }
- }
-
- var val string
- if needValue {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil { return 0, nil }
- val = v
- }
-
- matchAll := true
- for _, m := range matches {
- field, op, valRaw := m[1], m[2], m[3]
- switch field {
- case "CommitIndex":
- num, _ := strconv.ParseUint(valRaw, 10, 64)
- switch op {
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
- }
- case "value":
- t := extractString(valRaw)
- switch op {
- case "=": if val != t { matchAll = false }
- case "like": if !WildcardMatch(val, t) { matchAll = false }
- }
- }
- if !matchAll { break }
- }
- if matchAll {
- return 1, nil
- }
- return 0, nil
- }
- }
- var candidates map[string]bool
- var useFTIndex bool = false
-
- // 2. Try to use FT Index (if enabled)
- if e.FTIndex != nil {
- for _, match := range matches {
- if match[1] == "value" && match[2] == "like" {
- pattern := extractString(match[3])
- clean := strings.Trim(pattern, "*")
- if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
- matches := e.FTIndex.Search(pattern)
- if matches != nil {
- currentSet := make(map[string]bool)
- for _, k := range matches {
- currentSet[k] = true
- }
- if !useFTIndex {
- candidates = currentSet
- useFTIndex = true
- } else {
- newSet := make(map[string]bool)
- for k := range candidates {
- if currentSet[k] { newSet[k] = true }
- }
- candidates = newSet
- }
- } else {
- return 0, nil
- }
- }
- }
- }
- } else {
- for _, match := range matches {
- if match[1] == "value" && match[2] == "like" {
- // Value search requested but index disabled -> return 0
- return 0, nil
- }
- }
- }
-
- var iterator func(func(string, IndexEntry) bool)
-
- if useFTIndex {
- iterator = func(cb func(string, IndexEntry) bool) {
- keys := make([]string, 0, len(candidates))
- for k := range candidates { keys = append(keys, k) }
- sort.Strings(keys)
- for _, k := range keys {
- if entry, ok := e.Index.Get(k); ok {
- if !cb(k, entry) { return }
- }
- }
- }
- } else {
- // Use FlatIndex Prefix Search
- var prefix string = ""
- var usePrefix bool = false
- for _, match := range matches {
- if match[1] == "key" && match[2] == "like" {
- pattern := extractString(match[3])
- // Flat Index supports simple prefix match
- if strings.HasSuffix(pattern, "*") {
- clean := pattern[:len(pattern)-1]
- if !strings.ContainsAny(clean, "*?") {
- prefix = clean
- usePrefix = true
- break
- }
- }
- }
- }
-
- iterator = func(cb func(string, IndexEntry) bool) {
- if usePrefix {
- e.Index.WalkPrefix(prefix, cb)
- } else {
- e.Index.WalkPrefix("", cb)
- }
- }
- }
- matchedCount := 0
-
- iterator(func(key string, entry IndexEntry) bool {
- var valStr string
- var valLoaded bool
-
- matchAll := true
- for _, match := range matches {
- field, op, valRaw := match[1], match[2], match[3]
- switch field {
- case "CommitIndex":
- num, _ := strconv.ParseUint(valRaw, 10, 64)
- switch op {
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
- }
- case "key":
- target := extractString(valRaw)
- switch op {
- case "=": if key != target { matchAll = false }
- case "like": if !WildcardMatch(key, target) { matchAll = false }
- }
- case "value":
- if e.FTIndex == nil && op == "like" {
- matchAll = false
- break
- }
- if !valLoaded {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil { matchAll = false; break }
- valStr = v
- valLoaded = true
- }
- target := extractString(valRaw)
- switch op {
- case "=": if valStr != target { matchAll = false }
- case "like": if !WildcardMatch(valStr, target) { matchAll = false }
- }
- }
- if !matchAll { break }
- }
-
- if matchAll {
- matchedCount++
- if limit > 0 && offset == 0 && matchedCount >= limit {
- return false
- }
- }
- return true
- })
- if offset > 0 {
- if offset >= matchedCount {
- return 0, nil
- }
- matchedCount -= offset
- }
- if limit >= 0 {
- if limit < matchedCount {
- matchedCount = limit
- }
- }
- return matchedCount, nil
- }
- func (e *Engine) Query(sql string) ([]QueryResult, error) {
- return e.queryInternal(sql)
- }
- func (e *Engine) queryInternal(sql string) ([]QueryResult, error) {
- sql = strings.TrimSpace(sql)
-
- reLimit := regexp.MustCompile(`(?i)\s+LIMIT\s+(\d+)`)
- reOffset := regexp.MustCompile(`(?i)\s+OFFSET\s+(\d+)`)
-
- limit := -1
- offset := 0
-
- if match := reLimit.FindStringSubmatch(sql); len(match) > 0 {
- limit, _ = strconv.Atoi(match[1])
- sql = reLimit.ReplaceAllString(sql, "")
- }
- if match := reOffset.FindStringSubmatch(sql); len(match) > 0 {
- offset, _ = strconv.Atoi(match[1])
- sql = reOffset.ReplaceAllString(sql, "")
- }
- re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
- matches := re.FindAllStringSubmatch(sql, -1)
- if len(matches) == 0 {
- return nil, fmt.Errorf("invalid query")
- }
- extractString := func(s string) string {
- return strings.Trim(strings.TrimSpace(s), "\"")
- }
- for _, match := range matches {
- if match[1] == "key" && match[2] == "=" {
- targetKey := extractString(match[3])
- entry, ok := e.Index.Get(targetKey)
- if !ok {
- return []QueryResult{}, nil
- }
-
- val, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil {
- return []QueryResult{}, nil
- }
-
- matchAll := true
- for _, m := range matches {
- field, op, valRaw := m[1], m[2], m[3]
- switch field {
- case "CommitIndex":
- num, _ := strconv.ParseUint(valRaw, 10, 64)
- switch op {
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
- }
- case "value":
- t := extractString(valRaw)
- switch op {
- case "=": if val != t { matchAll = false }
- case "like": if !WildcardMatch(val, t) { matchAll = false }
- }
- }
- if !matchAll { break }
- }
- if matchAll {
- return []QueryResult{{Key: targetKey, Value: val, CommitIndex: entry.CommitIndex}}, nil
- }
- return []QueryResult{}, nil
- }
- }
- var candidates map[string]bool
- var useFTIndex bool = false
-
- if e.FTIndex != nil {
- for _, match := range matches {
- if match[1] == "value" && match[2] == "like" {
- pattern := extractString(match[3])
- clean := strings.Trim(pattern, "*")
- if len(clean) > 0 && !strings.Contains(clean, "*") && !strings.Contains(clean, "?") {
- matches := e.FTIndex.Search(pattern)
- if matches != nil {
- currentSet := make(map[string]bool)
- for _, k := range matches {
- currentSet[k] = true
- }
-
- if !useFTIndex {
- candidates = currentSet
- useFTIndex = true
- } else {
- newSet := make(map[string]bool)
- for k := range candidates {
- if currentSet[k] {
- newSet[k] = true
- }
- }
- candidates = newSet
- }
- } else {
- return []QueryResult{}, nil
- }
- }
- }
- }
- } else {
- for _, match := range matches {
- if match[1] == "value" && match[2] == "like" {
- return []QueryResult{}, nil
- }
- }
- }
-
- var iterator func(func(string, IndexEntry) bool)
-
- if useFTIndex {
- iterator = func(cb func(string, IndexEntry) bool) {
- keys := make([]string, 0, len(candidates))
- for k := range candidates {
- keys = append(keys, k)
- }
- sort.Strings(keys)
-
- for _, k := range keys {
- if entry, ok := e.Index.Get(k); ok {
- if !cb(k, entry) {
- return
- }
- }
- }
- }
- } else {
- var prefix string = ""
- var usePrefix bool = false
- for _, match := range matches {
- if match[1] == "key" && match[2] == "like" {
- pattern := extractString(match[3])
- if strings.HasSuffix(pattern, "*") {
- clean := pattern[:len(pattern)-1]
- if !strings.ContainsAny(clean, "*?") {
- prefix = clean
- usePrefix = true
- break
- }
- }
- }
- }
-
- iterator = func(cb func(string, IndexEntry) bool) {
- if usePrefix {
- e.Index.WalkPrefix(prefix, cb)
- } else {
- e.Index.WalkPrefix("", cb)
- }
- }
- }
- var results []QueryResult
- iterator(func(key string, entry IndexEntry) bool {
- var valStr string
- var valLoaded bool
-
- matchAll := true
- for _, match := range matches {
- field, op, valRaw := match[1], match[2], match[3]
- switch field {
- case "CommitIndex":
- num, _ := strconv.ParseUint(valRaw, 10, 64)
- switch op {
- case ">": if !(entry.CommitIndex > num) { matchAll = false }
- case "<": if !(entry.CommitIndex < num) { matchAll = false }
- case ">=": if !(entry.CommitIndex >= num) { matchAll = false }
- case "<=": if !(entry.CommitIndex <= num) { matchAll = false }
- case "=": if !(entry.CommitIndex == num) { matchAll = false }
- }
- case "key":
- target := extractString(valRaw)
- switch op {
- case "=": if key != target { matchAll = false }
- case "like": if !WildcardMatch(key, target) { matchAll = false }
- }
- case "value":
- if e.FTIndex == nil && op == "like" {
- matchAll = false
- break
- }
- if !valLoaded {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil { matchAll = false; break }
- valStr = v
- valLoaded = true
- }
- target := extractString(valRaw)
- switch op {
- case "=": if valStr != target { matchAll = false }
- case "like": if !WildcardMatch(valStr, target) { matchAll = false }
- }
- }
- if !matchAll { break }
- }
-
- if matchAll {
- if !valLoaded {
- v, err := e.Storage.ReadValue(entry.ValueOffset)
- if err == nil { valStr = v }
- }
- results = append(results, QueryResult{
- Key: key,
- Value: valStr,
- CommitIndex: entry.CommitIndex,
- })
- needed := limit + offset
- if limit > 0 && len(results) >= needed {
- return false
- }
- }
- return true
- })
- if offset > 0 {
- if offset >= len(results) {
- return []QueryResult{}, nil
- }
- results = results[offset:]
- }
- if limit >= 0 {
- if limit < len(results) {
- results = results[:limit]
- }
- }
- return results, nil
- }
- // Snapshot creates a full consistency snapshot of the database.
- // Since the DB engine itself IS the state machine, this simply serializes
- // all current valid data. This snapshot can be used to restore a lagging node
- // that has fallen behind the Raft logs.
- func (e *Engine) Snapshot() ([]byte, error) {
- e.Index.mu.RLock()
- defer e.Index.mu.RUnlock()
- // Snapshot Format Version 1:
- // [Count U64] + [Record...]
- // Record: [KeyLen U16] [Key Bytes] [ValLen U32] [Val Bytes] [CommitIndex U64]
-
- // Pre-calculate size to reduce allocations
- // Estimate: Count * (AverageKeySize + AverageValSize + Overhead)
- // We start with a reasonable buffer size.
- buf := make([]byte, 0, 1024*1024)
-
- count := uint64(len(e.Index.items))
- // Write Count
- tmp := make([]byte, 8)
- binary.LittleEndian.PutUint64(tmp, count)
- buf = append(buf, tmp...)
- // Buffer for encoding headers to avoid repeated small allocations
- headerBuf := make([]byte, 2+4+8) // KeyLen + ValLen + CommitIndex
- for i := range e.Index.items {
- // Get Key
- key := e.Index.getKey(i)
- entry := e.Index.items[i].entry
- // Get Value
- // We read directly from storage. Since we hold the read lock on index,
- // the offset is valid. The storage file is append-only, so old data exists.
- val, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil {
- // If we can't read a value, it's a critical error for snapshot integrity.
- return nil, fmt.Errorf("snapshot failed reading key '%s' at offset %d: %v", key, entry.ValueOffset, err)
- }
- keyBytes := []byte(key)
- valBytes := []byte(val)
- // Encode Header
- binary.LittleEndian.PutUint16(headerBuf[0:2], uint16(len(keyBytes)))
- binary.LittleEndian.PutUint32(headerBuf[2:6], uint32(len(valBytes)))
- binary.LittleEndian.PutUint64(headerBuf[6:14], entry.CommitIndex)
- // Append to buffer
- buf = append(buf, headerBuf...)
- buf = append(buf, keyBytes...)
- buf = append(buf, valBytes...)
- }
- return buf, nil
- }
- // Restore completely replaces the database content with the provided snapshot.
- // This matches the "DB as Snapshot" design: we wipe the current state
- // and rebuild from the full image provided by the leader.
- func (e *Engine) Restore(data []byte) error {
- e.writeMu.Lock()
- defer e.writeMu.Unlock()
-
- // 1. Safety check & Parse Header
- if len(data) < 8 {
- if len(data) == 0 {
- // Empty snapshot implies empty DB.
- return e.resetToEmpty()
- }
- return fmt.Errorf("invalid snapshot data: too short")
- }
- count := binary.LittleEndian.Uint64(data[0:8])
- offset := 8
- // 2. Stop-the-World: Close current storage to safely wipe it
- e.Index.mu.Lock()
-
- if err := e.Storage.Close(); err != nil {
- e.Index.mu.Unlock()
- return fmt.Errorf("failed to close storage for restore: %v", err)
- }
-
- // 3. Truncate & Reset
- // We are replacing the entire DB state.
-
- // Reset In-Memory Index by clearing slices (keeping underlying capacity)
- // We do NOT replace the pointer, so we keep the lock valid.
- e.Index.keyBuf = e.Index.keyBuf[:0]
- e.Index.items = e.Index.items[:0]
- if e.config.EnableValueIndex {
- // Recreate FTIndex since it's a map
- e.FTIndex = NewFullTextIndex()
- }
-
- // Clear Cache
- e.Storage.cache = make(map[int64]string)
-
- // Re-open storage in Truncate mode (Wipe data file)
- f, err := os.OpenFile(e.Storage.filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
- if err != nil {
- e.Index.mu.Unlock()
- return fmt.Errorf("failed to reopen storage for restore: %v", err)
- }
- e.Storage.file = f
- e.Storage.offset = 0
-
- // We hold the lock during the entire restore process to prevent any reads/writes
- // This is acceptable as Restore is a rare, heavyweight operation.
- defer e.Index.mu.Unlock()
- maxCommitIndex := uint64(0)
- // 4. Rebuild Data from Snapshot Stream
- for i := uint64(0); i < count; i++ {
- // Read Header (14 bytes)
- if offset+14 > len(data) {
- return fmt.Errorf("restore failed: truncated header at record %d", i)
- }
-
- keyLen := int(binary.LittleEndian.Uint16(data[offset : offset+2]))
- valLen := int(binary.LittleEndian.Uint32(data[offset+2 : offset+6]))
- commitIndex := binary.LittleEndian.Uint64(data[offset+6 : offset+14])
- offset += 14
-
- // Read Key
- if offset+keyLen > len(data) {
- return fmt.Errorf("restore failed: truncated data at record %d (key)", i)
- }
- key := string(data[offset : offset+keyLen])
- offset += keyLen
-
- // Read Value
- if offset+valLen > len(data) {
- return fmt.Errorf("restore failed: truncated data at record %d (value)", i)
- }
- val := string(data[offset : offset+valLen])
- offset += valLen
-
- if commitIndex > maxCommitIndex {
- maxCommitIndex = commitIndex
- }
- // Direct Write to Storage (Internal Append)
- // We use the low-level Storage.Append directly since we already hold locks
- // and are bypassing the standard Set path.
- writeOffset, err := e.Storage.Append(key, val, RecordTypePut, commitIndex)
- if err != nil {
- return fmt.Errorf("restore failed appending record %d: %v", i, err)
- }
- // Update Memory Index
- // Accessing e.Index.items directly because we hold e.Index.mu
- // But we should use the helper to safely manage keyBuf
- // Use insertLocked to avoid deadlock (we already hold e.Index.mu)
- e.Index.insertLocked(key, IndexEntry{
- ValueOffset: writeOffset,
- CommitIndex: commitIndex,
- })
-
- // Update Full Text Index if enabled
- if e.FTIndex != nil {
- e.FTIndex.Add(key, val)
- }
- }
-
- // 5. Update Metadata
- e.commitMu.Lock()
- e.LastCommitIndex = maxCommitIndex
- e.saveMetadata()
- e.commitMu.Unlock()
-
- // 6. Force Sync
- if err := e.Sync(); err != nil {
- return fmt.Errorf("failed to sync restored data: %v", err)
- }
-
- return nil
- }
- // resetToEmpty helper to wipe the DB cleanly
- func (e *Engine) resetToEmpty() error {
- e.Index.mu.Lock()
- defer e.Index.mu.Unlock()
-
- if err := e.Storage.Close(); err != nil {
- return err
- }
-
- e.Index = NewFlatIndex()
- if e.config.EnableValueIndex {
- e.FTIndex = NewFullTextIndex()
- }
- e.Storage.cache = make(map[int64]string)
-
- f, err := os.OpenFile(e.Storage.filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
- if err != nil {
- return err
- }
- e.Storage.file = f
- e.Storage.offset = 0
-
- e.commitMu.Lock()
- e.LastCommitIndex = 0
- e.saveMetadata()
- e.commitMu.Unlock()
-
- return nil
- }
- func (e *Engine) DebugClearAuxiliary() {
- e.Storage.cacheMu.Lock()
- e.Storage.cache = make(map[int64]string)
- e.Storage.cacheMu.Unlock()
-
- if e.FTIndex != nil {
- e.FTIndex.mu.Lock()
- e.FTIndex.index = make(map[string]map[string]bool)
- e.FTIndex.mu.Unlock()
- }
- }
|