main.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "os"
  6. "path/filepath"
  7. "time"
  8. "igit.com/xbase/raft"
  9. )
  10. func main() {
  11. // Configuration
  12. nodeID := "node1"
  13. addr := "localhost:5001"
  14. dataDir := "data/node1"
  15. // Clean start to ensure clean test
  16. os.RemoveAll(dataDir)
  17. // Cluster configuration - Single Node for this test
  18. clusterNodes := map[string]string{
  19. "node1": "localhost:5001",
  20. }
  21. config := raft.DefaultConfig()
  22. config.NodeID = nodeID
  23. config.ListenAddr = addr
  24. config.DataDir = dataDir
  25. config.ClusterNodes = clusterNodes
  26. // Log level: 0=DEBUG, 1=INFO, 2=WARN, 3=ERROR
  27. config.Logger = raft.NewConsoleLogger(nodeID, 1)
  28. // Stress Test Configuration
  29. // Trigger compaction at 40000, retain 20000.
  30. config.SnapshotThreshold = 40000
  31. config.SnapshotMinRetention = 20000
  32. // Ensure data directory exists
  33. if err := os.MkdirAll(dataDir, 0755); err != nil {
  34. log.Fatalf("Failed to create data directory: %v", err)
  35. }
  36. // Create KV Server
  37. server, err := raft.NewKVServer(config)
  38. if err != nil {
  39. log.Fatalf("Failed to create server: %v", err)
  40. }
  41. // Start server
  42. if err := server.Start(); err != nil {
  43. log.Fatalf("Failed to start server: %v", err)
  44. }
  45. fmt.Printf("Node %s started. Beginning Log Compaction Safety Test.\n", nodeID)
  46. // Wait for leader election
  47. fmt.Println("Waiting for leader election...")
  48. time.Sleep(2 * time.Second)
  49. // Verify Leader
  50. if !server.IsLeader() {
  51. // In single node, it should become leader quickly
  52. fmt.Println("Warning: Node is not leader yet. Waiting...")
  53. time.Sleep(2 * time.Second)
  54. }
  55. // Write loop
  56. totalKeys := 50000 // Enough to trigger 40k threshold
  57. logPath := filepath.Join(dataDir, "log.bin")
  58. var initialLogSize int64
  59. fmt.Printf("Starting write workload of %d keys...\n", totalKeys)
  60. for i := 0; i < totalKeys; i++ {
  61. // Ensure leader
  62. if i%1000 == 0 {
  63. if !server.IsLeader() {
  64. fmt.Println("Lost leadership, waiting...")
  65. time.Sleep(1 * time.Second)
  66. i--
  67. continue
  68. }
  69. }
  70. key := fmt.Sprintf("k-%d", i)
  71. val := fmt.Sprintf("v-%d", i)
  72. // Retry logic
  73. for retry := 0; retry < 3; retry++ {
  74. if err := server.Set(key, val); err != nil {
  75. if retry == 2 {
  76. log.Printf("Failed to set key %s after retries: %v", key, err)
  77. }
  78. time.Sleep(10 * time.Millisecond)
  79. continue
  80. }
  81. break
  82. }
  83. // Check log size just before expected compaction point (approx)
  84. if i == 35000 {
  85. info, err := os.Stat(logPath)
  86. if err == nil {
  87. initialLogSize = info.Size()
  88. fmt.Printf("Log size at 35k entries: %d bytes\n", initialLogSize)
  89. }
  90. }
  91. if i > 0 && i % 5000 == 0 {
  92. stats := server.GetStats()
  93. fmt.Printf("Progress: %d/%d. Log Indices: [%d, %d]\n", i, totalKeys, stats.FirstLogIndex, stats.LastLogIndex)
  94. }
  95. }
  96. // Give time for compaction to trigger (async in background usually, but called after Apply)
  97. fmt.Println("Workload finished. Waiting for compaction to settle...")
  98. time.Sleep(5 * time.Second)
  99. // Check log size
  100. info, err := os.Stat(logPath)
  101. if err != nil {
  102. log.Fatalf("Failed to stat log file: %v", err)
  103. }
  104. finalLogSize := info.Size()
  105. stats := server.GetStats()
  106. fmt.Printf("Final Stats - Indices: [%d, %d]\n", stats.FirstLogIndex, stats.LastLogIndex)
  107. fmt.Printf("Final Log Size: %d bytes\n", finalLogSize)
  108. if initialLogSize > 0 {
  109. if finalLogSize < initialLogSize {
  110. fmt.Printf("\033[32mSUCCESS: Log file size decreased (Compaction worked physically). %d -> %d\033[0m\n", initialLogSize, finalLogSize)
  111. } else {
  112. fmt.Printf("\033[33mWARNING: Log file size did not decrease. %d -> %d. (Compaction might not have triggered or physically rewritten)\033[0m\n", initialLogSize, finalLogSize)
  113. }
  114. }
  115. // Verify Data Integrity
  116. fmt.Println("Verifying Data Integrity (Reading samples)...")
  117. errors := 0
  118. // Check specifically keys that might have been compacted (old keys)
  119. // and keys that are new.
  120. checkIndices := []int{0, 100, 1000, 10000, 20000, 30000, 40000, 49999}
  121. for _, i := range checkIndices {
  122. key := fmt.Sprintf("k-%d", i)
  123. val, ok, err := server.GetLinear(key) // Should hit DB
  124. if err != nil {
  125. log.Printf("Read error for %s: %v", key, err)
  126. errors++
  127. continue
  128. }
  129. if !ok {
  130. log.Printf("Key %s missing!", key)
  131. errors++
  132. continue
  133. }
  134. expected := fmt.Sprintf("v-%d", i)
  135. if val != expected {
  136. log.Printf("Value mismatch for %s. Got %s, want %s", key, val, expected)
  137. errors++
  138. } else {
  139. // fmt.Printf("Verified %s OK\n", key)
  140. }
  141. }
  142. if errors == 0 {
  143. fmt.Printf("\033[32mData Integrity Verified. All samples readable.\033[0m\n")
  144. } else {
  145. fmt.Printf("\033[31mFound %d data integrity errors.\033[0m\n", errors)
  146. }
  147. fmt.Println("Test Complete. Shutting down...")
  148. server.Stop()
  149. }