main.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. "os"
  7. "path/filepath"
  8. "time"
  9. "igit.com/xbase/raft"
  10. )
  11. func main() {
  12. // Command line flags
  13. snapshotThreshold := flag.Uint64("threshold", 10000, "Log count to trigger snapshot/compaction")
  14. minRetention := flag.Uint64("retention", 20000, "Minimum log entries to retain after compaction")
  15. totalKeys := flag.Int("keys", 100000, "Total number of keys to write")
  16. flag.Parse()
  17. // Configuration
  18. nodeID := "node1"
  19. addr := "localhost:5001"
  20. dataDir := "data/node1"
  21. // Clean start to ensure clean test
  22. os.RemoveAll(dataDir)
  23. // Cluster configuration - Single Node for this test
  24. clusterNodes := map[string]string{
  25. "node1": "localhost:5001",
  26. }
  27. config := raft.DefaultConfig()
  28. config.NodeID = nodeID
  29. config.ListenAddr = addr
  30. config.DataDir = dataDir
  31. config.ClusterNodes = clusterNodes
  32. // Log level: 0=DEBUG, 1=INFO, 2=WARN, 3=ERROR
  33. config.Logger = raft.NewConsoleLogger(nodeID, 1)
  34. // Stress Test Configuration
  35. config.SnapshotThreshold = *snapshotThreshold
  36. config.SnapshotMinRetention = *minRetention
  37. fmt.Printf("Configuration:\n")
  38. fmt.Printf("- Snapshot Threshold: %d\n", config.SnapshotThreshold)
  39. fmt.Printf("- Min Retention: %d\n", config.SnapshotMinRetention)
  40. fmt.Printf("- Total Keys: %d\n", *totalKeys)
  41. // Ensure data directory exists
  42. if err := os.MkdirAll(dataDir, 0755); err != nil {
  43. log.Fatalf("Failed to create data directory: %v", err)
  44. }
  45. // Create KV Server
  46. server, err := raft.NewKVServer(config)
  47. if err != nil {
  48. log.Fatalf("Failed to create server: %v", err)
  49. }
  50. // Start server
  51. if err := server.Start(); err != nil {
  52. log.Fatalf("Failed to start server: %v", err)
  53. }
  54. fmt.Printf("Node %s started. Beginning Log Compaction Safety Test.\n", nodeID)
  55. // Wait for leader election
  56. fmt.Println("Waiting for leader election...")
  57. time.Sleep(2 * time.Second)
  58. // Verify Leader
  59. if !server.IsLeader() {
  60. // In single node, it should become leader quickly
  61. fmt.Println("Warning: Node is not leader yet. Waiting...")
  62. time.Sleep(2 * time.Second)
  63. }
  64. // Write loop
  65. logPath := filepath.Join(dataDir, "log.bin")
  66. var initialLogSize int64
  67. fmt.Printf("Starting write workload of %d keys...\n", *totalKeys)
  68. for i := 0; i < *totalKeys; i++ {
  69. // Ensure leader
  70. if i%1000 == 0 {
  71. if !server.IsLeader() {
  72. fmt.Println("Lost leadership, waiting...")
  73. time.Sleep(1 * time.Second)
  74. i--
  75. continue
  76. }
  77. }
  78. key := fmt.Sprintf("k-%d", i)
  79. val := fmt.Sprintf("v-%d", i)
  80. // Retry logic
  81. for retry := 0; retry < 3; retry++ {
  82. if err := server.Set(key, val); err != nil {
  83. if retry == 2 {
  84. log.Printf("Failed to set key %s after retries: %v", key, err)
  85. }
  86. time.Sleep(10 * time.Millisecond)
  87. continue
  88. }
  89. break
  90. }
  91. // Check log size just before expected compaction point (approx)
  92. if i == 35000 {
  93. info, err := os.Stat(logPath)
  94. if err == nil {
  95. initialLogSize = info.Size()
  96. fmt.Printf("Log size at 35k entries: %d bytes\n", initialLogSize)
  97. }
  98. }
  99. if i > 0 && i % 5000 == 0 {
  100. stats := server.GetStats()
  101. fmt.Printf("Progress: %d/%d. Log Indices: [%d, %d]\n", i, *totalKeys, stats.FirstLogIndex, stats.LastLogIndex)
  102. }
  103. }
  104. // Give time for compaction to trigger (async in background usually, but called after Apply)
  105. fmt.Println("Workload finished. Waiting for compaction to settle...")
  106. time.Sleep(5 * time.Second)
  107. // Check log size
  108. info, err := os.Stat(logPath)
  109. if err != nil {
  110. log.Fatalf("Failed to stat log file: %v", err)
  111. }
  112. finalLogSize := info.Size()
  113. stats := server.GetStats()
  114. fmt.Printf("Final Stats - Indices: [%d, %d]\n", stats.FirstLogIndex, stats.LastLogIndex)
  115. fmt.Printf("Final Log Size: %d bytes\n", finalLogSize)
  116. if initialLogSize > 0 {
  117. if finalLogSize < initialLogSize {
  118. fmt.Printf("\033[32mSUCCESS: Log file size decreased (Compaction worked physically). %d -> %d\033[0m\n", initialLogSize, finalLogSize)
  119. } else {
  120. fmt.Printf("\033[33mWARNING: Log file size did not decrease. %d -> %d. (Compaction might not have triggered or physically rewritten)\033[0m\n", initialLogSize, finalLogSize)
  121. }
  122. }
  123. // Verify Data Integrity
  124. fmt.Println("Verifying Data Integrity (Reading samples)...")
  125. errors := 0
  126. // Check specifically keys that might have been compacted (old keys)
  127. // and keys that are new.
  128. checkIndices := []int{0, 100, 1000, 10000, 20000, 30000, 40000, 50000, 60000, 80000, 99999}
  129. for _, i := range checkIndices {
  130. key := fmt.Sprintf("k-%d", i)
  131. val, ok, err := server.GetLinear(key) // Should hit DB
  132. if err != nil {
  133. log.Printf("Read error for %s: %v", key, err)
  134. errors++
  135. continue
  136. }
  137. if !ok {
  138. log.Printf("Key %s missing!", key)
  139. errors++
  140. continue
  141. }
  142. expected := fmt.Sprintf("v-%d", i)
  143. if val != expected {
  144. log.Printf("Value mismatch for %s. Got %s, want %s", key, val, expected)
  145. errors++
  146. } else {
  147. // fmt.Printf("Verified %s OK\n", key)
  148. }
  149. }
  150. if errors == 0 {
  151. fmt.Printf("\033[32mData Integrity Verified. All samples readable.\033[0m\n")
  152. } else {
  153. fmt.Printf("\033[31mFound %d data integrity errors.\033[0m\n", errors)
  154. }
  155. fmt.Println("Test Complete. Shutting down...")
  156. server.Stop()
  157. }