| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634 |
- package db
- import (
- "bufio"
- "encoding/binary"
- "encoding/json"
- "fmt"
- "io"
- "os"
- "regexp"
- "sort"
- "strconv"
- "strings"
- "sync"
- )
- // FreeList manages reusable disk space in memory using Best-Fit strategy.
- type FreeList struct {
- // Capacity -> Stack of Offsets
- buckets map[uint32][]int64
- // Sorted list of capacities available in buckets for fast Best-Fit lookup
- sortedCaps []uint32
- mu sync.Mutex
- }
- func NewFreeList() *FreeList {
- return &FreeList{
- buckets: make(map[uint32][]int64),
- }
- }
- // Add adds an offset to the free list for a given capacity.
- func (fl *FreeList) Add(cap uint32, offset int64) {
- fl.mu.Lock()
- defer fl.mu.Unlock()
-
- if _, exists := fl.buckets[cap]; !exists {
- fl.buckets[cap] = []int64{offset}
- // Maintain sortedCaps
- fl.sortedCaps = append(fl.sortedCaps, cap)
- sort.Slice(fl.sortedCaps, func(i, j int) bool { return fl.sortedCaps[i] < fl.sortedCaps[j] })
- } else {
- fl.buckets[cap] = append(fl.buckets[cap], offset)
- }
- }
- // Pop tries to get an available offset using Best-Fit strategy.
- // It finds the smallest available capacity >= targetCap.
- // Returns offset and the ACTUAL capacity of the slot found (which might be larger than targetCap).
- func (fl *FreeList) Pop(targetCap uint32) (int64, uint32, bool) {
- fl.mu.Lock()
- defer fl.mu.Unlock()
-
- // Binary search for smallest capacity >= targetCap
- idx := sort.Search(len(fl.sortedCaps), func(i int) bool {
- return fl.sortedCaps[i] >= targetCap
- })
-
- // If idx is out of bounds, no suitable slot found
- if idx >= len(fl.sortedCaps) {
- return 0, 0, false
- }
-
- // Found a suitable capacity bucket
- foundCap := fl.sortedCaps[idx]
- offsets := fl.buckets[foundCap]
-
- if len(offsets) == 0 {
- // This should technically not happen if we maintain sortedCaps correctly (remove empty caps)
- // But for safety, let's handle it.
- // In a rigorous impl, we would remove empty bucket from sortedCaps.
- return 0, 0, false
- }
-
- // Pop from end (Stack LIFO)
- lastIdx := len(offsets) - 1
- offset := offsets[lastIdx]
-
- // Update bucket
- if lastIdx == 0 {
- // Bucket becomes empty
- delete(fl.buckets, foundCap)
- // Remove from sortedCaps to keep search efficient
- // This is O(N) copy but N (number of distinct capacities) is usually small (< 100 for typical payload range)
- fl.sortedCaps = append(fl.sortedCaps[:idx], fl.sortedCaps[idx+1:]...)
- } else {
- fl.buckets[foundCap] = offsets[:lastIdx]
- }
-
- return offset, foundCap, true
- }
- // Storage (Table 2) manages disk storage for values.
- // It uses a slot-based format: [Flag][Capacity][Length][Data...]
- // Flag: 1 byte (0=Deleted, 1=Valid)
- // Capacity: 4 bytes (Allocated size)
- // Length: 4 bytes (Actual used size)
- // Data: Capacity bytes
- type Storage struct {
- file *os.File
- filename string
- offset int64 // Current end of file
- freeList *FreeList
- mu sync.RWMutex
- }
- const (
- FlagDeleted = 0x00
- FlagValid = 0x01
- HeaderSize = 9 // 1(Flag) + 4(Cap) + 4(Len)
- AlignSize = 16 // Align capacity to 16 bytes
- )
- func NewStorage(filename string) (*Storage, error) {
- f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
- if err != nil {
- return nil, err
- }
- st := &Storage{
- file: f,
- filename: filename,
- freeList: NewFreeList(),
- }
- // Scan file to build free list and find end offset
- if err := st.scan(); err != nil {
- f.Close()
- return nil, err
- }
- return st, nil
- }
- // scan iterates over the file to reconstruct FreeList and find EOF.
- func (s *Storage) scan() error {
- offset := int64(0)
-
- if _, err := s.file.Seek(0, 0); err != nil {
- return err
- }
-
- reader := bufio.NewReader(s.file)
-
- for {
- header := make([]byte, HeaderSize)
- n, err := io.ReadFull(reader, header)
- if err == io.EOF {
- break
- }
- if err == io.ErrUnexpectedEOF && n == 0 {
- break
- }
- if err != nil {
- return err
- }
-
- flag := header[0]
- cap := binary.LittleEndian.Uint32(header[1:])
-
- if flag == FlagDeleted {
- s.freeList.Add(cap, offset)
- }
-
- discardLen := int(cap)
- if _, err := reader.Discard(discardLen); err != nil {
- buf := make([]byte, discardLen)
- if _, err := io.ReadFull(reader, buf); err != nil {
- return err
- }
- }
-
- offset += int64(HeaderSize + discardLen)
- }
-
- s.offset = offset
- return nil
- }
- // alignCapacity calculates the capacity needed aligned to 16 bytes.
- func alignCapacity(length int) uint32 {
- if length == 0 {
- return AlignSize
- }
- cap := (length + AlignSize - 1) / AlignSize * AlignSize
- return uint32(cap)
- }
- // WriteValue writes a value to storage.
- // If oldOffset >= 0, it tries to update in-place.
- // Returns the new offset (or oldOffset if updated in-place) and error.
- func (s *Storage) WriteValue(val string, oldOffset int64) (int64, error) {
- s.mu.Lock()
- defer s.mu.Unlock()
- valLen := len(val)
- // Try in-place update first
- if oldOffset >= 0 {
- capBuf := make([]byte, 4)
- if _, err := s.file.ReadAt(capBuf, oldOffset+1); err == nil {
- oldCap := binary.LittleEndian.Uint32(capBuf)
-
- if uint32(valLen) <= oldCap {
- // Perfect, update in place!
- lenBuf := make([]byte, 4)
- binary.LittleEndian.PutUint32(lenBuf, uint32(valLen))
- if _, err := s.file.WriteAt(lenBuf, oldOffset+5); err != nil {
- return 0, err
- }
- if _, err := s.file.WriteAt([]byte(val), oldOffset+HeaderSize); err != nil {
- return 0, err
- }
- return oldOffset, nil
- }
-
- // Mark old slot as deleted and add to FreeList.
- if _, err := s.file.WriteAt([]byte{FlagDeleted}, oldOffset); err != nil {
- return 0, err
- }
- s.freeList.Add(oldCap, oldOffset)
- }
- }
- // Calculate needed capacity
- newCap := alignCapacity(valLen)
- // Try to reuse space from FreeList using BEST FIT
- if reusedOffset, actualCap, ok := s.freeList.Pop(newCap); ok {
- // Write Header + Data
- // Header: [Flag=Valid][Cap=actualCap][Len=valLen]
- // NOTE: We MUST write 'actualCap' back to the header, NOT 'newCap'.
- // The physical slot size on disk is 'actualCap', and we cannot shrink it physically
- // without moving subsequent data.
- // So we effectively waste (actualCap - newCap) bytes of padding.
-
- buf := make([]byte, HeaderSize+int(actualCap))
- buf[0] = FlagValid
- binary.LittleEndian.PutUint32(buf[1:], actualCap) // Must be actualCap
- binary.LittleEndian.PutUint32(buf[5:], uint32(valLen))
- copy(buf[HeaderSize:], []byte(val))
-
- if _, err := s.file.WriteAt(buf, reusedOffset); err != nil {
- return 0, err
- }
- return reusedOffset, nil
- }
-
- // If no reuse, Append new slot
- newOffset := s.offset
-
- totalSize := HeaderSize + int(newCap)
- buf := make([]byte, totalSize)
-
- buf[0] = FlagValid
- binary.LittleEndian.PutUint32(buf[1:], newCap)
- binary.LittleEndian.PutUint32(buf[5:], uint32(valLen))
- copy(buf[HeaderSize:], []byte(val))
-
- if _, err := s.file.WriteAt(buf, newOffset); err != nil {
- return 0, err
- }
-
- s.offset += int64(totalSize)
- return newOffset, nil
- }
- // ReadValue reads value at offset.
- func (s *Storage) ReadValue(offset int64) (string, error) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- // Read Header
- header := make([]byte, HeaderSize)
- if _, err := s.file.ReadAt(header, offset); err != nil {
- return "", err
- }
- flag := header[0]
- if flag == FlagDeleted {
- return "", fmt.Errorf("record deleted")
- }
- length := binary.LittleEndian.Uint32(header[5:])
- // Read Data
- data := make([]byte, length)
- if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
- return "", err
- }
- return string(data), nil
- }
- func (s *Storage) Close() error {
- return s.file.Close()
- }
- // StringTable (Table 2) is now just a wrapper for Storage + Key Management
- // We removed the global deduplication map.
- type StringTable struct {
- }
- // IndexEntry (Table 1)
- type IndexEntry struct {
- ValueOffset int64 // Replaces ValueID
- CommitIndex uint64
- }
- // InvertedIndex (Table 3)
- type InvertedIndex struct {
- KeyTokens map[string][]uint32
- ValueTokens map[string][]uint32
- }
- func NewInvertedIndex() *InvertedIndex {
- return &InvertedIndex{
- KeyTokens: make(map[string][]uint32),
- ValueTokens: make(map[string][]uint32),
- }
- }
- // KeyMap maintains the mapping between KeyString and an internal KeyID (uint32).
- type KeyMap struct {
- StrToID map[string]uint32
- IDToStr map[uint32]string
- NextID uint32
- }
- func NewKeyMap() *KeyMap {
- return &KeyMap{
- StrToID: make(map[string]uint32),
- IDToStr: make(map[uint32]string),
- NextID: 1,
- }
- }
- func (km *KeyMap) GetOrCreateID(key string) uint32 {
- if id, ok := km.StrToID[key]; ok {
- return id
- }
- id := km.NextID
- km.NextID++
- km.StrToID[key] = id
- km.IDToStr[id] = key
- return id
- }
- // Engine is the core storage engine.
- type Engine struct {
- mu sync.RWMutex
- // Table 1: KeyID -> Entry (ValueOffset + CommitIndex)
- Index map[uint32]IndexEntry
- // Key mapping (In-memory, rebuilt on start or snapshotted)
- Keys *KeyMap
- // Table 2: Disk Storage
- Storage *Storage
- // Table 3: Inverted Index
- SearchIndex *InvertedIndex
- LastCommitIndex uint64
- dataDir string
- }
- func NewEngine(dataDir string) (*Engine, error) {
- if err := os.MkdirAll(dataDir, 0755); err != nil {
- return nil, err
- }
- store, err := NewStorage(dataDir + "/values.data")
- if err != nil {
- return nil, err
- }
- e := &Engine{
- Index: make(map[uint32]IndexEntry),
- Keys: NewKeyMap(),
- Storage: store,
- SearchIndex: NewInvertedIndex(),
- dataDir: dataDir,
- }
- return e, nil
- }
- func (e *Engine) Close() error {
- return e.Storage.Close()
- }
- func (e *Engine) tokenizeKey(key string) []string {
- return strings.Split(key, ".")
- }
- func (e *Engine) tokenizeValue(val string) []string {
- f := func(c rune) bool {
- return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
- }
- return strings.FieldsFunc(val, f)
- }
- func (e *Engine) Set(key, value string, commitIndex uint64) error {
- e.mu.Lock()
- defer e.mu.Unlock()
- if commitIndex > e.LastCommitIndex {
- e.LastCommitIndex = commitIndex
- }
- // 1. Get KeyID
- keyID := e.Keys.GetOrCreateID(key)
- // 2. Check existing entry for update
- var oldOffset int64 = -1
- if entry, ok := e.Index[keyID]; ok {
- oldOffset = entry.ValueOffset
- }
- // 3. Write Value (In-place or Append)
- newOffset, err := e.Storage.WriteValue(value, oldOffset)
- if err != nil {
- return err
- }
- // 4. Update Index (Table 1)
- e.Index[keyID] = IndexEntry{
- ValueOffset: newOffset,
- CommitIndex: commitIndex,
- }
- // 5. Update Inverted Index (Table 3)
- for _, token := range e.tokenizeKey(key) {
- e.addToken(e.SearchIndex.KeyTokens, token, keyID)
- }
- for _, token := range e.tokenizeValue(value) {
- e.addToken(e.SearchIndex.ValueTokens, token, keyID)
- }
- return nil
- }
- func (e *Engine) addToken(index map[string][]uint32, token string, id uint32) {
- ids := index[token]
- for _, existing := range ids {
- if existing == id {
- return
- }
- }
- index[token] = append(ids, id)
- }
- func (e *Engine) Get(key string) (string, bool) {
- e.mu.RLock()
- defer e.mu.RUnlock()
- keyID, ok := e.Keys.StrToID[key]
- if !ok {
- return "", false
- }
- entry, ok := e.Index[keyID]
- if !ok {
- return "", false
- }
- val, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil {
- return "", false
- }
- return val, true
- }
- type QueryResult struct {
- Key string `json:"key"`
- Value string `json:"value"`
- CommitIndex uint64 `json:"commit_index"`
- }
- // WildcardMatch is kept as is
- func WildcardMatch(str, pattern string) bool {
- s := []rune(str)
- p := []rune(pattern)
- sLen, pLen := len(s), len(p)
- i, j := 0, 0
- starIdx, matchIdx := -1, -1
- for i < sLen {
- if j < pLen && (p[j] == '?' || p[j] == s[i]) {
- i++
- j++
- } else if j < pLen && p[j] == '*' {
- starIdx = j
- matchIdx = i
- j++
- } else if starIdx != -1 {
- j = starIdx + 1
- matchIdx++
- i = matchIdx
- } else {
- return false
- }
- }
- for j < pLen && p[j] == '*' {
- j++
- }
- return j == pLen
- }
- func (e *Engine) Query(sql string) ([]QueryResult, error) {
- e.mu.RLock()
- defer e.mu.RUnlock()
- sql = strings.TrimSpace(sql)
- re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
- matches := re.FindAllStringSubmatch(sql, -1)
- if len(matches) == 0 {
- return nil, fmt.Errorf("invalid query")
- }
- extractString := func(s string) string {
- return strings.Trim(strings.TrimSpace(s), "\"")
- }
- var results []QueryResult
- filter := func(kID uint32, entry IndexEntry) (bool, string, string) {
- keyStr, ok := e.Keys.IDToStr[kID]
- if !ok { return false, "", "" }
-
- valStr, err := e.Storage.ReadValue(entry.ValueOffset)
- if err != nil { return false, "", "" }
- for _, match := range matches {
- field := match[1]
- op := match[2]
- valRaw := match[3]
- switch field {
- case "CommitIndex":
- num, _ := strconv.ParseUint(valRaw, 10, 64)
- switch op {
- case ">":
- if !(entry.CommitIndex > num) { return false, "", "" }
- case "<":
- if !(entry.CommitIndex < num) { return false, "", "" }
- case ">=":
- if !(entry.CommitIndex >= num) { return false, "", "" }
- case "<=":
- if !(entry.CommitIndex <= num) { return false, "", "" }
- case "=":
- if !(entry.CommitIndex == num) { return false, "", "" }
- }
- case "key":
- target := extractString(valRaw)
- switch op {
- case "=":
- if keyStr != target { return false, "", "" }
- case "like":
- if !WildcardMatch(keyStr, target) { return false, "", "" }
- }
- case "value":
- target := extractString(valRaw)
- switch op {
- case "=":
- if valStr != target { return false, "", "" }
- case "like":
- if !WildcardMatch(valStr, target) { return false, "", "" }
- }
- }
- }
- return true, keyStr, valStr
- }
- for kID, entry := range e.Index {
- ok, kStr, vStr := filter(kID, entry)
- if ok {
- results = append(results, QueryResult{
- Key: kStr,
- Value: vStr,
- CommitIndex: entry.CommitIndex,
- })
- }
- }
- sort.Slice(results, func(i, j int) bool {
- return results[i].Key < results[j].Key
- })
- return results, nil
- }
- func (e *Engine) Snapshot() ([]byte, error) {
- e.mu.RLock()
- defer e.mu.RUnlock()
- data := struct {
- Index map[uint32]IndexEntry
- Keys *KeyMap
- LastCommitIndex uint64
- SearchIndex *InvertedIndex
- }{
- Index: e.Index,
- Keys: e.Keys,
- LastCommitIndex: e.LastCommitIndex,
- SearchIndex: e.SearchIndex,
- }
- return json.Marshal(data)
- }
- func (e *Engine) Restore(data []byte) error {
- e.mu.Lock()
- defer e.mu.Unlock()
- var dump struct {
- Index map[uint32]IndexEntry
- Keys *KeyMap
- LastCommitIndex uint64
- SearchIndex *InvertedIndex
- }
- if err := json.Unmarshal(data, &dump); err != nil {
- return err
- }
- e.Index = dump.Index
- e.Keys = dump.Keys
- e.LastCommitIndex = dump.LastCommitIndex
- e.SearchIndex = dump.SearchIndex
- return nil
- }
|