package raft import ( "encoding/json" "errors" "fmt" "io" "net" "net/http" "sort" "strings" "sync" "time" "igit.com/xbase/raft/db" ) // KVServer wraps Raft to provide a distributed key-value store type KVServer struct { Raft *Raft DB *db.Engine CLI *CLI AuthManager *AuthManager Watcher *WebHookWatcher httpServer *http.Server stopCh chan struct{} wg sync.WaitGroup stopOnce sync.Once // leavingNodes tracks nodes that are currently being removed // to prevent auto-rejoin/discovery logic from interfering leavingNodes sync.Map // Pending requests for synchronous operations (Read-Your-Writes) pendingRequests map[uint64]chan error pendingMu sync.Mutex } // NewKVServer creates a new KV server func NewKVServer(config *Config) (*KVServer, error) { // Initialize DB Engine // Use a subdirectory for DB to avoid conflict with Raft logs if they share DataDir dbPath := config.DataDir + "/kv_engine" engine, err := db.NewEngine(dbPath) if err != nil { return nil, fmt.Errorf("failed to create db engine: %w", err) } // Initialize LastAppliedIndex from DB to prevent re-applying entries config.LastAppliedIndex = engine.GetLastAppliedIndex() // Create stop channel early for use in callbacks stopCh := make(chan struct{}) // Configure snapshot provider config.SnapshotProvider = func(minIncludeIndex uint64) ([]byte, error) { // Wait for DB to catch up to the requested index // This is critical for data integrity during compaction for engine.GetLastAppliedIndex() < minIncludeIndex { select { case <-stopCh: return nil, fmt.Errorf("server stopping") default: time.Sleep(10 * time.Millisecond) } } // Force sync to disk to ensure data durability before compaction // This prevents data loss if Raft logs are compacted but DB data is only in OS cache if err := engine.Sync(); err != nil { return nil, fmt.Errorf("failed to sync engine before snapshot: %w", err) } return engine.Snapshot() } // Configure get handler for remote reads config.GetHandler = func(key string) (string, bool) { return engine.Get(key) } applyCh := make(chan ApplyMsg, 1000) // Increase buffer for async processing transport := NewTCPTransport(config.ListenAddr, 10, config.Logger) // Initialize WebHookWatcher // 5 workers, 3 retries watcher := NewWebHookWatcher(5, 3, config.Logger) r, err := NewRaft(config, transport, applyCh) if err != nil { engine.Close() return nil, err } s := &KVServer{ Raft: r, DB: engine, CLI: nil, AuthManager: nil, // initialized below Watcher: watcher, stopCh: stopCh, pendingRequests: make(map[uint64]chan error), } // Initialize AuthManager s.AuthManager = NewAuthManager(s) // Load Auth Data from DB (if any) if err := s.AuthManager.LoadFromDB(); err != nil { s.Raft.config.Logger.Warn("Failed to load auth data from DB: %v", err) } // Initialize CLI s.CLI = NewCLI(s) // Start applying entries go s.runApplyLoop(applyCh) // Start background maintenance loop s.wg.Add(1) go s.maintenanceLoop() return s, nil } func (s *KVServer) Start() error { // Start CLI if enabled if s.Raft.config.EnableCLI { go s.CLI.Start() } // Start HTTP Server if configured if s.Raft.config.HTTPAddr != "" { if err := s.startHTTPServer(s.Raft.config.HTTPAddr); err != nil { s.Raft.config.Logger.Warn("Failed to start HTTP server: %v", err) } } // Start TCP Server (Hardcoded offset +10 for demo or config?) // User didn't add TCPPort to config, so let's derive it or just fix it. // For demo, if ListenAddr is 127.0.0.1:9001, we try 9011. host, port, _ := net.SplitHostPort(s.Raft.config.ListenAddr) if port == "9001" { tcpAddr := fmt.Sprintf("%s:%s", host, "9011") if err := s.StartTCPServer(tcpAddr); err != nil { s.Raft.config.Logger.Warn("Failed to start TCP server: %v", err) } } else if port == "9002" { tcpAddr := fmt.Sprintf("%s:%s", host, "9012") s.StartTCPServer(tcpAddr) } else { // Fallback or ignore for other nodes in demo } return s.Raft.Start() } func (s *KVServer) Stop() error { var err error s.stopOnce.Do(func() { // Stop maintenance loop if s.stopCh != nil { close(s.stopCh) s.wg.Wait() } // Stop Watcher if s.Watcher != nil { s.Watcher.Stop() } // Stop HTTP Server if s.httpServer != nil { s.httpServer.Close() } // Stop Raft first if errRaft := s.Raft.Stop(); errRaft != nil { err = errRaft } // Close DB if s.DB != nil { if errDB := s.DB.Close(); errDB != nil { // Combine errors if both fail if err != nil { err = fmt.Errorf("raft stop error: %v, db close error: %v", err, errDB) } else { err = errDB } } } }) return err } func (s *KVServer) runApplyLoop(applyCh chan ApplyMsg) { for msg := range applyCh { if msg.CommandValid { // Optimization: Skip if already applied // We check this here to avoid unmarshalling and locking DB for known duplicates if msg.CommandIndex <= s.DB.GetLastAppliedIndex() { // Even if duplicate, we might have someone waiting for it if they just proposed it // but this is unlikely for duplicates. // However, notify waiters just in case. s.notifyPending(msg.CommandIndex, nil) continue } var cmd KVCommand if err := json.Unmarshal(msg.Command, &cmd); err != nil { s.Raft.config.Logger.Error("Failed to unmarshal command: %v", err) s.notifyPending(msg.CommandIndex, err) continue } var err error switch cmd.Type { case KVSet: // Update Auth Cache for system keys // This includes AuthLockPrefix now if strings.HasPrefix(cmd.Key, SystemKeyPrefix) { s.AuthManager.UpdateCache(cmd.Key, cmd.Value, false) } err = s.DB.Set(cmd.Key, cmd.Value, msg.CommandIndex) if err == nil { s.Watcher.Notify(cmd.Key, cmd.Value, KVSet) } case KVDel: // Update Auth Cache for system keys if strings.HasPrefix(cmd.Key, SystemKeyPrefix) { s.AuthManager.UpdateCache(cmd.Key, "", true) } err = s.DB.Delete(cmd.Key, msg.CommandIndex) if err == nil { s.Watcher.Notify(cmd.Key, "", KVDel) } default: s.Raft.config.Logger.Error("Unknown command type: %d", cmd.Type) } if err != nil { s.Raft.config.Logger.Error("DB Apply failed: %v", err) } // Notify anyone waiting for this index s.notifyPending(msg.CommandIndex, err) } else if msg.SnapshotValid { if err := s.DB.Restore(msg.Snapshot); err != nil { s.Raft.config.Logger.Error("DB Restore failed: %v", err) } } } } // notifyPending notifies any waiting requests for the given index func (s *KVServer) notifyPending(index uint64, err error) { s.pendingMu.Lock() defer s.pendingMu.Unlock() if ch, ok := s.pendingRequests[index]; ok { // Non-blocking send in case listener is gone (buffered channel recommended) select { case ch <- err: default: } close(ch) delete(s.pendingRequests, index) } } // SetAuthenticatedAsync sets a key-value pair asynchronously with permission check func (s *KVServer) SetAuthenticatedAsync(key, value, token string) error { if err := s.AuthManager.CheckPermission(token, key, ActionWrite, value); err != nil { return err } return s.Set(key, value) } // SetAuthenticated sets a key-value pair with permission check func (s *KVServer) SetAuthenticated(key, value, token string) error { if err := s.AuthManager.CheckPermission(token, key, ActionWrite, value); err != nil { return err } // Use SetSync for consistency in CLI/API return s.SetSync(key, value) } // DelAuthenticated deletes a key with permission check func (s *KVServer) DelAuthenticated(key, token string) error { if err := s.AuthManager.CheckPermission(token, key, ActionWrite, ""); err != nil { return err } // Use SetSync (via Del which should be updated or create DelSync) // For simplicity, we implement DelSync logic here or update Del to be sync? // Let's implement DelSync return s.DelSync(key) } // Set sets a key-value pair (Async - eventually consistent) func (s *KVServer) Set(key, value string) error { cmd := KVCommand{ Type: KVSet, Key: key, Value: value, } data, err := json.Marshal(cmd) if err != nil { return err } _, _, err = s.Raft.ProposeWithForward(data) return err } // SetSync sets a key-value pair and waits for it to be applied (Read-Your-Writes) func (s *KVServer) SetSync(key, value string) error { cmd := KVCommand{ Type: KVSet, Key: key, Value: value, } data, err := json.Marshal(cmd) if err != nil { return err } index, _, err := s.Raft.ProposeWithForward(data) if err != nil { return err } // Wait for application ch := make(chan error, 1) s.pendingMu.Lock() s.pendingRequests[index] = ch s.pendingMu.Unlock() select { case applyErr := <-ch: return applyErr case <-time.After(5 * time.Second): s.pendingMu.Lock() delete(s.pendingRequests, index) s.pendingMu.Unlock() return fmt.Errorf("timeout waiting for apply") } } // Del deletes a key (Async) func (s *KVServer) Del(key string) error { cmd := KVCommand{ Type: KVDel, Key: key, } data, err := json.Marshal(cmd) if err != nil { return err } _, _, err = s.Raft.ProposeWithForward(data) return err } // DelSync deletes a key and waits for it to be applied func (s *KVServer) DelSync(key string) error { cmd := KVCommand{ Type: KVDel, Key: key, } data, err := json.Marshal(cmd) if err != nil { return err } index, _, err := s.Raft.ProposeWithForward(data) if err != nil { return err } // Wait for application ch := make(chan error, 1) s.pendingMu.Lock() s.pendingRequests[index] = ch s.pendingMu.Unlock() select { case applyErr := <-ch: return applyErr case <-time.After(5 * time.Second): s.pendingMu.Lock() delete(s.pendingRequests, index) s.pendingMu.Unlock() return fmt.Errorf("timeout waiting for apply") } } // GetAuthenticated gets a value with permission check (local read) func (s *KVServer) GetAuthenticated(key, token string) (string, bool, error) { if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil { return "", false, err } val, ok := s.Get(key) return val, ok, nil } // Get gets a value (local read, can be stale) // For linearizable reads, use GetLinear instead func (s *KVServer) Get(key string) (string, bool) { return s.DB.Get(key) } // GetLinearAuthenticated gets a value with linearizable consistency and permission check func (s *KVServer) GetLinearAuthenticated(key, token string) (string, bool, error) { if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil { return "", false, err } return s.GetLinear(key) } // Logout invalidates the current session func (s *KVServer) Logout(token string) error { return s.AuthManager.Logout(token) } // GetSessionInfo returns the session details func (s *KVServer) GetSessionInfo(token string) (*Session, error) { return s.AuthManager.GetSession(token) } // IsRoot checks if the token belongs to the root user func (s *KVServer) IsRoot(token string) bool { sess, err := s.AuthManager.GetSession(token) if err != nil { return false } return sess.Username == "root" } // CreateUser creates a new user (Root only) func (s *KVServer) CreateUser(username, password string, roles []string, token string) error { if s.AuthManager.IsEnabled() && !s.IsRoot(token) { return fmt.Errorf("permission denied: root access required") } // Use RegisterUserSync return s.AuthManager.RegisterUser(username, password, roles) } // DeleteUser deletes a user (Root only) func (s *KVServer) DeleteUser(username string, token string) error { if s.AuthManager.IsEnabled() && !s.IsRoot(token) { return fmt.Errorf("permission denied: root access required") } // Check if user exists if _, err := s.AuthManager.GetUser(username); err != nil { return err } if username == "root" { return fmt.Errorf("cannot delete root user") } // Use DelSync return s.DelSync(AuthUserPrefix + username) } // UpdateUser updates generic user fields (Root only) func (s *KVServer) UpdateUser(user User, token string) error { if s.AuthManager.IsEnabled() && !s.IsRoot(token) { return fmt.Errorf("permission denied: root access required") } // Check if user exists if _, err := s.AuthManager.GetUser(user.Username); err != nil { return err } return s.AuthManager.UpdateUser(user) } // ChangeUserPassword changes a user's password (Root or Self) func (s *KVServer) ChangeUserPassword(username, newPassword string, token string) error { if s.AuthManager.IsEnabled() { session, err := s.AuthManager.GetSession(token) if err != nil { return err } if session.Username != "root" && session.Username != username { return fmt.Errorf("permission denied") } } // Use ChangePassword (which should use SetSync) return s.AuthManager.ChangePassword(username, newPassword) } // Role Management Helpers // CreateRole creates a new role (Root only) func (s *KVServer) CreateRole(name string, token string) error { if s.AuthManager.IsEnabled() && !s.IsRoot(token) { return fmt.Errorf("permission denied: root access required") } return s.AuthManager.CreateRole(name) } // DeleteRole deletes a role (Root only) func (s *KVServer) DeleteRole(name string, token string) error { if s.AuthManager.IsEnabled() && !s.IsRoot(token) { return fmt.Errorf("permission denied: root access required") } return s.AuthManager.DeleteRole(name) } // UpdateRole updates a role (Root only) func (s *KVServer) UpdateRole(role Role, token string) error { if s.AuthManager.IsEnabled() && !s.IsRoot(token) { return fmt.Errorf("permission denied: root access required") } return s.AuthManager.UpdateRole(role) } // ListUsers lists all users (Root only) func (s *KVServer) ListUsers(token string) ([]*User, error) { if s.AuthManager.IsEnabled() && !s.IsRoot(token) { return nil, fmt.Errorf("permission denied: root access required") } return s.AuthManager.ListUsers(), nil } // ListRoles lists all roles (Root only) func (s *KVServer) ListRoles(token string) ([]*Role, error) { if s.AuthManager.IsEnabled() && !s.IsRoot(token) { return nil, fmt.Errorf("permission denied: root access required") } return s.AuthManager.ListRoles(), nil } // SearchAuthenticated searches keys with permission checks func (s *KVServer) SearchAuthenticated(pattern string, limit, offset int, token string) ([]db.QueryResult, error) { // Optimization: If user has full access (*), delegate to DB with limit/offset if s.AuthManager.HasFullAccess(token) { sql := fmt.Sprintf("key like \"%s\" LIMIT %d OFFSET %d", pattern, limit, offset) return s.DB.Query(sql) } // Slow path: Fetch all potential matches and filter // We construct a SQL query that retrieves CANDIDATES. // We cannot safely apply LIMIT/OFFSET in SQL because we might filter out some results. // So we must fetch ALL matches. // WARNING: This can be slow and memory intensive. // If pattern is wildcard, we might fetch everything. sql := fmt.Sprintf("key like \"%s\"", pattern) results, err := s.DB.Query(sql) if err != nil { return nil, err } filtered := make([]db.QueryResult, 0, len(results)) // Apply Filtering for _, r := range results { if err := s.AuthManager.CheckPermission(token, r.Key, ActionRead, ""); err == nil { filtered = append(filtered, r) } } // Apply Pagination on Filtered Results if offset > len(filtered) { return []db.QueryResult{}, nil } start := offset end := offset + limit if end > len(filtered) { end = len(filtered) } return filtered[start:end], nil } // CountAuthenticated counts keys with permission checks func (s *KVServer) CountAuthenticated(pattern string, token string) (int, error) { // Optimization: If user has full access if s.AuthManager.HasFullAccess(token) { sql := "" if pattern == "*" { sql = "*" } else { sql = fmt.Sprintf("key like \"%s\"", pattern) } return s.DB.Count(sql) } // Slow path: Iterate and check // We use DB.Query to get keys (Query returns values too, which is wasteful but DB engine doesn't expose keys-only Query yet) // Actually, we can access s.DB.Index.WalkPrefix if we want to be faster and avoid value read, // but s.DB.Index is inside 'db' package. We can access it if it's exported. // 'db' package exports 'Engine' and 'FlatIndex'. // So s.DB.Index IS accessible. count := 0 // Determine prefix from pattern prefix := "" if strings.HasSuffix(pattern, "*") { prefix = strings.TrimSuffix(pattern, "*") } else if pattern == "*" { prefix = "" } else { // Exact match check if err := s.AuthManager.CheckPermission(token, pattern, ActionRead, ""); err == nil { // Check if exists if _, ok := s.DB.Get(pattern); ok { return 1, nil } return 0, nil } return 0, nil // No perm or not found } // Walk // Note: WalkPrefix locks the DB Index (Read Lock). // Calling CheckPermission inside might involve some logic but it is memory-only and usually fast. // However, if CheckPermission takes time, we hold DB lock. s.DB.Index.WalkPrefix(prefix, func(key string, entry db.IndexEntry) bool { // Check pattern match first (WalkPrefix is just prefix, pattern might be more complex like "user.*.name") if !db.WildcardMatch(key, pattern) { return true } if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err == nil { count++ } return true }) return count, nil } // GetLinear gets a value with linearizable consistency // This ensures the read sees all writes committed before the read started func (s *KVServer) GetLinear(key string) (string, bool, error) { // First, ensure we have up-to-date data via ReadIndex _, err := s.Raft.ReadIndex() if err != nil { // If we're not leader, try forwarding if errors.Is(err, ErrNotLeader) { return s.forwardGet(key) } return "", false, err } val, ok := s.DB.Get(key) return val, ok, nil } // forwardGet forwards a get request to the leader func (s *KVServer) forwardGet(key string) (string, bool, error) { return s.Raft.ForwardGet(key) } // Join joins an existing cluster func (s *KVServer) Join(nodeID, addr string) error { return s.Raft.AddNodeWithForward(nodeID, addr) } // Leave leaves the cluster func (s *KVServer) Leave(nodeID string) error { // Mark node as leaving to prevent auto-rejoin s.leavingNodes.Store(nodeID, time.Now()) // Auto-expire the leaving flag after a while go func() { time.Sleep(30 * time.Second) s.leavingNodes.Delete(nodeID) }() // Remove from RaftNode discovery key first to prevent auto-rejoin if err := s.removeNodeFromDiscovery(nodeID); err != nil { s.Raft.config.Logger.Warn("Failed to remove node from discovery key: %v", err) // Continue anyway, as the main goal is to leave the cluster } return s.Raft.RemoveNodeWithForward(nodeID) } // removeNodeFromDiscovery removes a node from the RaftNode key to prevent auto-rejoin func (s *KVServer) removeNodeFromDiscovery(targetID string) error { val, ok := s.Get("RaftNode") if !ok || val == "" { return nil } parts := strings.Split(val, ";") var newParts []string changed := false for _, part := range parts { if part == "" { continue } kv := strings.SplitN(part, "=", 2) if len(kv) == 2 { if kv[0] == targetID { changed = true continue // Skip this node } newParts = append(newParts, part) } } if changed { newVal := strings.Join(newParts, ";") return s.Set("RaftNode", newVal) } return nil } // WaitForLeader waits until a leader is elected func (s *KVServer) WaitForLeader(timeout time.Duration) error { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { leader := s.Raft.GetLeaderID() if leader != "" { return nil } time.Sleep(100 * time.Millisecond) } return fmt.Errorf("timeout waiting for leader") } // HealthCheck returns the health status of this server func (s *KVServer) HealthCheck() HealthStatus { return s.Raft.HealthCheck() } // GetStats returns runtime statistics func (s *KVServer) GetStats() Stats { return s.Raft.GetStats() } // GetMetrics returns runtime metrics func (s *KVServer) GetMetrics() Metrics { return s.Raft.GetMetrics() } // TransferLeadership transfers leadership to the specified node func (s *KVServer) TransferLeadership(targetID string) error { return s.Raft.TransferLeadership(targetID) } // GetClusterNodes returns current cluster membership func (s *KVServer) GetClusterNodes() map[string]string { return s.Raft.GetClusterNodes() } // IsLeader returns true if this node is the leader func (s *KVServer) IsLeader() bool { _, isLeader := s.Raft.GetState() return isLeader } // GetLeaderID returns the current leader ID func (s *KVServer) GetLeaderID() string { return s.Raft.GetLeaderID() } // GetLogSize returns the raft log size func (s *KVServer) GetLogSize() int64 { return s.Raft.log.GetLogSize() } // GetDBSize returns the db size func (s *KVServer) GetDBSize() int64 { return s.DB.GetDBSize() } // WatchURL registers a webhook url for a key func (s *KVServer) WatchURL(key, url string) { s.Watcher.Subscribe(key, url) } // UnwatchURL removes a webhook url for a key func (s *KVServer) UnwatchURL(key, url string) { s.Watcher.Unsubscribe(key, url) } // WatchAll registers a watcher for all keys func (s *KVServer) WatchAll(handler WatchHandler) { // s.FSM.WatchAll(handler) // TODO: Implement Watcher for DB } // Watch registers a watcher for a key func (s *KVServer) Watch(key string, handler WatchHandler) { // s.FSM.Watch(key, handler) // TODO: Implement Watcher for DB } // Unwatch removes watchers for a key func (s *KVServer) Unwatch(key string) { // s.FSM.Unwatch(key) // TODO: Implement Watcher for DB } func (s *KVServer) maintenanceLoop() { defer s.wg.Done() // Check every 1 second for faster reaction ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-s.stopCh: return case <-ticker.C: s.updateNodeInfo() s.checkConnections() } } } func (s *KVServer) updateNodeInfo() { // 1. Ensure "CreateNode/" is set to self address // We do this via Propose (Set) so it's replicated myID := s.Raft.config.NodeID myAddr := s.Raft.config.ListenAddr key := fmt.Sprintf("CreateNode/%s", myID) // Check if we need to update (avoid spamming logs/proposals) val, exists := s.Get(key) if !exists || val != myAddr { // Run in goroutine to avoid blocking go func() { if err := s.Set(key, myAddr); err != nil { s.Raft.config.Logger.Debug("Failed to update node info: %v", err) } }() } // 2. Only leader updates RaftNode aggregation if s.IsLeader() { // Read current RaftNode to preserve history currentVal, _ := s.Get("RaftNode") knownNodes := make(map[string]string) if currentVal != "" { parts := strings.Split(currentVal, ";") for _, part := range parts { if part == "" { continue } kv := strings.SplitN(part, "=", 2) if len(kv) == 2 { knownNodes[kv[0]] = kv[1] } } } // Merge current cluster nodes changed := false currentCluster := s.GetClusterNodes() for id, addr := range currentCluster { // Skip nodes that are marked as leaving if _, leaving := s.leavingNodes.Load(id); leaving { continue } if knownNodes[id] != addr { knownNodes[id] = addr changed = true } } // If changed, update RaftNode if changed { var peers []string for id, addr := range knownNodes { peers = append(peers, fmt.Sprintf("%s=%s", id, addr)) } sort.Strings(peers) newVal := strings.Join(peers, ";") // Check again if we need to write to avoid loops if Get returned stale if newVal != currentVal { go func(k, v string) { if err := s.Set(k, v); err != nil { s.Raft.config.Logger.Warn("Failed to update RaftNode key: %v", err) } }("RaftNode", newVal) } } } } func (s *KVServer) checkConnections() { if !s.IsLeader() { return } // Read RaftNode key to find potential members that are missing val, ok := s.Get("RaftNode") if !ok || val == "" { return } // Parse saved nodes savedParts := strings.Split(val, ";") currentNodes := s.GetClusterNodes() // Invert currentNodes for address check currentAddrs := make(map[string]bool) for _, addr := range currentNodes { currentAddrs[addr] = true } for _, part := range savedParts { if part == "" { continue } // Expect id=addr kv := strings.SplitN(part, "=", 2) if len(kv) != 2 { continue } id, addr := kv[0], kv[1] // Skip invalid addresses if strings.HasPrefix(addr, ".") || !strings.Contains(addr, ":") { continue } if !currentAddrs[addr] { // Skip nodes that are marked as leaving if _, leaving := s.leavingNodes.Load(id); leaving { continue } // Found a node that was previously in the cluster but is now missing // Try to add it back // We use AddNodeWithForward which handles non-blocking internally somewhat, // but we should run this in goroutine to not block the loop go func(nodeID, nodeAddr string) { // Try to add node s.Raft.config.Logger.Info("Auto-rejoining node found in RaftNode: %s (%s)", nodeID, nodeAddr) if err := s.Join(nodeID, nodeAddr); err != nil { s.Raft.config.Logger.Debug("Failed to auto-rejoin node %s: %v", nodeID, err) } }(id, addr) } } } // startHTTPServer starts the HTTP API server func (s *KVServer) startHTTPServer(addr string) error { mux := http.NewServeMux() // KV API mux.HandleFunc("/kv", func(w http.ResponseWriter, r *http.Request) { token := r.Header.Get("X-Raft-Token") switch r.Method { case http.MethodGet: key := r.URL.Query().Get("key") if key == "" { http.Error(w, "missing key", http.StatusBadRequest) return } // Use Authenticated method val, found, err := s.GetLinearAuthenticated(key, token) if err != nil { // Distinguish auth error vs raft error? if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") { http.Error(w, err.Error(), http.StatusForbidden) } else { http.Error(w, err.Error(), http.StatusInternalServerError) } return } if !found { http.Error(w, "not found", http.StatusNotFound) return } w.Write([]byte(val)) case http.MethodPost: body, _ := io.ReadAll(r.Body) var req struct { Key string `json:"key"` Value string `json:"value"` } if err := json.Unmarshal(body, &req); err != nil { http.Error(w, "invalid json", http.StatusBadRequest) return } if err := s.SetAuthenticated(req.Key, req.Value, token); err != nil { if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") { http.Error(w, err.Error(), http.StatusForbidden) } else { http.Error(w, err.Error(), http.StatusInternalServerError) } return } w.WriteHeader(http.StatusOK) case http.MethodDelete: key := r.URL.Query().Get("key") if key == "" { http.Error(w, "missing key", http.StatusBadRequest) return } if err := s.DelAuthenticated(key, token); err != nil { if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") { http.Error(w, err.Error(), http.StatusForbidden) } else { http.Error(w, err.Error(), http.StatusInternalServerError) } return } w.WriteHeader(http.StatusOK) default: http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } }) // Auth API mux.HandleFunc("/auth/login", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } var req struct { Username string `json:"username"` Password string `json:"password"` Code string `json:"code"` } body, _ := io.ReadAll(r.Body) if err := json.Unmarshal(body, &req); err != nil { http.Error(w, "invalid json", http.StatusBadRequest) return } ip := r.RemoteAddr if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil { ip = host } token, err := s.AuthManager.Login(req.Username, req.Password, req.Code, ip) if err != nil { http.Error(w, err.Error(), http.StatusUnauthorized) return } resp := struct { Token string `json:"token"` }{Token: token} json.NewEncoder(w).Encode(resp) }) // Watcher API mux.HandleFunc("/watch", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } body, _ := io.ReadAll(r.Body) var req struct { Key string `json:"key"` URL string `json:"url"` } if err := json.Unmarshal(body, &req); err != nil { http.Error(w, "invalid json", http.StatusBadRequest) return } if req.Key == "" || req.URL == "" { http.Error(w, "missing key or url", http.StatusBadRequest) return } s.WatchURL(req.Key, req.URL) w.WriteHeader(http.StatusOK) }) mux.HandleFunc("/unwatch", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } body, _ := io.ReadAll(r.Body) var req struct { Key string `json:"key"` URL string `json:"url"` } if err := json.Unmarshal(body, &req); err != nil { http.Error(w, "invalid json", http.StatusBadRequest) return } s.UnwatchURL(req.Key, req.URL) w.WriteHeader(http.StatusOK) }) s.httpServer = &http.Server{ Addr: addr, Handler: mux, } go func() { s.Raft.config.Logger.Info("HTTP API server listening on %s", addr) if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { s.Raft.config.Logger.Error("HTTP server failed: %v", err) } }() return nil }