package main import ( "bufio" "encoding/json" "fmt" "io" "log" "net" "net/http" "strings" "sync" ) // Configuration const ( TargetTCPAddr = "127.0.0.1:9011" WebPort = ":8088" ) type APIRequest struct { Command string `json:"command"` Args []string `json:"args"` } type APIResponse struct { Status string `json:"status"` Data any `json:"data,omitempty"` Error string `json:"error,omitempty"` } // TCPPool implements a simple connection pool type TCPPool struct { mu sync.Mutex conns []net.Conn addr string } func NewTCPPool(addr string) *TCPPool { return &TCPPool{ addr: addr, conns: make([]net.Conn, 0), } } func (p *TCPPool) Get() (net.Conn, error) { p.mu.Lock() if len(p.conns) > 0 { conn := p.conns[len(p.conns)-1] p.conns = p.conns[:len(p.conns)-1] p.mu.Unlock() return conn, nil } p.mu.Unlock() return net.Dial("tcp", p.addr) } func (p *TCPPool) Put(conn net.Conn) { p.mu.Lock() p.conns = append(p.conns, conn) p.mu.Unlock() } func (p *TCPPool) Discard(conn net.Conn) { if conn != nil { conn.Close() } } var pool *TCPPool func main() { // Initialize Connection Pool pool = NewTCPPool(TargetTCPAddr) // Serve static files (HTML) http.HandleFunc("/", handleIndex) // API Endpoint http.HandleFunc("/api/proxy", handleAPI) fmt.Printf("Web Admin started at http://localhost%s\n", WebPort) log.Fatal(http.ListenAndServe(WebPort, nil)) } func handleIndex(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/" { http.NotFound(w, r) return } // Serve the embedded HTML directly http.ServeFile(w, r, "example/web_admin/index.html") } func handleAPI(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // 1. Parse Request body, _ := io.ReadAll(r.Body) var req APIRequest if err := json.Unmarshal(body, &req); err != nil { jsonResponse(w, APIResponse{Status: "error", Error: "Invalid JSON"}) return } // 2. Get Token from Header token := r.Header.Get("X-Session-Token") // 3. Connect to TCP (via Pool) conn, err := pool.Get() if err != nil { jsonResponse(w, APIResponse{Status: "error", Error: "Connection failed: 无法连接到后端节点 (127.0.0.1:9011)。请确保 Raft 服务已启动。 (" + err.Error() + ")"}) return } // Helper to execute request with retry logic execute := func(c net.Conn) error { // Set deadline to avoid hanging // c.SetDeadline(time.Now().Add(5 * time.Second)) reader := bufio.NewReader(c) // 4. Authenticate if token present (Except for LOGIN command itself) if req.Command != "LOGIN" && token != "" { fmt.Fprintf(c, "AUTH %s\n", token) authResp, err := reader.ReadString('\n') if err != nil { return err } if !strings.HasPrefix(authResp, "OK") { // Special handling: if auth fails, we return a specific error that is NOT a network error // but we still want to return it to the client. // However, if we are reusing a connection, maybe the previous session is interfering? // AUTH command should override previous session. // If it fails here, it means the token is invalid. return fmt.Errorf("Session Expired") } } // 5. Construct TCP Command cmdStr := req.Command if len(req.Args) > 0 { cmdStr += " " + strings.Join(req.Args, " ") } fmt.Fprintf(c, "%s\n", cmdStr) // 6. Read TCP Response respStr, err := reader.ReadString('\n') if err != nil { return err } respStr = strings.TrimSpace(respStr) // 7. Parse Response if strings.HasPrefix(respStr, "OK") { payload := strings.TrimPrefix(respStr, "OK ") // If payload looks like JSON, try to parse it var jsonData any if strings.HasPrefix(payload, "[") || strings.HasPrefix(payload, "{") { if err := json.Unmarshal([]byte(payload), &jsonData); err == nil { jsonResponse(w, APIResponse{Status: "ok", Data: jsonData}) return nil } } jsonResponse(w, APIResponse{Status: "ok", Data: payload}) } else if strings.HasPrefix(respStr, "ERR") { errMsg := strings.TrimPrefix(respStr, "ERR ") jsonResponse(w, APIResponse{Status: "error", Error: errMsg}) } else { jsonResponse(w, APIResponse{Status: "error", Error: "Unknown response: " + respStr}) } return nil } // Try with pooled connection err = execute(conn) if err != nil { // If error is "Session Expired", it's a logic error, not network. Don't retry. if err.Error() == "Session Expired" { pool.Put(conn) // Connection is fine, token is bad jsonResponse(w, APIResponse{Status: "error", Error: "Session Expired"}) return } // If network error, discard and retry once pool.Discard(conn) // Retry with new connection conn, err = net.Dial("tcp", TargetTCPAddr) if err != nil { jsonResponse(w, APIResponse{Status: "error", Error: "Connection failed: 无法连接到后端节点 (127.0.0.1:9011)。请确保 Raft 服务已启动。 (" + err.Error() + ")"}) return } err = execute(conn) if err != nil { pool.Discard(conn) // Return error to client errMsg := err.Error() if errMsg != "Session Expired" { errMsg = "Connection failed: 无法连接到后端节点。请确保 Raft 服务已启动。 (" + errMsg + ")" } jsonResponse(w, APIResponse{Status: "error", Error: errMsg}) return } // Success on retry pool.Put(conn) } else { // Success on first try pool.Put(conn) } } func jsonResponse(w http.ResponseWriter, resp APIResponse) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(resp) }