main.go 5.3 KB


  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "log"
  6. "os"
  7. "os/signal"
  8. "strings"
  9. "syscall"
  10. "time"
  11. "igit.com/xbase/raft"
  12. )
  13. const (
  14. ColorReset = "\033[0m"
  15. ColorDim = "\033[90m" // Dark Gray
  16. ColorRed = "\033[31m"
  17. ColorGreen = "\033[32m"
  18. ColorYellow = "\033[33m"
  19. ColorBlue = "\033[34m"
  20. ColorCyan = "\033[36m"
  21. )
  22. func main() {
  23. // Configuration
  24. nodeID := "node3"
  25. addr := "localhost:5003"
  26. dataDir := "data/node3"
  27. // Cluster configuration
  28. clusterNodes := map[string]string{
  29. "node1": "localhost:5001",
  30. "node2": "localhost:5002",
  31. "node3": "localhost:5003",
  32. }
  33. config := raft.DefaultConfig()
  34. config.NodeID = nodeID
  35. config.ListenAddr = addr
  36. config.DataDir = dataDir
  37. config.ClusterNodes = clusterNodes
  38. // Log level: 0=DEBUG, 1=INFO, 2=WARN, 3=ERROR
  39. config.Logger = raft.NewConsoleLogger(nodeID, 1)
  40. // Ensure data directory exists
  41. if err := os.MkdirAll(dataDir, 0755); err != nil {
  42. log.Fatalf("Failed to create data directory: %v", err)
  43. }
  44. // Create KV Server
  45. server, err := raft.NewKVServer(config)
  46. if err != nil {
  47. log.Fatalf("Failed to create server: %v", err)
  48. }
  49. // Start server
  50. if err := server.Start(); err != nil {
  51. log.Fatalf("Failed to start server: %v", err)
  52. }
  53. fmt.Printf("Node %s%s%s started on %s\n", ColorGreen, nodeID, ColorReset, addr)
  54. fmt.Println("Commands: set <key> <val>, get <key>, del <key>")
  55. // State Monitor Loop (Real-time status updates)
  56. go func() {
  57. var lastState string
  58. var lastTerm uint64
  59. // Initial state
  60. stats := server.GetStats()
  61. lastState = stats.State
  62. lastTerm = stats.Term
  63. ticker := time.NewTicker(100 * time.Millisecond)
  64. defer ticker.Stop()
  65. for range ticker.C {
  66. stats := server.GetStats()
  67. if stats.State != lastState || stats.Term != lastTerm {
  68. fmt.Printf("\n%s[State Change] %s (Term %d) -> %s (Term %d)%s\n> ",
  69. ColorYellow, lastState, lastTerm, stats.State, stats.Term, ColorReset)
  70. lastState = stats.State
  71. lastTerm = stats.Term
  72. }
  73. }
  74. }()
  75. // Simple test loop
  76. go func() {
  77. ticker := time.NewTicker(10 * time.Second)
  78. defer ticker.Stop()
  79. for range ticker.C {
  80. fmt.Println() // Extra newline
  81. if server.IsLeader() {
  82. fmt.Printf("--- I am the %sLEADER%s ---\n", ColorGreen, ColorReset)
  83. } else {
  84. leaderID := server.GetLeaderID()
  85. fmt.Printf("--- I am %sFOLLOWER%s (Leader: %s%s%s) ---\n",
  86. ColorYellow, ColorReset, ColorCyan, leaderID, ColorReset)
  87. }
  88. // We always try to Set and Get, letting the server handle routing
  89. // Write a value
  90. key := fmt.Sprintf("key-%d", time.Now().Unix())
  91. val := fmt.Sprintf("val-%s", nodeID)
  92. err := server.Set(key, val)
  93. if server.IsLeader() {
  94. if err != nil {
  95. fmt.Printf("%sSet failed:%s %v\n", ColorRed, ColorReset, err)
  96. } else {
  97. fmt.Printf("%sSet%s %s%s%s%s%s=%s%s%s%s\n",
  98. ColorGreen, ColorReset,
  99. ColorCyan, key, ColorReset,
  100. ColorDim, ":", ColorReset,
  101. ColorYellow, val, ColorReset)
  102. }
  103. // Read it back
  104. if v, ok, err := server.GetLinear(key); err != nil {
  105. fmt.Printf("%sGetLinear failed:%s %v\n", ColorRed, ColorReset, err)
  106. } else if ok {
  107. fmt.Printf("%sGet%s %s%s%s%s%s=%s%s%s%s\n",
  108. ColorGreen, ColorReset,
  109. ColorCyan, key, ColorReset,
  110. ColorDim, ":", ColorReset,
  111. ColorYellow, v, ColorReset)
  112. }
  113. }
  114. // Print stats
  115. metrics := server.GetMetrics()
  116. health := server.HealthCheck()
  117. fmt.Printf("Term: %s%d%s (M:%d), CommitIndex: %s%d%s, Applied: %s%d%s\n",
  118. ColorBlue, health.Term, ColorReset, metrics.Term,
  119. ColorGreen, health.CommitIndex, ColorReset,
  120. ColorGreen, health.LastApplied, ColorReset)
  121. fmt.Print("> ") // Prompt
  122. }
  123. }()
  124. // CLI Loop
  125. go func() {
  126. scanner := bufio.NewScanner(os.Stdin)
  127. fmt.Print("> ")
  128. for scanner.Scan() {
  129. text := strings.TrimSpace(scanner.Text())
  130. if text == "" {
  131. fmt.Print("> ")
  132. continue
  133. }
  134. parts := strings.Fields(text)
  135. cmd := strings.ToLower(parts[0])
  136. switch cmd {
  137. case "set":
  138. if len(parts) != 3 {
  139. fmt.Println("Usage: set <key> <value>")
  140. break
  141. }
  142. key, val := parts[1], parts[2]
  143. if err := server.Set(key, val); err != nil {
  144. fmt.Printf("%sError:%s %v\n", ColorRed, ColorReset, err)
  145. } else {
  146. fmt.Printf("%sOK%s\n", ColorGreen, ColorReset)
  147. }
  148. case "get":
  149. if len(parts) != 2 {
  150. fmt.Println("Usage: get <key>")
  151. break
  152. }
  153. key := parts[1]
  154. if val, ok, err := server.GetLinear(key); err != nil {
  155. fmt.Printf("%sError:%s %v\n", ColorRed, ColorReset, err)
  156. } else if !ok {
  157. fmt.Printf("%sNot Found%s\n", ColorYellow, ColorReset)
  158. } else {
  159. // Fixed: 7 verbs for 7 args
  160. fmt.Printf("%s%s%s%s%s=%s%s\n", ColorDim, key, ColorReset, ColorDim, ":", ColorReset, val)
  161. }
  162. case "del", "delete":
  163. if len(parts) != 2 {
  164. fmt.Println("Usage: del <key>")
  165. break
  166. }
  167. key := parts[1]
  168. if err := server.Del(key); err != nil {
  169. fmt.Printf("%sError:%s %v\n", ColorRed, ColorReset, err)
  170. } else {
  171. fmt.Printf("%sDeleted%s\n", ColorGreen, ColorReset)
  172. }
  173. case "help":
  174. fmt.Println("Commands: set <key> <val>, get <key>, del <key>")
  175. default:
  176. fmt.Println("Unknown command")
  177. }
  178. fmt.Print("> ")
  179. }
  180. }()
  181. // Handle shutdown
  182. sigCh := make(chan os.Signal, 1)
  183. signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
  184. <-sigCh
  185. fmt.Println("\nShutting down...")
  186. server.Stop()
  187. }