package raft import ( "encoding/json" "os" "testing" "time" ) // TestCompactionBasic tests basic log compaction func TestCompactionBasic(t *testing.T) { dir, err := os.MkdirTemp("", "raft-compaction-test") if err != nil { t.Fatal(err) } defer os.RemoveAll(dir) // Create storage with small memory capacity to test file reads storage, err := NewHybridStorage(dir, 200, nil) if err != nil { t.Fatal(err) } defer storage.Close() // Append 100 entries - these will stay in memory entries := make([]LogEntry, 100) for i := 0; i < 100; i++ { entries[i] = LogEntry{ Index: uint64(i + 1), Term: 1, Type: EntryNormal, Command: []byte("test"), } } if err := storage.AppendEntries(entries); err != nil { t.Fatal(err) } // Verify entries exist if storage.GetLastIndex() != 100 { t.Fatalf("expected lastIndex=100, got %d", storage.GetLastIndex()) } // Verify entry before compaction entry, err := storage.GetEntry(50) if err != nil { t.Fatalf("should be able to read entry 50 before compaction: %v", err) } if entry.Index != 50 { t.Fatalf("expected entry with index=50, got index=%d", entry.Index) } // Compact up to index 50 if err := storage.TruncateBefore(50); err != nil { t.Fatal(err) } // Verify firstIndex updated if storage.GetFirstIndex() != 50 { t.Fatalf("expected firstIndex=50, got %d", storage.GetFirstIndex()) } // Verify old entries are inaccessible _, err = storage.GetEntry(49) if err != ErrCompacted { t.Fatalf("expected ErrCompacted for index 49, got %v", err) } // Verify entries at and after compaction point are accessible // Entry 50 should still be in memory after TruncateBefore entry, err = storage.GetEntry(51) if err != nil { t.Fatalf("should be able to read entry 51: %v", err) } if entry.Index != 51 { t.Fatalf("expected index=51, got %d", entry.Index) } } // TestSnapshotSaveAndLoad tests snapshot persistence func TestSnapshotSaveAndLoad(t *testing.T) { dir, err := os.MkdirTemp("", "raft-snapshot-test") if err != nil { t.Fatal(err) } defer os.RemoveAll(dir) // Create storage storage, err := NewHybridStorage(dir, 1000, nil) if err != nil { t.Fatal(err) } // Create snapshot data snapshotData := map[string]string{ "key1": "value1", "key2": "value2", } data, _ := json.Marshal(snapshotData) // Save snapshot if err := storage.SaveSnapshot(data, 100, 5); err != nil { t.Fatal(err) } storage.Close() // Reopen storage and verify snapshot storage2, err := NewHybridStorage(dir, 1000, nil) if err != nil { t.Fatal(err) } defer storage2.Close() loadedData, lastIndex, lastTerm, err := storage2.GetSnapshot() if err != nil { t.Fatal(err) } if lastIndex != 100 { t.Fatalf("expected lastIndex=100, got %d", lastIndex) } if lastTerm != 5 { t.Fatalf("expected lastTerm=5, got %d", lastTerm) } var loadedSnapshot map[string]string if err := json.Unmarshal(loadedData, &loadedSnapshot); err != nil { t.Fatal(err) } if loadedSnapshot["key1"] != "value1" || loadedSnapshot["key2"] != "value2" { t.Fatalf("snapshot data mismatch: %v", loadedSnapshot) } } // TestCompactionWithKVServer tests compaction through KVServer func TestCompactionWithKVServer(t *testing.T) { dir, err := os.MkdirTemp("", "raft-kv-compaction-test") if err != nil { t.Fatal(err) } defer os.RemoveAll(dir) config := DefaultConfig() config.NodeID = "test-node" config.ListenAddr = "127.0.0.1:19001" config.DataDir = dir config.ClusterNodes = map[string]string{ "test-node": "127.0.0.1:19001", } // Lower threshold for testing config.SnapshotThreshold = 100 config.SnapshotMinRetention = 10 server, err := NewKVServer(config) if err != nil { t.Fatal(err) } if err := server.Start(); err != nil { t.Fatal(err) } defer server.Stop() // Wait for leader election time.Sleep(500 * time.Millisecond) // Write enough entries to trigger compaction for i := 0; i < 200; i++ { key := "key" val := "value" // We just set the same key 200 times server.FSM.Apply(mustMarshal(KVCommand{Type: KVSet, Key: key, Value: val})) } // Check that FSM has the correct value val, ok := server.FSM.Get("key") if !ok { t.Fatal("key not found") } if val != "value" { t.Fatalf("expected 'value', got '%s'", val) } } // TestDataConsistencyAfterCompaction tests that data is consistent after compaction func TestDataConsistencyAfterCompaction(t *testing.T) { dir, err := os.MkdirTemp("", "raft-consistency-test") if err != nil { t.Fatal(err) } defer os.RemoveAll(dir) // Create KV state machine fsm := NewKVStateMachine() // Apply a series of operations operations := []KVCommand{ {Type: KVSet, Key: "counter", Value: "1"}, {Type: KVSet, Key: "counter", Value: "2"}, {Type: KVSet, Key: "counter", Value: "3"}, {Type: KVSet, Key: "name", Value: "alice"}, {Type: KVSet, Key: "counter", Value: "4"}, {Type: KVDel, Key: "name"}, {Type: KVSet, Key: "counter", Value: "5"}, } for _, op := range operations { data, _ := json.Marshal(op) fsm.Apply(data) } // Take snapshot snapshot, err := fsm.Snapshot() if err != nil { t.Fatal(err) } // Create new FSM and restore from snapshot fsm2 := NewKVStateMachine() if err := fsm2.Restore(snapshot); err != nil { t.Fatal(err) } // Verify data consistency val1, ok1 := fsm.Get("counter") val2, ok2 := fsm2.Get("counter") if val1 != val2 || ok1 != ok2 { t.Fatalf("counter mismatch: original=%s(%v), restored=%s(%v)", val1, ok1, val2, ok2) } if val1 != "5" { t.Fatalf("expected counter=5, got %s", val1) } // name should not exist (was deleted) _, ok := fsm2.Get("name") if ok { t.Fatal("name should have been deleted") } } // TestCompactionDoesNotLoseData tests that no committed data is lost during compaction func TestCompactionDoesNotLoseData(t *testing.T) { dir, err := os.MkdirTemp("", "raft-no-loss-test") if err != nil { t.Fatal(err) } defer os.RemoveAll(dir) // Use larger memory capacity to keep all entries in memory storage, err := NewHybridStorage(dir, 2000, nil) if err != nil { t.Fatal(err) } defer storage.Close() // Write 1000 entries with unique values entries := make([]LogEntry, 1000) for i := 0; i < 1000; i++ { cmd := KVCommand{Type: KVSet, Key: "key", Value: string(rune('A' + i%26))} data, _ := json.Marshal(cmd) entries[i] = LogEntry{ Index: uint64(i + 1), Term: 1, Type: EntryNormal, Command: data, } } if err := storage.AppendEntries(entries); err != nil { t.Fatal(err) } // Save snapshot at index 500 fsm := NewKVStateMachine() for i := 0; i < 500; i++ { fsm.Apply(entries[i].Command) } snapshot, _ := fsm.Snapshot() if err := storage.SaveSnapshot(snapshot, 500, 1); err != nil { t.Fatal(err) } // Compact log if err := storage.TruncateBefore(500); err != nil { t.Fatal(err) } // Verify entries 501-1000 still accessible (500 is the compaction point) for i := 501; i <= 1000; i++ { entry, err := storage.GetEntry(uint64(i)) if err != nil { t.Fatalf("entry %d should be accessible: %v", i, err) } if entry.Index != uint64(i) { t.Fatalf("entry index mismatch at %d", i) } } // Verify entries before 500 return ErrCompacted for i := 1; i < 500; i++ { _, err := storage.GetEntry(uint64(i)) if err != ErrCompacted { t.Fatalf("entry %d should return ErrCompacted, got: %v", i, err) } } } func mustMarshal(v interface{}) []byte { data, _ := json.Marshal(v) return data } // TestDynamicCompactionThreshold tests that compaction threshold increases dynamically func TestDynamicCompactionThreshold(t *testing.T) { dir, err := os.MkdirTemp("", "raft-dynamic-threshold-test") if err != nil { t.Fatal(err) } defer os.RemoveAll(dir) config := DefaultConfig() config.NodeID = "test-node" config.ListenAddr = "127.0.0.1:19002" config.DataDir = dir config.ClusterNodes = map[string]string{ "test-node": "127.0.0.1:19002", } // Set low thresholds for testing config.SnapshotThreshold = 100 config.SnapshotMinRetention = 10 config.Logger = nil // Suppress logs server, err := NewKVServer(config) if err != nil { t.Fatal(err) } if err := server.Start(); err != nil { t.Fatal(err) } defer server.Stop() // Wait for leader election time.Sleep(500 * time.Millisecond) // Get initial threshold (should be 0, meaning use SnapshotThreshold) initialThreshold := server.Raft.nextCompactionThreshold if initialThreshold != 0 { t.Fatalf("expected initial threshold to be 0, got %d", initialThreshold) } // Propose enough entries to trigger first compaction // We need > 100 entries to trigger compaction for i := 0; i < 150; i++ { cmd := KVCommand{Type: KVSet, Key: "key", Value: "value"} data, _ := json.Marshal(cmd) server.Raft.Propose(data) } // Wait for apply and potential compaction time.Sleep(500 * time.Millisecond) // Check that dynamic threshold was set after compaction // After compaction with 150 entries and minRetention=10, we should have ~10 entries // So next threshold should be around 10 * 1.5 = 15, but at least 100 (initial threshold) newThreshold := server.Raft.nextCompactionThreshold // The threshold should now be set (> 0) or remain at initial if compaction happened // Key point: it should be >= SnapshotThreshold to prevent thrashing if newThreshold > 0 && newThreshold < config.SnapshotThreshold { t.Fatalf("dynamic threshold %d should not be less than initial threshold %d", newThreshold, config.SnapshotThreshold) } t.Logf("Dynamic threshold after first compaction: %d (initial: %d)", newThreshold, config.SnapshotThreshold) }