| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- package main
- import (
- "bufio"
- "fmt"
- "log"
- "os"
- "os/signal"
- "strings"
- "syscall"
- "time"
- "igit.com/xbase/raft"
- )
- const (
- ColorReset = "\033[0m"
- ColorDim = "\033[90m" // Dark Gray
- ColorRed = "\033[31m"
- ColorGreen = "\033[32m"
- ColorYellow = "\033[33m"
- ColorBlue = "\033[34m"
- ColorCyan = "\033[36m"
- )
- func main() {
- // Configuration
- nodeID := "node2"
- addr := "localhost:5002"
- dataDir := "data/node2"
- // Cluster configuration
- clusterNodes := map[string]string{
- "node1": "localhost:5001",
- "node2": "localhost:5002",
- "node3": "localhost:5003",
- }
- config := raft.DefaultConfig()
- config.NodeID = nodeID
- config.ListenAddr = addr
- config.DataDir = dataDir
- config.ClusterNodes = clusterNodes
- // Log level: 0=DEBUG, 1=INFO, 2=WARN, 3=ERROR
- config.Logger = raft.NewConsoleLogger(nodeID, 1)
- // Ensure data directory exists
- if err := os.MkdirAll(dataDir, 0755); err != nil {
- log.Fatalf("Failed to create data directory: %v", err)
- }
- // Create KV Server
- server, err := raft.NewKVServer(config)
- if err != nil {
- log.Fatalf("Failed to create server: %v", err)
- }
- // Start server
- if err := server.Start(); err != nil {
- log.Fatalf("Failed to start server: %v", err)
- }
- fmt.Printf("Node %s%s%s started on %s\n", ColorGreen, nodeID, ColorReset, addr)
- fmt.Println("Commands: set <key> <val>, get <key>, del <key>")
- // State Monitor Loop (Real-time status updates)
- go func() {
- var lastState string
- var lastTerm uint64
- // Initial state
- stats := server.GetStats()
- lastState = stats.State
- lastTerm = stats.Term
- ticker := time.NewTicker(100 * time.Millisecond)
- defer ticker.Stop()
- for range ticker.C {
- stats := server.GetStats()
- if stats.State != lastState || stats.Term != lastTerm {
- fmt.Printf("\n%s[State Change] %s (Term %d) -> %s (Term %d)%s\n> ",
- ColorYellow, lastState, lastTerm, stats.State, stats.Term, ColorReset)
- lastState = stats.State
- lastTerm = stats.Term
- }
- }
- }()
- // Simple test loop
- go func() {
- ticker := time.NewTicker(10 * time.Second)
- defer ticker.Stop()
- for range ticker.C {
- fmt.Println() // Extra newline
- if server.IsLeader() {
- fmt.Printf("--- I am the %sLEADER%s ---\n", ColorGreen, ColorReset)
- } else {
- leaderID := server.GetLeaderID()
- fmt.Printf("--- I am %sFOLLOWER%s (Leader: %s%s%s) ---\n",
- ColorYellow, ColorReset, ColorCyan, leaderID, ColorReset)
- }
- // We always try to Set and Get, letting the server handle routing
- // Write a value
- key := fmt.Sprintf("key-%d", time.Now().Unix())
- val := fmt.Sprintf("val-%s", nodeID)
-
- err := server.Set(key, val)
-
- if server.IsLeader() {
- if err != nil {
- fmt.Printf("%sSet failed:%s %v\n", ColorRed, ColorReset, err)
- } else {
- fmt.Printf("%sSet%s %s%s%s%s%s=%s%s%s%s\n",
- ColorGreen, ColorReset,
- ColorCyan, key, ColorReset,
- ColorDim, ":", ColorReset,
- ColorYellow, val, ColorReset)
- }
-
- // Read it back
- if v, ok, err := server.GetLinear(key); err != nil {
- fmt.Printf("%sGetLinear failed:%s %v\n", ColorRed, ColorReset, err)
- } else if ok {
- fmt.Printf("%sGet%s %s%s%s%s%s=%s%s%s%s\n",
- ColorGreen, ColorReset,
- ColorCyan, key, ColorReset,
- ColorDim, ":", ColorReset,
- ColorYellow, v, ColorReset)
- }
- }
- // Print stats
- metrics := server.GetMetrics()
- health := server.HealthCheck()
- fmt.Printf("Term: %s%d%s (M:%d), CommitIndex: %s%d%s, Applied: %s%d%s\n",
- ColorBlue, health.Term, ColorReset, metrics.Term,
- ColorGreen, health.CommitIndex, ColorReset,
- ColorGreen, health.LastApplied, ColorReset)
- fmt.Print("> ") // Prompt
- }
- }()
- // CLI Loop
- go func() {
- scanner := bufio.NewScanner(os.Stdin)
- fmt.Print("> ")
- for scanner.Scan() {
- text := strings.TrimSpace(scanner.Text())
- if text == "" {
- fmt.Print("> ")
- continue
- }
- parts := strings.Fields(text)
- cmd := strings.ToLower(parts[0])
- switch cmd {
- case "set":
- if len(parts) != 3 {
- fmt.Println("Usage: set <key> <value>")
- break
- }
- key, val := parts[1], parts[2]
- if err := server.Set(key, val); err != nil {
- fmt.Printf("%sError:%s %v\n", ColorRed, ColorReset, err)
- } else {
- fmt.Printf("%sOK%s\n", ColorGreen, ColorReset)
- }
- case "get":
- if len(parts) != 2 {
- fmt.Println("Usage: get <key>")
- break
- }
- key := parts[1]
- if val, ok, err := server.GetLinear(key); err != nil {
- fmt.Printf("%sError:%s %v\n", ColorRed, ColorReset, err)
- } else if !ok {
- fmt.Printf("%sNot Found%s\n", ColorYellow, ColorReset)
- } else {
- // Fixed: 7 verbs for 7 args
- fmt.Printf("%s%s%s%s%s=%s%s\n", ColorDim, key, ColorReset, ColorDim, ":", ColorReset, val)
- }
- case "del", "delete":
- if len(parts) != 2 {
- fmt.Println("Usage: del <key>")
- break
- }
- key := parts[1]
- if err := server.Del(key); err != nil {
- fmt.Printf("%sError:%s %v\n", ColorRed, ColorReset, err)
- } else {
- fmt.Printf("%sDeleted%s\n", ColorGreen, ColorReset)
- }
- case "help":
- fmt.Println("Commands: set <key> <val>, get <key>, del <key>")
- default:
- fmt.Println("Unknown command")
- }
- fmt.Print("> ")
- }
- }()
- // Handle shutdown
- sigCh := make(chan os.Signal, 1)
- signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
- <-sigCh
- fmt.Println("\nShutting down...")
- server.Stop()
- }
|