瀏覽代碼

cli 内部集成

xbase 2 周之前
父節點
當前提交
c2ddcdb054
共有 13 個文件被更改,包括 677 次插入218 次删除
  1. 3 1
      .gitignore
  2. 22 0
      README.md
  3. 428 0
      cli.go
  4. 30 0
      db/engine.go
  5. 103 204
      example/basic/common/cli.go
  6. 6 3
      example/basic/node1/main.go
  7. 6 2
      example/basic/node2/main.go
  8. 6 2
      example/basic/node3/main.go
  9. 6 2
      example/basic/node4/main.go
  10. 20 3
      raft.go
  11. 16 0
      raft_ext.go
  12. 24 0
      server.go
  13. 7 1
      types.go

+ 3 - 1
.gitignore

@@ -23,6 +23,8 @@ _testmain.go
 *.exe
 *.test
 *.prof
-
+raft-cli
+node1_bin
 data/
+example/basic/data/
 bench_db_data/

+ 22 - 0
README.md

@@ -135,6 +135,28 @@ if err != nil {
 err := server1.Join("node3", "127.0.0.1:9003")
 ```
 
+## 命令行交互 (CLI)
+
+系统内置了一个强大的交互式命令行工具,默认随节点启动。它提供了直观的界面来管理数据、监控集群状态以及执行运维操作。
+
+**基础命令:**
+
+*   **数据操作**:
+    *   `set <key> <value>` / `get <key>` / `del <key>`:标准的 KV 操作。
+    *   `search <pattern> [limit] [offset]`:搜索 Key(支持 SQL `LIKE` 模式)。
+    *   `count <pattern>`:统计符合模式的 Key 数量。
+*   **集群管理**:
+    *   `join <id> <addr>`:将新节点加入集群。
+    *   `leave <id>`:将节点移出集群。
+*   **系统监控**:
+    *   `info`:实时展示节点健康状态、Raft 索引(Commit/Applied)、存储占用(DB/Log)、内存使用及集群成员连接状况。
+
+**扩展命令(示例):**
+
+CLI 设计为可扩展的。在 `example/basic` 示例项目中,我们展示了如何通过 `common.RegisterDemoCommands` 注册自定义命令:
+*   `demodata <n> <pattern>`:快速生成 n 条测试数据(例如 `demodata 1000 user-*`)。
+*   `deletedatas <n> <pattern>`:批量删除匹配模式的数据。
+
 ## 高级功能详解
 
 ### 动态成员变更

+ 428 - 0
cli.go

@@ -0,0 +1,428 @@
+package raft
+
+import (
+	"bufio"
+	"bytes"
+	"fmt"
+	"net"
+	"os"
+	"regexp"
+	"runtime"
+	"sort"
+	"strconv"
+	"strings"
+	"sync"
+	"text/tabwriter"
+	"time"
+)
+
+const (
+	ColorReset  = "\033[0m"
+	ColorDim    = "\033[90m" // Dark Gray
+	ColorRed    = "\033[31m"
+	ColorGreen  = "\033[32m"
+	ColorYellow = "\033[33m"
+	ColorBlue   = "\033[34m"
+	ColorCyan   = "\033[36m"
+)
+
+// CLICommand defines a CLI command handler
+type CLICommand struct {
+	Name        string
+	Description string
+	Handler     func(parts []string, server *KVServer)
+}
+
+// CLI manages the command line interface
+type CLI struct {
+	server      *KVServer
+	commands    map[string]CLICommand
+	mu          sync.RWMutex
+}
+
+// NewCLI creates a new CLI instance
+func NewCLI(server *KVServer) *CLI {
+	cli := &CLI{
+		server:   server,
+		commands: make(map[string]CLICommand),
+	}
+	cli.registerDefaultCommands()
+	return cli
+}
+
+// Helper to calculate visible length of string ignoring ANSI codes
+var ansiRegex = regexp.MustCompile(`\x1b\[[0-9;]*m`)
+
+func visibleLen(s string) int {
+	clean := ansiRegex.ReplaceAllString(s, "")
+	return len([]rune(clean))
+}
+
+func printBoxed(content string) {
+	lines := strings.Split(strings.TrimSpace(content), "\n")
+	maxWidth := 0
+	for _, line := range lines {
+		l := visibleLen(line)
+		if l > maxWidth {
+			maxWidth = l
+		}
+	}
+	
+	// Min width to look decent
+	if maxWidth < 20 {
+		maxWidth = 20
+	}
+
+	// Add padding
+	contentWidth := maxWidth + 2 
+
+	fmt.Println() // Start new line before box
+	// Top Border
+	fmt.Printf("%s╭%s╮%s\n", ColorDim, strings.Repeat("─", contentWidth), ColorReset)
+	
+	// Content
+	for _, line := range lines {
+		visLen := visibleLen(line)
+		padding := contentWidth - visLen - 1 // -1 for the space after │
+		// Ensure padding is not negative
+		if padding < 0 { padding = 0 }
+		
+		fmt.Printf("%s│%s %s%s%s│%s\n", ColorDim, ColorReset, line, strings.Repeat(" ", padding), ColorDim, ColorReset)
+	}
+	
+	// Bottom Border
+	fmt.Printf("%s╰%s╯%s\n", ColorDim, strings.Repeat("─", contentWidth), ColorReset)
+	fmt.Println() // End new line after box
+}
+
+func formatBytes(bytes int64) string {
+	const unit = 1024
+	if bytes < unit {
+		return fmt.Sprintf("%d B", bytes)
+	}
+	div, exp := int64(unit), 0
+	for n := bytes / unit; n >= unit; n /= unit {
+		div *= unit
+		exp++
+	}
+	return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
+}
+
+// Start starts the command line interface
+func (c *CLI) Start() {
+	fmt.Printf("Node %s%s%s CLI Started\n", ColorGreen, c.server.Raft.nodeID, ColorReset)
+	fmt.Println("Type 'help' for commands.")
+
+	// State Monitor Loop
+	go func() {
+		var lastState string
+		var lastTerm uint64
+		stats := c.server.GetStats()
+		lastState = stats.State
+		lastTerm = stats.Term
+
+		ticker := time.NewTicker(100 * time.Millisecond)
+		defer ticker.Stop()
+
+		for {
+			select {
+			case <-c.server.stopCh:
+				return
+			case <-ticker.C:
+				stats := c.server.GetStats()
+				if stats.State != lastState || stats.Term != lastTerm {
+					msg := fmt.Sprintf("%s[State Change] %s (Term %d) -> %s (Term %d)%s",
+						ColorYellow, lastState, lastTerm, stats.State, stats.Term, ColorReset)
+					fmt.Printf("\n%s\n> ", msg)
+					
+					lastState = stats.State
+					lastTerm = stats.Term
+				}
+			}
+		}
+	}()
+
+	scanner := bufio.NewScanner(os.Stdin)
+	fmt.Print("> ")
+	for scanner.Scan() {
+		text := strings.TrimSpace(scanner.Text())
+		if text == "" {
+			fmt.Print("> ")
+			continue
+		}
+		parts := strings.Fields(text)
+		cmdName := strings.ToLower(parts[0])
+
+		c.mu.RLock()
+		cmd, exists := c.commands[cmdName]
+		c.mu.RUnlock()
+
+		if exists {
+			cmd.Handler(parts, c.server)
+		} else {
+			// Check aliases
+			switch cmdName {
+			case "binlog", "db", "stats":
+				c.mu.RLock()
+				if infoCmd, ok := c.commands["info"]; ok {
+					infoCmd.Handler(parts, c.server)
+				}
+				c.mu.RUnlock()
+			default:
+				printBoxed("Unknown command. Type 'help'.")
+			}
+		}
+		fmt.Print("> ")
+	}
+}
+
+// RegisterCommand registers a new CLI command
+func (c *CLI) RegisterCommand(name, description string, handler func(parts []string, server *KVServer)) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.commands[strings.ToLower(name)] = CLICommand{
+		Name:        name,
+		Description: description,
+		Handler:     handler,
+	}
+}
+
+func (c *CLI) registerDefaultCommands() {
+	c.RegisterCommand("set", "Set value (set <key> <val>)", func(parts []string, server *KVServer) {
+		if len(parts) != 3 {
+			printBoxed("Usage: set <key> <value>")
+			return
+		}
+		key, val := parts[1], parts[2]
+		if err := server.Set(key, val); err != nil {
+			printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+		} else {
+			printBoxed(fmt.Sprintf("%sOK%s", ColorGreen, ColorReset))
+		}
+	})
+
+	c.RegisterCommand("get", "Get value (get <key>)", func(parts []string, server *KVServer) {
+		if len(parts) != 2 {
+			printBoxed("Usage: get <key>")
+			return
+		}
+		key := parts[1]
+		if val, ok, err := server.GetLinear(key); err != nil {
+			printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+		} else if !ok {
+			printBoxed(fmt.Sprintf("%sNot Found%s", ColorYellow, ColorReset))
+		} else {
+			printBoxed(fmt.Sprintf("%s%s%s = %s%s%s", ColorCyan, key, ColorReset, ColorYellow, val, ColorReset))
+		}
+	})
+
+	c.RegisterCommand("del", "Delete key (del <key>)", func(parts []string, server *KVServer) {
+		if len(parts) != 2 {
+			printBoxed("Usage: del <key>")
+			return
+		}
+		key := parts[1]
+		if err := server.Del(key); err != nil {
+			printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+		} else {
+			printBoxed(fmt.Sprintf("%sDeleted%s", ColorGreen, ColorReset))
+		}
+	})
+
+	c.RegisterCommand("join", "Add node to cluster (join <id> <addr>)", func(parts []string, server *KVServer) {
+		if len(parts) != 3 {
+			printBoxed("Usage: join <nodeID> <address>")
+			return
+		}
+		nodeID := parts[1]
+		addr := parts[2]
+		if err := server.Join(nodeID, addr); err != nil {
+			printBoxed(fmt.Sprintf("%sError joining node:%s %v", ColorRed, ColorReset, err))
+		} else {
+			printBoxed(fmt.Sprintf("%sJoin request sent for node %s%s", ColorGreen, nodeID, ColorReset))
+		}
+	})
+
+	c.RegisterCommand("leave", "Remove node from cluster (leave <id>)", func(parts []string, server *KVServer) {
+		if len(parts) != 2 {
+			printBoxed("Usage: leave <nodeID>")
+			return
+		}
+		nodeID := parts[1]
+		if err := server.Leave(nodeID); err != nil {
+			printBoxed(fmt.Sprintf("%sError leaving node:%s %v", ColorRed, ColorReset, err))
+		} else {
+			printBoxed(fmt.Sprintf("%sLeave request sent for node %s%s", ColorGreen, nodeID, ColorReset))
+		}
+	})
+
+	c.RegisterCommand("search", "Search keys (search <pat> [lim] [off])", func(parts []string, server *KVServer) {
+		if len(parts) < 2 {
+			printBoxed("Usage: search <pattern> [limit] [offset]")
+			return
+		}
+		pattern := parts[1]
+		limit := 20 // Default limit
+		offset := 0
+		
+		if len(parts) >= 3 {
+			if l, err := strconv.Atoi(parts[2]); err == nil {
+				limit = l
+			}
+		}
+		if len(parts) >= 4 {
+			if o, err := strconv.Atoi(parts[3]); err == nil {
+				offset = o
+			}
+		}
+
+		// Construct SQL for DB Engine
+		sql := fmt.Sprintf("key like \"%s\" LIMIT %d OFFSET %d", pattern, limit, offset)
+		results, err := server.DB.Query(sql)
+		if err != nil {
+			printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+			return
+		}
+
+		// Format Table to Buffer
+		var buf bytes.Buffer
+		w := tabwriter.NewWriter(&buf, 0, 0, 2, ' ', 0)
+		fmt.Fprintln(w, "Key\tValue\tCommitIndex")
+		fmt.Fprintln(w, "---\t-----\t-----------")
+		for _, r := range results {
+			fmt.Fprintf(w, "%s\t%s\t%d\n", r.Key, r.Value, r.CommitIndex)
+		}
+		w.Flush()
+		
+		summary := fmt.Sprintf("\n%sFound %d records (showing max %d)%s", ColorDim, len(results), limit, ColorReset)
+		printBoxed(buf.String() + summary)
+	})
+
+	c.RegisterCommand("count", "Count keys (count <pat>)", func(parts []string, server *KVServer) {
+		if len(parts) < 2 {
+			printBoxed("Usage: count <pattern> (e.g. 'count *', 'count user.*')")
+			return
+		}
+		pattern := parts[1]
+		
+		sql := ""
+		if pattern == "*" {
+			sql = "*"
+		} else {
+			sql = fmt.Sprintf("key like \"%s\"", pattern)
+		}
+		
+		start := time.Now()
+		count, err := server.DB.Count(sql)
+		duration := time.Since(start)
+		
+		if err != nil {
+			printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+		} else {
+			printBoxed(fmt.Sprintf("%sCount: %d%s (took %v)", ColorGreen, count, ColorReset, duration))
+		}
+	})
+
+	c.RegisterCommand("info", "Show all system stats", func(parts []string, server *KVServer) {
+		// 1. Gather all data
+		var m runtime.MemStats
+		runtime.ReadMemStats(&m)
+		
+		dbSize := server.GetDBSize()
+		logSize := server.GetLogSize()
+		
+		stats := server.GetStats()
+		health := server.HealthCheck()
+		dbApplied := server.DB.GetLastAppliedIndex()
+
+		// 2. Build Output
+		var sb strings.Builder
+		
+		// Section: Node Status
+		sb.WriteString(fmt.Sprintf("%sNode Status:%s\n", ColorCyan, ColorReset))
+		sb.WriteString(fmt.Sprintf("  ID:           %s%s%s\n", ColorGreen, health.NodeID, ColorReset))
+		sb.WriteString(fmt.Sprintf("  State:        %s\n", health.State))
+		sb.WriteString(fmt.Sprintf("  Term:         %d\n", health.Term))
+		sb.WriteString(fmt.Sprintf("  Leader:       %s\n", health.LeaderID))
+		sb.WriteString(fmt.Sprintf("  Healthy:      %v\n", health.IsHealthy))
+		sb.WriteString("\n")
+
+		// Section: Storage Stats
+		sb.WriteString(fmt.Sprintf("%sStorage & Memory:%s\n", ColorCyan, ColorReset))
+		sb.WriteString(fmt.Sprintf("  DB Size:      %s\n", formatBytes(dbSize)))
+		sb.WriteString(fmt.Sprintf("  Raft Log:     %s\n", formatBytes(logSize)))
+		sb.WriteString(fmt.Sprintf("  Mem Alloc:    %s\n", formatBytes(int64(m.Alloc))))
+		sb.WriteString(fmt.Sprintf("  Mem Sys:      %s\n", formatBytes(int64(m.Sys))))
+		sb.WriteString(fmt.Sprintf("  NumGC:        %d\n", m.NumGC))
+		sb.WriteString("\n")
+
+		// Section: Raft & DB Indices
+		sb.WriteString(fmt.Sprintf("%sConsensus Indices:%s\n", ColorCyan, ColorReset))
+		sb.WriteString(fmt.Sprintf("  Commit Index: %d\n", stats.CommitIndex))
+		sb.WriteString(fmt.Sprintf("  Applied Index:%d\n", stats.LastApplied))
+		sb.WriteString(fmt.Sprintf("  Last Log Idx: %d\n", stats.LastLogIndex))
+		sb.WriteString(fmt.Sprintf("  DB Applied:   %d\n", dbApplied))
+		sb.WriteString("\n")
+
+		// Section: Cluster Members
+		sb.WriteString(fmt.Sprintf("%sCluster Members (%d):%s", ColorCyan, stats.ClusterSize, ColorReset))
+		
+		for id, addr := range stats.ClusterNodes {
+			// Determine status color
+			statusColor := ColorDim
+			nodeID := server.Raft.nodeID // Assuming local nodeID is accessible
+			if id == nodeID {
+				statusColor = ColorCyan // Self
+			} else {
+				// Quick check
+				conn, err := net.DialTimeout("tcp", addr, 50*time.Millisecond)
+				if err == nil {
+					statusColor = ColorGreen
+					conn.Close()
+				}
+			}
+			
+			// Marker for Leader
+			marker := ""
+			if id == health.LeaderID {
+				marker = fmt.Sprintf(" %s(Leader)%s", ColorYellow, ColorReset)
+			}
+			
+			sb.WriteString(fmt.Sprintf("\n  - %s%s%s: %s%s", statusColor, id, ColorReset, addr, marker))
+		}
+		
+		printBoxed(sb.String())
+	})
+
+	c.RegisterCommand("help", "Show commands", func(parts []string, server *KVServer) {
+		var sb strings.Builder
+		sb.WriteString(fmt.Sprintf("%sCommands:%s\n", ColorGreen, ColorReset))
+		
+		// Sort commands
+		c.mu.RLock()
+		var cmds []CLICommand
+		for _, cmd := range c.commands {
+			cmds = append(cmds, cmd)
+		}
+		c.mu.RUnlock()
+		
+		sort.Slice(cmds, func(i, j int) bool {
+			return cmds[i].Name < cmds[j].Name
+		})
+
+		// Find max length for padding
+		maxNameLen := 0
+		for _, cmd := range cmds {
+			if len(cmd.Name) > maxNameLen {
+				maxNameLen = len(cmd.Name)
+			}
+		}
+
+		for _, cmd := range cmds {
+			padding := strings.Repeat(" ", maxNameLen-len(cmd.Name))
+			sb.WriteString(fmt.Sprintf("  %s%s%s%s  %s%s%s\n", ColorGreen, cmd.Name, ColorReset, padding, ColorDim, cmd.Description, ColorReset))
+		}
+		
+		printBoxed(sb.String())
+	})
+}

+ 30 - 0
db/engine.go

@@ -731,6 +731,17 @@ func (e *Engine) GetLastAppliedIndex() uint64 {
 	return e.LastCommitIndex
 }
 
+func (e *Engine) GetDBSize() int64 {
+	e.commitMu.Lock()
+	defer e.commitMu.Unlock()
+	
+	info, err := e.Storage.file.Stat()
+	if err == nil {
+		return info.Size()
+	}
+	return 0
+}
+
 func (e *Engine) Sync() error {
 	e.metaFile.Sync()
 	return e.Storage.Sync()
@@ -806,13 +817,32 @@ func (e *Engine) execute(sql string, countOnly bool) (int, error) {
 		sql = reOffset.ReplaceAllString(sql, "")
 	}
 
+	// 0. Handle Count All efficient case
+	// Check specifically for "*" or simple wildcard match
+	if sql == "*" || sql == "\"*\"" {
+		e.Index.mu.RLock()
+		count := len(e.Index.items)
+		e.Index.mu.RUnlock()
+		return count, nil
+	}
+
 	re := regexp.MustCompile(`(key|value|CommitIndex)\s*(=|like|>=|<=|>|<)\s*("[^"]*"|\d+)`)
 	matches := re.FindAllStringSubmatch(sql, -1)
 
 	if len(matches) == 0 {
+		// Fallback for when regex fails but user meant "count all" or similar via other syntax
+		// But strictly speaking, if not "*", it's invalid query per current parser
 		return 0, fmt.Errorf("invalid query")
 	}
 
+	// Also check for 'key like "*"' pattern which is equivalent to count all
+	if len(matches) == 1 && matches[0][1] == "key" && matches[0][2] == "like" && (matches[0][3] == "\"*\"" || matches[0][3] == "\"%\"") {
+		e.Index.mu.RLock()
+		count := len(e.Index.items)
+		e.Index.mu.RUnlock()
+		return count, nil
+	}
+
 	extractString := func(s string) string {
 		return strings.Trim(strings.TrimSpace(s), "\"")
 	}

+ 103 - 204
example/basic/common/cli.go

@@ -1,12 +1,10 @@
 package common
 
 import (
-	"bufio"
 	"fmt"
-	"os"
+	"regexp"
 	"strconv"
 	"strings"
-	"text/tabwriter"
 	"time"
 
 	"igit.com/xbase/raft"
@@ -22,216 +20,117 @@ const (
 	ColorCyan   = "\033[36m"
 )
 
-func StartCLI(server *raft.KVServer, nodeID string) {
-	fmt.Printf("Node %s%s%s CLI Started\n", ColorGreen, nodeID, ColorReset)
-	fmt.Println("Type 'help' for commands.")
+// Helper to calculate visible length of string ignoring ANSI codes
+var ansiRegex = regexp.MustCompile(`\x1b\[[0-9;]*m`)
 
-	// State Monitor Loop
-	go func() {
-		var lastState string
-		var lastTerm uint64
-		stats := server.GetStats()
-		lastState = stats.State
-		lastTerm = stats.Term
-
-		ticker := time.NewTicker(100 * time.Millisecond)
-		defer ticker.Stop()
+func visibleLen(s string) int {
+	clean := ansiRegex.ReplaceAllString(s, "")
+	return len([]rune(clean))
+}
 
-		for range ticker.C {
-			stats := server.GetStats()
-			if stats.State != lastState || stats.Term != lastTerm {
-				fmt.Printf("\n%s[State Change] %s (Term %d) -> %s (Term %d)%s\n> ",
-					ColorYellow, lastState, lastTerm, stats.State, stats.Term, ColorReset)
-				lastState = stats.State
-				lastTerm = stats.Term
-			}
+func printBoxed(content string) {
+	lines := strings.Split(strings.TrimSpace(content), "\n")
+	maxWidth := 0
+	for _, line := range lines {
+		l := visibleLen(line)
+		if l > maxWidth {
+			maxWidth = l
 		}
-	}()
+	}
+	
+	// Min width to look decent
+	if maxWidth < 20 {
+		maxWidth = 20
+	}
 
-	scanner := bufio.NewScanner(os.Stdin)
-	fmt.Print("> ")
-	for scanner.Scan() {
-		text := strings.TrimSpace(scanner.Text())
-		if text == "" {
-			fmt.Print("> ")
-			continue
-		}
-		parts := strings.Fields(text)
-		cmd := strings.ToLower(parts[0])
+	// Add padding
+	contentWidth := maxWidth + 2 
+
+	fmt.Println() // Start new line before box
+	// Top Border
+	fmt.Printf("%s╭%s╮%s\n", ColorDim, strings.Repeat("─", contentWidth), ColorReset)
+	
+	// Content
+	for _, line := range lines {
+		visLen := visibleLen(line)
+		padding := contentWidth - visLen - 1 // -1 for the space after │
+		// Ensure padding is not negative
+		if padding < 0 { padding = 0 }
+		
+		fmt.Printf("%s│%s %s%s%s│%s\n", ColorDim, ColorReset, line, strings.Repeat(" ", padding), ColorDim, ColorReset)
+	}
+	
+	// Bottom Border
+	fmt.Printf("%s╰%s╯%s\n", ColorDim, strings.Repeat("─", contentWidth), ColorReset)
+	fmt.Println() // End new line after box
+}
 
-		switch cmd {
-		case "set":
-			if len(parts) != 3 {
-				fmt.Println("Usage: set <key> <value>")
-				break
-			}
-			key, val := parts[1], parts[2]
+// RegisterDemoCommands registers the demo commands (demodata, deletedatas) to the CLI
+func RegisterDemoCommands(cli *raft.CLI) {
+	cli.RegisterCommand("demodata", "Generate n items (e.g. 'demodata 100 user.*')", func(parts []string, server *raft.KVServer) {
+		if len(parts) != 3 {
+			printBoxed("Usage: demodata <count> <pattern> (e.g. demodata 100 user.*)")
+			return
+		}
+		count, err := strconv.Atoi(parts[1])
+		if err != nil {
+			printBoxed(fmt.Sprintf("Invalid count: %v", err))
+			return
+		}
+		pattern := parts[2]
+		fmt.Printf("Generating %d items with pattern '%s'...\n", count, pattern)
+		
+		start := time.Now()
+		success := 0
+		for i := 1; i <= count; i++ {
+			key := strings.Replace(pattern, "*", strconv.Itoa(i), -1)
+			val := fmt.Sprintf("val-%s", key) // Simple value derivation
 			if err := server.Set(key, val); err != nil {
-				fmt.Printf("%sError:%s %v\n", ColorRed, ColorReset, err)
-			} else {
-				fmt.Printf("%sOK%s\n", ColorGreen, ColorReset)
-			}
-
-		case "get":
-			if len(parts) != 2 {
-				fmt.Println("Usage: get <key>")
-				break
-			}
-			key := parts[1]
-			if val, ok, err := server.GetLinear(key); err != nil {
-				fmt.Printf("%sError:%s %v\n", ColorRed, ColorReset, err)
-			} else if !ok {
-				fmt.Printf("%sNot Found%s\n", ColorYellow, ColorReset)
-			} else {
-				fmt.Printf("%s%s%s = %s%s%s\n", ColorCyan, key, ColorReset, ColorYellow, val, ColorReset)
-			}
-
-		case "del", "delete":
-			if len(parts) != 2 {
-				fmt.Println("Usage: del <key>")
-				break
-			}
-			key := parts[1]
-			if err := server.Del(key); err != nil {
-				fmt.Printf("%sError:%s %v\n", ColorRed, ColorReset, err)
-			} else {
-				fmt.Printf("%sDeleted%s\n", ColorGreen, ColorReset)
-			}
-
-		case "demodata":
-			if len(parts) != 3 {
-				fmt.Println("Usage: demodata <count> <pattern> (e.g. demodata 100 user.*)")
-				break
-			}
-			count, err := strconv.Atoi(parts[1])
-			if err != nil {
-				fmt.Printf("Invalid count: %v\n", err)
-				break
-			}
-			pattern := parts[2]
-			fmt.Printf("Generating %d items with pattern '%s'...\n", count, pattern)
-			
-			start := time.Now()
-			success := 0
-			for i := 1; i <= count; i++ {
-				key := strings.Replace(pattern, "*", strconv.Itoa(i), -1)
-				val := fmt.Sprintf("val-%s", key) // Simple value derivation
-				if err := server.Set(key, val); err != nil {
-					fmt.Printf("Failed at %d: %v\n", i, err)
-				} else {
-					success++
-				}
-				if i%100 == 0 {
-					fmt.Printf("Progress: %d/%d\r", i, count)
-				}
-			}
-			duration := time.Since(start)
-			fmt.Printf("\n%sDone!%s inserted %d/%d items in %v (Avg: %v/op)\n", 
-				ColorGreen, ColorReset, success, count, duration, duration/time.Duration(count))
-
-		case "search":
-			// search <pattern> [limit] [offset]
-			// Internally constructs: key like "<pattern>" LIMIT <limit> OFFSET <offset>
-			if len(parts) < 2 {
-				fmt.Println("Usage: search <pattern> [limit] [offset]")
-				break
-			}
-			pattern := parts[1]
-			limit := 20 // Default limit
-			offset := 0
-			
-			if len(parts) >= 3 {
-				if l, err := strconv.Atoi(parts[2]); err == nil {
-					limit = l
-				}
-			}
-			if len(parts) >= 4 {
-				if o, err := strconv.Atoi(parts[3]); err == nil {
-					offset = o
-				}
-			}
-
-			// Construct SQL for DB Engine
-			sql := fmt.Sprintf("key like \"%s\" LIMIT %d OFFSET %d", pattern, limit, offset)
-			results, err := server.DB.Query(sql)
-			if err != nil {
-				fmt.Printf("%sError:%s %v\n", ColorRed, ColorReset, err)
-				break
-			}
-
-			// Print Table
-			w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
-			fmt.Fprintln(w, "Key\tValue\tCommitIndex")
-			fmt.Fprintln(w, "---\t-----\t-----------")
-			for _, r := range results {
-				fmt.Fprintf(w, "%s\t%s\t%d\n", r.Key, r.Value, r.CommitIndex)
-			}
-			w.Flush()
-			fmt.Printf("%sFound %d records (showing max %d)%s\n", ColorDim, len(results), limit, ColorReset)
-
-		case "binlog":
-			// Show Raft Log Stats
-			stats := server.GetStats() // Basic stats
-			fmt.Printf("Raft Log CommitIndex: %s%d%s\n", ColorGreen, stats.CommitIndex, ColorReset)
-			fmt.Printf("Raft Log AppliedIndex: %s%d%s\n", ColorGreen, stats.LastApplied, ColorReset)
-			fmt.Printf("Raft LastLogIndex: %s%d%s\n", ColorGreen, stats.LastLogIndex, ColorReset)
-
-		case "db":
-			// Show DB Stats
-			idx := server.DB.GetLastAppliedIndex()
-			fmt.Printf("DB LastAppliedIndex: %s%d%s\n", ColorGreen, idx, ColorReset)
-
-		case "stats":
-			stats := server.GetStats()
-			health := server.HealthCheck()
-			fmt.Printf("Node: %s\nState: %s\nLeader: %s\nTerm: %d\nHealthy: %v\n", 
-				health.NodeID, health.State, health.LeaderID, health.Term, health.IsHealthy)
-			fmt.Printf("CommitIndex: %d, Applied: %d\n", stats.CommitIndex, stats.LastApplied)
-			fmt.Printf("Cluster Size: %d\n", stats.ClusterSize)
-			fmt.Println("Cluster Nodes:")
-			for id, addr := range stats.ClusterNodes {
-				fmt.Printf("  - %s: %s\n", id, addr)
-			}
-
-		case "join":
-			if len(parts) != 3 {
-				fmt.Println("Usage: join <nodeID> <addr>")
-				break
-			}
-			if err := server.Join(parts[1], parts[2]); err != nil {
-				fmt.Printf("%sError:%s %v\n", ColorRed, ColorReset, err)
+				fmt.Printf("Failed at %d: %v\n", i, err)
 			} else {
-				fmt.Printf("%sJoined node %s%s\n", ColorGreen, parts[1], ColorReset)
+				success++
 			}
-
-		case "leave":
-			if len(parts) != 2 {
-				fmt.Println("Usage: leave <nodeID>")
-				break
+			if i%100 == 0 {
+				fmt.Printf("Progress: %d/%d\r", i, count)
 			}
-			if err := server.Leave(parts[1]); err != nil {
-				fmt.Printf("%sError:%s %v\n", ColorRed, ColorReset, err)
-			} else {
-				fmt.Printf("%sRemoved node %s%s\n", ColorGreen, parts[1], ColorReset)
+		}
+		duration := time.Since(start)
+		printBoxed(fmt.Sprintf("%sDone!%s inserted %d/%d items in %v (Avg: %v/op)", 
+			ColorGreen, ColorReset, success, count, duration, duration/time.Duration(count)))
+	})
+
+	cli.RegisterCommand("deletedatas", "Batch delete n items matching pattern", func(parts []string, server *raft.KVServer) {
+		// deletedatas <n> <pattern>
+		if len(parts) != 3 {
+			printBoxed("Usage: deletedatas <n> <pattern>")
+			return
+		}
+		n, err := strconv.Atoi(parts[1])
+		if err != nil {
+			printBoxed("Invalid number: " + parts[1])
+			return
+		}
+		pattern := parts[2]
+		
+		// Replace * with sequential numbers 1..n
+		basePattern := strings.TrimSuffix(pattern, "*")
+		
+		successCount := 0
+		start := time.Now()
+		
+		fmt.Printf("Deleting up to %d keys matching %s...\n", n, pattern)
+		
+		for i := 1; i <= n; i++ {
+			key := fmt.Sprintf("%s%d", basePattern, i)
+			if err := server.Del(key); err == nil {
+				successCount++
+			}
+			if i % 100 == 0 {
+				fmt.Print(".")
 			}
-
-		case "help":
-			fmt.Println("Commands:")
-			fmt.Println("  set <key> <val>           Set value")
-			fmt.Println("  get <key>                 Get value")
-			fmt.Println("  del <key>                 Delete key")
-			fmt.Println("  demodata <n> <pat>        Generate n items (e.g. 'demodata 100 user.*')")
-			fmt.Println("  search <pat> [lim] [off]  Search keys (e.g. 'search user.* 10 0')")
-			fmt.Println("  binlog                    Show Raft log indices")
-			fmt.Println("  db                        Show DB applied index")
-			fmt.Println("  stats                     Show node stats")
-			fmt.Println("  join <id> <addr>          Add node to cluster")
-			fmt.Println("  leave <id>                Remove node from cluster")
-
-		default:
-			fmt.Println("Unknown command. Type 'help'.")
 		}
-		fmt.Print("> ")
-	}
+		fmt.Println()
+		
+		printBoxed(fmt.Sprintf("%sDeleted %d keys in %v%s", ColorGreen, successCount, time.Since(start), ColorReset))
+	})
 }
-

+ 6 - 3
example/basic/node1/main.go

@@ -12,7 +12,7 @@ func main() {
 	// Configuration
 	nodeID := "node1"
 	addr := "127.0.0.1:9001"
-	dataDir := "../../data/node1"
+	dataDir := "../data/node1"
 
 	// Initial Cluster configuration (Node 1 + Node 2)
 	clusterNodes := map[string]string{
@@ -41,13 +41,16 @@ func main() {
 		log.Fatalf("Failed to create server: %v", err)
 	}
 
+	// Register demo commands
+	common.RegisterDemoCommands(server.CLI)
+
 	// Start server
 	if err := server.Start(); err != nil {
 		log.Fatalf("Failed to start server: %v", err)
 	}
 	defer server.Stop()
 
-	// Start CLI
-	common.StartCLI(server, nodeID)
+	// Keep main thread running
+	select {}
 }
 

+ 6 - 2
example/basic/node2/main.go

@@ -12,7 +12,7 @@ func main() {
 	// Configuration
 	nodeID := "node2"
 	addr := "127.0.0.1:9002"
-	dataDir := "../../data/node2"
+	dataDir := "../data/node2"
 
 	// Initial Cluster configuration (Node 1 + Node 2)
 	clusterNodes := map[string]string{
@@ -38,11 +38,15 @@ func main() {
 		log.Fatalf("Failed to create server: %v", err)
 	}
 
+	// Register demo commands
+	common.RegisterDemoCommands(server.CLI)
+
 	if err := server.Start(); err != nil {
 		log.Fatalf("Failed to start server: %v", err)
 	}
 	defer server.Stop()
 
-	common.StartCLI(server, nodeID)
+	// Keep main thread running
+	select {}
 }
 

+ 6 - 2
example/basic/node3/main.go

@@ -12,7 +12,7 @@ func main() {
 	// Configuration
 	nodeID := "node3"
 	addr := "127.0.0.1:9003"
-	dataDir := "../../data/node3"
+	dataDir := "../data/node3"
 
 	// Standalone configuration (will be joined to cluster later)
 	clusterNodes := map[string]string{
@@ -37,11 +37,15 @@ func main() {
 		log.Fatalf("Failed to create server: %v", err)
 	}
 
+	// Register demo commands
+	common.RegisterDemoCommands(server.CLI)
+
 	if err := server.Start(); err != nil {
 		log.Fatalf("Failed to start server: %v", err)
 	}
 	defer server.Stop()
 
-	common.StartCLI(server, nodeID)
+	// Keep main thread running
+	select {}
 }
 

+ 6 - 2
example/basic/node4/main.go

@@ -12,7 +12,7 @@ func main() {
 	// Configuration
 	nodeID := "node4"
 	addr := "127.0.0.1:9004"
-	dataDir := "../../data/node4"
+	dataDir := "../data/node4"
 
 	// Standalone configuration
 	clusterNodes := map[string]string{
@@ -37,11 +37,15 @@ func main() {
 		log.Fatalf("Failed to create server: %v", err)
 	}
 
+	// Register demo commands
+	common.RegisterDemoCommands(server.CLI)
+
 	if err := server.Start(); err != nil {
 		log.Fatalf("Failed to start server: %v", err)
 	}
 	defer server.Stop()
 
-	common.StartCLI(server, nodeID)
+	// Keep main thread running
+	select {}
 }
 

+ 20 - 3
raft.go

@@ -2225,8 +2225,24 @@ func (r *Raft) RemoveNode(nodeID string) error {
 		return fmt.Errorf("node %s not found in cluster", nodeID)
 	}
 
-	// Cannot reduce cluster below 1 node
-	if len(r.clusterNodes) <= 1 {
+	// FORCE REMOVAL CHECK
+	// Check connectivity to the node being removed.
+	// If unreachable or invalid, we force remove it without standard consensus if needed,
+	// or at least bypass some strict checks.
+	// For Raft safety, we still need to commit a config change log entry so other nodes know about the removal.
+	// However, if the node is "invalid" (e.g. bad address), it won't respond to RPCs.
+	// We should allow removal even if we can't contact it.
+	
+	// Check validity of address (simple heuristic)
+	nodeAddr := r.clusterNodes[nodeID]
+	isValidNode := true
+	if nodeAddr == "" || strings.HasPrefix(nodeAddr, ".") || !strings.Contains(nodeAddr, ":") {
+		isValidNode = false
+		r.logger.Warn("Removing invalid node %s with address %s", nodeID, nodeAddr)
+	}
+
+	// Cannot reduce cluster below 1 node, unless it's an invalid node cleanup
+	if isValidNode && len(r.clusterNodes) <= 1 {
 		r.mu.Unlock()
 		return fmt.Errorf("cannot remove last node from cluster")
 	}
@@ -2368,7 +2384,8 @@ func (r *Raft) RemoveNodeWithForward(nodeID string) error {
 	}
 
 	// Forward to leader
-	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	// Short timeout for removal as it shouldn't block long
+	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 	defer cancel()
 
 	args := &RemoveNodeArgs{NodeID: nodeID}

+ 16 - 0
raft_ext.go

@@ -0,0 +1,16 @@
+package raft
+
+// GetLogSize returns the current size of the log file in bytes
+func (lm *LogManager) GetLogSize() int64 {
+	lm.mu.RLock()
+	defer lm.mu.RUnlock()
+
+	if hs, ok := lm.storage.(*HybridStorage); ok {
+		info, err := hs.logFile.Stat()
+		if err == nil {
+			return info.Size()
+		}
+	}
+	return 0
+}
+

+ 24 - 0
server.go

@@ -16,6 +16,7 @@ import (
 type KVServer struct {
 	Raft     *Raft
 	DB       *db.Engine
+	CLI      *CLI
 	stopCh   chan struct{}
 	wg       sync.WaitGroup
 	stopOnce sync.Once
@@ -76,9 +77,13 @@ func NewKVServer(config *Config) (*KVServer, error) {
 	s := &KVServer{
 		Raft:   r,
 		DB:     engine,
+		CLI:    nil,
 		stopCh: stopCh,
 	}
 
+	// Initialize CLI
+	s.CLI = NewCLI(s)
+
 	// Start applying entries
 	go s.runApplyLoop(applyCh)
 
@@ -90,6 +95,10 @@ func NewKVServer(config *Config) (*KVServer, error) {
 }
 
 func (s *KVServer) Start() error {
+	// Start CLI if enabled
+	if s.Raft.config.EnableCLI {
+		go s.CLI.Start()
+	}
 	return s.Raft.Start()
 }
 
@@ -274,6 +283,16 @@ func (s *KVServer) GetLeaderID() string {
 	return s.Raft.GetLeaderID()
 }
 
+// GetLogSize returns the raft log size
+func (s *KVServer) GetLogSize() int64 {
+	return s.Raft.log.GetLogSize()
+}
+
+// GetDBSize returns the db size
+func (s *KVServer) GetDBSize() int64 {
+	return s.DB.GetDBSize()
+}
+
 // WatchAll registers a watcher for all keys
 func (s *KVServer) WatchAll(handler WatchHandler) {
 	// s.FSM.WatchAll(handler)
@@ -407,6 +426,11 @@ func (s *KVServer) checkConnections() {
 		}
 		id, addr := kv[0], kv[1]
 
+		// Skip invalid addresses
+		if strings.HasPrefix(addr, ".") || !strings.Contains(addr, ":") {
+			continue
+		}
+
 		if !currentAddrs[addr] {
 			// Found a node that was previously in the cluster but is now missing
 			// Try to add it back

+ 7 - 1
types.go

@@ -216,7 +216,12 @@ type Config struct {
 	// Batching configurations
 	BatchMinWait time.Duration // Minimum wait time for batching (Default: 1ms)
 	BatchMaxWait time.Duration // Maximum wait time for batching (Default: 10ms)
-	BatchMaxSize int           // Maximum batch size before forcing flush (Default: 100)
+	// BatchMaxSize is the maximum batch size before forcing flush (Default: 100)
+	BatchMaxSize int
+
+	// EnableCLI determines if the CLI should be started automatically
+	// Default: false
+	EnableCLI bool
 
 	// Logger for debug output
 	Logger Logger
@@ -286,6 +291,7 @@ func DefaultConfig() *Config {
 		BatchMinWait:            1 * time.Millisecond,
 		BatchMaxWait:            10 * time.Millisecond,
 		BatchMaxSize:            100,
+		EnableCLI:               true,
 	}
 }