compaction_test.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. package raft
  2. import (
  3. "encoding/json"
  4. "os"
  5. "testing"
  6. "time"
  7. )
  8. // TestCompactionBasic tests basic log compaction
  9. func TestCompactionBasic(t *testing.T) {
  10. dir, err := os.MkdirTemp("", "raft-compaction-test")
  11. if err != nil {
  12. t.Fatal(err)
  13. }
  14. defer os.RemoveAll(dir)
  15. // Create storage with small memory capacity to test file reads
  16. storage, err := NewHybridStorage(dir, 200, nil)
  17. if err != nil {
  18. t.Fatal(err)
  19. }
  20. defer storage.Close()
  21. // Append 100 entries - these will stay in memory
  22. entries := make([]LogEntry, 100)
  23. for i := 0; i < 100; i++ {
  24. entries[i] = LogEntry{
  25. Index: uint64(i + 1),
  26. Term: 1,
  27. Type: EntryNormal,
  28. Command: []byte("test"),
  29. }
  30. }
  31. if err := storage.AppendEntries(entries); err != nil {
  32. t.Fatal(err)
  33. }
  34. // Verify entries exist
  35. if storage.GetLastIndex() != 100 {
  36. t.Fatalf("expected lastIndex=100, got %d", storage.GetLastIndex())
  37. }
  38. // Verify entry before compaction
  39. entry, err := storage.GetEntry(50)
  40. if err != nil {
  41. t.Fatalf("should be able to read entry 50 before compaction: %v", err)
  42. }
  43. if entry.Index != 50 {
  44. t.Fatalf("expected entry with index=50, got index=%d", entry.Index)
  45. }
  46. // Compact up to index 50
  47. if err := storage.TruncateBefore(50); err != nil {
  48. t.Fatal(err)
  49. }
  50. // Verify firstIndex updated
  51. if storage.GetFirstIndex() != 50 {
  52. t.Fatalf("expected firstIndex=50, got %d", storage.GetFirstIndex())
  53. }
  54. // Verify old entries are inaccessible
  55. _, err = storage.GetEntry(49)
  56. if err != ErrCompacted {
  57. t.Fatalf("expected ErrCompacted for index 49, got %v", err)
  58. }
  59. // Verify entries at and after compaction point are accessible
  60. // Entry 50 should still be in memory after TruncateBefore
  61. entry, err = storage.GetEntry(51)
  62. if err != nil {
  63. t.Fatalf("should be able to read entry 51: %v", err)
  64. }
  65. if entry.Index != 51 {
  66. t.Fatalf("expected index=51, got %d", entry.Index)
  67. }
  68. }
  69. // TestSnapshotSaveAndLoad tests snapshot persistence
  70. func TestSnapshotSaveAndLoad(t *testing.T) {
  71. dir, err := os.MkdirTemp("", "raft-snapshot-test")
  72. if err != nil {
  73. t.Fatal(err)
  74. }
  75. defer os.RemoveAll(dir)
  76. // Create storage
  77. storage, err := NewHybridStorage(dir, 1000, nil)
  78. if err != nil {
  79. t.Fatal(err)
  80. }
  81. // Create snapshot data
  82. snapshotData := map[string]string{
  83. "key1": "value1",
  84. "key2": "value2",
  85. }
  86. data, _ := json.Marshal(snapshotData)
  87. // Save snapshot
  88. if err := storage.SaveSnapshot(data, 100, 5); err != nil {
  89. t.Fatal(err)
  90. }
  91. storage.Close()
  92. // Reopen storage and verify snapshot
  93. storage2, err := NewHybridStorage(dir, 1000, nil)
  94. if err != nil {
  95. t.Fatal(err)
  96. }
  97. defer storage2.Close()
  98. loadedData, lastIndex, lastTerm, err := storage2.GetSnapshot()
  99. if err != nil {
  100. t.Fatal(err)
  101. }
  102. if lastIndex != 100 {
  103. t.Fatalf("expected lastIndex=100, got %d", lastIndex)
  104. }
  105. if lastTerm != 5 {
  106. t.Fatalf("expected lastTerm=5, got %d", lastTerm)
  107. }
  108. var loadedSnapshot map[string]string
  109. if err := json.Unmarshal(loadedData, &loadedSnapshot); err != nil {
  110. t.Fatal(err)
  111. }
  112. if loadedSnapshot["key1"] != "value1" || loadedSnapshot["key2"] != "value2" {
  113. t.Fatalf("snapshot data mismatch: %v", loadedSnapshot)
  114. }
  115. }
  116. // TestCompactionWithKVServer tests compaction through KVServer
  117. func TestCompactionWithKVServer(t *testing.T) {
  118. dir, err := os.MkdirTemp("", "raft-kv-compaction-test")
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. defer os.RemoveAll(dir)
  123. config := DefaultConfig()
  124. config.NodeID = "test-node"
  125. config.ListenAddr = "127.0.0.1:19001"
  126. config.DataDir = dir
  127. config.ClusterNodes = map[string]string{
  128. "test-node": "127.0.0.1:19001",
  129. }
  130. // Lower threshold for testing
  131. config.SnapshotThreshold = 100
  132. config.SnapshotMinRetention = 10
  133. server, err := NewKVServer(config)
  134. if err != nil {
  135. t.Fatal(err)
  136. }
  137. if err := server.Start(); err != nil {
  138. t.Fatal(err)
  139. }
  140. defer server.Stop()
  141. // Wait for leader election
  142. time.Sleep(500 * time.Millisecond)
  143. // Write enough entries to trigger compaction
  144. for i := 0; i < 200; i++ {
  145. key := "key"
  146. val := "value"
  147. // We just set the same key 200 times
  148. if err := server.Set(key, val); err != nil {
  149. t.Fatalf("failed to set key: %v", err)
  150. }
  151. }
  152. // Check that FSM has the correct value
  153. val, ok := server.Get("key")
  154. if !ok {
  155. t.Fatal("key not found")
  156. }
  157. if val != "value" {
  158. t.Fatalf("expected 'value', got '%s'", val)
  159. }
  160. }
  161. // TestDataConsistencyAfterCompaction tests that data is consistent after compaction
  162. func TestDataConsistencyAfterCompaction(t *testing.T) {
  163. dir, err := os.MkdirTemp("", "raft-consistency-test")
  164. if err != nil {
  165. t.Fatal(err)
  166. }
  167. defer os.RemoveAll(dir)
  168. // Create KV state machine
  169. fsm := NewKVStateMachine()
  170. // Apply a series of operations
  171. operations := []KVCommand{
  172. {Type: KVSet, Key: "counter", Value: "1"},
  173. {Type: KVSet, Key: "counter", Value: "2"},
  174. {Type: KVSet, Key: "counter", Value: "3"},
  175. {Type: KVSet, Key: "name", Value: "alice"},
  176. {Type: KVSet, Key: "counter", Value: "4"},
  177. {Type: KVDel, Key: "name"},
  178. {Type: KVSet, Key: "counter", Value: "5"},
  179. }
  180. for _, op := range operations {
  181. data, _ := json.Marshal(op)
  182. fsm.Apply(data)
  183. }
  184. // Take snapshot
  185. snapshot, err := fsm.Snapshot()
  186. if err != nil {
  187. t.Fatal(err)
  188. }
  189. // Create new FSM and restore from snapshot
  190. fsm2 := NewKVStateMachine()
  191. if err := fsm2.Restore(snapshot); err != nil {
  192. t.Fatal(err)
  193. }
  194. // Verify data consistency
  195. val1, ok1 := fsm.Get("counter")
  196. val2, ok2 := fsm2.Get("counter")
  197. if val1 != val2 || ok1 != ok2 {
  198. t.Fatalf("counter mismatch: original=%s(%v), restored=%s(%v)", val1, ok1, val2, ok2)
  199. }
  200. if val1 != "5" {
  201. t.Fatalf("expected counter=5, got %s", val1)
  202. }
  203. // name should not exist (was deleted)
  204. _, ok := fsm2.Get("name")
  205. if ok {
  206. t.Fatal("name should have been deleted")
  207. }
  208. }
  209. // TestCompactionDoesNotLoseData tests that no committed data is lost during compaction
  210. func TestCompactionDoesNotLoseData(t *testing.T) {
  211. dir, err := os.MkdirTemp("", "raft-no-loss-test")
  212. if err != nil {
  213. t.Fatal(err)
  214. }
  215. defer os.RemoveAll(dir)
  216. // Use larger memory capacity to keep all entries in memory
  217. storage, err := NewHybridStorage(dir, 2000, nil)
  218. if err != nil {
  219. t.Fatal(err)
  220. }
  221. defer storage.Close()
  222. // Write 1000 entries with unique values
  223. entries := make([]LogEntry, 1000)
  224. for i := 0; i < 1000; i++ {
  225. cmd := KVCommand{Type: KVSet, Key: "key", Value: string(rune('A' + i%26))}
  226. data, _ := json.Marshal(cmd)
  227. entries[i] = LogEntry{
  228. Index: uint64(i + 1),
  229. Term: 1,
  230. Type: EntryNormal,
  231. Command: data,
  232. }
  233. }
  234. if err := storage.AppendEntries(entries); err != nil {
  235. t.Fatal(err)
  236. }
  237. // Save snapshot at index 500
  238. fsm := NewKVStateMachine()
  239. for i := 0; i < 500; i++ {
  240. fsm.Apply(entries[i].Command)
  241. }
  242. snapshot, _ := fsm.Snapshot()
  243. if err := storage.SaveSnapshot(snapshot, 500, 1); err != nil {
  244. t.Fatal(err)
  245. }
  246. // Compact log
  247. if err := storage.TruncateBefore(500); err != nil {
  248. t.Fatal(err)
  249. }
  250. // Verify entries 501-1000 still accessible (500 is the compaction point)
  251. for i := 501; i <= 1000; i++ {
  252. entry, err := storage.GetEntry(uint64(i))
  253. if err != nil {
  254. t.Fatalf("entry %d should be accessible: %v", i, err)
  255. }
  256. if entry.Index != uint64(i) {
  257. t.Fatalf("entry index mismatch at %d", i)
  258. }
  259. }
  260. // Verify entries before 500 return ErrCompacted
  261. for i := 1; i < 500; i++ {
  262. _, err := storage.GetEntry(uint64(i))
  263. if err != ErrCompacted {
  264. t.Fatalf("entry %d should return ErrCompacted, got: %v", i, err)
  265. }
  266. }
  267. }
  268. func mustMarshal(v interface{}) []byte {
  269. data, _ := json.Marshal(v)
  270. return data
  271. }
  272. // TestDynamicCompactionThreshold tests that compaction threshold increases dynamically
  273. func TestDynamicCompactionThreshold(t *testing.T) {
  274. dir, err := os.MkdirTemp("", "raft-dynamic-threshold-test")
  275. if err != nil {
  276. t.Fatal(err)
  277. }
  278. defer os.RemoveAll(dir)
  279. config := DefaultConfig()
  280. config.NodeID = "test-node"
  281. config.ListenAddr = "127.0.0.1:19002"
  282. config.DataDir = dir
  283. config.ClusterNodes = map[string]string{
  284. "test-node": "127.0.0.1:19002",
  285. }
  286. // Set low thresholds for testing
  287. config.SnapshotThreshold = 100
  288. config.SnapshotMinRetention = 10
  289. config.Logger = nil // Suppress logs
  290. server, err := NewKVServer(config)
  291. if err != nil {
  292. t.Fatal(err)
  293. }
  294. if err := server.Start(); err != nil {
  295. t.Fatal(err)
  296. }
  297. defer server.Stop()
  298. // Wait for leader election
  299. time.Sleep(500 * time.Millisecond)
  300. // Get initial threshold (should be 0, meaning use SnapshotThreshold)
  301. initialThreshold := server.Raft.nextCompactionThreshold
  302. if initialThreshold != 0 {
  303. t.Fatalf("expected initial threshold to be 0, got %d", initialThreshold)
  304. }
  305. // Propose enough entries to trigger first compaction
  306. // We need > 100 entries to trigger compaction
  307. for i := 0; i < 150; i++ {
  308. cmd := KVCommand{Type: KVSet, Key: "key", Value: "value"}
  309. data, _ := json.Marshal(cmd)
  310. server.Raft.Propose(data)
  311. }
  312. // Wait for apply and potential compaction
  313. time.Sleep(500 * time.Millisecond)
  314. // Check that dynamic threshold was set after compaction
  315. // After compaction with 150 entries and minRetention=10, we should have ~10 entries
  316. // So next threshold should be around 10 * 1.5 = 15, but at least 100 (initial threshold)
  317. newThreshold := server.Raft.nextCompactionThreshold
  318. // The threshold should now be set (> 0) or remain at initial if compaction happened
  319. // Key point: it should be >= SnapshotThreshold to prevent thrashing
  320. if newThreshold > 0 && newThreshold < config.SnapshotThreshold {
  321. t.Fatalf("dynamic threshold %d should not be less than initial threshold %d",
  322. newThreshold, config.SnapshotThreshold)
  323. }
  324. t.Logf("Dynamic threshold after first compaction: %d (initial: %d)",
  325. newThreshold, config.SnapshotThreshold)
  326. }