server.go 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "sort"
  10. "strings"
  11. "sync"
  12. "time"
  13. "igit.com/xbase/raft/db"
  14. )
  15. // KVServer wraps Raft to provide a distributed key-value store
  16. type KVServer struct {
  17. Raft *Raft
  18. DB *db.Engine
  19. CLI *CLI
  20. AuthManager *AuthManager
  21. Watcher *WebHookWatcher
  22. httpServer *http.Server
  23. stopCh chan struct{}
  24. wg sync.WaitGroup
  25. stopOnce sync.Once
  26. // leavingNodes tracks nodes that are currently being removed
  27. // to prevent auto-rejoin/discovery logic from interfering
  28. leavingNodes sync.Map
  29. // Pending requests for synchronous operations (Read-Your-Writes)
  30. pendingRequests map[uint64]chan error
  31. pendingMu sync.Mutex
  32. }
  33. // NewKVServer creates a new KV server
  34. func NewKVServer(config *Config) (*KVServer, error) {
  35. // Initialize DB Engine
  36. // Use a subdirectory for DB to avoid conflict with Raft logs if they share DataDir
  37. dbPath := config.DataDir + "/kv_engine"
  38. engine, err := db.NewEngine(dbPath)
  39. if err != nil {
  40. return nil, fmt.Errorf("failed to create db engine: %w", err)
  41. }
  42. // Initialize LastAppliedIndex from DB to prevent re-applying entries
  43. config.LastAppliedIndex = engine.GetLastAppliedIndex()
  44. // Create stop channel early for use in callbacks
  45. stopCh := make(chan struct{})
  46. // Configure snapshot provider
  47. config.SnapshotProvider = func(minIncludeIndex uint64) ([]byte, error) {
  48. // Wait for DB to catch up to the requested index
  49. // This is critical for data integrity during compaction
  50. for engine.GetLastAppliedIndex() < minIncludeIndex {
  51. select {
  52. case <-stopCh:
  53. return nil, fmt.Errorf("server stopping")
  54. default:
  55. time.Sleep(10 * time.Millisecond)
  56. }
  57. }
  58. // Force sync to disk to ensure data durability before compaction
  59. // This prevents data loss if Raft logs are compacted but DB data is only in OS cache
  60. if err := engine.Sync(); err != nil {
  61. return nil, fmt.Errorf("failed to sync engine before snapshot: %w", err)
  62. }
  63. return engine.Snapshot()
  64. }
  65. // Configure get handler for remote reads
  66. config.GetHandler = func(key string) (string, bool) {
  67. return engine.Get(key)
  68. }
  69. applyCh := make(chan ApplyMsg, 1000) // Increase buffer for async processing
  70. transport := NewTCPTransport(config.ListenAddr, 10, config.Logger)
  71. // Initialize WebHookWatcher
  72. // 5 workers, 3 retries
  73. watcher := NewWebHookWatcher(5, 3, config.Logger)
  74. r, err := NewRaft(config, transport, applyCh)
  75. if err != nil {
  76. engine.Close()
  77. return nil, err
  78. }
  79. s := &KVServer{
  80. Raft: r,
  81. DB: engine,
  82. CLI: nil,
  83. AuthManager: nil, // initialized below
  84. Watcher: watcher,
  85. stopCh: stopCh,
  86. pendingRequests: make(map[uint64]chan error),
  87. }
  88. // Initialize AuthManager
  89. s.AuthManager = NewAuthManager(s)
  90. // Load Auth Data from DB (if any)
  91. if err := s.AuthManager.LoadFromDB(); err != nil {
  92. s.Raft.config.Logger.Warn("Failed to load auth data from DB: %v", err)
  93. }
  94. // Initialize CLI
  95. s.CLI = NewCLI(s)
  96. // Start applying entries
  97. go s.runApplyLoop(applyCh)
  98. // Start background maintenance loop
  99. s.wg.Add(1)
  100. go s.maintenanceLoop()
  101. return s, nil
  102. }
  103. func (s *KVServer) Start() error {
  104. // Start CLI if enabled
  105. if s.Raft.config.EnableCLI {
  106. go s.CLI.Start()
  107. }
  108. // Start HTTP Server if configured
  109. if s.Raft.config.HTTPAddr != "" {
  110. if err := s.startHTTPServer(s.Raft.config.HTTPAddr); err != nil {
  111. s.Raft.config.Logger.Warn("Failed to start HTTP server: %v", err)
  112. }
  113. }
  114. // Start TCP Server (Hardcoded offset +10 for demo or config?)
  115. // User didn't add TCPPort to config, so let's derive it or just fix it.
  116. // For demo, if ListenAddr is 127.0.0.1:9001, we try 9011.
  117. host, port, _ := net.SplitHostPort(s.Raft.config.ListenAddr)
  118. if port == "9001" {
  119. tcpAddr := fmt.Sprintf("%s:%s", host, "9011")
  120. if err := s.StartTCPServer(tcpAddr); err != nil {
  121. s.Raft.config.Logger.Warn("Failed to start TCP server: %v", err)
  122. }
  123. } else if port == "9002" {
  124. tcpAddr := fmt.Sprintf("%s:%s", host, "9012")
  125. s.StartTCPServer(tcpAddr)
  126. } else {
  127. // Fallback or ignore for other nodes in demo
  128. }
  129. return s.Raft.Start()
  130. }
  131. func (s *KVServer) Stop() error {
  132. var err error
  133. s.stopOnce.Do(func() {
  134. // Stop maintenance loop
  135. if s.stopCh != nil {
  136. close(s.stopCh)
  137. s.wg.Wait()
  138. }
  139. // Stop Watcher
  140. if s.Watcher != nil {
  141. s.Watcher.Stop()
  142. }
  143. // Stop HTTP Server
  144. if s.httpServer != nil {
  145. s.httpServer.Close()
  146. }
  147. // Stop Raft first
  148. if errRaft := s.Raft.Stop(); errRaft != nil {
  149. err = errRaft
  150. }
  151. // Close DB
  152. if s.DB != nil {
  153. if errDB := s.DB.Close(); errDB != nil {
  154. // Combine errors if both fail
  155. if err != nil {
  156. err = fmt.Errorf("raft stop error: %v, db close error: %v", err, errDB)
  157. } else {
  158. err = errDB
  159. }
  160. }
  161. }
  162. })
  163. return err
  164. }
  165. func (s *KVServer) runApplyLoop(applyCh chan ApplyMsg) {
  166. for msg := range applyCh {
  167. if msg.CommandValid {
  168. // Optimization: Skip if already applied
  169. // We check this here to avoid unmarshalling and locking DB for known duplicates
  170. if msg.CommandIndex <= s.DB.GetLastAppliedIndex() {
  171. // Even if duplicate, we might have someone waiting for it if they just proposed it
  172. // but this is unlikely for duplicates.
  173. // However, notify waiters just in case.
  174. s.notifyPending(msg.CommandIndex, nil)
  175. continue
  176. }
  177. var cmd KVCommand
  178. if err := json.Unmarshal(msg.Command, &cmd); err != nil {
  179. s.Raft.config.Logger.Error("Failed to unmarshal command: %v", err)
  180. s.notifyPending(msg.CommandIndex, err)
  181. continue
  182. }
  183. var err error
  184. switch cmd.Type {
  185. case KVSet:
  186. // Update Auth Cache for system keys
  187. // This includes AuthLockPrefix now
  188. if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
  189. s.AuthManager.UpdateCache(cmd.Key, cmd.Value, false)
  190. }
  191. err = s.DB.Set(cmd.Key, cmd.Value, msg.CommandIndex)
  192. if err == nil {
  193. s.Watcher.Notify(cmd.Key, cmd.Value, KVSet)
  194. }
  195. case KVDel:
  196. // Update Auth Cache for system keys
  197. if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
  198. s.AuthManager.UpdateCache(cmd.Key, "", true)
  199. }
  200. err = s.DB.Delete(cmd.Key, msg.CommandIndex)
  201. if err == nil {
  202. s.Watcher.Notify(cmd.Key, "", KVDel)
  203. }
  204. default:
  205. s.Raft.config.Logger.Error("Unknown command type: %d", cmd.Type)
  206. }
  207. if err != nil {
  208. s.Raft.config.Logger.Error("DB Apply failed: %v", err)
  209. }
  210. // Notify anyone waiting for this index
  211. s.notifyPending(msg.CommandIndex, err)
  212. } else if msg.SnapshotValid {
  213. if err := s.DB.Restore(msg.Snapshot); err != nil {
  214. s.Raft.config.Logger.Error("DB Restore failed: %v", err)
  215. }
  216. }
  217. }
  218. }
  219. // notifyPending notifies any waiting requests for the given index
  220. func (s *KVServer) notifyPending(index uint64, err error) {
  221. s.pendingMu.Lock()
  222. defer s.pendingMu.Unlock()
  223. if ch, ok := s.pendingRequests[index]; ok {
  224. // Non-blocking send in case listener is gone (buffered channel recommended)
  225. select {
  226. case ch <- err:
  227. default:
  228. }
  229. close(ch)
  230. delete(s.pendingRequests, index)
  231. }
  232. }
  233. // SetAuthenticated sets a key-value pair with permission check
  234. func (s *KVServer) SetAuthenticated(key, value, token string) error {
  235. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, value); err != nil {
  236. return err
  237. }
  238. // Use SetSync for consistency in CLI/API
  239. return s.SetSync(key, value)
  240. }
  241. // DelAuthenticated deletes a key with permission check
  242. func (s *KVServer) DelAuthenticated(key, token string) error {
  243. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, ""); err != nil {
  244. return err
  245. }
  246. // Use SetSync (via Del which should be updated or create DelSync)
  247. // For simplicity, we implement DelSync logic here or update Del to be sync?
  248. // Let's implement DelSync
  249. return s.DelSync(key)
  250. }
  251. // Set sets a key-value pair (Async - eventually consistent)
  252. func (s *KVServer) Set(key, value string) error {
  253. cmd := KVCommand{
  254. Type: KVSet,
  255. Key: key,
  256. Value: value,
  257. }
  258. data, err := json.Marshal(cmd)
  259. if err != nil {
  260. return err
  261. }
  262. _, _, err = s.Raft.ProposeWithForward(data)
  263. return err
  264. }
  265. // SetSync sets a key-value pair and waits for it to be applied (Read-Your-Writes)
  266. func (s *KVServer) SetSync(key, value string) error {
  267. cmd := KVCommand{
  268. Type: KVSet,
  269. Key: key,
  270. Value: value,
  271. }
  272. data, err := json.Marshal(cmd)
  273. if err != nil {
  274. return err
  275. }
  276. index, _, err := s.Raft.ProposeWithForward(data)
  277. if err != nil {
  278. return err
  279. }
  280. // Wait for application
  281. ch := make(chan error, 1)
  282. s.pendingMu.Lock()
  283. s.pendingRequests[index] = ch
  284. s.pendingMu.Unlock()
  285. select {
  286. case applyErr := <-ch:
  287. return applyErr
  288. case <-time.After(5 * time.Second):
  289. s.pendingMu.Lock()
  290. delete(s.pendingRequests, index)
  291. s.pendingMu.Unlock()
  292. return fmt.Errorf("timeout waiting for apply")
  293. }
  294. }
  295. // Del deletes a key (Async)
  296. func (s *KVServer) Del(key string) error {
  297. cmd := KVCommand{
  298. Type: KVDel,
  299. Key: key,
  300. }
  301. data, err := json.Marshal(cmd)
  302. if err != nil {
  303. return err
  304. }
  305. _, _, err = s.Raft.ProposeWithForward(data)
  306. return err
  307. }
  308. // DelSync deletes a key and waits for it to be applied
  309. func (s *KVServer) DelSync(key string) error {
  310. cmd := KVCommand{
  311. Type: KVDel,
  312. Key: key,
  313. }
  314. data, err := json.Marshal(cmd)
  315. if err != nil {
  316. return err
  317. }
  318. index, _, err := s.Raft.ProposeWithForward(data)
  319. if err != nil {
  320. return err
  321. }
  322. // Wait for application
  323. ch := make(chan error, 1)
  324. s.pendingMu.Lock()
  325. s.pendingRequests[index] = ch
  326. s.pendingMu.Unlock()
  327. select {
  328. case applyErr := <-ch:
  329. return applyErr
  330. case <-time.After(5 * time.Second):
  331. s.pendingMu.Lock()
  332. delete(s.pendingRequests, index)
  333. s.pendingMu.Unlock()
  334. return fmt.Errorf("timeout waiting for apply")
  335. }
  336. }
  337. // GetAuthenticated gets a value with permission check (local read)
  338. func (s *KVServer) GetAuthenticated(key, token string) (string, bool, error) {
  339. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
  340. return "", false, err
  341. }
  342. val, ok := s.Get(key)
  343. return val, ok, nil
  344. }
  345. // Get gets a value (local read, can be stale)
  346. // For linearizable reads, use GetLinear instead
  347. func (s *KVServer) Get(key string) (string, bool) {
  348. return s.DB.Get(key)
  349. }
  350. // GetLinearAuthenticated gets a value with linearizable consistency and permission check
  351. func (s *KVServer) GetLinearAuthenticated(key, token string) (string, bool, error) {
  352. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
  353. return "", false, err
  354. }
  355. return s.GetLinear(key)
  356. }
  357. // Logout invalidates the current session
  358. func (s *KVServer) Logout(token string) error {
  359. return s.AuthManager.Logout(token)
  360. }
  361. // GetSessionInfo returns the session details
  362. func (s *KVServer) GetSessionInfo(token string) (*Session, error) {
  363. return s.AuthManager.GetSession(token)
  364. }
  365. // IsRoot checks if the token belongs to the root user
  366. func (s *KVServer) IsRoot(token string) bool {
  367. sess, err := s.AuthManager.GetSession(token)
  368. if err != nil {
  369. return false
  370. }
  371. return sess.Username == "root"
  372. }
  373. // CreateUser creates a new user (Root only)
  374. func (s *KVServer) CreateUser(username, password string, roles []string, token string) error {
  375. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  376. return fmt.Errorf("permission denied: root access required")
  377. }
  378. // Use RegisterUserSync
  379. return s.AuthManager.RegisterUser(username, password, roles)
  380. }
  381. // DeleteUser deletes a user (Root only)
  382. func (s *KVServer) DeleteUser(username string, token string) error {
  383. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  384. return fmt.Errorf("permission denied: root access required")
  385. }
  386. // Check if user exists
  387. if _, err := s.AuthManager.GetUser(username); err != nil {
  388. return err
  389. }
  390. if username == "root" {
  391. return fmt.Errorf("cannot delete root user")
  392. }
  393. // Use DelSync
  394. return s.DelSync(AuthUserPrefix + username)
  395. }
  396. // UpdateUser updates generic user fields (Root only)
  397. func (s *KVServer) UpdateUser(user User, token string) error {
  398. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  399. return fmt.Errorf("permission denied: root access required")
  400. }
  401. // Check if user exists
  402. if _, err := s.AuthManager.GetUser(user.Username); err != nil {
  403. return err
  404. }
  405. return s.AuthManager.UpdateUser(user)
  406. }
  407. // ChangeUserPassword changes a user's password (Root or Self)
  408. func (s *KVServer) ChangeUserPassword(username, newPassword string, token string) error {
  409. if s.AuthManager.IsEnabled() {
  410. session, err := s.AuthManager.GetSession(token)
  411. if err != nil {
  412. return err
  413. }
  414. if session.Username != "root" && session.Username != username {
  415. return fmt.Errorf("permission denied")
  416. }
  417. }
  418. // Use ChangePassword (which should use SetSync)
  419. return s.AuthManager.ChangePassword(username, newPassword)
  420. }
  421. // Role Management Helpers
  422. // CreateRole creates a new role (Root only)
  423. func (s *KVServer) CreateRole(name string, token string) error {
  424. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  425. return fmt.Errorf("permission denied: root access required")
  426. }
  427. return s.AuthManager.CreateRole(name)
  428. }
  429. // DeleteRole deletes a role (Root only)
  430. func (s *KVServer) DeleteRole(name string, token string) error {
  431. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  432. return fmt.Errorf("permission denied: root access required")
  433. }
  434. return s.AuthManager.DeleteRole(name)
  435. }
  436. // UpdateRole updates a role (Root only)
  437. func (s *KVServer) UpdateRole(role Role, token string) error {
  438. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  439. return fmt.Errorf("permission denied: root access required")
  440. }
  441. return s.AuthManager.UpdateRole(role)
  442. }
  443. // ListUsers lists all users (Root only)
  444. func (s *KVServer) ListUsers(token string) ([]*User, error) {
  445. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  446. return nil, fmt.Errorf("permission denied: root access required")
  447. }
  448. return s.AuthManager.ListUsers(), nil
  449. }
  450. // ListRoles lists all roles (Root only)
  451. func (s *KVServer) ListRoles(token string) ([]*Role, error) {
  452. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  453. return nil, fmt.Errorf("permission denied: root access required")
  454. }
  455. return s.AuthManager.ListRoles(), nil
  456. }
  457. // SearchAuthenticated searches keys with permission checks
  458. func (s *KVServer) SearchAuthenticated(pattern string, limit, offset int, token string) ([]db.QueryResult, error) {
  459. // Optimization: If user has full access (*), delegate to DB with limit/offset
  460. if s.AuthManager.HasFullAccess(token) {
  461. sql := fmt.Sprintf("key like \"%s\" LIMIT %d OFFSET %d", pattern, limit, offset)
  462. return s.DB.Query(sql)
  463. }
  464. // Slow path: Fetch all potential matches and filter
  465. // We construct a SQL query that retrieves CANDIDATES.
  466. // We cannot safely apply LIMIT/OFFSET in SQL because we might filter out some results.
  467. // So we must fetch ALL matches.
  468. // WARNING: This can be slow and memory intensive.
  469. // If pattern is wildcard, we might fetch everything.
  470. sql := fmt.Sprintf("key like \"%s\"", pattern)
  471. results, err := s.DB.Query(sql)
  472. if err != nil {
  473. return nil, err
  474. }
  475. filtered := make([]db.QueryResult, 0, len(results))
  476. // Apply Filtering
  477. for _, r := range results {
  478. if err := s.AuthManager.CheckPermission(token, r.Key, ActionRead, ""); err == nil {
  479. filtered = append(filtered, r)
  480. }
  481. }
  482. // Apply Pagination on Filtered Results
  483. if offset > len(filtered) {
  484. return []db.QueryResult{}, nil
  485. }
  486. start := offset
  487. end := offset + limit
  488. if end > len(filtered) {
  489. end = len(filtered)
  490. }
  491. return filtered[start:end], nil
  492. }
  493. // CountAuthenticated counts keys with permission checks
  494. func (s *KVServer) CountAuthenticated(pattern string, token string) (int, error) {
  495. // Optimization: If user has full access
  496. if s.AuthManager.HasFullAccess(token) {
  497. sql := ""
  498. if pattern == "*" {
  499. sql = "*"
  500. } else {
  501. sql = fmt.Sprintf("key like \"%s\"", pattern)
  502. }
  503. return s.DB.Count(sql)
  504. }
  505. // Slow path: Iterate and check
  506. // We use DB.Query to get keys (Query returns values too, which is wasteful but DB engine doesn't expose keys-only Query yet)
  507. // Actually, we can access s.DB.Index.WalkPrefix if we want to be faster and avoid value read,
  508. // but s.DB.Index is inside 'db' package. We can access it if it's exported.
  509. // 'db' package exports 'Engine' and 'FlatIndex'.
  510. // So s.DB.Index IS accessible.
  511. count := 0
  512. // Determine prefix from pattern
  513. prefix := ""
  514. if strings.HasSuffix(pattern, "*") {
  515. prefix = strings.TrimSuffix(pattern, "*")
  516. } else if pattern == "*" {
  517. prefix = ""
  518. } else {
  519. // Exact match check
  520. if err := s.AuthManager.CheckPermission(token, pattern, ActionRead, ""); err == nil {
  521. // Check if exists
  522. if _, ok := s.DB.Get(pattern); ok {
  523. return 1, nil
  524. }
  525. return 0, nil
  526. }
  527. return 0, nil // No perm or not found
  528. }
  529. // Walk
  530. // Note: WalkPrefix locks the DB Index (Read Lock).
  531. // Calling CheckPermission inside might involve some logic but it is memory-only and usually fast.
  532. // However, if CheckPermission takes time, we hold DB lock.
  533. s.DB.Index.WalkPrefix(prefix, func(key string, entry db.IndexEntry) bool {
  534. // Check pattern match first (WalkPrefix is just prefix, pattern might be more complex like "user.*.name")
  535. if !db.WildcardMatch(key, pattern) {
  536. return true
  537. }
  538. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err == nil {
  539. count++
  540. }
  541. return true
  542. })
  543. return count, nil
  544. }
  545. // GetLinear gets a value with linearizable consistency
  546. // This ensures the read sees all writes committed before the read started
  547. func (s *KVServer) GetLinear(key string) (string, bool, error) {
  548. // First, ensure we have up-to-date data via ReadIndex
  549. _, err := s.Raft.ReadIndex()
  550. if err != nil {
  551. // If we're not leader, try forwarding
  552. if errors.Is(err, ErrNotLeader) {
  553. return s.forwardGet(key)
  554. }
  555. return "", false, err
  556. }
  557. val, ok := s.DB.Get(key)
  558. return val, ok, nil
  559. }
  560. // forwardGet forwards a get request to the leader
  561. func (s *KVServer) forwardGet(key string) (string, bool, error) {
  562. return s.Raft.ForwardGet(key)
  563. }
  564. // Join joins an existing cluster
  565. func (s *KVServer) Join(nodeID, addr string) error {
  566. return s.Raft.AddNodeWithForward(nodeID, addr)
  567. }
  568. // Leave leaves the cluster
  569. func (s *KVServer) Leave(nodeID string) error {
  570. // Mark node as leaving to prevent auto-rejoin
  571. s.leavingNodes.Store(nodeID, time.Now())
  572. // Auto-expire the leaving flag after a while
  573. go func() {
  574. time.Sleep(30 * time.Second)
  575. s.leavingNodes.Delete(nodeID)
  576. }()
  577. // Remove from RaftNode discovery key first to prevent auto-rejoin
  578. if err := s.removeNodeFromDiscovery(nodeID); err != nil {
  579. s.Raft.config.Logger.Warn("Failed to remove node from discovery key: %v", err)
  580. // Continue anyway, as the main goal is to leave the cluster
  581. }
  582. return s.Raft.RemoveNodeWithForward(nodeID)
  583. }
  584. // removeNodeFromDiscovery removes a node from the RaftNode key to prevent auto-rejoin
  585. func (s *KVServer) removeNodeFromDiscovery(targetID string) error {
  586. val, ok := s.Get("RaftNode")
  587. if !ok || val == "" {
  588. return nil
  589. }
  590. parts := strings.Split(val, ";")
  591. var newParts []string
  592. changed := false
  593. for _, part := range parts {
  594. if part == "" {
  595. continue
  596. }
  597. kv := strings.SplitN(part, "=", 2)
  598. if len(kv) == 2 {
  599. if kv[0] == targetID {
  600. changed = true
  601. continue // Skip this node
  602. }
  603. newParts = append(newParts, part)
  604. }
  605. }
  606. if changed {
  607. newVal := strings.Join(newParts, ";")
  608. return s.Set("RaftNode", newVal)
  609. }
  610. return nil
  611. }
  612. // WaitForLeader waits until a leader is elected
  613. func (s *KVServer) WaitForLeader(timeout time.Duration) error {
  614. deadline := time.Now().Add(timeout)
  615. for time.Now().Before(deadline) {
  616. leader := s.Raft.GetLeaderID()
  617. if leader != "" {
  618. return nil
  619. }
  620. time.Sleep(100 * time.Millisecond)
  621. }
  622. return fmt.Errorf("timeout waiting for leader")
  623. }
  624. // HealthCheck returns the health status of this server
  625. func (s *KVServer) HealthCheck() HealthStatus {
  626. return s.Raft.HealthCheck()
  627. }
  628. // GetStats returns runtime statistics
  629. func (s *KVServer) GetStats() Stats {
  630. return s.Raft.GetStats()
  631. }
  632. // GetMetrics returns runtime metrics
  633. func (s *KVServer) GetMetrics() Metrics {
  634. return s.Raft.GetMetrics()
  635. }
  636. // TransferLeadership transfers leadership to the specified node
  637. func (s *KVServer) TransferLeadership(targetID string) error {
  638. return s.Raft.TransferLeadership(targetID)
  639. }
  640. // GetClusterNodes returns current cluster membership
  641. func (s *KVServer) GetClusterNodes() map[string]string {
  642. return s.Raft.GetClusterNodes()
  643. }
  644. // IsLeader returns true if this node is the leader
  645. func (s *KVServer) IsLeader() bool {
  646. _, isLeader := s.Raft.GetState()
  647. return isLeader
  648. }
  649. // GetLeaderID returns the current leader ID
  650. func (s *KVServer) GetLeaderID() string {
  651. return s.Raft.GetLeaderID()
  652. }
  653. // GetLogSize returns the raft log size
  654. func (s *KVServer) GetLogSize() int64 {
  655. return s.Raft.log.GetLogSize()
  656. }
  657. // GetDBSize returns the db size
  658. func (s *KVServer) GetDBSize() int64 {
  659. return s.DB.GetDBSize()
  660. }
  661. // WatchURL registers a webhook url for a key
  662. func (s *KVServer) WatchURL(key, url string) {
  663. s.Watcher.Subscribe(key, url)
  664. }
  665. // UnwatchURL removes a webhook url for a key
  666. func (s *KVServer) UnwatchURL(key, url string) {
  667. s.Watcher.Unsubscribe(key, url)
  668. }
  669. // WatchAll registers a watcher for all keys
  670. func (s *KVServer) WatchAll(handler WatchHandler) {
  671. // s.FSM.WatchAll(handler)
  672. // TODO: Implement Watcher for DB
  673. }
  674. // Watch registers a watcher for a key
  675. func (s *KVServer) Watch(key string, handler WatchHandler) {
  676. // s.FSM.Watch(key, handler)
  677. // TODO: Implement Watcher for DB
  678. }
  679. // Unwatch removes watchers for a key
  680. func (s *KVServer) Unwatch(key string) {
  681. // s.FSM.Unwatch(key)
  682. // TODO: Implement Watcher for DB
  683. }
  684. func (s *KVServer) maintenanceLoop() {
  685. defer s.wg.Done()
  686. // Check every 1 second for faster reaction
  687. ticker := time.NewTicker(1 * time.Second)
  688. defer ticker.Stop()
  689. for {
  690. select {
  691. case <-s.stopCh:
  692. return
  693. case <-ticker.C:
  694. s.updateNodeInfo()
  695. s.checkConnections()
  696. }
  697. }
  698. }
  699. func (s *KVServer) updateNodeInfo() {
  700. // 1. Ensure "CreateNode/<NodeID>" is set to self address
  701. // We do this via Propose (Set) so it's replicated
  702. myID := s.Raft.config.NodeID
  703. myAddr := s.Raft.config.ListenAddr
  704. key := fmt.Sprintf("CreateNode/%s", myID)
  705. // Check if we need to update (avoid spamming logs/proposals)
  706. val, exists := s.Get(key)
  707. if !exists || val != myAddr {
  708. // Run in goroutine to avoid blocking
  709. go func() {
  710. if err := s.Set(key, myAddr); err != nil {
  711. s.Raft.config.Logger.Debug("Failed to update node info: %v", err)
  712. }
  713. }()
  714. }
  715. // 2. Only leader updates RaftNode aggregation
  716. if s.IsLeader() {
  717. // Read current RaftNode to preserve history
  718. currentVal, _ := s.Get("RaftNode")
  719. knownNodes := make(map[string]string)
  720. if currentVal != "" {
  721. parts := strings.Split(currentVal, ";")
  722. for _, part := range parts {
  723. if part == "" { continue }
  724. kv := strings.SplitN(part, "=", 2)
  725. if len(kv) == 2 {
  726. knownNodes[kv[0]] = kv[1]
  727. }
  728. }
  729. }
  730. // Merge current cluster nodes
  731. changed := false
  732. currentCluster := s.GetClusterNodes()
  733. for id, addr := range currentCluster {
  734. // Skip nodes that are marked as leaving
  735. if _, leaving := s.leavingNodes.Load(id); leaving {
  736. continue
  737. }
  738. if knownNodes[id] != addr {
  739. knownNodes[id] = addr
  740. changed = true
  741. }
  742. }
  743. // If changed, update RaftNode
  744. if changed {
  745. var peers []string
  746. for id, addr := range knownNodes {
  747. peers = append(peers, fmt.Sprintf("%s=%s", id, addr))
  748. }
  749. sort.Strings(peers)
  750. newVal := strings.Join(peers, ";")
  751. // Check again if we need to write to avoid loops if Get returned stale
  752. if newVal != currentVal {
  753. go func(k, v string) {
  754. if err := s.Set(k, v); err != nil {
  755. s.Raft.config.Logger.Warn("Failed to update RaftNode key: %v", err)
  756. }
  757. }("RaftNode", newVal)
  758. }
  759. }
  760. }
  761. }
  762. func (s *KVServer) checkConnections() {
  763. if !s.IsLeader() {
  764. return
  765. }
  766. // Read RaftNode key to find potential members that are missing
  767. val, ok := s.Get("RaftNode")
  768. if !ok || val == "" {
  769. return
  770. }
  771. // Parse saved nodes
  772. savedParts := strings.Split(val, ";")
  773. currentNodes := s.GetClusterNodes()
  774. // Invert currentNodes for address check
  775. currentAddrs := make(map[string]bool)
  776. for _, addr := range currentNodes {
  777. currentAddrs[addr] = true
  778. }
  779. for _, part := range savedParts {
  780. if part == "" {
  781. continue
  782. }
  783. // Expect id=addr
  784. kv := strings.SplitN(part, "=", 2)
  785. if len(kv) != 2 {
  786. continue
  787. }
  788. id, addr := kv[0], kv[1]
  789. // Skip invalid addresses
  790. if strings.HasPrefix(addr, ".") || !strings.Contains(addr, ":") {
  791. continue
  792. }
  793. if !currentAddrs[addr] {
  794. // Skip nodes that are marked as leaving
  795. if _, leaving := s.leavingNodes.Load(id); leaving {
  796. continue
  797. }
  798. // Found a node that was previously in the cluster but is now missing
  799. // Try to add it back
  800. // We use AddNodeWithForward which handles non-blocking internally somewhat,
  801. // but we should run this in goroutine to not block the loop
  802. go func(nodeID, nodeAddr string) {
  803. // Try to add node
  804. s.Raft.config.Logger.Info("Auto-rejoining node found in RaftNode: %s (%s)", nodeID, nodeAddr)
  805. if err := s.Join(nodeID, nodeAddr); err != nil {
  806. s.Raft.config.Logger.Debug("Failed to auto-rejoin node %s: %v", nodeID, err)
  807. }
  808. }(id, addr)
  809. }
  810. }
  811. }
  812. // startHTTPServer starts the HTTP API server
  813. func (s *KVServer) startHTTPServer(addr string) error {
  814. mux := http.NewServeMux()
  815. // KV API
  816. mux.HandleFunc("/kv", func(w http.ResponseWriter, r *http.Request) {
  817. token := r.Header.Get("X-Raft-Token")
  818. switch r.Method {
  819. case http.MethodGet:
  820. key := r.URL.Query().Get("key")
  821. if key == "" {
  822. http.Error(w, "missing key", http.StatusBadRequest)
  823. return
  824. }
  825. // Use Authenticated method
  826. val, found, err := s.GetLinearAuthenticated(key, token)
  827. if err != nil {
  828. // Distinguish auth error vs raft error?
  829. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  830. http.Error(w, err.Error(), http.StatusForbidden)
  831. } else {
  832. http.Error(w, err.Error(), http.StatusInternalServerError)
  833. }
  834. return
  835. }
  836. if !found {
  837. http.Error(w, "not found", http.StatusNotFound)
  838. return
  839. }
  840. w.Write([]byte(val))
  841. case http.MethodPost:
  842. body, _ := io.ReadAll(r.Body)
  843. var req struct {
  844. Key string `json:"key"`
  845. Value string `json:"value"`
  846. }
  847. if err := json.Unmarshal(body, &req); err != nil {
  848. http.Error(w, "invalid json", http.StatusBadRequest)
  849. return
  850. }
  851. if err := s.SetAuthenticated(req.Key, req.Value, token); err != nil {
  852. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  853. http.Error(w, err.Error(), http.StatusForbidden)
  854. } else {
  855. http.Error(w, err.Error(), http.StatusInternalServerError)
  856. }
  857. return
  858. }
  859. w.WriteHeader(http.StatusOK)
  860. case http.MethodDelete:
  861. key := r.URL.Query().Get("key")
  862. if key == "" {
  863. http.Error(w, "missing key", http.StatusBadRequest)
  864. return
  865. }
  866. if err := s.DelAuthenticated(key, token); err != nil {
  867. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  868. http.Error(w, err.Error(), http.StatusForbidden)
  869. } else {
  870. http.Error(w, err.Error(), http.StatusInternalServerError)
  871. }
  872. return
  873. }
  874. w.WriteHeader(http.StatusOK)
  875. default:
  876. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  877. }
  878. })
  879. // Auth API
  880. mux.HandleFunc("/auth/login", func(w http.ResponseWriter, r *http.Request) {
  881. if r.Method != http.MethodPost {
  882. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  883. return
  884. }
  885. var req struct {
  886. Username string `json:"username"`
  887. Password string `json:"password"`
  888. Code string `json:"code"`
  889. }
  890. body, _ := io.ReadAll(r.Body)
  891. if err := json.Unmarshal(body, &req); err != nil {
  892. http.Error(w, "invalid json", http.StatusBadRequest)
  893. return
  894. }
  895. ip := r.RemoteAddr
  896. if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil {
  897. ip = host
  898. }
  899. token, err := s.AuthManager.Login(req.Username, req.Password, req.Code, ip)
  900. if err != nil {
  901. http.Error(w, err.Error(), http.StatusUnauthorized)
  902. return
  903. }
  904. resp := struct {
  905. Token string `json:"token"`
  906. }{Token: token}
  907. json.NewEncoder(w).Encode(resp)
  908. })
  909. // Watcher API
  910. mux.HandleFunc("/watch", func(w http.ResponseWriter, r *http.Request) {
  911. if r.Method != http.MethodPost {
  912. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  913. return
  914. }
  915. body, _ := io.ReadAll(r.Body)
  916. var req struct {
  917. Key string `json:"key"`
  918. URL string `json:"url"`
  919. }
  920. if err := json.Unmarshal(body, &req); err != nil {
  921. http.Error(w, "invalid json", http.StatusBadRequest)
  922. return
  923. }
  924. if req.Key == "" || req.URL == "" {
  925. http.Error(w, "missing key or url", http.StatusBadRequest)
  926. return
  927. }
  928. s.WatchURL(req.Key, req.URL)
  929. w.WriteHeader(http.StatusOK)
  930. })
  931. mux.HandleFunc("/unwatch", func(w http.ResponseWriter, r *http.Request) {
  932. if r.Method != http.MethodPost {
  933. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  934. return
  935. }
  936. body, _ := io.ReadAll(r.Body)
  937. var req struct {
  938. Key string `json:"key"`
  939. URL string `json:"url"`
  940. }
  941. if err := json.Unmarshal(body, &req); err != nil {
  942. http.Error(w, "invalid json", http.StatusBadRequest)
  943. return
  944. }
  945. s.UnwatchURL(req.Key, req.URL)
  946. w.WriteHeader(http.StatusOK)
  947. })
  948. s.httpServer = &http.Server{
  949. Addr: addr,
  950. Handler: mux,
  951. }
  952. go func() {
  953. s.Raft.config.Logger.Info("HTTP API server listening on %s", addr)
  954. if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  955. s.Raft.config.Logger.Error("HTTP server failed: %v", err)
  956. }
  957. }()
  958. return nil
  959. }