| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- package main
- import (
- "flag"
- "fmt"
- "log"
- "os"
- "path/filepath"
- "time"
- "igit.com/xbase/raft"
- )
- func main() {
- // Command line flags
- snapshotThreshold := flag.Uint64("threshold", 10000, "Log count to trigger snapshot/compaction")
- minRetention := flag.Uint64("retention", 20000, "Minimum log entries to retain after compaction")
- totalKeys := flag.Int("keys", 100000, "Total number of keys to write")
- flag.Parse()
- // Configuration
- nodeID := "node1"
- addr := "localhost:5001"
- dataDir := "data/node1"
-
- // Clean start to ensure clean test
- os.RemoveAll(dataDir)
- // Cluster configuration - Single Node for this test
- clusterNodes := map[string]string{
- "node1": "localhost:5001",
- }
- config := raft.DefaultConfig()
- config.NodeID = nodeID
- config.ListenAddr = addr
- config.DataDir = dataDir
- config.ClusterNodes = clusterNodes
- // Log level: 0=DEBUG, 1=INFO, 2=WARN, 3=ERROR
- config.Logger = raft.NewConsoleLogger(nodeID, 1)
- // Stress Test Configuration
- config.SnapshotThreshold = *snapshotThreshold
- config.SnapshotMinRetention = *minRetention
-
- fmt.Printf("Configuration:\n")
- fmt.Printf("- Snapshot Threshold: %d\n", config.SnapshotThreshold)
- fmt.Printf("- Min Retention: %d\n", config.SnapshotMinRetention)
- fmt.Printf("- Total Keys: %d\n", *totalKeys)
- // Ensure data directory exists
- if err := os.MkdirAll(dataDir, 0755); err != nil {
- log.Fatalf("Failed to create data directory: %v", err)
- }
- // Create KV Server
- server, err := raft.NewKVServer(config)
- if err != nil {
- log.Fatalf("Failed to create server: %v", err)
- }
- // Start server
- if err := server.Start(); err != nil {
- log.Fatalf("Failed to start server: %v", err)
- }
- fmt.Printf("Node %s started. Beginning Log Compaction Safety Test.\n", nodeID)
-
- // Wait for leader election
- fmt.Println("Waiting for leader election...")
- time.Sleep(2 * time.Second)
-
- // Verify Leader
- if !server.IsLeader() {
- // In single node, it should become leader quickly
- fmt.Println("Warning: Node is not leader yet. Waiting...")
- time.Sleep(2 * time.Second)
- }
- // Write loop
- logPath := filepath.Join(dataDir, "log.bin")
-
- var initialLogSize int64
-
- fmt.Printf("Starting write workload of %d keys...\n", *totalKeys)
- for i := 0; i < *totalKeys; i++ {
- // Ensure leader
- if i%1000 == 0 {
- if !server.IsLeader() {
- fmt.Println("Lost leadership, waiting...")
- time.Sleep(1 * time.Second)
- i--
- continue
- }
- }
- key := fmt.Sprintf("k-%d", i)
- val := fmt.Sprintf("v-%d", i)
-
- // Retry logic
- for retry := 0; retry < 3; retry++ {
- if err := server.Set(key, val); err != nil {
- if retry == 2 {
- log.Printf("Failed to set key %s after retries: %v", key, err)
- }
- time.Sleep(10 * time.Millisecond)
- continue
- }
- break
- }
-
- // Check log size just before expected compaction point (approx)
- if i == 35000 {
- info, err := os.Stat(logPath)
- if err == nil {
- initialLogSize = info.Size()
- fmt.Printf("Log size at 35k entries: %d bytes\n", initialLogSize)
- }
- }
-
- if i > 0 && i % 5000 == 0 {
- stats := server.GetStats()
- fmt.Printf("Progress: %d/%d. Log Indices: [%d, %d]\n", i, *totalKeys, stats.FirstLogIndex, stats.LastLogIndex)
- }
- }
-
- // Give time for compaction to trigger (async in background usually, but called after Apply)
- fmt.Println("Workload finished. Waiting for compaction to settle...")
- time.Sleep(5 * time.Second)
-
- // Check log size
- info, err := os.Stat(logPath)
- if err != nil {
- log.Fatalf("Failed to stat log file: %v", err)
- }
- finalLogSize := info.Size()
-
- stats := server.GetStats()
- fmt.Printf("Final Stats - Indices: [%d, %d]\n", stats.FirstLogIndex, stats.LastLogIndex)
- fmt.Printf("Final Log Size: %d bytes\n", finalLogSize)
-
- if initialLogSize > 0 {
- if finalLogSize < initialLogSize {
- fmt.Printf("\033[32mSUCCESS: Log file size decreased (Compaction worked physically). %d -> %d\033[0m\n", initialLogSize, finalLogSize)
- } else {
- fmt.Printf("\033[33mWARNING: Log file size did not decrease. %d -> %d. (Compaction might not have triggered or physically rewritten)\033[0m\n", initialLogSize, finalLogSize)
- }
- }
-
- // Verify Data Integrity
- fmt.Println("Verifying Data Integrity (Reading samples)...")
- errors := 0
- // Check specifically keys that might have been compacted (old keys)
- // and keys that are new.
- checkIndices := []int{0, 100, 1000, 10000, 20000, 30000, 40000, 50000, 60000, 80000, 99999}
-
- for _, i := range checkIndices {
- key := fmt.Sprintf("k-%d", i)
- val, ok, err := server.GetLinear(key) // Should hit DB
-
- if err != nil {
- log.Printf("Read error for %s: %v", key, err)
- errors++
- continue
- }
- if !ok {
- log.Printf("Key %s missing!", key)
- errors++
- continue
- }
-
- expected := fmt.Sprintf("v-%d", i)
- if val != expected {
- log.Printf("Value mismatch for %s. Got %s, want %s", key, val, expected)
- errors++
- } else {
- // fmt.Printf("Verified %s OK\n", key)
- }
- }
-
- if errors == 0 {
- fmt.Printf("\033[32mData Integrity Verified. All samples readable.\033[0m\n")
- } else {
- fmt.Printf("\033[31mFound %d data integrity errors.\033[0m\n", errors)
- }
-
- fmt.Println("Test Complete. Shutting down...")
- server.Stop()
- }
|