package db import ( "bufio" "encoding/binary" "encoding/json" "fmt" "io" "os" "regexp" "sort" "strconv" "strings" "sync" ) // FreeList manages reusable disk space in memory using Best-Fit strategy. type FreeList struct { // Capacity -> Stack of Offsets buckets map[uint32][]int64 // Sorted list of capacities available in buckets for fast Best-Fit lookup sortedCaps []uint32 mu sync.Mutex } func NewFreeList() *FreeList { return &FreeList{ buckets: make(map[uint32][]int64), } } // Add adds an offset to the free list for a given capacity. func (fl *FreeList) Add(cap uint32, offset int64) { fl.mu.Lock() defer fl.mu.Unlock() if _, exists := fl.buckets[cap]; !exists { fl.buckets[cap] = []int64{offset} // Maintain sortedCaps fl.sortedCaps = append(fl.sortedCaps, cap) sort.Slice(fl.sortedCaps, func(i, j int) bool { return fl.sortedCaps[i] < fl.sortedCaps[j] }) } else { fl.buckets[cap] = append(fl.buckets[cap], offset) } } // Pop tries to get an available offset using Best-Fit strategy. // It finds the smallest available capacity >= targetCap. // Returns offset and the ACTUAL capacity of the slot found (which might be larger than targetCap). func (fl *FreeList) Pop(targetCap uint32) (int64, uint32, bool) { fl.mu.Lock() defer fl.mu.Unlock() // Binary search for smallest capacity >= targetCap idx := sort.Search(len(fl.sortedCaps), func(i int) bool { return fl.sortedCaps[i] >= targetCap }) // If idx is out of bounds, no suitable slot found if idx >= len(fl.sortedCaps) { return 0, 0, false } // Found a suitable capacity bucket foundCap := fl.sortedCaps[idx] offsets := fl.buckets[foundCap] if len(offsets) == 0 { // This should technically not happen if we maintain sortedCaps correctly (remove empty caps) // But for safety, let's handle it. // In a rigorous impl, we would remove empty bucket from sortedCaps. return 0, 0, false } // Pop from end (Stack LIFO) lastIdx := len(offsets) - 1 offset := offsets[lastIdx] // Update bucket if lastIdx == 0 { // Bucket becomes empty delete(fl.buckets, foundCap) // Remove from sortedCaps to keep search efficient // This is O(N) copy but N (number of distinct capacities) is usually small (< 100 for typical payload range) fl.sortedCaps = append(fl.sortedCaps[:idx], fl.sortedCaps[idx+1:]...) } else { fl.buckets[foundCap] = offsets[:lastIdx] } return offset, foundCap, true } // Storage (Table 2) manages disk storage for values. // It uses a slot-based format: [Flag][Capacity][Length][Data...] // Flag: 1 byte (0=Deleted, 1=Valid) // Capacity: 4 bytes (Allocated size) // Length: 4 bytes (Actual used size) // Data: Capacity bytes type Storage struct { file *os.File filename string offset int64 // Current end of file freeList *FreeList mu sync.RWMutex } const ( FlagDeleted = 0x00 FlagValid = 0x01 HeaderSize = 9 // 1(Flag) + 4(Cap) + 4(Len) AlignSize = 16 // Align capacity to 16 bytes ) func NewStorage(filename string) (*Storage, error) { f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return nil, err } st := &Storage{ file: f, filename: filename, freeList: NewFreeList(), } // Scan file to build free list and find end offset if err := st.scan(); err != nil { f.Close() return nil, err } return st, nil } // scan iterates over the file to reconstruct FreeList and find EOF. func (s *Storage) scan() error { offset := int64(0) if _, err := s.file.Seek(0, 0); err != nil { return err } reader := bufio.NewReader(s.file) for { header := make([]byte, HeaderSize) n, err := io.ReadFull(reader, header) if err == io.EOF { break } if err == io.ErrUnexpectedEOF && n == 0 { break } if err != nil { return err } flag := header[0] cap := binary.LittleEndian.Uint32(header[1:]) if flag == FlagDeleted { s.freeList.Add(cap, offset) } discardLen := int(cap) if _, err := reader.Discard(discardLen); err != nil { buf := make([]byte, discardLen) if _, err := io.ReadFull(reader, buf); err != nil { return err } } offset += int64(HeaderSize + discardLen) } s.offset = offset return nil } // alignCapacity calculates the capacity needed aligned to 16 bytes. func alignCapacity(length int) uint32 { if length == 0 { return AlignSize } cap := (length + AlignSize - 1) / AlignSize * AlignSize return uint32(cap) } // WriteValue writes a value to storage. // If oldOffset >= 0, it tries to update in-place. // Returns the new offset (or oldOffset if updated in-place) and error. func (s *Storage) WriteValue(val string, oldOffset int64) (int64, error) { s.mu.Lock() defer s.mu.Unlock() valLen := len(val) // Try in-place update first if oldOffset >= 0 { capBuf := make([]byte, 4) if _, err := s.file.ReadAt(capBuf, oldOffset+1); err == nil { oldCap := binary.LittleEndian.Uint32(capBuf) if uint32(valLen) <= oldCap { // Perfect, update in place! lenBuf := make([]byte, 4) binary.LittleEndian.PutUint32(lenBuf, uint32(valLen)) if _, err := s.file.WriteAt(lenBuf, oldOffset+5); err != nil { return 0, err } if _, err := s.file.WriteAt([]byte(val), oldOffset+HeaderSize); err != nil { return 0, err } return oldOffset, nil } // Mark old slot as deleted and add to FreeList. if _, err := s.file.WriteAt([]byte{FlagDeleted}, oldOffset); err != nil { return 0, err } s.freeList.Add(oldCap, oldOffset) } } // Calculate needed capacity newCap := alignCapacity(valLen) // Try to reuse space from FreeList using BEST FIT if reusedOffset, actualCap, ok := s.freeList.Pop(newCap); ok { // Write Header + Data // Header: [Flag=Valid][Cap=actualCap][Len=valLen] // NOTE: We MUST write 'actualCap' back to the header, NOT 'newCap'. // The physical slot size on disk is 'actualCap', and we cannot shrink it physically // without moving subsequent data. // So we effectively waste (actualCap - newCap) bytes of padding. buf := make([]byte, HeaderSize+int(actualCap)) buf[0] = FlagValid binary.LittleEndian.PutUint32(buf[1:], actualCap) // Must be actualCap binary.LittleEndian.PutUint32(buf[5:], uint32(valLen)) copy(buf[HeaderSize:], []byte(val)) if _, err := s.file.WriteAt(buf, reusedOffset); err != nil { return 0, err } return reusedOffset, nil } // If no reuse, Append new slot newOffset := s.offset totalSize := HeaderSize + int(newCap) buf := make([]byte, totalSize) buf[0] = FlagValid binary.LittleEndian.PutUint32(buf[1:], newCap) binary.LittleEndian.PutUint32(buf[5:], uint32(valLen)) copy(buf[HeaderSize:], []byte(val)) if _, err := s.file.WriteAt(buf, newOffset); err != nil { return 0, err } s.offset += int64(totalSize) return newOffset, nil } // ReadValue reads value at offset. func (s *Storage) ReadValue(offset int64) (string, error) { s.mu.RLock() defer s.mu.RUnlock() // Read Header header := make([]byte, HeaderSize) if _, err := s.file.ReadAt(header, offset); err != nil { return "", err } flag := header[0] if flag == FlagDeleted { return "", fmt.Errorf("record deleted") } length := binary.LittleEndian.Uint32(header[5:]) // Read Data data := make([]byte, length) if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil { return "", err } return string(data), nil } func (s *Storage) Close() error { return s.file.Close() } // StringTable (Table 2) is now just a wrapper for Storage + Key Management // We removed the global deduplication map. type StringTable struct { } // IndexEntry (Table 1) type IndexEntry struct { ValueOffset int64 // Replaces ValueID CommitIndex uint64 } // InvertedIndex (Table 3) type InvertedIndex struct { KeyTokens map[string][]uint32 ValueTokens map[string][]uint32 } func NewInvertedIndex() *InvertedIndex { return &InvertedIndex{ KeyTokens: make(map[string][]uint32), ValueTokens: make(map[string][]uint32), } } // KeyMap maintains the mapping between KeyString and an internal KeyID (uint32). type KeyMap struct { StrToID map[string]uint32 IDToStr map[uint32]string NextID uint32 } func NewKeyMap() *KeyMap { return &KeyMap{ StrToID: make(map[string]uint32), IDToStr: make(map[uint32]string), NextID: 1, } } func (km *KeyMap) GetOrCreateID(key string) uint32 { if id, ok := km.StrToID[key]; ok { return id } id := km.NextID km.NextID++ km.StrToID[key] = id km.IDToStr[id] = key return id } // Engine is the core storage engine. type Engine struct { mu sync.RWMutex // Table 1: KeyID -> Entry (ValueOffset + CommitIndex) Index map[uint32]IndexEntry // Key mapping (In-memory, rebuilt on start or snapshotted) Keys *KeyMap // Table 2: Disk Storage Storage *Storage // Table 3: Inverted Index SearchIndex *InvertedIndex LastCommitIndex uint64 dataDir string } func NewEngine(dataDir string) (*Engine, error) { if err := os.MkdirAll(dataDir, 0755); err != nil { return nil, err } store, err := NewStorage(dataDir + "/values.data") if err != nil { return nil, err } e := &Engine{ Index: make(map[uint32]IndexEntry), Keys: NewKeyMap(), Storage: store, SearchIndex: NewInvertedIndex(), dataDir: dataDir, } return e, nil } func (e *Engine) Close() error { return e.Storage.Close() } func (e *Engine) tokenizeKey(key string) []string { return strings.Split(key, ".") } func (e *Engine) tokenizeValue(val string) []string { f := func(c rune) bool { return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9')) } return strings.FieldsFunc(val, f) } func (e *Engine) Set(key, value string, commitIndex uint64) error { e.mu.Lock() defer e.mu.Unlock() if commitIndex > e.LastCommitIndex { e.LastCommitIndex = commitIndex } // 1. Get KeyID keyID := e.Keys.GetOrCreateID(key) // 2. Check existing entry for update var oldOffset int64 = -1 if entry, ok := e.Index[keyID]; ok { oldOffset = entry.ValueOffset } // 3. Write Value (In-place or Append) newOffset, err := e.Storage.WriteValue(value, oldOffset) if err != nil { return err } // 4. Update Index (Table 1) e.Index[keyID] = IndexEntry{ ValueOffset: newOffset, CommitIndex: commitIndex, } // 5. Update Inverted Index (Table 3) for _, token := range e.tokenizeKey(key) { e.addToken(e.SearchIndex.KeyTokens, token, keyID) } for _, token := range e.tokenizeValue(value) { e.addToken(e.SearchIndex.ValueTokens, token, keyID) } return nil } func (e *Engine) addToken(index map[string][]uint32, token string, id uint32) { ids := index[token] for _, existing := range ids { if existing == id { return } } index[token] = append(ids, id) } func (e *Engine) Get(key string) (string, bool) { e.mu.RLock() defer e.mu.RUnlock() keyID, ok := e.Keys.StrToID[key] if !ok { return "", false } entry, ok := e.Index[keyID] if !ok { return "", false } val, err := e.Storage.ReadValue(entry.ValueOffset) if err != nil { return "", false } return val, true } type QueryResult struct { Key string `json:"key"` Value string `json:"value"` CommitIndex uint64 `json:"commit_index"` } // WildcardMatch is kept as is func WildcardMatch(str, pattern string) bool { s := []rune(str) p := []rune(pattern) sLen, pLen := len(s), len(p) i, j := 0, 0 starIdx, matchIdx := -1, -1 for i < sLen { if j < pLen && (p[j] == '?' || p[j] == s[i]) { i++ j++ } else if j < pLen && p[j] == '*' { starIdx = j matchIdx = i j++ } else if starIdx != -1 { j = starIdx + 1 matchIdx++ i = matchIdx } else { return false } } for j < pLen && p[j] == '*' { j++ } return j == pLen } func (e *Engine) Query(sql string) ([]QueryResult, error) { e.mu.RLock() defer e.mu.RUnlock() sql = strings.TrimSpace(sql) re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`) matches := re.FindAllStringSubmatch(sql, -1) if len(matches) == 0 { return nil, fmt.Errorf("invalid query") } extractString := func(s string) string { return strings.Trim(strings.TrimSpace(s), "\"") } var results []QueryResult filter := func(kID uint32, entry IndexEntry) (bool, string, string) { keyStr, ok := e.Keys.IDToStr[kID] if !ok { return false, "", "" } valStr, err := e.Storage.ReadValue(entry.ValueOffset) if err != nil { return false, "", "" } for _, match := range matches { field := match[1] op := match[2] valRaw := match[3] switch field { case "CommitIndex": num, _ := strconv.ParseUint(valRaw, 10, 64) switch op { case ">": if !(entry.CommitIndex > num) { return false, "", "" } case "<": if !(entry.CommitIndex < num) { return false, "", "" } case ">=": if !(entry.CommitIndex >= num) { return false, "", "" } case "<=": if !(entry.CommitIndex <= num) { return false, "", "" } case "=": if !(entry.CommitIndex == num) { return false, "", "" } } case "key": target := extractString(valRaw) switch op { case "=": if keyStr != target { return false, "", "" } case "like": if !WildcardMatch(keyStr, target) { return false, "", "" } } case "value": target := extractString(valRaw) switch op { case "=": if valStr != target { return false, "", "" } case "like": if !WildcardMatch(valStr, target) { return false, "", "" } } } } return true, keyStr, valStr } for kID, entry := range e.Index { ok, kStr, vStr := filter(kID, entry) if ok { results = append(results, QueryResult{ Key: kStr, Value: vStr, CommitIndex: entry.CommitIndex, }) } } sort.Slice(results, func(i, j int) bool { return results[i].Key < results[j].Key }) return results, nil } func (e *Engine) Snapshot() ([]byte, error) { e.mu.RLock() defer e.mu.RUnlock() data := struct { Index map[uint32]IndexEntry Keys *KeyMap LastCommitIndex uint64 SearchIndex *InvertedIndex }{ Index: e.Index, Keys: e.Keys, LastCommitIndex: e.LastCommitIndex, SearchIndex: e.SearchIndex, } return json.Marshal(data) } func (e *Engine) Restore(data []byte) error { e.mu.Lock() defer e.mu.Unlock() var dump struct { Index map[uint32]IndexEntry Keys *KeyMap LastCommitIndex uint64 SearchIndex *InvertedIndex } if err := json.Unmarshal(data, &dump); err != nil { return err } e.Index = dump.Index e.Keys = dump.Keys e.LastCommitIndex = dump.LastCommitIndex e.SearchIndex = dump.SearchIndex return nil }