compaction_test.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. package raft
  2. import (
  3. "encoding/json"
  4. "os"
  5. "testing"
  6. "time"
  7. )
  8. // TestCompactionBasic tests basic log compaction
  9. func TestCompactionBasic(t *testing.T) {
  10. dir, err := os.MkdirTemp("", "raft-compaction-test")
  11. if err != nil {
  12. t.Fatal(err)
  13. }
  14. defer os.RemoveAll(dir)
  15. // Create storage with small memory capacity to test file reads
  16. storage, err := NewHybridStorage(dir, 200, nil)
  17. if err != nil {
  18. t.Fatal(err)
  19. }
  20. defer storage.Close()
  21. // Append 100 entries - these will stay in memory
  22. entries := make([]LogEntry, 100)
  23. for i := 0; i < 100; i++ {
  24. entries[i] = LogEntry{
  25. Index: uint64(i + 1),
  26. Term: 1,
  27. Type: EntryNormal,
  28. Command: []byte("test"),
  29. }
  30. }
  31. if err := storage.AppendEntries(entries); err != nil {
  32. t.Fatal(err)
  33. }
  34. // Verify entries exist
  35. if storage.GetLastIndex() != 100 {
  36. t.Fatalf("expected lastIndex=100, got %d", storage.GetLastIndex())
  37. }
  38. // Verify entry before compaction
  39. entry, err := storage.GetEntry(50)
  40. if err != nil {
  41. t.Fatalf("should be able to read entry 50 before compaction: %v", err)
  42. }
  43. if entry.Index != 50 {
  44. t.Fatalf("expected entry with index=50, got index=%d", entry.Index)
  45. }
  46. // Compact up to index 50
  47. if err := storage.TruncateBefore(50); err != nil {
  48. t.Fatal(err)
  49. }
  50. // Verify firstIndex updated
  51. if storage.GetFirstIndex() != 50 {
  52. t.Fatalf("expected firstIndex=50, got %d", storage.GetFirstIndex())
  53. }
  54. // Verify old entries are inaccessible
  55. _, err = storage.GetEntry(49)
  56. if err != ErrCompacted {
  57. t.Fatalf("expected ErrCompacted for index 49, got %v", err)
  58. }
  59. // Verify entries at and after compaction point are accessible
  60. // Entry 50 should still be in memory after TruncateBefore
  61. entry, err = storage.GetEntry(51)
  62. if err != nil {
  63. t.Fatalf("should be able to read entry 51: %v", err)
  64. }
  65. if entry.Index != 51 {
  66. t.Fatalf("expected index=51, got %d", entry.Index)
  67. }
  68. }
  69. // TestSnapshotSaveAndLoad tests snapshot persistence
  70. func TestSnapshotSaveAndLoad(t *testing.T) {
  71. dir, err := os.MkdirTemp("", "raft-snapshot-test")
  72. if err != nil {
  73. t.Fatal(err)
  74. }
  75. defer os.RemoveAll(dir)
  76. // Create storage
  77. storage, err := NewHybridStorage(dir, 1000, nil)
  78. if err != nil {
  79. t.Fatal(err)
  80. }
  81. // Create snapshot data
  82. snapshotData := map[string]string{
  83. "key1": "value1",
  84. "key2": "value2",
  85. }
  86. data, _ := json.Marshal(snapshotData)
  87. // Save snapshot
  88. if err := storage.SaveSnapshot(data, 100, 5); err != nil {
  89. t.Fatal(err)
  90. }
  91. storage.Close()
  92. // Reopen storage and verify snapshot
  93. storage2, err := NewHybridStorage(dir, 1000, nil)
  94. if err != nil {
  95. t.Fatal(err)
  96. }
  97. defer storage2.Close()
  98. loadedData, lastIndex, lastTerm, err := storage2.GetSnapshot()
  99. if err != nil {
  100. t.Fatal(err)
  101. }
  102. if lastIndex != 100 {
  103. t.Fatalf("expected lastIndex=100, got %d", lastIndex)
  104. }
  105. if lastTerm != 5 {
  106. t.Fatalf("expected lastTerm=5, got %d", lastTerm)
  107. }
  108. var loadedSnapshot map[string]string
  109. if err := json.Unmarshal(loadedData, &loadedSnapshot); err != nil {
  110. t.Fatal(err)
  111. }
  112. if loadedSnapshot["key1"] != "value1" || loadedSnapshot["key2"] != "value2" {
  113. t.Fatalf("snapshot data mismatch: %v", loadedSnapshot)
  114. }
  115. }
  116. // TestCompactionWithKVServer tests compaction through KVServer
  117. func TestCompactionWithKVServer(t *testing.T) {
  118. dir, err := os.MkdirTemp("", "raft-kv-compaction-test")
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. defer os.RemoveAll(dir)
  123. config := DefaultConfig()
  124. config.NodeID = "test-node"
  125. config.ListenAddr = "127.0.0.1:19001"
  126. config.DataDir = dir
  127. config.ClusterNodes = map[string]string{
  128. "test-node": "127.0.0.1:19001",
  129. }
  130. // Lower threshold for testing
  131. config.SnapshotThreshold = 100
  132. config.SnapshotMinRetention = 10
  133. server, err := NewKVServer(config)
  134. if err != nil {
  135. t.Fatal(err)
  136. }
  137. if err := server.Start(); err != nil {
  138. t.Fatal(err)
  139. }
  140. defer server.Stop()
  141. // Wait for leader election
  142. time.Sleep(500 * time.Millisecond)
  143. // Write enough entries to trigger compaction
  144. for i := 0; i < 200; i++ {
  145. key := "key"
  146. val := "value"
  147. // We just set the same key 200 times
  148. server.FSM.Apply(mustMarshal(KVCommand{Type: KVSet, Key: key, Value: val}))
  149. }
  150. // Check that FSM has the correct value
  151. val, ok := server.FSM.Get("key")
  152. if !ok {
  153. t.Fatal("key not found")
  154. }
  155. if val != "value" {
  156. t.Fatalf("expected 'value', got '%s'", val)
  157. }
  158. }
  159. // TestDataConsistencyAfterCompaction tests that data is consistent after compaction
  160. func TestDataConsistencyAfterCompaction(t *testing.T) {
  161. dir, err := os.MkdirTemp("", "raft-consistency-test")
  162. if err != nil {
  163. t.Fatal(err)
  164. }
  165. defer os.RemoveAll(dir)
  166. // Create KV state machine
  167. fsm := NewKVStateMachine()
  168. // Apply a series of operations
  169. operations := []KVCommand{
  170. {Type: KVSet, Key: "counter", Value: "1"},
  171. {Type: KVSet, Key: "counter", Value: "2"},
  172. {Type: KVSet, Key: "counter", Value: "3"},
  173. {Type: KVSet, Key: "name", Value: "alice"},
  174. {Type: KVSet, Key: "counter", Value: "4"},
  175. {Type: KVDel, Key: "name"},
  176. {Type: KVSet, Key: "counter", Value: "5"},
  177. }
  178. for _, op := range operations {
  179. data, _ := json.Marshal(op)
  180. fsm.Apply(data)
  181. }
  182. // Take snapshot
  183. snapshot, err := fsm.Snapshot()
  184. if err != nil {
  185. t.Fatal(err)
  186. }
  187. // Create new FSM and restore from snapshot
  188. fsm2 := NewKVStateMachine()
  189. if err := fsm2.Restore(snapshot); err != nil {
  190. t.Fatal(err)
  191. }
  192. // Verify data consistency
  193. val1, ok1 := fsm.Get("counter")
  194. val2, ok2 := fsm2.Get("counter")
  195. if val1 != val2 || ok1 != ok2 {
  196. t.Fatalf("counter mismatch: original=%s(%v), restored=%s(%v)", val1, ok1, val2, ok2)
  197. }
  198. if val1 != "5" {
  199. t.Fatalf("expected counter=5, got %s", val1)
  200. }
  201. // name should not exist (was deleted)
  202. _, ok := fsm2.Get("name")
  203. if ok {
  204. t.Fatal("name should have been deleted")
  205. }
  206. }
  207. // TestCompactionDoesNotLoseData tests that no committed data is lost during compaction
  208. func TestCompactionDoesNotLoseData(t *testing.T) {
  209. dir, err := os.MkdirTemp("", "raft-no-loss-test")
  210. if err != nil {
  211. t.Fatal(err)
  212. }
  213. defer os.RemoveAll(dir)
  214. // Use larger memory capacity to keep all entries in memory
  215. storage, err := NewHybridStorage(dir, 2000, nil)
  216. if err != nil {
  217. t.Fatal(err)
  218. }
  219. defer storage.Close()
  220. // Write 1000 entries with unique values
  221. entries := make([]LogEntry, 1000)
  222. for i := 0; i < 1000; i++ {
  223. cmd := KVCommand{Type: KVSet, Key: "key", Value: string(rune('A' + i%26))}
  224. data, _ := json.Marshal(cmd)
  225. entries[i] = LogEntry{
  226. Index: uint64(i + 1),
  227. Term: 1,
  228. Type: EntryNormal,
  229. Command: data,
  230. }
  231. }
  232. if err := storage.AppendEntries(entries); err != nil {
  233. t.Fatal(err)
  234. }
  235. // Save snapshot at index 500
  236. fsm := NewKVStateMachine()
  237. for i := 0; i < 500; i++ {
  238. fsm.Apply(entries[i].Command)
  239. }
  240. snapshot, _ := fsm.Snapshot()
  241. if err := storage.SaveSnapshot(snapshot, 500, 1); err != nil {
  242. t.Fatal(err)
  243. }
  244. // Compact log
  245. if err := storage.TruncateBefore(500); err != nil {
  246. t.Fatal(err)
  247. }
  248. // Verify entries 501-1000 still accessible (500 is the compaction point)
  249. for i := 501; i <= 1000; i++ {
  250. entry, err := storage.GetEntry(uint64(i))
  251. if err != nil {
  252. t.Fatalf("entry %d should be accessible: %v", i, err)
  253. }
  254. if entry.Index != uint64(i) {
  255. t.Fatalf("entry index mismatch at %d", i)
  256. }
  257. }
  258. // Verify entries before 500 return ErrCompacted
  259. for i := 1; i < 500; i++ {
  260. _, err := storage.GetEntry(uint64(i))
  261. if err != ErrCompacted {
  262. t.Fatalf("entry %d should return ErrCompacted, got: %v", i, err)
  263. }
  264. }
  265. }
  266. func mustMarshal(v interface{}) []byte {
  267. data, _ := json.Marshal(v)
  268. return data
  269. }
  270. // TestDynamicCompactionThreshold tests that compaction threshold increases dynamically
  271. func TestDynamicCompactionThreshold(t *testing.T) {
  272. dir, err := os.MkdirTemp("", "raft-dynamic-threshold-test")
  273. if err != nil {
  274. t.Fatal(err)
  275. }
  276. defer os.RemoveAll(dir)
  277. config := DefaultConfig()
  278. config.NodeID = "test-node"
  279. config.ListenAddr = "127.0.0.1:19002"
  280. config.DataDir = dir
  281. config.ClusterNodes = map[string]string{
  282. "test-node": "127.0.0.1:19002",
  283. }
  284. // Set low thresholds for testing
  285. config.SnapshotThreshold = 100
  286. config.SnapshotMinRetention = 10
  287. config.Logger = nil // Suppress logs
  288. server, err := NewKVServer(config)
  289. if err != nil {
  290. t.Fatal(err)
  291. }
  292. if err := server.Start(); err != nil {
  293. t.Fatal(err)
  294. }
  295. defer server.Stop()
  296. // Wait for leader election
  297. time.Sleep(500 * time.Millisecond)
  298. // Get initial threshold (should be 0, meaning use SnapshotThreshold)
  299. initialThreshold := server.Raft.nextCompactionThreshold
  300. if initialThreshold != 0 {
  301. t.Fatalf("expected initial threshold to be 0, got %d", initialThreshold)
  302. }
  303. // Propose enough entries to trigger first compaction
  304. // We need > 100 entries to trigger compaction
  305. for i := 0; i < 150; i++ {
  306. cmd := KVCommand{Type: KVSet, Key: "key", Value: "value"}
  307. data, _ := json.Marshal(cmd)
  308. server.Raft.Propose(data)
  309. }
  310. // Wait for apply and potential compaction
  311. time.Sleep(500 * time.Millisecond)
  312. // Check that dynamic threshold was set after compaction
  313. // After compaction with 150 entries and minRetention=10, we should have ~10 entries
  314. // So next threshold should be around 10 * 1.5 = 15, but at least 100 (initial threshold)
  315. newThreshold := server.Raft.nextCompactionThreshold
  316. // The threshold should now be set (> 0) or remain at initial if compaction happened
  317. // Key point: it should be >= SnapshotThreshold to prevent thrashing
  318. if newThreshold > 0 && newThreshold < config.SnapshotThreshold {
  319. t.Fatalf("dynamic threshold %d should not be less than initial threshold %d",
  320. newThreshold, config.SnapshotThreshold)
  321. }
  322. t.Logf("Dynamic threshold after first compaction: %d (initial: %d)",
  323. newThreshold, config.SnapshotThreshold)
  324. }