server.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "sort"
  10. "strings"
  11. "sync"
  12. "time"
  13. "igit.com/xbase/raft/db"
  14. )
  15. // KVServer wraps Raft to provide a distributed key-value store
  16. type KVServer struct {
  17. Raft *Raft
  18. DB *db.Engine
  19. CLI *CLI
  20. AuthManager *AuthManager
  21. Watcher *WebHookWatcher
  22. httpServer *http.Server
  23. stopCh chan struct{}
  24. wg sync.WaitGroup
  25. stopOnce sync.Once
  26. // leavingNodes tracks nodes that are currently being removed
  27. // to prevent auto-rejoin/discovery logic from interfering
  28. leavingNodes sync.Map
  29. }
  30. // NewKVServer creates a new KV server
  31. func NewKVServer(config *Config) (*KVServer, error) {
  32. // Initialize DB Engine
  33. // Use a subdirectory for DB to avoid conflict with Raft logs if they share DataDir
  34. dbPath := config.DataDir + "/kv_engine"
  35. engine, err := db.NewEngine(dbPath)
  36. if err != nil {
  37. return nil, fmt.Errorf("failed to create db engine: %w", err)
  38. }
  39. // Initialize LastAppliedIndex from DB to prevent re-applying entries
  40. config.LastAppliedIndex = engine.GetLastAppliedIndex()
  41. // Create stop channel early for use in callbacks
  42. stopCh := make(chan struct{})
  43. // Configure snapshot provider
  44. config.SnapshotProvider = func(minIncludeIndex uint64) ([]byte, error) {
  45. // Wait for DB to catch up to the requested index
  46. // This is critical for data integrity during compaction
  47. for engine.GetLastAppliedIndex() < minIncludeIndex {
  48. select {
  49. case <-stopCh:
  50. return nil, fmt.Errorf("server stopping")
  51. default:
  52. time.Sleep(10 * time.Millisecond)
  53. }
  54. }
  55. // Force sync to disk to ensure data durability before compaction
  56. // This prevents data loss if Raft logs are compacted but DB data is only in OS cache
  57. if err := engine.Sync(); err != nil {
  58. return nil, fmt.Errorf("failed to sync engine before snapshot: %w", err)
  59. }
  60. return engine.Snapshot()
  61. }
  62. // Configure get handler for remote reads
  63. config.GetHandler = func(key string) (string, bool) {
  64. return engine.Get(key)
  65. }
  66. applyCh := make(chan ApplyMsg, 1000) // Increase buffer for async processing
  67. transport := NewTCPTransport(config.ListenAddr, 10, config.Logger)
  68. // Initialize WebHookWatcher
  69. // 5 workers, 3 retries
  70. watcher := NewWebHookWatcher(5, 3, config.Logger)
  71. r, err := NewRaft(config, transport, applyCh)
  72. if err != nil {
  73. engine.Close()
  74. return nil, err
  75. }
  76. s := &KVServer{
  77. Raft: r,
  78. DB: engine,
  79. CLI: nil,
  80. AuthManager: nil, // initialized below
  81. Watcher: watcher,
  82. stopCh: stopCh,
  83. }
  84. // Initialize AuthManager
  85. s.AuthManager = NewAuthManager(s)
  86. // Load Auth Data from DB (if any)
  87. if err := s.AuthManager.LoadFromDB(); err != nil {
  88. s.Raft.config.Logger.Warn("Failed to load auth data from DB: %v", err)
  89. }
  90. // Initialize CLI
  91. s.CLI = NewCLI(s)
  92. // Start applying entries
  93. go s.runApplyLoop(applyCh)
  94. // Start background maintenance loop
  95. s.wg.Add(1)
  96. go s.maintenanceLoop()
  97. return s, nil
  98. }
  99. func (s *KVServer) Start() error {
  100. // Start CLI if enabled
  101. if s.Raft.config.EnableCLI {
  102. go s.CLI.Start()
  103. }
  104. // Start HTTP Server if configured
  105. if s.Raft.config.HTTPAddr != "" {
  106. if err := s.startHTTPServer(s.Raft.config.HTTPAddr); err != nil {
  107. s.Raft.config.Logger.Warn("Failed to start HTTP server: %v", err)
  108. }
  109. }
  110. return s.Raft.Start()
  111. }
  112. func (s *KVServer) Stop() error {
  113. var err error
  114. s.stopOnce.Do(func() {
  115. // Stop maintenance loop
  116. if s.stopCh != nil {
  117. close(s.stopCh)
  118. s.wg.Wait()
  119. }
  120. // Stop Watcher
  121. if s.Watcher != nil {
  122. s.Watcher.Stop()
  123. }
  124. // Stop HTTP Server
  125. if s.httpServer != nil {
  126. s.httpServer.Close()
  127. }
  128. // Stop Raft first
  129. if errRaft := s.Raft.Stop(); errRaft != nil {
  130. err = errRaft
  131. }
  132. // Close DB
  133. if s.DB != nil {
  134. if errDB := s.DB.Close(); errDB != nil {
  135. // Combine errors if both fail
  136. if err != nil {
  137. err = fmt.Errorf("raft stop error: %v, db close error: %v", err, errDB)
  138. } else {
  139. err = errDB
  140. }
  141. }
  142. }
  143. })
  144. return err
  145. }
  146. func (s *KVServer) runApplyLoop(applyCh chan ApplyMsg) {
  147. for msg := range applyCh {
  148. if msg.CommandValid {
  149. // Optimization: Skip if already applied
  150. // We check this here to avoid unmarshalling and locking DB for known duplicates
  151. if msg.CommandIndex <= s.DB.GetLastAppliedIndex() {
  152. continue
  153. }
  154. var cmd KVCommand
  155. if err := json.Unmarshal(msg.Command, &cmd); err != nil {
  156. s.Raft.config.Logger.Error("Failed to unmarshal command: %v", err)
  157. continue
  158. }
  159. var err error
  160. switch cmd.Type {
  161. case KVSet:
  162. // Update Auth Cache for system keys
  163. if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
  164. s.AuthManager.UpdateCache(cmd.Key, cmd.Value, false)
  165. }
  166. err = s.DB.Set(cmd.Key, cmd.Value, msg.CommandIndex)
  167. if err == nil {
  168. s.Watcher.Notify(cmd.Key, cmd.Value, KVSet)
  169. }
  170. case KVDel:
  171. // Update Auth Cache for system keys
  172. if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
  173. s.AuthManager.UpdateCache(cmd.Key, "", true)
  174. }
  175. err = s.DB.Delete(cmd.Key, msg.CommandIndex)
  176. if err == nil {
  177. s.Watcher.Notify(cmd.Key, "", KVDel)
  178. }
  179. default:
  180. s.Raft.config.Logger.Error("Unknown command type: %d", cmd.Type)
  181. }
  182. if err != nil {
  183. s.Raft.config.Logger.Error("DB Apply failed: %v", err)
  184. }
  185. } else if msg.SnapshotValid {
  186. if err := s.DB.Restore(msg.Snapshot); err != nil {
  187. s.Raft.config.Logger.Error("DB Restore failed: %v", err)
  188. }
  189. }
  190. }
  191. }
  192. // SetAuthenticated sets a key-value pair with permission check
  193. func (s *KVServer) SetAuthenticated(key, value, token string) error {
  194. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, value); err != nil {
  195. return err
  196. }
  197. return s.Set(key, value)
  198. }
  199. // DelAuthenticated deletes a key with permission check
  200. func (s *KVServer) DelAuthenticated(key, token string) error {
  201. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, ""); err != nil {
  202. return err
  203. }
  204. return s.Del(key)
  205. }
  206. // Set sets a key-value pair
  207. func (s *KVServer) Set(key, value string) error {
  208. cmd := KVCommand{
  209. Type: KVSet,
  210. Key: key,
  211. Value: value,
  212. }
  213. data, err := json.Marshal(cmd)
  214. if err != nil {
  215. return err
  216. }
  217. _, _, err = s.Raft.ProposeWithForward(data)
  218. return err
  219. }
  220. // Del deletes a key
  221. func (s *KVServer) Del(key string) error {
  222. cmd := KVCommand{
  223. Type: KVDel,
  224. Key: key,
  225. }
  226. data, err := json.Marshal(cmd)
  227. if err != nil {
  228. return err
  229. }
  230. _, _, err = s.Raft.ProposeWithForward(data)
  231. return err
  232. }
  233. // GetAuthenticated gets a value with permission check (local read)
  234. func (s *KVServer) GetAuthenticated(key, token string) (string, bool, error) {
  235. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
  236. return "", false, err
  237. }
  238. val, ok := s.Get(key)
  239. return val, ok, nil
  240. }
  241. // Get gets a value (local read, can be stale)
  242. // For linearizable reads, use GetLinear instead
  243. func (s *KVServer) Get(key string) (string, bool) {
  244. return s.DB.Get(key)
  245. }
  246. // GetLinearAuthenticated gets a value with linearizable consistency and permission check
  247. func (s *KVServer) GetLinearAuthenticated(key, token string) (string, bool, error) {
  248. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
  249. return "", false, err
  250. }
  251. return s.GetLinear(key)
  252. }
  253. // Logout invalidates the current session
  254. func (s *KVServer) Logout(token string) error {
  255. return s.AuthManager.Logout(token)
  256. }
  257. // GetSessionInfo returns the session details
  258. func (s *KVServer) GetSessionInfo(token string) (*Session, error) {
  259. return s.AuthManager.GetSession(token)
  260. }
  261. // IsRoot checks if the token belongs to the root user
  262. func (s *KVServer) IsRoot(token string) bool {
  263. sess, err := s.AuthManager.GetSession(token)
  264. if err != nil {
  265. return false
  266. }
  267. return sess.Username == "root"
  268. }
  269. // SearchAuthenticated searches keys with permission checks
  270. func (s *KVServer) SearchAuthenticated(pattern string, limit, offset int, token string) ([]db.QueryResult, error) {
  271. // Optimization: If user has full access (*), delegate to DB with limit/offset
  272. if s.AuthManager.HasFullAccess(token) {
  273. sql := fmt.Sprintf("key like \"%s\" LIMIT %d OFFSET %d", pattern, limit, offset)
  274. return s.DB.Query(sql)
  275. }
  276. // Slow path: Fetch all potential matches and filter
  277. // We construct a SQL query that retrieves CANDIDATES.
  278. // We cannot safely apply LIMIT/OFFSET in SQL because we might filter out some results.
  279. // So we must fetch ALL matches.
  280. // WARNING: This can be slow and memory intensive.
  281. // If pattern is wildcard, we might fetch everything.
  282. sql := fmt.Sprintf("key like \"%s\"", pattern)
  283. results, err := s.DB.Query(sql)
  284. if err != nil {
  285. return nil, err
  286. }
  287. filtered := make([]db.QueryResult, 0, len(results))
  288. // Apply Filtering
  289. for _, r := range results {
  290. if err := s.AuthManager.CheckPermission(token, r.Key, ActionRead, ""); err == nil {
  291. filtered = append(filtered, r)
  292. }
  293. }
  294. // Apply Pagination on Filtered Results
  295. if offset > len(filtered) {
  296. return []db.QueryResult{}, nil
  297. }
  298. start := offset
  299. end := offset + limit
  300. if end > len(filtered) {
  301. end = len(filtered)
  302. }
  303. return filtered[start:end], nil
  304. }
  305. // CountAuthenticated counts keys with permission checks
  306. func (s *KVServer) CountAuthenticated(pattern string, token string) (int, error) {
  307. // Optimization: If user has full access
  308. if s.AuthManager.HasFullAccess(token) {
  309. sql := ""
  310. if pattern == "*" {
  311. sql = "*"
  312. } else {
  313. sql = fmt.Sprintf("key like \"%s\"", pattern)
  314. }
  315. return s.DB.Count(sql)
  316. }
  317. // Slow path: Iterate and check
  318. // We use DB.Query to get keys (Query returns values too, which is wasteful but DB engine doesn't expose keys-only Query yet)
  319. // Actually, we can access s.DB.Index.WalkPrefix if we want to be faster and avoid value read,
  320. // but s.DB.Index is inside 'db' package. We can access it if it's exported.
  321. // 'db' package exports 'Engine' and 'FlatIndex'.
  322. // So s.DB.Index IS accessible.
  323. count := 0
  324. // Determine prefix from pattern
  325. prefix := ""
  326. if strings.HasSuffix(pattern, "*") {
  327. prefix = strings.TrimSuffix(pattern, "*")
  328. } else if pattern == "*" {
  329. prefix = ""
  330. } else {
  331. // Exact match check
  332. if err := s.AuthManager.CheckPermission(token, pattern, ActionRead, ""); err == nil {
  333. // Check if exists
  334. if _, ok := s.DB.Get(pattern); ok {
  335. return 1, nil
  336. }
  337. return 0, nil
  338. }
  339. return 0, nil // No perm or not found
  340. }
  341. // Walk
  342. // Note: WalkPrefix locks the DB Index (Read Lock).
  343. // Calling CheckPermission inside might involve some logic but it is memory-only and usually fast.
  344. // However, if CheckPermission takes time, we hold DB lock.
  345. s.DB.Index.WalkPrefix(prefix, func(key string, entry db.IndexEntry) bool {
  346. // Check pattern match first (WalkPrefix is just prefix, pattern might be more complex like "user.*.name")
  347. if !db.WildcardMatch(key, pattern) {
  348. return true
  349. }
  350. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err == nil {
  351. count++
  352. }
  353. return true
  354. })
  355. return count, nil
  356. }
  357. // GetLinear gets a value with linearizable consistency
  358. // This ensures the read sees all writes committed before the read started
  359. func (s *KVServer) GetLinear(key string) (string, bool, error) {
  360. // First, ensure we have up-to-date data via ReadIndex
  361. _, err := s.Raft.ReadIndex()
  362. if err != nil {
  363. // If we're not leader, try forwarding
  364. if errors.Is(err, ErrNotLeader) {
  365. return s.forwardGet(key)
  366. }
  367. return "", false, err
  368. }
  369. val, ok := s.DB.Get(key)
  370. return val, ok, nil
  371. }
  372. // forwardGet forwards a get request to the leader
  373. func (s *KVServer) forwardGet(key string) (string, bool, error) {
  374. return s.Raft.ForwardGet(key)
  375. }
  376. // Join joins an existing cluster
  377. func (s *KVServer) Join(nodeID, addr string) error {
  378. return s.Raft.AddNodeWithForward(nodeID, addr)
  379. }
  380. // Leave leaves the cluster
  381. func (s *KVServer) Leave(nodeID string) error {
  382. // Mark node as leaving to prevent auto-rejoin
  383. s.leavingNodes.Store(nodeID, time.Now())
  384. // Auto-expire the leaving flag after a while
  385. go func() {
  386. time.Sleep(30 * time.Second)
  387. s.leavingNodes.Delete(nodeID)
  388. }()
  389. // Remove from RaftNode discovery key first to prevent auto-rejoin
  390. if err := s.removeNodeFromDiscovery(nodeID); err != nil {
  391. s.Raft.config.Logger.Warn("Failed to remove node from discovery key: %v", err)
  392. // Continue anyway, as the main goal is to leave the cluster
  393. }
  394. return s.Raft.RemoveNodeWithForward(nodeID)
  395. }
  396. // removeNodeFromDiscovery removes a node from the RaftNode key to prevent auto-rejoin
  397. func (s *KVServer) removeNodeFromDiscovery(targetID string) error {
  398. val, ok := s.Get("RaftNode")
  399. if !ok || val == "" {
  400. return nil
  401. }
  402. parts := strings.Split(val, ";")
  403. var newParts []string
  404. changed := false
  405. for _, part := range parts {
  406. if part == "" {
  407. continue
  408. }
  409. kv := strings.SplitN(part, "=", 2)
  410. if len(kv) == 2 {
  411. if kv[0] == targetID {
  412. changed = true
  413. continue // Skip this node
  414. }
  415. newParts = append(newParts, part)
  416. }
  417. }
  418. if changed {
  419. newVal := strings.Join(newParts, ";")
  420. return s.Set("RaftNode", newVal)
  421. }
  422. return nil
  423. }
  424. // WaitForLeader waits until a leader is elected
  425. func (s *KVServer) WaitForLeader(timeout time.Duration) error {
  426. deadline := time.Now().Add(timeout)
  427. for time.Now().Before(deadline) {
  428. leader := s.Raft.GetLeaderID()
  429. if leader != "" {
  430. return nil
  431. }
  432. time.Sleep(100 * time.Millisecond)
  433. }
  434. return fmt.Errorf("timeout waiting for leader")
  435. }
  436. // HealthCheck returns the health status of this server
  437. func (s *KVServer) HealthCheck() HealthStatus {
  438. return s.Raft.HealthCheck()
  439. }
  440. // GetStats returns runtime statistics
  441. func (s *KVServer) GetStats() Stats {
  442. return s.Raft.GetStats()
  443. }
  444. // GetMetrics returns runtime metrics
  445. func (s *KVServer) GetMetrics() Metrics {
  446. return s.Raft.GetMetrics()
  447. }
  448. // TransferLeadership transfers leadership to the specified node
  449. func (s *KVServer) TransferLeadership(targetID string) error {
  450. return s.Raft.TransferLeadership(targetID)
  451. }
  452. // GetClusterNodes returns current cluster membership
  453. func (s *KVServer) GetClusterNodes() map[string]string {
  454. return s.Raft.GetClusterNodes()
  455. }
  456. // IsLeader returns true if this node is the leader
  457. func (s *KVServer) IsLeader() bool {
  458. _, isLeader := s.Raft.GetState()
  459. return isLeader
  460. }
  461. // GetLeaderID returns the current leader ID
  462. func (s *KVServer) GetLeaderID() string {
  463. return s.Raft.GetLeaderID()
  464. }
  465. // GetLogSize returns the raft log size
  466. func (s *KVServer) GetLogSize() int64 {
  467. return s.Raft.log.GetLogSize()
  468. }
  469. // GetDBSize returns the db size
  470. func (s *KVServer) GetDBSize() int64 {
  471. return s.DB.GetDBSize()
  472. }
  473. // WatchURL registers a webhook url for a key
  474. func (s *KVServer) WatchURL(key, url string) {
  475. s.Watcher.Subscribe(key, url)
  476. }
  477. // UnwatchURL removes a webhook url for a key
  478. func (s *KVServer) UnwatchURL(key, url string) {
  479. s.Watcher.Unsubscribe(key, url)
  480. }
  481. // WatchAll registers a watcher for all keys
  482. func (s *KVServer) WatchAll(handler WatchHandler) {
  483. // s.FSM.WatchAll(handler)
  484. // TODO: Implement Watcher for DB
  485. }
  486. // Watch registers a watcher for a key
  487. func (s *KVServer) Watch(key string, handler WatchHandler) {
  488. // s.FSM.Watch(key, handler)
  489. // TODO: Implement Watcher for DB
  490. }
  491. // Unwatch removes watchers for a key
  492. func (s *KVServer) Unwatch(key string) {
  493. // s.FSM.Unwatch(key)
  494. // TODO: Implement Watcher for DB
  495. }
  496. func (s *KVServer) maintenanceLoop() {
  497. defer s.wg.Done()
  498. // Check every 1 second for faster reaction
  499. ticker := time.NewTicker(1 * time.Second)
  500. defer ticker.Stop()
  501. for {
  502. select {
  503. case <-s.stopCh:
  504. return
  505. case <-ticker.C:
  506. s.updateNodeInfo()
  507. s.checkConnections()
  508. }
  509. }
  510. }
  511. func (s *KVServer) updateNodeInfo() {
  512. // 1. Ensure "CreateNode/<NodeID>" is set to self address
  513. // We do this via Propose (Set) so it's replicated
  514. myID := s.Raft.config.NodeID
  515. myAddr := s.Raft.config.ListenAddr
  516. key := fmt.Sprintf("CreateNode/%s", myID)
  517. // Check if we need to update (avoid spamming logs/proposals)
  518. val, exists := s.Get(key)
  519. if !exists || val != myAddr {
  520. // Run in goroutine to avoid blocking
  521. go func() {
  522. if err := s.Set(key, myAddr); err != nil {
  523. s.Raft.config.Logger.Debug("Failed to update node info: %v", err)
  524. }
  525. }()
  526. }
  527. // 2. Only leader updates RaftNode aggregation
  528. if s.IsLeader() {
  529. // Read current RaftNode to preserve history
  530. currentVal, _ := s.Get("RaftNode")
  531. knownNodes := make(map[string]string)
  532. if currentVal != "" {
  533. parts := strings.Split(currentVal, ";")
  534. for _, part := range parts {
  535. if part == "" { continue }
  536. kv := strings.SplitN(part, "=", 2)
  537. if len(kv) == 2 {
  538. knownNodes[kv[0]] = kv[1]
  539. }
  540. }
  541. }
  542. // Merge current cluster nodes
  543. changed := false
  544. currentCluster := s.GetClusterNodes()
  545. for id, addr := range currentCluster {
  546. // Skip nodes that are marked as leaving
  547. if _, leaving := s.leavingNodes.Load(id); leaving {
  548. continue
  549. }
  550. if knownNodes[id] != addr {
  551. knownNodes[id] = addr
  552. changed = true
  553. }
  554. }
  555. // If changed, update RaftNode
  556. if changed {
  557. var peers []string
  558. for id, addr := range knownNodes {
  559. peers = append(peers, fmt.Sprintf("%s=%s", id, addr))
  560. }
  561. sort.Strings(peers)
  562. newVal := strings.Join(peers, ";")
  563. // Check again if we need to write to avoid loops if Get returned stale
  564. if newVal != currentVal {
  565. go func(k, v string) {
  566. if err := s.Set(k, v); err != nil {
  567. s.Raft.config.Logger.Warn("Failed to update RaftNode key: %v", err)
  568. }
  569. }("RaftNode", newVal)
  570. }
  571. }
  572. }
  573. }
  574. func (s *KVServer) checkConnections() {
  575. if !s.IsLeader() {
  576. return
  577. }
  578. // Read RaftNode key to find potential members that are missing
  579. val, ok := s.Get("RaftNode")
  580. if !ok || val == "" {
  581. return
  582. }
  583. // Parse saved nodes
  584. savedParts := strings.Split(val, ";")
  585. currentNodes := s.GetClusterNodes()
  586. // Invert currentNodes for address check
  587. currentAddrs := make(map[string]bool)
  588. for _, addr := range currentNodes {
  589. currentAddrs[addr] = true
  590. }
  591. for _, part := range savedParts {
  592. if part == "" {
  593. continue
  594. }
  595. // Expect id=addr
  596. kv := strings.SplitN(part, "=", 2)
  597. if len(kv) != 2 {
  598. continue
  599. }
  600. id, addr := kv[0], kv[1]
  601. // Skip invalid addresses
  602. if strings.HasPrefix(addr, ".") || !strings.Contains(addr, ":") {
  603. continue
  604. }
  605. if !currentAddrs[addr] {
  606. // Skip nodes that are marked as leaving
  607. if _, leaving := s.leavingNodes.Load(id); leaving {
  608. continue
  609. }
  610. // Found a node that was previously in the cluster but is now missing
  611. // Try to add it back
  612. // We use AddNodeWithForward which handles non-blocking internally somewhat,
  613. // but we should run this in goroutine to not block the loop
  614. go func(nodeID, nodeAddr string) {
  615. // Try to add node
  616. s.Raft.config.Logger.Info("Auto-rejoining node found in RaftNode: %s (%s)", nodeID, nodeAddr)
  617. if err := s.Join(nodeID, nodeAddr); err != nil {
  618. s.Raft.config.Logger.Debug("Failed to auto-rejoin node %s: %v", nodeID, err)
  619. }
  620. }(id, addr)
  621. }
  622. }
  623. }
  624. // startHTTPServer starts the HTTP API server
  625. func (s *KVServer) startHTTPServer(addr string) error {
  626. mux := http.NewServeMux()
  627. // KV API
  628. mux.HandleFunc("/kv", func(w http.ResponseWriter, r *http.Request) {
  629. token := r.Header.Get("X-Raft-Token")
  630. switch r.Method {
  631. case http.MethodGet:
  632. key := r.URL.Query().Get("key")
  633. if key == "" {
  634. http.Error(w, "missing key", http.StatusBadRequest)
  635. return
  636. }
  637. // Use Authenticated method
  638. val, found, err := s.GetLinearAuthenticated(key, token)
  639. if err != nil {
  640. // Distinguish auth error vs raft error?
  641. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  642. http.Error(w, err.Error(), http.StatusForbidden)
  643. } else {
  644. http.Error(w, err.Error(), http.StatusInternalServerError)
  645. }
  646. return
  647. }
  648. if !found {
  649. http.Error(w, "not found", http.StatusNotFound)
  650. return
  651. }
  652. w.Write([]byte(val))
  653. case http.MethodPost:
  654. body, _ := io.ReadAll(r.Body)
  655. var req struct {
  656. Key string `json:"key"`
  657. Value string `json:"value"`
  658. }
  659. if err := json.Unmarshal(body, &req); err != nil {
  660. http.Error(w, "invalid json", http.StatusBadRequest)
  661. return
  662. }
  663. if err := s.SetAuthenticated(req.Key, req.Value, token); err != nil {
  664. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  665. http.Error(w, err.Error(), http.StatusForbidden)
  666. } else {
  667. http.Error(w, err.Error(), http.StatusInternalServerError)
  668. }
  669. return
  670. }
  671. w.WriteHeader(http.StatusOK)
  672. case http.MethodDelete:
  673. key := r.URL.Query().Get("key")
  674. if key == "" {
  675. http.Error(w, "missing key", http.StatusBadRequest)
  676. return
  677. }
  678. if err := s.DelAuthenticated(key, token); err != nil {
  679. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  680. http.Error(w, err.Error(), http.StatusForbidden)
  681. } else {
  682. http.Error(w, err.Error(), http.StatusInternalServerError)
  683. }
  684. return
  685. }
  686. w.WriteHeader(http.StatusOK)
  687. default:
  688. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  689. }
  690. })
  691. // Auth API
  692. mux.HandleFunc("/auth/login", func(w http.ResponseWriter, r *http.Request) {
  693. if r.Method != http.MethodPost {
  694. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  695. return
  696. }
  697. var req struct {
  698. Username string `json:"username"`
  699. Password string `json:"password"`
  700. Code string `json:"code"`
  701. }
  702. body, _ := io.ReadAll(r.Body)
  703. if err := json.Unmarshal(body, &req); err != nil {
  704. http.Error(w, "invalid json", http.StatusBadRequest)
  705. return
  706. }
  707. ip := r.RemoteAddr
  708. if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil {
  709. ip = host
  710. }
  711. token, err := s.AuthManager.Login(req.Username, req.Password, req.Code, ip)
  712. if err != nil {
  713. http.Error(w, err.Error(), http.StatusUnauthorized)
  714. return
  715. }
  716. resp := struct {
  717. Token string `json:"token"`
  718. }{Token: token}
  719. json.NewEncoder(w).Encode(resp)
  720. })
  721. // Watcher API
  722. mux.HandleFunc("/watch", func(w http.ResponseWriter, r *http.Request) {
  723. if r.Method != http.MethodPost {
  724. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  725. return
  726. }
  727. body, _ := io.ReadAll(r.Body)
  728. var req struct {
  729. Key string `json:"key"`
  730. URL string `json:"url"`
  731. }
  732. if err := json.Unmarshal(body, &req); err != nil {
  733. http.Error(w, "invalid json", http.StatusBadRequest)
  734. return
  735. }
  736. if req.Key == "" || req.URL == "" {
  737. http.Error(w, "missing key or url", http.StatusBadRequest)
  738. return
  739. }
  740. s.WatchURL(req.Key, req.URL)
  741. w.WriteHeader(http.StatusOK)
  742. })
  743. mux.HandleFunc("/unwatch", func(w http.ResponseWriter, r *http.Request) {
  744. if r.Method != http.MethodPost {
  745. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  746. return
  747. }
  748. body, _ := io.ReadAll(r.Body)
  749. var req struct {
  750. Key string `json:"key"`
  751. URL string `json:"url"`
  752. }
  753. if err := json.Unmarshal(body, &req); err != nil {
  754. http.Error(w, "invalid json", http.StatusBadRequest)
  755. return
  756. }
  757. s.UnwatchURL(req.Key, req.URL)
  758. w.WriteHeader(http.StatusOK)
  759. })
  760. s.httpServer = &http.Server{
  761. Addr: addr,
  762. Handler: mux,
  763. }
  764. go func() {
  765. s.Raft.config.Logger.Info("HTTP API server listening on %s", addr)
  766. if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  767. s.Raft.config.Logger.Error("HTTP server failed: %v", err)
  768. }
  769. }()
  770. return nil
  771. }