engine.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634
  1. package db
  2. import (
  3. "bufio"
  4. "encoding/binary"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "regexp"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. )
  15. // FreeList manages reusable disk space in memory using Best-Fit strategy.
  16. type FreeList struct {
  17. // Capacity -> Stack of Offsets
  18. buckets map[uint32][]int64
  19. // Sorted list of capacities available in buckets for fast Best-Fit lookup
  20. sortedCaps []uint32
  21. mu sync.Mutex
  22. }
  23. func NewFreeList() *FreeList {
  24. return &FreeList{
  25. buckets: make(map[uint32][]int64),
  26. }
  27. }
  28. // Add adds an offset to the free list for a given capacity.
  29. func (fl *FreeList) Add(cap uint32, offset int64) {
  30. fl.mu.Lock()
  31. defer fl.mu.Unlock()
  32. if _, exists := fl.buckets[cap]; !exists {
  33. fl.buckets[cap] = []int64{offset}
  34. // Maintain sortedCaps
  35. fl.sortedCaps = append(fl.sortedCaps, cap)
  36. sort.Slice(fl.sortedCaps, func(i, j int) bool { return fl.sortedCaps[i] < fl.sortedCaps[j] })
  37. } else {
  38. fl.buckets[cap] = append(fl.buckets[cap], offset)
  39. }
  40. }
  41. // Pop tries to get an available offset using Best-Fit strategy.
  42. // It finds the smallest available capacity >= targetCap.
  43. // Returns offset and the ACTUAL capacity of the slot found (which might be larger than targetCap).
  44. func (fl *FreeList) Pop(targetCap uint32) (int64, uint32, bool) {
  45. fl.mu.Lock()
  46. defer fl.mu.Unlock()
  47. // Binary search for smallest capacity >= targetCap
  48. idx := sort.Search(len(fl.sortedCaps), func(i int) bool {
  49. return fl.sortedCaps[i] >= targetCap
  50. })
  51. // If idx is out of bounds, no suitable slot found
  52. if idx >= len(fl.sortedCaps) {
  53. return 0, 0, false
  54. }
  55. // Found a suitable capacity bucket
  56. foundCap := fl.sortedCaps[idx]
  57. offsets := fl.buckets[foundCap]
  58. if len(offsets) == 0 {
  59. // This should technically not happen if we maintain sortedCaps correctly (remove empty caps)
  60. // But for safety, let's handle it.
  61. // In a rigorous impl, we would remove empty bucket from sortedCaps.
  62. return 0, 0, false
  63. }
  64. // Pop from end (Stack LIFO)
  65. lastIdx := len(offsets) - 1
  66. offset := offsets[lastIdx]
  67. // Update bucket
  68. if lastIdx == 0 {
  69. // Bucket becomes empty
  70. delete(fl.buckets, foundCap)
  71. // Remove from sortedCaps to keep search efficient
  72. // This is O(N) copy but N (number of distinct capacities) is usually small (< 100 for typical payload range)
  73. fl.sortedCaps = append(fl.sortedCaps[:idx], fl.sortedCaps[idx+1:]...)
  74. } else {
  75. fl.buckets[foundCap] = offsets[:lastIdx]
  76. }
  77. return offset, foundCap, true
  78. }
  79. // Storage (Table 2) manages disk storage for values.
  80. // It uses a slot-based format: [Flag][Capacity][Length][Data...]
  81. // Flag: 1 byte (0=Deleted, 1=Valid)
  82. // Capacity: 4 bytes (Allocated size)
  83. // Length: 4 bytes (Actual used size)
  84. // Data: Capacity bytes
  85. type Storage struct {
  86. file *os.File
  87. filename string
  88. offset int64 // Current end of file
  89. freeList *FreeList
  90. mu sync.RWMutex
  91. }
  92. const (
  93. FlagDeleted = 0x00
  94. FlagValid = 0x01
  95. HeaderSize = 9 // 1(Flag) + 4(Cap) + 4(Len)
  96. AlignSize = 16 // Align capacity to 16 bytes
  97. )
  98. func NewStorage(filename string) (*Storage, error) {
  99. f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
  100. if err != nil {
  101. return nil, err
  102. }
  103. st := &Storage{
  104. file: f,
  105. filename: filename,
  106. freeList: NewFreeList(),
  107. }
  108. // Scan file to build free list and find end offset
  109. if err := st.scan(); err != nil {
  110. f.Close()
  111. return nil, err
  112. }
  113. return st, nil
  114. }
  115. // scan iterates over the file to reconstruct FreeList and find EOF.
  116. func (s *Storage) scan() error {
  117. offset := int64(0)
  118. if _, err := s.file.Seek(0, 0); err != nil {
  119. return err
  120. }
  121. reader := bufio.NewReader(s.file)
  122. for {
  123. header := make([]byte, HeaderSize)
  124. n, err := io.ReadFull(reader, header)
  125. if err == io.EOF {
  126. break
  127. }
  128. if err == io.ErrUnexpectedEOF && n == 0 {
  129. break
  130. }
  131. if err != nil {
  132. return err
  133. }
  134. flag := header[0]
  135. cap := binary.LittleEndian.Uint32(header[1:])
  136. if flag == FlagDeleted {
  137. s.freeList.Add(cap, offset)
  138. }
  139. discardLen := int(cap)
  140. if _, err := reader.Discard(discardLen); err != nil {
  141. buf := make([]byte, discardLen)
  142. if _, err := io.ReadFull(reader, buf); err != nil {
  143. return err
  144. }
  145. }
  146. offset += int64(HeaderSize + discardLen)
  147. }
  148. s.offset = offset
  149. return nil
  150. }
  151. // alignCapacity calculates the capacity needed aligned to 16 bytes.
  152. func alignCapacity(length int) uint32 {
  153. if length == 0 {
  154. return AlignSize
  155. }
  156. cap := (length + AlignSize - 1) / AlignSize * AlignSize
  157. return uint32(cap)
  158. }
  159. // WriteValue writes a value to storage.
  160. // If oldOffset >= 0, it tries to update in-place.
  161. // Returns the new offset (or oldOffset if updated in-place) and error.
  162. func (s *Storage) WriteValue(val string, oldOffset int64) (int64, error) {
  163. s.mu.Lock()
  164. defer s.mu.Unlock()
  165. valLen := len(val)
  166. // Try in-place update first
  167. if oldOffset >= 0 {
  168. capBuf := make([]byte, 4)
  169. if _, err := s.file.ReadAt(capBuf, oldOffset+1); err == nil {
  170. oldCap := binary.LittleEndian.Uint32(capBuf)
  171. if uint32(valLen) <= oldCap {
  172. // Perfect, update in place!
  173. lenBuf := make([]byte, 4)
  174. binary.LittleEndian.PutUint32(lenBuf, uint32(valLen))
  175. if _, err := s.file.WriteAt(lenBuf, oldOffset+5); err != nil {
  176. return 0, err
  177. }
  178. if _, err := s.file.WriteAt([]byte(val), oldOffset+HeaderSize); err != nil {
  179. return 0, err
  180. }
  181. return oldOffset, nil
  182. }
  183. // Mark old slot as deleted and add to FreeList.
  184. if _, err := s.file.WriteAt([]byte{FlagDeleted}, oldOffset); err != nil {
  185. return 0, err
  186. }
  187. s.freeList.Add(oldCap, oldOffset)
  188. }
  189. }
  190. // Calculate needed capacity
  191. newCap := alignCapacity(valLen)
  192. // Try to reuse space from FreeList using BEST FIT
  193. if reusedOffset, actualCap, ok := s.freeList.Pop(newCap); ok {
  194. // Write Header + Data
  195. // Header: [Flag=Valid][Cap=actualCap][Len=valLen]
  196. // NOTE: We MUST write 'actualCap' back to the header, NOT 'newCap'.
  197. // The physical slot size on disk is 'actualCap', and we cannot shrink it physically
  198. // without moving subsequent data.
  199. // So we effectively waste (actualCap - newCap) bytes of padding.
  200. buf := make([]byte, HeaderSize+int(actualCap))
  201. buf[0] = FlagValid
  202. binary.LittleEndian.PutUint32(buf[1:], actualCap) // Must be actualCap
  203. binary.LittleEndian.PutUint32(buf[5:], uint32(valLen))
  204. copy(buf[HeaderSize:], []byte(val))
  205. if _, err := s.file.WriteAt(buf, reusedOffset); err != nil {
  206. return 0, err
  207. }
  208. return reusedOffset, nil
  209. }
  210. // If no reuse, Append new slot
  211. newOffset := s.offset
  212. totalSize := HeaderSize + int(newCap)
  213. buf := make([]byte, totalSize)
  214. buf[0] = FlagValid
  215. binary.LittleEndian.PutUint32(buf[1:], newCap)
  216. binary.LittleEndian.PutUint32(buf[5:], uint32(valLen))
  217. copy(buf[HeaderSize:], []byte(val))
  218. if _, err := s.file.WriteAt(buf, newOffset); err != nil {
  219. return 0, err
  220. }
  221. s.offset += int64(totalSize)
  222. return newOffset, nil
  223. }
  224. // ReadValue reads value at offset.
  225. func (s *Storage) ReadValue(offset int64) (string, error) {
  226. s.mu.RLock()
  227. defer s.mu.RUnlock()
  228. // Read Header
  229. header := make([]byte, HeaderSize)
  230. if _, err := s.file.ReadAt(header, offset); err != nil {
  231. return "", err
  232. }
  233. flag := header[0]
  234. if flag == FlagDeleted {
  235. return "", fmt.Errorf("record deleted")
  236. }
  237. length := binary.LittleEndian.Uint32(header[5:])
  238. // Read Data
  239. data := make([]byte, length)
  240. if _, err := s.file.ReadAt(data, offset+HeaderSize); err != nil {
  241. return "", err
  242. }
  243. return string(data), nil
  244. }
  245. func (s *Storage) Close() error {
  246. return s.file.Close()
  247. }
  248. // StringTable (Table 2) is now just a wrapper for Storage + Key Management
  249. // We removed the global deduplication map.
  250. type StringTable struct {
  251. }
  252. // IndexEntry (Table 1)
  253. type IndexEntry struct {
  254. ValueOffset int64 // Replaces ValueID
  255. CommitIndex uint64
  256. }
  257. // InvertedIndex (Table 3)
  258. type InvertedIndex struct {
  259. KeyTokens map[string][]uint32
  260. ValueTokens map[string][]uint32
  261. }
  262. func NewInvertedIndex() *InvertedIndex {
  263. return &InvertedIndex{
  264. KeyTokens: make(map[string][]uint32),
  265. ValueTokens: make(map[string][]uint32),
  266. }
  267. }
  268. // KeyMap maintains the mapping between KeyString and an internal KeyID (uint32).
  269. type KeyMap struct {
  270. StrToID map[string]uint32
  271. IDToStr map[uint32]string
  272. NextID uint32
  273. }
  274. func NewKeyMap() *KeyMap {
  275. return &KeyMap{
  276. StrToID: make(map[string]uint32),
  277. IDToStr: make(map[uint32]string),
  278. NextID: 1,
  279. }
  280. }
  281. func (km *KeyMap) GetOrCreateID(key string) uint32 {
  282. if id, ok := km.StrToID[key]; ok {
  283. return id
  284. }
  285. id := km.NextID
  286. km.NextID++
  287. km.StrToID[key] = id
  288. km.IDToStr[id] = key
  289. return id
  290. }
  291. // Engine is the core storage engine.
  292. type Engine struct {
  293. mu sync.RWMutex
  294. // Table 1: KeyID -> Entry (ValueOffset + CommitIndex)
  295. Index map[uint32]IndexEntry
  296. // Key mapping (In-memory, rebuilt on start or snapshotted)
  297. Keys *KeyMap
  298. // Table 2: Disk Storage
  299. Storage *Storage
  300. // Table 3: Inverted Index
  301. SearchIndex *InvertedIndex
  302. LastCommitIndex uint64
  303. dataDir string
  304. }
  305. func NewEngine(dataDir string) (*Engine, error) {
  306. if err := os.MkdirAll(dataDir, 0755); err != nil {
  307. return nil, err
  308. }
  309. store, err := NewStorage(dataDir + "/values.data")
  310. if err != nil {
  311. return nil, err
  312. }
  313. e := &Engine{
  314. Index: make(map[uint32]IndexEntry),
  315. Keys: NewKeyMap(),
  316. Storage: store,
  317. SearchIndex: NewInvertedIndex(),
  318. dataDir: dataDir,
  319. }
  320. return e, nil
  321. }
  322. func (e *Engine) Close() error {
  323. return e.Storage.Close()
  324. }
  325. func (e *Engine) tokenizeKey(key string) []string {
  326. return strings.Split(key, ".")
  327. }
  328. func (e *Engine) tokenizeValue(val string) []string {
  329. f := func(c rune) bool {
  330. return !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9'))
  331. }
  332. return strings.FieldsFunc(val, f)
  333. }
  334. func (e *Engine) Set(key, value string, commitIndex uint64) error {
  335. e.mu.Lock()
  336. defer e.mu.Unlock()
  337. if commitIndex > e.LastCommitIndex {
  338. e.LastCommitIndex = commitIndex
  339. }
  340. // 1. Get KeyID
  341. keyID := e.Keys.GetOrCreateID(key)
  342. // 2. Check existing entry for update
  343. var oldOffset int64 = -1
  344. if entry, ok := e.Index[keyID]; ok {
  345. oldOffset = entry.ValueOffset
  346. }
  347. // 3. Write Value (In-place or Append)
  348. newOffset, err := e.Storage.WriteValue(value, oldOffset)
  349. if err != nil {
  350. return err
  351. }
  352. // 4. Update Index (Table 1)
  353. e.Index[keyID] = IndexEntry{
  354. ValueOffset: newOffset,
  355. CommitIndex: commitIndex,
  356. }
  357. // 5. Update Inverted Index (Table 3)
  358. for _, token := range e.tokenizeKey(key) {
  359. e.addToken(e.SearchIndex.KeyTokens, token, keyID)
  360. }
  361. for _, token := range e.tokenizeValue(value) {
  362. e.addToken(e.SearchIndex.ValueTokens, token, keyID)
  363. }
  364. return nil
  365. }
  366. func (e *Engine) addToken(index map[string][]uint32, token string, id uint32) {
  367. ids := index[token]
  368. for _, existing := range ids {
  369. if existing == id {
  370. return
  371. }
  372. }
  373. index[token] = append(ids, id)
  374. }
  375. func (e *Engine) Get(key string) (string, bool) {
  376. e.mu.RLock()
  377. defer e.mu.RUnlock()
  378. keyID, ok := e.Keys.StrToID[key]
  379. if !ok {
  380. return "", false
  381. }
  382. entry, ok := e.Index[keyID]
  383. if !ok {
  384. return "", false
  385. }
  386. val, err := e.Storage.ReadValue(entry.ValueOffset)
  387. if err != nil {
  388. return "", false
  389. }
  390. return val, true
  391. }
  392. type QueryResult struct {
  393. Key string `json:"key"`
  394. Value string `json:"value"`
  395. CommitIndex uint64 `json:"commit_index"`
  396. }
  397. // WildcardMatch is kept as is
  398. func WildcardMatch(str, pattern string) bool {
  399. s := []rune(str)
  400. p := []rune(pattern)
  401. sLen, pLen := len(s), len(p)
  402. i, j := 0, 0
  403. starIdx, matchIdx := -1, -1
  404. for i < sLen {
  405. if j < pLen && (p[j] == '?' || p[j] == s[i]) {
  406. i++
  407. j++
  408. } else if j < pLen && p[j] == '*' {
  409. starIdx = j
  410. matchIdx = i
  411. j++
  412. } else if starIdx != -1 {
  413. j = starIdx + 1
  414. matchIdx++
  415. i = matchIdx
  416. } else {
  417. return false
  418. }
  419. }
  420. for j < pLen && p[j] == '*' {
  421. j++
  422. }
  423. return j == pLen
  424. }
  425. func (e *Engine) Query(sql string) ([]QueryResult, error) {
  426. e.mu.RLock()
  427. defer e.mu.RUnlock()
  428. sql = strings.TrimSpace(sql)
  429. re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
  430. matches := re.FindAllStringSubmatch(sql, -1)
  431. if len(matches) == 0 {
  432. return nil, fmt.Errorf("invalid query")
  433. }
  434. extractString := func(s string) string {
  435. return strings.Trim(strings.TrimSpace(s), "\"")
  436. }
  437. var results []QueryResult
  438. filter := func(kID uint32, entry IndexEntry) (bool, string, string) {
  439. keyStr, ok := e.Keys.IDToStr[kID]
  440. if !ok { return false, "", "" }
  441. valStr, err := e.Storage.ReadValue(entry.ValueOffset)
  442. if err != nil { return false, "", "" }
  443. for _, match := range matches {
  444. field := match[1]
  445. op := match[2]
  446. valRaw := match[3]
  447. switch field {
  448. case "CommitIndex":
  449. num, _ := strconv.ParseUint(valRaw, 10, 64)
  450. switch op {
  451. case ">":
  452. if !(entry.CommitIndex > num) { return false, "", "" }
  453. case "<":
  454. if !(entry.CommitIndex < num) { return false, "", "" }
  455. case ">=":
  456. if !(entry.CommitIndex >= num) { return false, "", "" }
  457. case "<=":
  458. if !(entry.CommitIndex <= num) { return false, "", "" }
  459. case "=":
  460. if !(entry.CommitIndex == num) { return false, "", "" }
  461. }
  462. case "key":
  463. target := extractString(valRaw)
  464. switch op {
  465. case "=":
  466. if keyStr != target { return false, "", "" }
  467. case "like":
  468. if !WildcardMatch(keyStr, target) { return false, "", "" }
  469. }
  470. case "value":
  471. target := extractString(valRaw)
  472. switch op {
  473. case "=":
  474. if valStr != target { return false, "", "" }
  475. case "like":
  476. if !WildcardMatch(valStr, target) { return false, "", "" }
  477. }
  478. }
  479. }
  480. return true, keyStr, valStr
  481. }
  482. for kID, entry := range e.Index {
  483. ok, kStr, vStr := filter(kID, entry)
  484. if ok {
  485. results = append(results, QueryResult{
  486. Key: kStr,
  487. Value: vStr,
  488. CommitIndex: entry.CommitIndex,
  489. })
  490. }
  491. }
  492. sort.Slice(results, func(i, j int) bool {
  493. return results[i].Key < results[j].Key
  494. })
  495. return results, nil
  496. }
  497. func (e *Engine) Snapshot() ([]byte, error) {
  498. e.mu.RLock()
  499. defer e.mu.RUnlock()
  500. data := struct {
  501. Index map[uint32]IndexEntry
  502. Keys *KeyMap
  503. LastCommitIndex uint64
  504. SearchIndex *InvertedIndex
  505. }{
  506. Index: e.Index,
  507. Keys: e.Keys,
  508. LastCommitIndex: e.LastCommitIndex,
  509. SearchIndex: e.SearchIndex,
  510. }
  511. return json.Marshal(data)
  512. }
  513. func (e *Engine) Restore(data []byte) error {
  514. e.mu.Lock()
  515. defer e.mu.Unlock()
  516. var dump struct {
  517. Index map[uint32]IndexEntry
  518. Keys *KeyMap
  519. LastCommitIndex uint64
  520. SearchIndex *InvertedIndex
  521. }
  522. if err := json.Unmarshal(data, &dump); err != nil {
  523. return err
  524. }
  525. e.Index = dump.Index
  526. e.Keys = dump.Keys
  527. e.LastCommitIndex = dump.LastCommitIndex
  528. e.SearchIndex = dump.SearchIndex
  529. return nil
  530. }