package raft import ( "bufio" "encoding/json" "fmt" "net" "runtime" "strconv" "strings" ) // TCPClientSession holds state for a TCP connection type TCPClientSession struct { conn net.Conn server *KVServer token string username string reader *bufio.Reader writer *bufio.Writer remoteAddr string } func (s *KVServer) StartTCPServer(addr string) error { listener, err := net.Listen("tcp", addr) if err != nil { return err } s.Raft.config.Logger.Info("TCP API server listening on %s", addr) go func() { defer listener.Close() for { conn, err := listener.Accept() if err != nil { if s.stopCh != nil { select { case <-s.stopCh: return default: } } s.Raft.config.Logger.Error("TCP Accept error: %v", err) continue } go s.handleTCPConnection(conn) } }() return nil } func (s *KVServer) handleTCPConnection(conn net.Conn) { session := &TCPClientSession{ conn: conn, server: s, reader: bufio.NewReader(conn), writer: bufio.NewWriter(conn), remoteAddr: conn.RemoteAddr().String(), } defer conn.Close() for { line, err := session.reader.ReadString('\n') if err != nil { return // Connection closed } line = strings.TrimSpace(line) if line == "" { continue } parts := strings.Fields(line) if len(parts) == 0 { continue } cmd := strings.ToUpper(parts[0]) var resp string switch cmd { case "LOGIN": if len(parts) < 3 { resp = "ERR usage: LOGIN [otp]" } else { user := parts[1] pass := parts[2] otp := "" if len(parts) > 3 { otp = parts[3] } // Extract IP ip := session.remoteAddr if host, _, err := net.SplitHostPort(ip); err == nil { ip = host } token, err := s.AuthManager.Login(user, pass, otp, ip) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { session.token = token session.username = user resp = fmt.Sprintf("OK %s", token) } } case "AUTH": if len(parts) < 2 { resp = "ERR usage: AUTH " } else { token := parts[1] // Verify token sess, err := s.AuthManager.GetSession(token) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { session.token = token session.username = sess.Username resp = "OK" } } case "LOGOUT": if session.token != "" { s.AuthManager.Logout(session.token) session.token = "" session.username = "" } resp = "OK" case "GET": if len(parts) < 2 { resp = "ERR usage: GET " } else { key := parts[1] val, found, err := s.GetLinearAuthenticated(key, session.token) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else if !found { resp = "ERR not found" } else { resp = fmt.Sprintf("OK %s", val) } } case "SET": if len(parts) < 3 { resp = "ERR usage: SET " } else { key := parts[1] // Value might contain spaces, join the rest val := strings.Join(parts[2:], " ") err := s.SetAuthenticated(key, val, session.token) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { resp = "OK" } } case "DEL": if len(parts) < 2 { resp = "ERR usage: DEL " } else { key := parts[1] err := s.DelAuthenticated(key, session.token) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { resp = "OK" } } case "WHOAMI": if session.username == "" { resp = "Guest" } else { resp = session.username } case "HELP": helpText := `Available Commands: GET - Get value SET - Set value DEL - Delete value SEARCH [limit] - Search keys (e.g. user.*) COUNT - Count keys INFO - Show system stats WHOAMI - Show current user JOIN - Add node (Root only) LEAVE - Remove node (Root only) USER_LIST - List users (Admin) ROLE_LIST - List roles (Admin) LOGIN/LOGOUT/EXIT` resp = "OK " + helpText case "INFO": // Check permission (Root only if auth enabled) if s.AuthManager.IsEnabled() { sess, err := s.AuthManager.GetSession(session.token) if err != nil { resp = fmt.Sprintf("ERR %v", err) break } if sess.Username != "root" { resp = "ERR Permission Denied: Root access required" break } } // Gather stats stats := s.GetStats() health := s.HealthCheck() dbSize := s.GetDBSize() logSize := s.GetLogSize() var m runtime.MemStats runtime.ReadMemStats(&m) // Construct JSON response info := map[string]interface{}{ "node": map[string]interface{}{ "id": health.NodeID, "state": health.State, "term": health.Term, "leader": health.LeaderID, "healthy": health.IsHealthy, }, "storage": map[string]interface{}{ "db_size": dbSize, "log_size": logSize, "mem_alloc": m.Alloc, "mem_sys": m.Sys, "num_gc": m.NumGC, }, "indices": map[string]interface{}{ "commit_index": stats.CommitIndex, "applied_index": stats.LastApplied, "last_log_index": stats.LastLogIndex, "db_applied": s.DB.GetLastAppliedIndex(), }, "cluster": stats.ClusterNodes, "cluster_size": stats.ClusterSize, } data, err := json.Marshal(info) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { resp = "OK " + string(data) } case "SEARCH": // Usage: SEARCH [limit] [offset] if len(parts) < 2 { resp = "ERR usage: SEARCH [limit] [offset]" } else { pattern := parts[1] limit := 20 offset := 0 if len(parts) >= 3 { if l, err := strconv.Atoi(parts[2]); err == nil { limit = l } } if len(parts) >= 4 { if o, err := strconv.Atoi(parts[3]); err == nil { offset = o } } results, err := s.SearchAuthenticated(pattern, limit, offset, session.token) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { data, _ := json.Marshal(results) resp = "OK " + string(data) } } case "COUNT": // Usage: COUNT if len(parts) < 2 { resp = "ERR usage: COUNT " } else { pattern := parts[1] count, err := s.CountAuthenticated(pattern, session.token) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { resp = fmt.Sprintf("OK %d", count) } } case "JOIN": // Usage: JOIN // Admin only if s.AuthManager.IsEnabled() { sess, err := s.AuthManager.GetSession(session.token) if err != nil || sess.Username != "root" { resp = "ERR Permission Denied: Root access required" break } } if len(parts) < 3 { resp = "ERR usage: JOIN " } else { err := s.Join(parts[1], parts[2]) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { resp = "OK Join request sent" } } case "LEAVE": // Usage: LEAVE // Admin only if s.AuthManager.IsEnabled() { sess, err := s.AuthManager.GetSession(session.token) if err != nil || sess.Username != "root" { resp = "ERR Permission Denied: Root access required" break } } if len(parts) < 2 { resp = "ERR usage: LEAVE " } else { err := s.Leave(parts[1]) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { resp = "OK Leave request sent" } } // --- Admin Commands --- case "USER_LIST": users := s.AuthManager.ListUsers() data, err := json.Marshal(users) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { // Ensure it's a single line for TCP protocol simplicity jsonStr := string(data) // JSON marshal might include newlines if indentation was used (it's not by default, but safe to check) // However, standard json.Marshal does not indent. resp = fmt.Sprintf("OK %s", jsonStr) } case "ROLE_LIST": roles := s.AuthManager.ListRoles() data, err := json.Marshal(roles) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { resp = fmt.Sprintf("OK %s", string(data)) } case "USER_CREATE": // Usage: USER_CREATE if len(parts) < 3 { resp = "ERR usage: USER_CREATE [roles]" } else { u := parts[1] p := parts[2] var roles []string if len(parts) > 3 { roles = strings.Split(parts[3], ",") } // Use RegisterUser (sync) err := s.AuthManager.RegisterUser(u, p, roles) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { resp = "OK" } } case "ROLE_CREATE": // Usage: ROLE_CREATE if len(parts) < 2 { resp = "ERR usage: ROLE_CREATE " } else { name := parts[1] err := s.AuthManager.CreateRole(name) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { resp = "OK" } } case "ROLE_PERMISSION_ADD": // Usage: ROLE_PERMISSION_ADD // Actions: comma separated list of actions (read,write,admin,*) if len(parts) < 4 { resp = "ERR usage: ROLE_PERMISSION_ADD " } else { roleName := parts[1] pattern := parts[2] actionsStr := parts[3] actions := strings.Split(actionsStr, ",") rolePtr, err := s.AuthManager.GetRole(roleName) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { // Create a copy to modify role := *rolePtr // Deep copy permissions to avoid potential side effects on cached object originalPerms := role.Permissions role.Permissions = make([]Permission, len(originalPerms)) copy(role.Permissions, originalPerms) newPerm := Permission{ KeyPattern: pattern, Actions: actions, } role.Permissions = append(role.Permissions, newPerm) err := s.AuthManager.UpdateRole(role) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { resp = "OK" } } } case "USER_UNLOCK": // Usage: USER_UNLOCK if len(parts) < 2 { resp = "ERR usage: USER_UNLOCK " } else { // Manually clear the lock key // Note: accessing server.Set directly bypasses auth check which is fine here // as the TCP session itself should be authenticated as admin ideally. // For now we trust the connected client has rights or we check session. // In real impl, check if session.username is root or has admin perm. userToUnlock := parts[1] // We use Del to remove the lock key err := s.Del("system.lock." + userToUnlock) if err != nil { resp = fmt.Sprintf("ERR %v", err) } else { resp = "OK" } } case "EXIT", "QUIT": session.writer.WriteString("BYE\n") session.writer.Flush() return default: s.Raft.config.Logger.Warn("Unknown command received: %s (parts: %v)", cmd, parts) resp = fmt.Sprintf("ERR unknown command: %s", cmd) } session.writer.WriteString(resp + "\n") session.writer.Flush() } }