server.go 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "sort"
  10. "strings"
  11. "sync"
  12. "time"
  13. "igit.com/xbase/raft/db"
  14. )
  15. // KVServer wraps Raft to provide a distributed key-value store
  16. type KVServer struct {
  17. Raft *Raft
  18. DB *db.Engine
  19. CLI *CLI
  20. AuthManager *AuthManager
  21. Watcher *WebHookWatcher
  22. httpServer *http.Server
  23. stopCh chan struct{}
  24. wg sync.WaitGroup
  25. stopOnce sync.Once
  26. // leavingNodes tracks nodes that are currently being removed
  27. // to prevent auto-rejoin/discovery logic from interfering
  28. leavingNodes sync.Map
  29. // Pending requests for synchronous operations (Read-Your-Writes)
  30. pendingRequests map[uint64]chan error
  31. pendingMu sync.Mutex
  32. }
  33. // NewKVServer creates a new KV server
  34. func NewKVServer(config *Config) (*KVServer, error) {
  35. // Initialize DB Engine
  36. // Use a subdirectory for DB to avoid conflict with Raft logs if they share DataDir
  37. dbPath := config.DataDir + "/kv_engine"
  38. engine, err := db.NewEngine(dbPath)
  39. if err != nil {
  40. return nil, fmt.Errorf("failed to create db engine: %w", err)
  41. }
  42. // Initialize LastAppliedIndex from DB to prevent re-applying entries
  43. config.LastAppliedIndex = engine.GetLastAppliedIndex()
  44. // Create stop channel early for use in callbacks
  45. stopCh := make(chan struct{})
  46. // Configure snapshot provider
  47. config.SnapshotProvider = func(minIncludeIndex uint64) ([]byte, error) {
  48. // Wait for DB to catch up to the requested index
  49. // This is critical for data integrity during compaction
  50. for engine.GetLastAppliedIndex() < minIncludeIndex {
  51. select {
  52. case <-stopCh:
  53. return nil, fmt.Errorf("server stopping")
  54. default:
  55. time.Sleep(10 * time.Millisecond)
  56. }
  57. }
  58. // Force sync to disk to ensure data durability before compaction
  59. // This prevents data loss if Raft logs are compacted but DB data is only in OS cache
  60. if err := engine.Sync(); err != nil {
  61. return nil, fmt.Errorf("failed to sync engine before snapshot: %w", err)
  62. }
  63. return engine.Snapshot()
  64. }
  65. // Configure get handler for remote reads
  66. config.GetHandler = func(key string) (string, bool) {
  67. return engine.Get(key)
  68. }
  69. applyCh := make(chan ApplyMsg, 1000) // Increase buffer for async processing
  70. transport := NewTCPTransport(config.ListenAddr, 10, config.Logger)
  71. // Initialize WebHookWatcher
  72. // 5 workers, 3 retries
  73. watcher := NewWebHookWatcher(5, 3, config.Logger)
  74. r, err := NewRaft(config, transport, applyCh)
  75. if err != nil {
  76. engine.Close()
  77. return nil, err
  78. }
  79. s := &KVServer{
  80. Raft: r,
  81. DB: engine,
  82. CLI: nil,
  83. AuthManager: nil, // initialized below
  84. Watcher: watcher,
  85. stopCh: stopCh,
  86. pendingRequests: make(map[uint64]chan error),
  87. }
  88. // Initialize AuthManager
  89. s.AuthManager = NewAuthManager(s)
  90. // Load Auth Data from DB (if any)
  91. if err := s.AuthManager.LoadFromDB(); err != nil {
  92. s.Raft.config.Logger.Warn("Failed to load auth data from DB: %v", err)
  93. }
  94. // Initialize CLI
  95. s.CLI = NewCLI(s)
  96. // Start applying entries
  97. go s.runApplyLoop(applyCh)
  98. // Start background maintenance loop
  99. s.wg.Add(1)
  100. go s.maintenanceLoop()
  101. return s, nil
  102. }
  103. func (s *KVServer) Start() error {
  104. // Start CLI if enabled
  105. if s.Raft.config.EnableCLI {
  106. go s.CLI.Start()
  107. }
  108. // Start HTTP Server if configured
  109. if s.Raft.config.HTTPAddr != "" {
  110. if err := s.startHTTPServer(s.Raft.config.HTTPAddr); err != nil {
  111. s.Raft.config.Logger.Warn("Failed to start HTTP server: %v", err)
  112. }
  113. }
  114. return s.Raft.Start()
  115. }
  116. func (s *KVServer) Stop() error {
  117. var err error
  118. s.stopOnce.Do(func() {
  119. // Stop maintenance loop
  120. if s.stopCh != nil {
  121. close(s.stopCh)
  122. s.wg.Wait()
  123. }
  124. // Stop Watcher
  125. if s.Watcher != nil {
  126. s.Watcher.Stop()
  127. }
  128. // Stop HTTP Server
  129. if s.httpServer != nil {
  130. s.httpServer.Close()
  131. }
  132. // Stop Raft first
  133. if errRaft := s.Raft.Stop(); errRaft != nil {
  134. err = errRaft
  135. }
  136. // Close DB
  137. if s.DB != nil {
  138. if errDB := s.DB.Close(); errDB != nil {
  139. // Combine errors if both fail
  140. if err != nil {
  141. err = fmt.Errorf("raft stop error: %v, db close error: %v", err, errDB)
  142. } else {
  143. err = errDB
  144. }
  145. }
  146. }
  147. })
  148. return err
  149. }
  150. func (s *KVServer) runApplyLoop(applyCh chan ApplyMsg) {
  151. for msg := range applyCh {
  152. if msg.CommandValid {
  153. // Optimization: Skip if already applied
  154. // We check this here to avoid unmarshalling and locking DB for known duplicates
  155. if msg.CommandIndex <= s.DB.GetLastAppliedIndex() {
  156. // Even if duplicate, we might have someone waiting for it if they just proposed it
  157. // but this is unlikely for duplicates.
  158. // However, notify waiters just in case.
  159. s.notifyPending(msg.CommandIndex, nil)
  160. continue
  161. }
  162. var cmd KVCommand
  163. if err := json.Unmarshal(msg.Command, &cmd); err != nil {
  164. s.Raft.config.Logger.Error("Failed to unmarshal command: %v", err)
  165. s.notifyPending(msg.CommandIndex, err)
  166. continue
  167. }
  168. var err error
  169. switch cmd.Type {
  170. case KVSet:
  171. // Update Auth Cache for system keys
  172. if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
  173. s.AuthManager.UpdateCache(cmd.Key, cmd.Value, false)
  174. }
  175. err = s.DB.Set(cmd.Key, cmd.Value, msg.CommandIndex)
  176. if err == nil {
  177. s.Watcher.Notify(cmd.Key, cmd.Value, KVSet)
  178. }
  179. case KVDel:
  180. // Update Auth Cache for system keys
  181. if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
  182. s.AuthManager.UpdateCache(cmd.Key, "", true)
  183. }
  184. err = s.DB.Delete(cmd.Key, msg.CommandIndex)
  185. if err == nil {
  186. s.Watcher.Notify(cmd.Key, "", KVDel)
  187. }
  188. default:
  189. s.Raft.config.Logger.Error("Unknown command type: %d", cmd.Type)
  190. }
  191. if err != nil {
  192. s.Raft.config.Logger.Error("DB Apply failed: %v", err)
  193. }
  194. // Notify anyone waiting for this index
  195. s.notifyPending(msg.CommandIndex, err)
  196. } else if msg.SnapshotValid {
  197. if err := s.DB.Restore(msg.Snapshot); err != nil {
  198. s.Raft.config.Logger.Error("DB Restore failed: %v", err)
  199. }
  200. }
  201. }
  202. }
  203. // notifyPending notifies any waiting requests for the given index
  204. func (s *KVServer) notifyPending(index uint64, err error) {
  205. s.pendingMu.Lock()
  206. defer s.pendingMu.Unlock()
  207. if ch, ok := s.pendingRequests[index]; ok {
  208. // Non-blocking send in case listener is gone (buffered channel recommended)
  209. select {
  210. case ch <- err:
  211. default:
  212. }
  213. close(ch)
  214. delete(s.pendingRequests, index)
  215. }
  216. }
  217. // SetAuthenticated sets a key-value pair with permission check
  218. func (s *KVServer) SetAuthenticated(key, value, token string) error {
  219. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, value); err != nil {
  220. return err
  221. }
  222. // Use SetSync for consistency in CLI/API
  223. return s.SetSync(key, value)
  224. }
  225. // DelAuthenticated deletes a key with permission check
  226. func (s *KVServer) DelAuthenticated(key, token string) error {
  227. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, ""); err != nil {
  228. return err
  229. }
  230. // Use SetSync (via Del which should be updated or create DelSync)
  231. // For simplicity, we implement DelSync logic here or update Del to be sync?
  232. // Let's implement DelSync
  233. return s.DelSync(key)
  234. }
  235. // Set sets a key-value pair (Async - eventually consistent)
  236. func (s *KVServer) Set(key, value string) error {
  237. cmd := KVCommand{
  238. Type: KVSet,
  239. Key: key,
  240. Value: value,
  241. }
  242. data, err := json.Marshal(cmd)
  243. if err != nil {
  244. return err
  245. }
  246. _, _, err = s.Raft.ProposeWithForward(data)
  247. return err
  248. }
  249. // SetSync sets a key-value pair and waits for it to be applied (Read-Your-Writes)
  250. func (s *KVServer) SetSync(key, value string) error {
  251. cmd := KVCommand{
  252. Type: KVSet,
  253. Key: key,
  254. Value: value,
  255. }
  256. data, err := json.Marshal(cmd)
  257. if err != nil {
  258. return err
  259. }
  260. index, _, err := s.Raft.ProposeWithForward(data)
  261. if err != nil {
  262. return err
  263. }
  264. // Wait for application
  265. ch := make(chan error, 1)
  266. s.pendingMu.Lock()
  267. s.pendingRequests[index] = ch
  268. s.pendingMu.Unlock()
  269. select {
  270. case applyErr := <-ch:
  271. return applyErr
  272. case <-time.After(5 * time.Second):
  273. s.pendingMu.Lock()
  274. delete(s.pendingRequests, index)
  275. s.pendingMu.Unlock()
  276. return fmt.Errorf("timeout waiting for apply")
  277. }
  278. }
  279. // Del deletes a key (Async)
  280. func (s *KVServer) Del(key string) error {
  281. cmd := KVCommand{
  282. Type: KVDel,
  283. Key: key,
  284. }
  285. data, err := json.Marshal(cmd)
  286. if err != nil {
  287. return err
  288. }
  289. _, _, err = s.Raft.ProposeWithForward(data)
  290. return err
  291. }
  292. // DelSync deletes a key and waits for it to be applied
  293. func (s *KVServer) DelSync(key string) error {
  294. cmd := KVCommand{
  295. Type: KVDel,
  296. Key: key,
  297. }
  298. data, err := json.Marshal(cmd)
  299. if err != nil {
  300. return err
  301. }
  302. index, _, err := s.Raft.ProposeWithForward(data)
  303. if err != nil {
  304. return err
  305. }
  306. // Wait for application
  307. ch := make(chan error, 1)
  308. s.pendingMu.Lock()
  309. s.pendingRequests[index] = ch
  310. s.pendingMu.Unlock()
  311. select {
  312. case applyErr := <-ch:
  313. return applyErr
  314. case <-time.After(5 * time.Second):
  315. s.pendingMu.Lock()
  316. delete(s.pendingRequests, index)
  317. s.pendingMu.Unlock()
  318. return fmt.Errorf("timeout waiting for apply")
  319. }
  320. }
  321. // GetAuthenticated gets a value with permission check (local read)
  322. func (s *KVServer) GetAuthenticated(key, token string) (string, bool, error) {
  323. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
  324. return "", false, err
  325. }
  326. val, ok := s.Get(key)
  327. return val, ok, nil
  328. }
  329. // Get gets a value (local read, can be stale)
  330. // For linearizable reads, use GetLinear instead
  331. func (s *KVServer) Get(key string) (string, bool) {
  332. return s.DB.Get(key)
  333. }
  334. // GetLinearAuthenticated gets a value with linearizable consistency and permission check
  335. func (s *KVServer) GetLinearAuthenticated(key, token string) (string, bool, error) {
  336. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
  337. return "", false, err
  338. }
  339. return s.GetLinear(key)
  340. }
  341. // Logout invalidates the current session
  342. func (s *KVServer) Logout(token string) error {
  343. return s.AuthManager.Logout(token)
  344. }
  345. // GetSessionInfo returns the session details
  346. func (s *KVServer) GetSessionInfo(token string) (*Session, error) {
  347. return s.AuthManager.GetSession(token)
  348. }
  349. // IsRoot checks if the token belongs to the root user
  350. func (s *KVServer) IsRoot(token string) bool {
  351. sess, err := s.AuthManager.GetSession(token)
  352. if err != nil {
  353. return false
  354. }
  355. return sess.Username == "root"
  356. }
  357. // CreateUser creates a new user (Root only)
  358. func (s *KVServer) CreateUser(username, password string, roles []string, token string) error {
  359. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  360. return fmt.Errorf("permission denied: root access required")
  361. }
  362. // Use RegisterUserSync
  363. return s.AuthManager.RegisterUser(username, password, roles)
  364. }
  365. // DeleteUser deletes a user (Root only)
  366. func (s *KVServer) DeleteUser(username string, token string) error {
  367. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  368. return fmt.Errorf("permission denied: root access required")
  369. }
  370. // Check if user exists
  371. if _, err := s.AuthManager.GetUser(username); err != nil {
  372. return err
  373. }
  374. if username == "root" {
  375. return fmt.Errorf("cannot delete root user")
  376. }
  377. // Use DelSync
  378. return s.DelSync(AuthUserPrefix + username)
  379. }
  380. // UpdateUser updates generic user fields (Root only)
  381. func (s *KVServer) UpdateUser(user User, token string) error {
  382. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  383. return fmt.Errorf("permission denied: root access required")
  384. }
  385. // Check if user exists
  386. if _, err := s.AuthManager.GetUser(user.Username); err != nil {
  387. return err
  388. }
  389. return s.AuthManager.UpdateUser(user)
  390. }
  391. // ChangeUserPassword changes a user's password (Root or Self)
  392. func (s *KVServer) ChangeUserPassword(username, newPassword string, token string) error {
  393. if s.AuthManager.IsEnabled() {
  394. session, err := s.AuthManager.GetSession(token)
  395. if err != nil {
  396. return err
  397. }
  398. if session.Username != "root" && session.Username != username {
  399. return fmt.Errorf("permission denied")
  400. }
  401. }
  402. // Use ChangePassword (which should use SetSync)
  403. return s.AuthManager.ChangePassword(username, newPassword)
  404. }
  405. // Role Management Helpers
  406. // CreateRole creates a new role (Root only)
  407. func (s *KVServer) CreateRole(name string, token string) error {
  408. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  409. return fmt.Errorf("permission denied: root access required")
  410. }
  411. return s.AuthManager.CreateRole(name)
  412. }
  413. // DeleteRole deletes a role (Root only)
  414. func (s *KVServer) DeleteRole(name string, token string) error {
  415. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  416. return fmt.Errorf("permission denied: root access required")
  417. }
  418. return s.AuthManager.DeleteRole(name)
  419. }
  420. // UpdateRole updates a role (Root only)
  421. func (s *KVServer) UpdateRole(role Role, token string) error {
  422. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  423. return fmt.Errorf("permission denied: root access required")
  424. }
  425. return s.AuthManager.UpdateRole(role)
  426. }
  427. // ListUsers lists all users (Root only)
  428. func (s *KVServer) ListUsers(token string) ([]*User, error) {
  429. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  430. return nil, fmt.Errorf("permission denied: root access required")
  431. }
  432. return s.AuthManager.ListUsers(), nil
  433. }
  434. // ListRoles lists all roles (Root only)
  435. func (s *KVServer) ListRoles(token string) ([]*Role, error) {
  436. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  437. return nil, fmt.Errorf("permission denied: root access required")
  438. }
  439. return s.AuthManager.ListRoles(), nil
  440. }
  441. // SearchAuthenticated searches keys with permission checks
  442. func (s *KVServer) SearchAuthenticated(pattern string, limit, offset int, token string) ([]db.QueryResult, error) {
  443. // Optimization: If user has full access (*), delegate to DB with limit/offset
  444. if s.AuthManager.HasFullAccess(token) {
  445. sql := fmt.Sprintf("key like \"%s\" LIMIT %d OFFSET %d", pattern, limit, offset)
  446. return s.DB.Query(sql)
  447. }
  448. // Slow path: Fetch all potential matches and filter
  449. // We construct a SQL query that retrieves CANDIDATES.
  450. // We cannot safely apply LIMIT/OFFSET in SQL because we might filter out some results.
  451. // So we must fetch ALL matches.
  452. // WARNING: This can be slow and memory intensive.
  453. // If pattern is wildcard, we might fetch everything.
  454. sql := fmt.Sprintf("key like \"%s\"", pattern)
  455. results, err := s.DB.Query(sql)
  456. if err != nil {
  457. return nil, err
  458. }
  459. filtered := make([]db.QueryResult, 0, len(results))
  460. // Apply Filtering
  461. for _, r := range results {
  462. if err := s.AuthManager.CheckPermission(token, r.Key, ActionRead, ""); err == nil {
  463. filtered = append(filtered, r)
  464. }
  465. }
  466. // Apply Pagination on Filtered Results
  467. if offset > len(filtered) {
  468. return []db.QueryResult{}, nil
  469. }
  470. start := offset
  471. end := offset + limit
  472. if end > len(filtered) {
  473. end = len(filtered)
  474. }
  475. return filtered[start:end], nil
  476. }
  477. // CountAuthenticated counts keys with permission checks
  478. func (s *KVServer) CountAuthenticated(pattern string, token string) (int, error) {
  479. // Optimization: If user has full access
  480. if s.AuthManager.HasFullAccess(token) {
  481. sql := ""
  482. if pattern == "*" {
  483. sql = "*"
  484. } else {
  485. sql = fmt.Sprintf("key like \"%s\"", pattern)
  486. }
  487. return s.DB.Count(sql)
  488. }
  489. // Slow path: Iterate and check
  490. // We use DB.Query to get keys (Query returns values too, which is wasteful but DB engine doesn't expose keys-only Query yet)
  491. // Actually, we can access s.DB.Index.WalkPrefix if we want to be faster and avoid value read,
  492. // but s.DB.Index is inside 'db' package. We can access it if it's exported.
  493. // 'db' package exports 'Engine' and 'FlatIndex'.
  494. // So s.DB.Index IS accessible.
  495. count := 0
  496. // Determine prefix from pattern
  497. prefix := ""
  498. if strings.HasSuffix(pattern, "*") {
  499. prefix = strings.TrimSuffix(pattern, "*")
  500. } else if pattern == "*" {
  501. prefix = ""
  502. } else {
  503. // Exact match check
  504. if err := s.AuthManager.CheckPermission(token, pattern, ActionRead, ""); err == nil {
  505. // Check if exists
  506. if _, ok := s.DB.Get(pattern); ok {
  507. return 1, nil
  508. }
  509. return 0, nil
  510. }
  511. return 0, nil // No perm or not found
  512. }
  513. // Walk
  514. // Note: WalkPrefix locks the DB Index (Read Lock).
  515. // Calling CheckPermission inside might involve some logic but it is memory-only and usually fast.
  516. // However, if CheckPermission takes time, we hold DB lock.
  517. s.DB.Index.WalkPrefix(prefix, func(key string, entry db.IndexEntry) bool {
  518. // Check pattern match first (WalkPrefix is just prefix, pattern might be more complex like "user.*.name")
  519. if !db.WildcardMatch(key, pattern) {
  520. return true
  521. }
  522. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err == nil {
  523. count++
  524. }
  525. return true
  526. })
  527. return count, nil
  528. }
  529. // GetLinear gets a value with linearizable consistency
  530. // This ensures the read sees all writes committed before the read started
  531. func (s *KVServer) GetLinear(key string) (string, bool, error) {
  532. // First, ensure we have up-to-date data via ReadIndex
  533. _, err := s.Raft.ReadIndex()
  534. if err != nil {
  535. // If we're not leader, try forwarding
  536. if errors.Is(err, ErrNotLeader) {
  537. return s.forwardGet(key)
  538. }
  539. return "", false, err
  540. }
  541. val, ok := s.DB.Get(key)
  542. return val, ok, nil
  543. }
  544. // forwardGet forwards a get request to the leader
  545. func (s *KVServer) forwardGet(key string) (string, bool, error) {
  546. return s.Raft.ForwardGet(key)
  547. }
  548. // Join joins an existing cluster
  549. func (s *KVServer) Join(nodeID, addr string) error {
  550. return s.Raft.AddNodeWithForward(nodeID, addr)
  551. }
  552. // Leave leaves the cluster
  553. func (s *KVServer) Leave(nodeID string) error {
  554. // Mark node as leaving to prevent auto-rejoin
  555. s.leavingNodes.Store(nodeID, time.Now())
  556. // Auto-expire the leaving flag after a while
  557. go func() {
  558. time.Sleep(30 * time.Second)
  559. s.leavingNodes.Delete(nodeID)
  560. }()
  561. // Remove from RaftNode discovery key first to prevent auto-rejoin
  562. if err := s.removeNodeFromDiscovery(nodeID); err != nil {
  563. s.Raft.config.Logger.Warn("Failed to remove node from discovery key: %v", err)
  564. // Continue anyway, as the main goal is to leave the cluster
  565. }
  566. return s.Raft.RemoveNodeWithForward(nodeID)
  567. }
  568. // removeNodeFromDiscovery removes a node from the RaftNode key to prevent auto-rejoin
  569. func (s *KVServer) removeNodeFromDiscovery(targetID string) error {
  570. val, ok := s.Get("RaftNode")
  571. if !ok || val == "" {
  572. return nil
  573. }
  574. parts := strings.Split(val, ";")
  575. var newParts []string
  576. changed := false
  577. for _, part := range parts {
  578. if part == "" {
  579. continue
  580. }
  581. kv := strings.SplitN(part, "=", 2)
  582. if len(kv) == 2 {
  583. if kv[0] == targetID {
  584. changed = true
  585. continue // Skip this node
  586. }
  587. newParts = append(newParts, part)
  588. }
  589. }
  590. if changed {
  591. newVal := strings.Join(newParts, ";")
  592. return s.Set("RaftNode", newVal)
  593. }
  594. return nil
  595. }
  596. // WaitForLeader waits until a leader is elected
  597. func (s *KVServer) WaitForLeader(timeout time.Duration) error {
  598. deadline := time.Now().Add(timeout)
  599. for time.Now().Before(deadline) {
  600. leader := s.Raft.GetLeaderID()
  601. if leader != "" {
  602. return nil
  603. }
  604. time.Sleep(100 * time.Millisecond)
  605. }
  606. return fmt.Errorf("timeout waiting for leader")
  607. }
  608. // HealthCheck returns the health status of this server
  609. func (s *KVServer) HealthCheck() HealthStatus {
  610. return s.Raft.HealthCheck()
  611. }
  612. // GetStats returns runtime statistics
  613. func (s *KVServer) GetStats() Stats {
  614. return s.Raft.GetStats()
  615. }
  616. // GetMetrics returns runtime metrics
  617. func (s *KVServer) GetMetrics() Metrics {
  618. return s.Raft.GetMetrics()
  619. }
  620. // TransferLeadership transfers leadership to the specified node
  621. func (s *KVServer) TransferLeadership(targetID string) error {
  622. return s.Raft.TransferLeadership(targetID)
  623. }
  624. // GetClusterNodes returns current cluster membership
  625. func (s *KVServer) GetClusterNodes() map[string]string {
  626. return s.Raft.GetClusterNodes()
  627. }
  628. // IsLeader returns true if this node is the leader
  629. func (s *KVServer) IsLeader() bool {
  630. _, isLeader := s.Raft.GetState()
  631. return isLeader
  632. }
  633. // GetLeaderID returns the current leader ID
  634. func (s *KVServer) GetLeaderID() string {
  635. return s.Raft.GetLeaderID()
  636. }
  637. // GetLogSize returns the raft log size
  638. func (s *KVServer) GetLogSize() int64 {
  639. return s.Raft.log.GetLogSize()
  640. }
  641. // GetDBSize returns the db size
  642. func (s *KVServer) GetDBSize() int64 {
  643. return s.DB.GetDBSize()
  644. }
  645. // WatchURL registers a webhook url for a key
  646. func (s *KVServer) WatchURL(key, url string) {
  647. s.Watcher.Subscribe(key, url)
  648. }
  649. // UnwatchURL removes a webhook url for a key
  650. func (s *KVServer) UnwatchURL(key, url string) {
  651. s.Watcher.Unsubscribe(key, url)
  652. }
  653. // WatchAll registers a watcher for all keys
  654. func (s *KVServer) WatchAll(handler WatchHandler) {
  655. // s.FSM.WatchAll(handler)
  656. // TODO: Implement Watcher for DB
  657. }
  658. // Watch registers a watcher for a key
  659. func (s *KVServer) Watch(key string, handler WatchHandler) {
  660. // s.FSM.Watch(key, handler)
  661. // TODO: Implement Watcher for DB
  662. }
  663. // Unwatch removes watchers for a key
  664. func (s *KVServer) Unwatch(key string) {
  665. // s.FSM.Unwatch(key)
  666. // TODO: Implement Watcher for DB
  667. }
  668. func (s *KVServer) maintenanceLoop() {
  669. defer s.wg.Done()
  670. // Check every 1 second for faster reaction
  671. ticker := time.NewTicker(1 * time.Second)
  672. defer ticker.Stop()
  673. for {
  674. select {
  675. case <-s.stopCh:
  676. return
  677. case <-ticker.C:
  678. s.updateNodeInfo()
  679. s.checkConnections()
  680. }
  681. }
  682. }
  683. func (s *KVServer) updateNodeInfo() {
  684. // 1. Ensure "CreateNode/<NodeID>" is set to self address
  685. // We do this via Propose (Set) so it's replicated
  686. myID := s.Raft.config.NodeID
  687. myAddr := s.Raft.config.ListenAddr
  688. key := fmt.Sprintf("CreateNode/%s", myID)
  689. // Check if we need to update (avoid spamming logs/proposals)
  690. val, exists := s.Get(key)
  691. if !exists || val != myAddr {
  692. // Run in goroutine to avoid blocking
  693. go func() {
  694. if err := s.Set(key, myAddr); err != nil {
  695. s.Raft.config.Logger.Debug("Failed to update node info: %v", err)
  696. }
  697. }()
  698. }
  699. // 2. Only leader updates RaftNode aggregation
  700. if s.IsLeader() {
  701. // Read current RaftNode to preserve history
  702. currentVal, _ := s.Get("RaftNode")
  703. knownNodes := make(map[string]string)
  704. if currentVal != "" {
  705. parts := strings.Split(currentVal, ";")
  706. for _, part := range parts {
  707. if part == "" { continue }
  708. kv := strings.SplitN(part, "=", 2)
  709. if len(kv) == 2 {
  710. knownNodes[kv[0]] = kv[1]
  711. }
  712. }
  713. }
  714. // Merge current cluster nodes
  715. changed := false
  716. currentCluster := s.GetClusterNodes()
  717. for id, addr := range currentCluster {
  718. // Skip nodes that are marked as leaving
  719. if _, leaving := s.leavingNodes.Load(id); leaving {
  720. continue
  721. }
  722. if knownNodes[id] != addr {
  723. knownNodes[id] = addr
  724. changed = true
  725. }
  726. }
  727. // If changed, update RaftNode
  728. if changed {
  729. var peers []string
  730. for id, addr := range knownNodes {
  731. peers = append(peers, fmt.Sprintf("%s=%s", id, addr))
  732. }
  733. sort.Strings(peers)
  734. newVal := strings.Join(peers, ";")
  735. // Check again if we need to write to avoid loops if Get returned stale
  736. if newVal != currentVal {
  737. go func(k, v string) {
  738. if err := s.Set(k, v); err != nil {
  739. s.Raft.config.Logger.Warn("Failed to update RaftNode key: %v", err)
  740. }
  741. }("RaftNode", newVal)
  742. }
  743. }
  744. }
  745. }
  746. func (s *KVServer) checkConnections() {
  747. if !s.IsLeader() {
  748. return
  749. }
  750. // Read RaftNode key to find potential members that are missing
  751. val, ok := s.Get("RaftNode")
  752. if !ok || val == "" {
  753. return
  754. }
  755. // Parse saved nodes
  756. savedParts := strings.Split(val, ";")
  757. currentNodes := s.GetClusterNodes()
  758. // Invert currentNodes for address check
  759. currentAddrs := make(map[string]bool)
  760. for _, addr := range currentNodes {
  761. currentAddrs[addr] = true
  762. }
  763. for _, part := range savedParts {
  764. if part == "" {
  765. continue
  766. }
  767. // Expect id=addr
  768. kv := strings.SplitN(part, "=", 2)
  769. if len(kv) != 2 {
  770. continue
  771. }
  772. id, addr := kv[0], kv[1]
  773. // Skip invalid addresses
  774. if strings.HasPrefix(addr, ".") || !strings.Contains(addr, ":") {
  775. continue
  776. }
  777. if !currentAddrs[addr] {
  778. // Skip nodes that are marked as leaving
  779. if _, leaving := s.leavingNodes.Load(id); leaving {
  780. continue
  781. }
  782. // Found a node that was previously in the cluster but is now missing
  783. // Try to add it back
  784. // We use AddNodeWithForward which handles non-blocking internally somewhat,
  785. // but we should run this in goroutine to not block the loop
  786. go func(nodeID, nodeAddr string) {
  787. // Try to add node
  788. s.Raft.config.Logger.Info("Auto-rejoining node found in RaftNode: %s (%s)", nodeID, nodeAddr)
  789. if err := s.Join(nodeID, nodeAddr); err != nil {
  790. s.Raft.config.Logger.Debug("Failed to auto-rejoin node %s: %v", nodeID, err)
  791. }
  792. }(id, addr)
  793. }
  794. }
  795. }
  796. // startHTTPServer starts the HTTP API server
  797. func (s *KVServer) startHTTPServer(addr string) error {
  798. mux := http.NewServeMux()
  799. // KV API
  800. mux.HandleFunc("/kv", func(w http.ResponseWriter, r *http.Request) {
  801. token := r.Header.Get("X-Raft-Token")
  802. switch r.Method {
  803. case http.MethodGet:
  804. key := r.URL.Query().Get("key")
  805. if key == "" {
  806. http.Error(w, "missing key", http.StatusBadRequest)
  807. return
  808. }
  809. // Use Authenticated method
  810. val, found, err := s.GetLinearAuthenticated(key, token)
  811. if err != nil {
  812. // Distinguish auth error vs raft error?
  813. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  814. http.Error(w, err.Error(), http.StatusForbidden)
  815. } else {
  816. http.Error(w, err.Error(), http.StatusInternalServerError)
  817. }
  818. return
  819. }
  820. if !found {
  821. http.Error(w, "not found", http.StatusNotFound)
  822. return
  823. }
  824. w.Write([]byte(val))
  825. case http.MethodPost:
  826. body, _ := io.ReadAll(r.Body)
  827. var req struct {
  828. Key string `json:"key"`
  829. Value string `json:"value"`
  830. }
  831. if err := json.Unmarshal(body, &req); err != nil {
  832. http.Error(w, "invalid json", http.StatusBadRequest)
  833. return
  834. }
  835. if err := s.SetAuthenticated(req.Key, req.Value, token); err != nil {
  836. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  837. http.Error(w, err.Error(), http.StatusForbidden)
  838. } else {
  839. http.Error(w, err.Error(), http.StatusInternalServerError)
  840. }
  841. return
  842. }
  843. w.WriteHeader(http.StatusOK)
  844. case http.MethodDelete:
  845. key := r.URL.Query().Get("key")
  846. if key == "" {
  847. http.Error(w, "missing key", http.StatusBadRequest)
  848. return
  849. }
  850. if err := s.DelAuthenticated(key, token); err != nil {
  851. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  852. http.Error(w, err.Error(), http.StatusForbidden)
  853. } else {
  854. http.Error(w, err.Error(), http.StatusInternalServerError)
  855. }
  856. return
  857. }
  858. w.WriteHeader(http.StatusOK)
  859. default:
  860. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  861. }
  862. })
  863. // Auth API
  864. mux.HandleFunc("/auth/login", func(w http.ResponseWriter, r *http.Request) {
  865. if r.Method != http.MethodPost {
  866. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  867. return
  868. }
  869. var req struct {
  870. Username string `json:"username"`
  871. Password string `json:"password"`
  872. Code string `json:"code"`
  873. }
  874. body, _ := io.ReadAll(r.Body)
  875. if err := json.Unmarshal(body, &req); err != nil {
  876. http.Error(w, "invalid json", http.StatusBadRequest)
  877. return
  878. }
  879. ip := r.RemoteAddr
  880. if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil {
  881. ip = host
  882. }
  883. token, err := s.AuthManager.Login(req.Username, req.Password, req.Code, ip)
  884. if err != nil {
  885. http.Error(w, err.Error(), http.StatusUnauthorized)
  886. return
  887. }
  888. resp := struct {
  889. Token string `json:"token"`
  890. }{Token: token}
  891. json.NewEncoder(w).Encode(resp)
  892. })
  893. // Watcher API
  894. mux.HandleFunc("/watch", func(w http.ResponseWriter, r *http.Request) {
  895. if r.Method != http.MethodPost {
  896. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  897. return
  898. }
  899. body, _ := io.ReadAll(r.Body)
  900. var req struct {
  901. Key string `json:"key"`
  902. URL string `json:"url"`
  903. }
  904. if err := json.Unmarshal(body, &req); err != nil {
  905. http.Error(w, "invalid json", http.StatusBadRequest)
  906. return
  907. }
  908. if req.Key == "" || req.URL == "" {
  909. http.Error(w, "missing key or url", http.StatusBadRequest)
  910. return
  911. }
  912. s.WatchURL(req.Key, req.URL)
  913. w.WriteHeader(http.StatusOK)
  914. })
  915. mux.HandleFunc("/unwatch", func(w http.ResponseWriter, r *http.Request) {
  916. if r.Method != http.MethodPost {
  917. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  918. return
  919. }
  920. body, _ := io.ReadAll(r.Body)
  921. var req struct {
  922. Key string `json:"key"`
  923. URL string `json:"url"`
  924. }
  925. if err := json.Unmarshal(body, &req); err != nil {
  926. http.Error(w, "invalid json", http.StatusBadRequest)
  927. return
  928. }
  929. s.UnwatchURL(req.Key, req.URL)
  930. w.WriteHeader(http.StatusOK)
  931. })
  932. s.httpServer = &http.Server{
  933. Addr: addr,
  934. Handler: mux,
  935. }
  936. go func() {
  937. s.Raft.config.Logger.Info("HTTP API server listening on %s", addr)
  938. if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  939. s.Raft.config.Logger.Error("HTTP server failed: %v", err)
  940. }
  941. }()
  942. return nil
  943. }