| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- package raft
- import (
- "encoding/json"
- "os"
- "testing"
- "time"
- )
- // TestCompactionBasic tests basic log compaction
- func TestCompactionBasic(t *testing.T) {
- dir, err := os.MkdirTemp("", "raft-compaction-test")
- if err != nil {
- t.Fatal(err)
- }
- defer os.RemoveAll(dir)
- // Create storage with small memory capacity to test file reads
- storage, err := NewHybridStorage(dir, 200, nil)
- if err != nil {
- t.Fatal(err)
- }
- defer storage.Close()
- // Append 100 entries - these will stay in memory
- entries := make([]LogEntry, 100)
- for i := 0; i < 100; i++ {
- entries[i] = LogEntry{
- Index: uint64(i + 1),
- Term: 1,
- Type: EntryNormal,
- Command: []byte("test"),
- }
- }
- if err := storage.AppendEntries(entries); err != nil {
- t.Fatal(err)
- }
- // Verify entries exist
- if storage.GetLastIndex() != 100 {
- t.Fatalf("expected lastIndex=100, got %d", storage.GetLastIndex())
- }
- // Verify entry before compaction
- entry, err := storage.GetEntry(50)
- if err != nil {
- t.Fatalf("should be able to read entry 50 before compaction: %v", err)
- }
- if entry.Index != 50 {
- t.Fatalf("expected entry with index=50, got index=%d", entry.Index)
- }
- // Compact up to index 50
- if err := storage.TruncateBefore(50); err != nil {
- t.Fatal(err)
- }
- // Verify firstIndex updated
- if storage.GetFirstIndex() != 50 {
- t.Fatalf("expected firstIndex=50, got %d", storage.GetFirstIndex())
- }
- // Verify old entries are inaccessible
- _, err = storage.GetEntry(49)
- if err != ErrCompacted {
- t.Fatalf("expected ErrCompacted for index 49, got %v", err)
- }
- // Verify entries at and after compaction point are accessible
- // Entry 50 should still be in memory after TruncateBefore
- entry, err = storage.GetEntry(51)
- if err != nil {
- t.Fatalf("should be able to read entry 51: %v", err)
- }
- if entry.Index != 51 {
- t.Fatalf("expected index=51, got %d", entry.Index)
- }
- }
- // TestSnapshotSaveAndLoad tests snapshot persistence
- func TestSnapshotSaveAndLoad(t *testing.T) {
- dir, err := os.MkdirTemp("", "raft-snapshot-test")
- if err != nil {
- t.Fatal(err)
- }
- defer os.RemoveAll(dir)
- // Create storage
- storage, err := NewHybridStorage(dir, 1000, nil)
- if err != nil {
- t.Fatal(err)
- }
- // Create snapshot data
- snapshotData := map[string]string{
- "key1": "value1",
- "key2": "value2",
- }
- data, _ := json.Marshal(snapshotData)
- // Save snapshot
- if err := storage.SaveSnapshot(data, 100, 5); err != nil {
- t.Fatal(err)
- }
- storage.Close()
- // Reopen storage and verify snapshot
- storage2, err := NewHybridStorage(dir, 1000, nil)
- if err != nil {
- t.Fatal(err)
- }
- defer storage2.Close()
- loadedData, lastIndex, lastTerm, err := storage2.GetSnapshot()
- if err != nil {
- t.Fatal(err)
- }
- if lastIndex != 100 {
- t.Fatalf("expected lastIndex=100, got %d", lastIndex)
- }
- if lastTerm != 5 {
- t.Fatalf("expected lastTerm=5, got %d", lastTerm)
- }
- var loadedSnapshot map[string]string
- if err := json.Unmarshal(loadedData, &loadedSnapshot); err != nil {
- t.Fatal(err)
- }
- if loadedSnapshot["key1"] != "value1" || loadedSnapshot["key2"] != "value2" {
- t.Fatalf("snapshot data mismatch: %v", loadedSnapshot)
- }
- }
- // TestCompactionWithKVServer tests compaction through KVServer
- func TestCompactionWithKVServer(t *testing.T) {
- dir, err := os.MkdirTemp("", "raft-kv-compaction-test")
- if err != nil {
- t.Fatal(err)
- }
- defer os.RemoveAll(dir)
- config := DefaultConfig()
- config.NodeID = "test-node"
- config.ListenAddr = "127.0.0.1:19001"
- config.DataDir = dir
- config.ClusterNodes = map[string]string{
- "test-node": "127.0.0.1:19001",
- }
- // Lower threshold for testing
- config.SnapshotThreshold = 100
- config.SnapshotMinRetention = 10
- server, err := NewKVServer(config)
- if err != nil {
- t.Fatal(err)
- }
- if err := server.Start(); err != nil {
- t.Fatal(err)
- }
- defer server.Stop()
- // Wait for leader election
- time.Sleep(500 * time.Millisecond)
- // Write enough entries to trigger compaction
- for i := 0; i < 200; i++ {
- key := "key"
- val := "value"
- // We just set the same key 200 times
- server.FSM.Apply(mustMarshal(KVCommand{Type: KVSet, Key: key, Value: val}))
- }
- // Check that FSM has the correct value
- val, ok := server.FSM.Get("key")
- if !ok {
- t.Fatal("key not found")
- }
- if val != "value" {
- t.Fatalf("expected 'value', got '%s'", val)
- }
- }
- // TestDataConsistencyAfterCompaction tests that data is consistent after compaction
- func TestDataConsistencyAfterCompaction(t *testing.T) {
- dir, err := os.MkdirTemp("", "raft-consistency-test")
- if err != nil {
- t.Fatal(err)
- }
- defer os.RemoveAll(dir)
- // Create KV state machine
- fsm := NewKVStateMachine()
- // Apply a series of operations
- operations := []KVCommand{
- {Type: KVSet, Key: "counter", Value: "1"},
- {Type: KVSet, Key: "counter", Value: "2"},
- {Type: KVSet, Key: "counter", Value: "3"},
- {Type: KVSet, Key: "name", Value: "alice"},
- {Type: KVSet, Key: "counter", Value: "4"},
- {Type: KVDel, Key: "name"},
- {Type: KVSet, Key: "counter", Value: "5"},
- }
- for _, op := range operations {
- data, _ := json.Marshal(op)
- fsm.Apply(data)
- }
- // Take snapshot
- snapshot, err := fsm.Snapshot()
- if err != nil {
- t.Fatal(err)
- }
- // Create new FSM and restore from snapshot
- fsm2 := NewKVStateMachine()
- if err := fsm2.Restore(snapshot); err != nil {
- t.Fatal(err)
- }
- // Verify data consistency
- val1, ok1 := fsm.Get("counter")
- val2, ok2 := fsm2.Get("counter")
- if val1 != val2 || ok1 != ok2 {
- t.Fatalf("counter mismatch: original=%s(%v), restored=%s(%v)", val1, ok1, val2, ok2)
- }
- if val1 != "5" {
- t.Fatalf("expected counter=5, got %s", val1)
- }
- // name should not exist (was deleted)
- _, ok := fsm2.Get("name")
- if ok {
- t.Fatal("name should have been deleted")
- }
- }
- // TestCompactionDoesNotLoseData tests that no committed data is lost during compaction
- func TestCompactionDoesNotLoseData(t *testing.T) {
- dir, err := os.MkdirTemp("", "raft-no-loss-test")
- if err != nil {
- t.Fatal(err)
- }
- defer os.RemoveAll(dir)
- // Use larger memory capacity to keep all entries in memory
- storage, err := NewHybridStorage(dir, 2000, nil)
- if err != nil {
- t.Fatal(err)
- }
- defer storage.Close()
- // Write 1000 entries with unique values
- entries := make([]LogEntry, 1000)
- for i := 0; i < 1000; i++ {
- cmd := KVCommand{Type: KVSet, Key: "key", Value: string(rune('A' + i%26))}
- data, _ := json.Marshal(cmd)
- entries[i] = LogEntry{
- Index: uint64(i + 1),
- Term: 1,
- Type: EntryNormal,
- Command: data,
- }
- }
- if err := storage.AppendEntries(entries); err != nil {
- t.Fatal(err)
- }
- // Save snapshot at index 500
- fsm := NewKVStateMachine()
- for i := 0; i < 500; i++ {
- fsm.Apply(entries[i].Command)
- }
- snapshot, _ := fsm.Snapshot()
- if err := storage.SaveSnapshot(snapshot, 500, 1); err != nil {
- t.Fatal(err)
- }
- // Compact log
- if err := storage.TruncateBefore(500); err != nil {
- t.Fatal(err)
- }
- // Verify entries 501-1000 still accessible (500 is the compaction point)
- for i := 501; i <= 1000; i++ {
- entry, err := storage.GetEntry(uint64(i))
- if err != nil {
- t.Fatalf("entry %d should be accessible: %v", i, err)
- }
- if entry.Index != uint64(i) {
- t.Fatalf("entry index mismatch at %d", i)
- }
- }
- // Verify entries before 500 return ErrCompacted
- for i := 1; i < 500; i++ {
- _, err := storage.GetEntry(uint64(i))
- if err != ErrCompacted {
- t.Fatalf("entry %d should return ErrCompacted, got: %v", i, err)
- }
- }
- }
- func mustMarshal(v interface{}) []byte {
- data, _ := json.Marshal(v)
- return data
- }
- // TestDynamicCompactionThreshold tests that compaction threshold increases dynamically
- func TestDynamicCompactionThreshold(t *testing.T) {
- dir, err := os.MkdirTemp("", "raft-dynamic-threshold-test")
- if err != nil {
- t.Fatal(err)
- }
- defer os.RemoveAll(dir)
- config := DefaultConfig()
- config.NodeID = "test-node"
- config.ListenAddr = "127.0.0.1:19002"
- config.DataDir = dir
- config.ClusterNodes = map[string]string{
- "test-node": "127.0.0.1:19002",
- }
- // Set low thresholds for testing
- config.SnapshotThreshold = 100
- config.SnapshotMinRetention = 10
- config.Logger = nil // Suppress logs
- server, err := NewKVServer(config)
- if err != nil {
- t.Fatal(err)
- }
- if err := server.Start(); err != nil {
- t.Fatal(err)
- }
- defer server.Stop()
- // Wait for leader election
- time.Sleep(500 * time.Millisecond)
- // Get initial threshold (should be 0, meaning use SnapshotThreshold)
- initialThreshold := server.Raft.nextCompactionThreshold
- if initialThreshold != 0 {
- t.Fatalf("expected initial threshold to be 0, got %d", initialThreshold)
- }
- // Propose enough entries to trigger first compaction
- // We need > 100 entries to trigger compaction
- for i := 0; i < 150; i++ {
- cmd := KVCommand{Type: KVSet, Key: "key", Value: "value"}
- data, _ := json.Marshal(cmd)
- server.Raft.Propose(data)
- }
- // Wait for apply and potential compaction
- time.Sleep(500 * time.Millisecond)
- // Check that dynamic threshold was set after compaction
- // After compaction with 150 entries and minRetention=10, we should have ~10 entries
- // So next threshold should be around 10 * 1.5 = 15, but at least 100 (initial threshold)
- newThreshold := server.Raft.nextCompactionThreshold
- // The threshold should now be set (> 0) or remain at initial if compaction happened
- // Key point: it should be >= SnapshotThreshold to prevent thrashing
- if newThreshold > 0 && newThreshold < config.SnapshotThreshold {
- t.Fatalf("dynamic threshold %d should not be less than initial threshold %d",
- newThreshold, config.SnapshotThreshold)
- }
- t.Logf("Dynamic threshold after first compaction: %d (initial: %d)",
- newThreshold, config.SnapshotThreshold)
- }
|