package main import ( "flag" "fmt" "log" "os" "path/filepath" "time" "igit.com/xbase/raft" ) func main() { // Command line flags snapshotThreshold := flag.Uint64("threshold", 10000, "Log count to trigger snapshot/compaction") minRetention := flag.Uint64("retention", 20000, "Minimum log entries to retain after compaction") totalKeys := flag.Int("keys", 100000, "Total number of keys to write") flag.Parse() // Configuration nodeID := "node1" addr := "localhost:5001" dataDir := "data/node1" // Clean start to ensure clean test os.RemoveAll(dataDir) // Cluster configuration - Single Node for this test clusterNodes := map[string]string{ "node1": "localhost:5001", } config := raft.DefaultConfig() config.NodeID = nodeID config.ListenAddr = addr config.DataDir = dataDir config.ClusterNodes = clusterNodes // Log level: 0=DEBUG, 1=INFO, 2=WARN, 3=ERROR config.Logger = raft.NewConsoleLogger(nodeID, 1) // Stress Test Configuration config.SnapshotThreshold = *snapshotThreshold config.SnapshotMinRetention = *minRetention fmt.Printf("Configuration:\n") fmt.Printf("- Snapshot Threshold: %d\n", config.SnapshotThreshold) fmt.Printf("- Min Retention: %d\n", config.SnapshotMinRetention) fmt.Printf("- Total Keys: %d\n", *totalKeys) // Ensure data directory exists if err := os.MkdirAll(dataDir, 0755); err != nil { log.Fatalf("Failed to create data directory: %v", err) } // Create KV Server server, err := raft.NewKVServer(config) if err != nil { log.Fatalf("Failed to create server: %v", err) } // Start server if err := server.Start(); err != nil { log.Fatalf("Failed to start server: %v", err) } fmt.Printf("Node %s started. Beginning Log Compaction Safety Test.\n", nodeID) // Wait for leader election fmt.Println("Waiting for leader election...") time.Sleep(2 * time.Second) // Verify Leader if !server.IsLeader() { // In single node, it should become leader quickly fmt.Println("Warning: Node is not leader yet. Waiting...") time.Sleep(2 * time.Second) } // Write loop logPath := filepath.Join(dataDir, "log.bin") var initialLogSize int64 fmt.Printf("Starting write workload of %d keys...\n", *totalKeys) for i := 0; i < *totalKeys; i++ { // Ensure leader if i%1000 == 0 { if !server.IsLeader() { fmt.Println("Lost leadership, waiting...") time.Sleep(1 * time.Second) i-- continue } } key := fmt.Sprintf("k-%d", i) val := fmt.Sprintf("v-%d", i) // Retry logic for retry := 0; retry < 3; retry++ { if err := server.Set(key, val); err != nil { if retry == 2 { log.Printf("Failed to set key %s after retries: %v", key, err) } time.Sleep(10 * time.Millisecond) continue } break } // Check log size just before expected compaction point (approx) if i == 35000 { info, err := os.Stat(logPath) if err == nil { initialLogSize = info.Size() fmt.Printf("Log size at 35k entries: %d bytes\n", initialLogSize) } } if i > 0 && i % 5000 == 0 { stats := server.GetStats() fmt.Printf("Progress: %d/%d. Log Indices: [%d, %d]\n", i, *totalKeys, stats.FirstLogIndex, stats.LastLogIndex) } } // Give time for compaction to trigger (async in background usually, but called after Apply) fmt.Println("Workload finished. Waiting for compaction to settle...") time.Sleep(5 * time.Second) // Check log size info, err := os.Stat(logPath) if err != nil { log.Fatalf("Failed to stat log file: %v", err) } finalLogSize := info.Size() stats := server.GetStats() fmt.Printf("Final Stats - Indices: [%d, %d]\n", stats.FirstLogIndex, stats.LastLogIndex) fmt.Printf("Final Log Size: %d bytes\n", finalLogSize) if initialLogSize > 0 { if finalLogSize < initialLogSize { fmt.Printf("\033[32mSUCCESS: Log file size decreased (Compaction worked physically). %d -> %d\033[0m\n", initialLogSize, finalLogSize) } else { fmt.Printf("\033[33mWARNING: Log file size did not decrease. %d -> %d. (Compaction might not have triggered or physically rewritten)\033[0m\n", initialLogSize, finalLogSize) } } // Verify Data Integrity fmt.Println("Verifying Data Integrity (Reading samples)...") errors := 0 // Check specifically keys that might have been compacted (old keys) // and keys that are new. checkIndices := []int{0, 100, 1000, 10000, 20000, 30000, 40000, 50000, 60000, 80000, 99999} for _, i := range checkIndices { key := fmt.Sprintf("k-%d", i) val, ok, err := server.GetLinear(key) // Should hit DB if err != nil { log.Printf("Read error for %s: %v", key, err) errors++ continue } if !ok { log.Printf("Key %s missing!", key) errors++ continue } expected := fmt.Sprintf("v-%d", i) if val != expected { log.Printf("Value mismatch for %s. Got %s, want %s", key, val, expected) errors++ } else { // fmt.Printf("Verified %s OK\n", key) } } if errors == 0 { fmt.Printf("\033[32mData Integrity Verified. All samples readable.\033[0m\n") } else { fmt.Printf("\033[31mFound %d data integrity errors.\033[0m\n", errors) } fmt.Println("Test Complete. Shutting down...") server.Stop() }