server.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "sort"
  10. "strings"
  11. "sync"
  12. "time"
  13. "igit.com/xbase/raft/db"
  14. )
  15. // KVServer wraps Raft to provide a distributed key-value store
  16. type KVServer struct {
  17. Raft *Raft
  18. DB *db.Engine
  19. CLI *CLI
  20. AuthManager *AuthManager
  21. Watcher *WebHookWatcher
  22. httpServer *http.Server
  23. stopCh chan struct{}
  24. wg sync.WaitGroup
  25. stopOnce sync.Once
  26. // leavingNodes tracks nodes that are currently being removed
  27. // to prevent auto-rejoin/discovery logic from interfering
  28. leavingNodes sync.Map
  29. }
  30. // NewKVServer creates a new KV server
  31. func NewKVServer(config *Config) (*KVServer, error) {
  32. // Initialize DB Engine
  33. // Use a subdirectory for DB to avoid conflict with Raft logs if they share DataDir
  34. dbPath := config.DataDir + "/kv_engine"
  35. engine, err := db.NewEngine(dbPath)
  36. if err != nil {
  37. return nil, fmt.Errorf("failed to create db engine: %w", err)
  38. }
  39. // Initialize LastAppliedIndex from DB to prevent re-applying entries
  40. config.LastAppliedIndex = engine.GetLastAppliedIndex()
  41. // Create stop channel early for use in callbacks
  42. stopCh := make(chan struct{})
  43. // Configure snapshot provider
  44. config.SnapshotProvider = func(minIncludeIndex uint64) ([]byte, error) {
  45. // Wait for DB to catch up to the requested index
  46. // This is critical for data integrity during compaction
  47. for engine.GetLastAppliedIndex() < minIncludeIndex {
  48. select {
  49. case <-stopCh:
  50. return nil, fmt.Errorf("server stopping")
  51. default:
  52. time.Sleep(10 * time.Millisecond)
  53. }
  54. }
  55. // Force sync to disk to ensure data durability before compaction
  56. // This prevents data loss if Raft logs are compacted but DB data is only in OS cache
  57. if err := engine.Sync(); err != nil {
  58. return nil, fmt.Errorf("failed to sync engine before snapshot: %w", err)
  59. }
  60. return engine.Snapshot()
  61. }
  62. // Configure get handler for remote reads
  63. config.GetHandler = func(key string) (string, bool) {
  64. return engine.Get(key)
  65. }
  66. applyCh := make(chan ApplyMsg, 1000) // Increase buffer for async processing
  67. transport := NewTCPTransport(config.ListenAddr, 10, config.Logger)
  68. // Initialize WebHookWatcher
  69. // 5 workers, 3 retries
  70. watcher := NewWebHookWatcher(5, 3, config.Logger)
  71. r, err := NewRaft(config, transport, applyCh)
  72. if err != nil {
  73. engine.Close()
  74. return nil, err
  75. }
  76. s := &KVServer{
  77. Raft: r,
  78. DB: engine,
  79. CLI: nil,
  80. AuthManager: nil, // initialized below
  81. Watcher: watcher,
  82. stopCh: stopCh,
  83. }
  84. // Initialize AuthManager
  85. s.AuthManager = NewAuthManager(s)
  86. // Initialize CLI
  87. s.CLI = NewCLI(s)
  88. // Start applying entries
  89. go s.runApplyLoop(applyCh)
  90. // Start background maintenance loop
  91. s.wg.Add(1)
  92. go s.maintenanceLoop()
  93. return s, nil
  94. }
  95. func (s *KVServer) Start() error {
  96. // Start CLI if enabled
  97. if s.Raft.config.EnableCLI {
  98. go s.CLI.Start()
  99. }
  100. // Start HTTP Server if configured
  101. if s.Raft.config.HTTPAddr != "" {
  102. if err := s.startHTTPServer(s.Raft.config.HTTPAddr); err != nil {
  103. s.Raft.config.Logger.Warn("Failed to start HTTP server: %v", err)
  104. }
  105. }
  106. return s.Raft.Start()
  107. }
  108. func (s *KVServer) Stop() error {
  109. var err error
  110. s.stopOnce.Do(func() {
  111. // Stop maintenance loop
  112. if s.stopCh != nil {
  113. close(s.stopCh)
  114. s.wg.Wait()
  115. }
  116. // Stop Watcher
  117. if s.Watcher != nil {
  118. s.Watcher.Stop()
  119. }
  120. // Stop HTTP Server
  121. if s.httpServer != nil {
  122. s.httpServer.Close()
  123. }
  124. // Stop Raft first
  125. if errRaft := s.Raft.Stop(); errRaft != nil {
  126. err = errRaft
  127. }
  128. // Close DB
  129. if s.DB != nil {
  130. if errDB := s.DB.Close(); errDB != nil {
  131. // Combine errors if both fail
  132. if err != nil {
  133. err = fmt.Errorf("raft stop error: %v, db close error: %v", err, errDB)
  134. } else {
  135. err = errDB
  136. }
  137. }
  138. }
  139. })
  140. return err
  141. }
  142. func (s *KVServer) runApplyLoop(applyCh chan ApplyMsg) {
  143. for msg := range applyCh {
  144. if msg.CommandValid {
  145. // Optimization: Skip if already applied
  146. // We check this here to avoid unmarshalling and locking DB for known duplicates
  147. if msg.CommandIndex <= s.DB.GetLastAppliedIndex() {
  148. continue
  149. }
  150. var cmd KVCommand
  151. if err := json.Unmarshal(msg.Command, &cmd); err != nil {
  152. s.Raft.config.Logger.Error("Failed to unmarshal command: %v", err)
  153. continue
  154. }
  155. var err error
  156. switch cmd.Type {
  157. case KVSet:
  158. // Update Auth Cache for system keys
  159. if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
  160. s.AuthManager.UpdateCache(cmd.Key, cmd.Value, false)
  161. }
  162. err = s.DB.Set(cmd.Key, cmd.Value, msg.CommandIndex)
  163. if err == nil {
  164. s.Watcher.Notify(cmd.Key, cmd.Value, KVSet)
  165. }
  166. case KVDel:
  167. // Update Auth Cache for system keys
  168. if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
  169. s.AuthManager.UpdateCache(cmd.Key, "", true)
  170. }
  171. err = s.DB.Delete(cmd.Key, msg.CommandIndex)
  172. if err == nil {
  173. s.Watcher.Notify(cmd.Key, "", KVDel)
  174. }
  175. default:
  176. s.Raft.config.Logger.Error("Unknown command type: %d", cmd.Type)
  177. }
  178. if err != nil {
  179. s.Raft.config.Logger.Error("DB Apply failed: %v", err)
  180. }
  181. } else if msg.SnapshotValid {
  182. if err := s.DB.Restore(msg.Snapshot); err != nil {
  183. s.Raft.config.Logger.Error("DB Restore failed: %v", err)
  184. }
  185. }
  186. }
  187. }
  188. // SetAuthenticated sets a key-value pair with permission check
  189. func (s *KVServer) SetAuthenticated(key, value, token string) error {
  190. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, value); err != nil {
  191. return err
  192. }
  193. return s.Set(key, value)
  194. }
  195. // DelAuthenticated deletes a key with permission check
  196. func (s *KVServer) DelAuthenticated(key, token string) error {
  197. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, ""); err != nil {
  198. return err
  199. }
  200. return s.Del(key)
  201. }
  202. // Set sets a key-value pair
  203. func (s *KVServer) Set(key, value string) error {
  204. cmd := KVCommand{
  205. Type: KVSet,
  206. Key: key,
  207. Value: value,
  208. }
  209. data, err := json.Marshal(cmd)
  210. if err != nil {
  211. return err
  212. }
  213. _, _, err = s.Raft.ProposeWithForward(data)
  214. return err
  215. }
  216. // Del deletes a key
  217. func (s *KVServer) Del(key string) error {
  218. cmd := KVCommand{
  219. Type: KVDel,
  220. Key: key,
  221. }
  222. data, err := json.Marshal(cmd)
  223. if err != nil {
  224. return err
  225. }
  226. _, _, err = s.Raft.ProposeWithForward(data)
  227. return err
  228. }
  229. // GetAuthenticated gets a value with permission check (local read)
  230. func (s *KVServer) GetAuthenticated(key, token string) (string, bool, error) {
  231. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
  232. return "", false, err
  233. }
  234. val, ok := s.Get(key)
  235. return val, ok, nil
  236. }
  237. // Get gets a value (local read, can be stale)
  238. // For linearizable reads, use GetLinear instead
  239. func (s *KVServer) Get(key string) (string, bool) {
  240. return s.DB.Get(key)
  241. }
  242. // GetLinearAuthenticated gets a value with linearizable consistency and permission check
  243. func (s *KVServer) GetLinearAuthenticated(key, token string) (string, bool, error) {
  244. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
  245. return "", false, err
  246. }
  247. return s.GetLinear(key)
  248. }
  249. // GetLinear gets a value with linearizable consistency
  250. // This ensures the read sees all writes committed before the read started
  251. func (s *KVServer) GetLinear(key string) (string, bool, error) {
  252. // First, ensure we have up-to-date data via ReadIndex
  253. _, err := s.Raft.ReadIndex()
  254. if err != nil {
  255. // If we're not leader, try forwarding
  256. if errors.Is(err, ErrNotLeader) {
  257. return s.forwardGet(key)
  258. }
  259. return "", false, err
  260. }
  261. val, ok := s.DB.Get(key)
  262. return val, ok, nil
  263. }
  264. // forwardGet forwards a get request to the leader
  265. func (s *KVServer) forwardGet(key string) (string, bool, error) {
  266. return s.Raft.ForwardGet(key)
  267. }
  268. // Join joins an existing cluster
  269. func (s *KVServer) Join(nodeID, addr string) error {
  270. return s.Raft.AddNodeWithForward(nodeID, addr)
  271. }
  272. // Leave leaves the cluster
  273. func (s *KVServer) Leave(nodeID string) error {
  274. // Mark node as leaving to prevent auto-rejoin
  275. s.leavingNodes.Store(nodeID, time.Now())
  276. // Auto-expire the leaving flag after a while
  277. go func() {
  278. time.Sleep(30 * time.Second)
  279. s.leavingNodes.Delete(nodeID)
  280. }()
  281. // Remove from RaftNode discovery key first to prevent auto-rejoin
  282. if err := s.removeNodeFromDiscovery(nodeID); err != nil {
  283. s.Raft.config.Logger.Warn("Failed to remove node from discovery key: %v", err)
  284. // Continue anyway, as the main goal is to leave the cluster
  285. }
  286. return s.Raft.RemoveNodeWithForward(nodeID)
  287. }
  288. // removeNodeFromDiscovery removes a node from the RaftNode key to prevent auto-rejoin
  289. func (s *KVServer) removeNodeFromDiscovery(targetID string) error {
  290. val, ok := s.Get("RaftNode")
  291. if !ok || val == "" {
  292. return nil
  293. }
  294. parts := strings.Split(val, ";")
  295. var newParts []string
  296. changed := false
  297. for _, part := range parts {
  298. if part == "" {
  299. continue
  300. }
  301. kv := strings.SplitN(part, "=", 2)
  302. if len(kv) == 2 {
  303. if kv[0] == targetID {
  304. changed = true
  305. continue // Skip this node
  306. }
  307. newParts = append(newParts, part)
  308. }
  309. }
  310. if changed {
  311. newVal := strings.Join(newParts, ";")
  312. return s.Set("RaftNode", newVal)
  313. }
  314. return nil
  315. }
  316. // WaitForLeader waits until a leader is elected
  317. func (s *KVServer) WaitForLeader(timeout time.Duration) error {
  318. deadline := time.Now().Add(timeout)
  319. for time.Now().Before(deadline) {
  320. leader := s.Raft.GetLeaderID()
  321. if leader != "" {
  322. return nil
  323. }
  324. time.Sleep(100 * time.Millisecond)
  325. }
  326. return fmt.Errorf("timeout waiting for leader")
  327. }
  328. // HealthCheck returns the health status of this server
  329. func (s *KVServer) HealthCheck() HealthStatus {
  330. return s.Raft.HealthCheck()
  331. }
  332. // GetStats returns runtime statistics
  333. func (s *KVServer) GetStats() Stats {
  334. return s.Raft.GetStats()
  335. }
  336. // GetMetrics returns runtime metrics
  337. func (s *KVServer) GetMetrics() Metrics {
  338. return s.Raft.GetMetrics()
  339. }
  340. // TransferLeadership transfers leadership to the specified node
  341. func (s *KVServer) TransferLeadership(targetID string) error {
  342. return s.Raft.TransferLeadership(targetID)
  343. }
  344. // GetClusterNodes returns current cluster membership
  345. func (s *KVServer) GetClusterNodes() map[string]string {
  346. return s.Raft.GetClusterNodes()
  347. }
  348. // IsLeader returns true if this node is the leader
  349. func (s *KVServer) IsLeader() bool {
  350. _, isLeader := s.Raft.GetState()
  351. return isLeader
  352. }
  353. // GetLeaderID returns the current leader ID
  354. func (s *KVServer) GetLeaderID() string {
  355. return s.Raft.GetLeaderID()
  356. }
  357. // GetLogSize returns the raft log size
  358. func (s *KVServer) GetLogSize() int64 {
  359. return s.Raft.log.GetLogSize()
  360. }
  361. // GetDBSize returns the db size
  362. func (s *KVServer) GetDBSize() int64 {
  363. return s.DB.GetDBSize()
  364. }
  365. // WatchURL registers a webhook url for a key
  366. func (s *KVServer) WatchURL(key, url string) {
  367. s.Watcher.Subscribe(key, url)
  368. }
  369. // UnwatchURL removes a webhook url for a key
  370. func (s *KVServer) UnwatchURL(key, url string) {
  371. s.Watcher.Unsubscribe(key, url)
  372. }
  373. // WatchAll registers a watcher for all keys
  374. func (s *KVServer) WatchAll(handler WatchHandler) {
  375. // s.FSM.WatchAll(handler)
  376. // TODO: Implement Watcher for DB
  377. }
  378. // Watch registers a watcher for a key
  379. func (s *KVServer) Watch(key string, handler WatchHandler) {
  380. // s.FSM.Watch(key, handler)
  381. // TODO: Implement Watcher for DB
  382. }
  383. // Unwatch removes watchers for a key
  384. func (s *KVServer) Unwatch(key string) {
  385. // s.FSM.Unwatch(key)
  386. // TODO: Implement Watcher for DB
  387. }
  388. func (s *KVServer) maintenanceLoop() {
  389. defer s.wg.Done()
  390. // Check every 1 second for faster reaction
  391. ticker := time.NewTicker(1 * time.Second)
  392. defer ticker.Stop()
  393. for {
  394. select {
  395. case <-s.stopCh:
  396. return
  397. case <-ticker.C:
  398. s.updateNodeInfo()
  399. s.checkConnections()
  400. }
  401. }
  402. }
  403. func (s *KVServer) updateNodeInfo() {
  404. // 1. Ensure "CreateNode/<NodeID>" is set to self address
  405. // We do this via Propose (Set) so it's replicated
  406. myID := s.Raft.config.NodeID
  407. myAddr := s.Raft.config.ListenAddr
  408. key := fmt.Sprintf("CreateNode/%s", myID)
  409. // Check if we need to update (avoid spamming logs/proposals)
  410. val, exists := s.Get(key)
  411. if !exists || val != myAddr {
  412. // Run in goroutine to avoid blocking
  413. go func() {
  414. if err := s.Set(key, myAddr); err != nil {
  415. s.Raft.config.Logger.Debug("Failed to update node info: %v", err)
  416. }
  417. }()
  418. }
  419. // 2. Only leader updates RaftNode aggregation
  420. if s.IsLeader() {
  421. // Read current RaftNode to preserve history
  422. currentVal, _ := s.Get("RaftNode")
  423. knownNodes := make(map[string]string)
  424. if currentVal != "" {
  425. parts := strings.Split(currentVal, ";")
  426. for _, part := range parts {
  427. if part == "" { continue }
  428. kv := strings.SplitN(part, "=", 2)
  429. if len(kv) == 2 {
  430. knownNodes[kv[0]] = kv[1]
  431. }
  432. }
  433. }
  434. // Merge current cluster nodes
  435. changed := false
  436. currentCluster := s.GetClusterNodes()
  437. for id, addr := range currentCluster {
  438. // Skip nodes that are marked as leaving
  439. if _, leaving := s.leavingNodes.Load(id); leaving {
  440. continue
  441. }
  442. if knownNodes[id] != addr {
  443. knownNodes[id] = addr
  444. changed = true
  445. }
  446. }
  447. // If changed, update RaftNode
  448. if changed {
  449. var peers []string
  450. for id, addr := range knownNodes {
  451. peers = append(peers, fmt.Sprintf("%s=%s", id, addr))
  452. }
  453. sort.Strings(peers)
  454. newVal := strings.Join(peers, ";")
  455. // Check again if we need to write to avoid loops if Get returned stale
  456. if newVal != currentVal {
  457. go func(k, v string) {
  458. if err := s.Set(k, v); err != nil {
  459. s.Raft.config.Logger.Warn("Failed to update RaftNode key: %v", err)
  460. }
  461. }("RaftNode", newVal)
  462. }
  463. }
  464. }
  465. }
  466. func (s *KVServer) checkConnections() {
  467. if !s.IsLeader() {
  468. return
  469. }
  470. // Read RaftNode key to find potential members that are missing
  471. val, ok := s.Get("RaftNode")
  472. if !ok || val == "" {
  473. return
  474. }
  475. // Parse saved nodes
  476. savedParts := strings.Split(val, ";")
  477. currentNodes := s.GetClusterNodes()
  478. // Invert currentNodes for address check
  479. currentAddrs := make(map[string]bool)
  480. for _, addr := range currentNodes {
  481. currentAddrs[addr] = true
  482. }
  483. for _, part := range savedParts {
  484. if part == "" {
  485. continue
  486. }
  487. // Expect id=addr
  488. kv := strings.SplitN(part, "=", 2)
  489. if len(kv) != 2 {
  490. continue
  491. }
  492. id, addr := kv[0], kv[1]
  493. // Skip invalid addresses
  494. if strings.HasPrefix(addr, ".") || !strings.Contains(addr, ":") {
  495. continue
  496. }
  497. if !currentAddrs[addr] {
  498. // Skip nodes that are marked as leaving
  499. if _, leaving := s.leavingNodes.Load(id); leaving {
  500. continue
  501. }
  502. // Found a node that was previously in the cluster but is now missing
  503. // Try to add it back
  504. // We use AddNodeWithForward which handles non-blocking internally somewhat,
  505. // but we should run this in goroutine to not block the loop
  506. go func(nodeID, nodeAddr string) {
  507. // Try to add node
  508. s.Raft.config.Logger.Info("Auto-rejoining node found in RaftNode: %s (%s)", nodeID, nodeAddr)
  509. if err := s.Join(nodeID, nodeAddr); err != nil {
  510. s.Raft.config.Logger.Debug("Failed to auto-rejoin node %s: %v", nodeID, err)
  511. }
  512. }(id, addr)
  513. }
  514. }
  515. }
  516. // startHTTPServer starts the HTTP API server
  517. func (s *KVServer) startHTTPServer(addr string) error {
  518. mux := http.NewServeMux()
  519. // KV API
  520. mux.HandleFunc("/kv", func(w http.ResponseWriter, r *http.Request) {
  521. token := r.Header.Get("X-Raft-Token")
  522. switch r.Method {
  523. case http.MethodGet:
  524. key := r.URL.Query().Get("key")
  525. if key == "" {
  526. http.Error(w, "missing key", http.StatusBadRequest)
  527. return
  528. }
  529. // Use Authenticated method
  530. val, found, err := s.GetLinearAuthenticated(key, token)
  531. if err != nil {
  532. // Distinguish auth error vs raft error?
  533. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  534. http.Error(w, err.Error(), http.StatusForbidden)
  535. } else {
  536. http.Error(w, err.Error(), http.StatusInternalServerError)
  537. }
  538. return
  539. }
  540. if !found {
  541. http.Error(w, "not found", http.StatusNotFound)
  542. return
  543. }
  544. w.Write([]byte(val))
  545. case http.MethodPost:
  546. body, _ := io.ReadAll(r.Body)
  547. var req struct {
  548. Key string `json:"key"`
  549. Value string `json:"value"`
  550. }
  551. if err := json.Unmarshal(body, &req); err != nil {
  552. http.Error(w, "invalid json", http.StatusBadRequest)
  553. return
  554. }
  555. if err := s.SetAuthenticated(req.Key, req.Value, token); err != nil {
  556. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  557. http.Error(w, err.Error(), http.StatusForbidden)
  558. } else {
  559. http.Error(w, err.Error(), http.StatusInternalServerError)
  560. }
  561. return
  562. }
  563. w.WriteHeader(http.StatusOK)
  564. case http.MethodDelete:
  565. key := r.URL.Query().Get("key")
  566. if key == "" {
  567. http.Error(w, "missing key", http.StatusBadRequest)
  568. return
  569. }
  570. if err := s.DelAuthenticated(key, token); err != nil {
  571. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  572. http.Error(w, err.Error(), http.StatusForbidden)
  573. } else {
  574. http.Error(w, err.Error(), http.StatusInternalServerError)
  575. }
  576. return
  577. }
  578. w.WriteHeader(http.StatusOK)
  579. default:
  580. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  581. }
  582. })
  583. // Auth API
  584. mux.HandleFunc("/auth/login", func(w http.ResponseWriter, r *http.Request) {
  585. if r.Method != http.MethodPost {
  586. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  587. return
  588. }
  589. var req struct {
  590. Username string `json:"username"`
  591. Password string `json:"password"`
  592. Code string `json:"code"`
  593. }
  594. body, _ := io.ReadAll(r.Body)
  595. if err := json.Unmarshal(body, &req); err != nil {
  596. http.Error(w, "invalid json", http.StatusBadRequest)
  597. return
  598. }
  599. ip := r.RemoteAddr
  600. if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil {
  601. ip = host
  602. }
  603. token, err := s.AuthManager.Login(req.Username, req.Password, req.Code, ip)
  604. if err != nil {
  605. http.Error(w, err.Error(), http.StatusUnauthorized)
  606. return
  607. }
  608. resp := struct {
  609. Token string `json:"token"`
  610. }{Token: token}
  611. json.NewEncoder(w).Encode(resp)
  612. })
  613. // Watcher API
  614. mux.HandleFunc("/watch", func(w http.ResponseWriter, r *http.Request) {
  615. if r.Method != http.MethodPost {
  616. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  617. return
  618. }
  619. body, _ := io.ReadAll(r.Body)
  620. var req struct {
  621. Key string `json:"key"`
  622. URL string `json:"url"`
  623. }
  624. if err := json.Unmarshal(body, &req); err != nil {
  625. http.Error(w, "invalid json", http.StatusBadRequest)
  626. return
  627. }
  628. if req.Key == "" || req.URL == "" {
  629. http.Error(w, "missing key or url", http.StatusBadRequest)
  630. return
  631. }
  632. s.WatchURL(req.Key, req.URL)
  633. w.WriteHeader(http.StatusOK)
  634. })
  635. mux.HandleFunc("/unwatch", func(w http.ResponseWriter, r *http.Request) {
  636. if r.Method != http.MethodPost {
  637. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  638. return
  639. }
  640. body, _ := io.ReadAll(r.Body)
  641. var req struct {
  642. Key string `json:"key"`
  643. URL string `json:"url"`
  644. }
  645. if err := json.Unmarshal(body, &req); err != nil {
  646. http.Error(w, "invalid json", http.StatusBadRequest)
  647. return
  648. }
  649. s.UnwatchURL(req.Key, req.URL)
  650. w.WriteHeader(http.StatusOK)
  651. })
  652. s.httpServer = &http.Server{
  653. Addr: addr,
  654. Handler: mux,
  655. }
  656. go func() {
  657. s.Raft.config.Logger.Info("HTTP API server listening on %s", addr)
  658. if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  659. s.Raft.config.Logger.Error("HTTP server failed: %v", err)
  660. }
  661. }()
  662. return nil
  663. }