server.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "sort"
  10. "strings"
  11. "sync"
  12. "time"
  13. "igit.com/xbase/raft/db"
  14. )
  15. // KVServer wraps Raft to provide a distributed key-value store
  16. type KVServer struct {
  17. Raft *Raft
  18. DB *db.Engine
  19. CLI *CLI
  20. AuthManager *AuthManager
  21. Watcher *WebHookWatcher
  22. httpServer *http.Server
  23. stopCh chan struct{}
  24. wg sync.WaitGroup
  25. stopOnce sync.Once
  26. // leavingNodes tracks nodes that are currently being removed
  27. // to prevent auto-rejoin/discovery logic from interfering
  28. leavingNodes sync.Map
  29. // Pending requests for synchronous operations (Read-Your-Writes)
  30. pendingRequests map[uint64]chan error
  31. pendingMu sync.Mutex
  32. }
  33. // NewKVServer creates a new KV server
  34. func NewKVServer(config *Config) (*KVServer, error) {
  35. // Initialize DB Engine
  36. // Use a subdirectory for DB to avoid conflict with Raft logs if they share DataDir
  37. dbPath := config.DataDir + "/kv_engine"
  38. engine, err := db.NewEngine(dbPath)
  39. if err != nil {
  40. return nil, fmt.Errorf("failed to create db engine: %w", err)
  41. }
  42. // Initialize LastAppliedIndex from DB to prevent re-applying entries
  43. config.LastAppliedIndex = engine.GetLastAppliedIndex()
  44. // Create stop channel early for use in callbacks
  45. stopCh := make(chan struct{})
  46. // Configure snapshot provider
  47. config.SnapshotProvider = func(minIncludeIndex uint64) ([]byte, error) {
  48. // Wait for DB to catch up to the requested index
  49. // This is critical for data integrity during compaction
  50. for engine.GetLastAppliedIndex() < minIncludeIndex {
  51. select {
  52. case <-stopCh:
  53. return nil, fmt.Errorf("server stopping")
  54. default:
  55. time.Sleep(10 * time.Millisecond)
  56. }
  57. }
  58. // Force sync to disk to ensure data durability before compaction
  59. // This prevents data loss if Raft logs are compacted but DB data is only in OS cache
  60. if err := engine.Sync(); err != nil {
  61. return nil, fmt.Errorf("failed to sync engine before snapshot: %w", err)
  62. }
  63. return engine.Snapshot()
  64. }
  65. // Configure get handler for remote reads
  66. config.GetHandler = func(key string) (string, bool) {
  67. return engine.Get(key)
  68. }
  69. applyCh := make(chan ApplyMsg, 1000) // Increase buffer for async processing
  70. transport := NewTCPTransport(config.ListenAddr, 10, config.Logger)
  71. // Initialize WebHookWatcher
  72. // 5 workers, 3 retries
  73. watcher := NewWebHookWatcher(5, 3, config.Logger)
  74. r, err := NewRaft(config, transport, applyCh)
  75. if err != nil {
  76. engine.Close()
  77. return nil, err
  78. }
  79. s := &KVServer{
  80. Raft: r,
  81. DB: engine,
  82. CLI: nil,
  83. AuthManager: nil, // initialized below
  84. Watcher: watcher,
  85. stopCh: stopCh,
  86. pendingRequests: make(map[uint64]chan error),
  87. }
  88. // Initialize AuthManager
  89. s.AuthManager = NewAuthManager(s)
  90. // Load Auth Data from DB (if any)
  91. if err := s.AuthManager.LoadFromDB(); err != nil {
  92. s.Raft.config.Logger.Warn("Failed to load auth data from DB: %v", err)
  93. }
  94. // Initialize CLI
  95. s.CLI = NewCLI(s)
  96. // Start applying entries
  97. go s.runApplyLoop(applyCh)
  98. // Start background maintenance loop
  99. s.wg.Add(1)
  100. go s.maintenanceLoop()
  101. return s, nil
  102. }
  103. func (s *KVServer) Start() error {
  104. // Start CLI if enabled
  105. if s.Raft.config.EnableCLI {
  106. go s.CLI.Start()
  107. }
  108. // Start HTTP Server if configured
  109. if s.Raft.config.HTTPAddr != "" {
  110. if err := s.startHTTPServer(s.Raft.config.HTTPAddr); err != nil {
  111. s.Raft.config.Logger.Warn("Failed to start HTTP server: %v", err)
  112. }
  113. }
  114. // Start TCP Server (Hardcoded offset +10 for demo or config?)
  115. // User didn't add TCPPort to config, so let's derive it or just fix it.
  116. // For demo, if ListenAddr is 127.0.0.1:9001, we try 9011.
  117. host, port, _ := net.SplitHostPort(s.Raft.config.ListenAddr)
  118. if port == "9001" {
  119. tcpAddr := fmt.Sprintf("%s:%s", host, "9011")
  120. if err := s.StartTCPServer(tcpAddr); err != nil {
  121. s.Raft.config.Logger.Warn("Failed to start TCP server: %v", err)
  122. }
  123. } else if port == "9002" {
  124. tcpAddr := fmt.Sprintf("%s:%s", host, "9012")
  125. s.StartTCPServer(tcpAddr)
  126. } else {
  127. // Fallback or ignore for other nodes in demo
  128. }
  129. return s.Raft.Start()
  130. }
  131. func (s *KVServer) Stop() error {
  132. var err error
  133. s.stopOnce.Do(func() {
  134. // Stop maintenance loop
  135. if s.stopCh != nil {
  136. close(s.stopCh)
  137. s.wg.Wait()
  138. }
  139. // Stop Watcher
  140. if s.Watcher != nil {
  141. s.Watcher.Stop()
  142. }
  143. // Stop HTTP Server
  144. if s.httpServer != nil {
  145. s.httpServer.Close()
  146. }
  147. // Stop Raft first
  148. if errRaft := s.Raft.Stop(); errRaft != nil {
  149. err = errRaft
  150. }
  151. // Close DB
  152. if s.DB != nil {
  153. if errDB := s.DB.Close(); errDB != nil {
  154. // Combine errors if both fail
  155. if err != nil {
  156. err = fmt.Errorf("raft stop error: %v, db close error: %v", err, errDB)
  157. } else {
  158. err = errDB
  159. }
  160. }
  161. }
  162. })
  163. return err
  164. }
  165. func (s *KVServer) runApplyLoop(applyCh chan ApplyMsg) {
  166. for msg := range applyCh {
  167. if msg.CommandValid {
  168. // Optimization: Skip if already applied
  169. // We check this here to avoid unmarshalling and locking DB for known duplicates
  170. if msg.CommandIndex <= s.DB.GetLastAppliedIndex() {
  171. // Even if duplicate, we might have someone waiting for it if they just proposed it
  172. // but this is unlikely for duplicates.
  173. // However, notify waiters just in case.
  174. s.notifyPending(msg.CommandIndex, nil)
  175. continue
  176. }
  177. var cmd KVCommand
  178. if err := json.Unmarshal(msg.Command, &cmd); err != nil {
  179. s.Raft.config.Logger.Error("Failed to unmarshal command: %v", err)
  180. s.notifyPending(msg.CommandIndex, err)
  181. continue
  182. }
  183. var err error
  184. switch cmd.Type {
  185. case KVSet:
  186. // Update Auth Cache for system keys
  187. // This includes AuthLockPrefix now
  188. if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
  189. s.AuthManager.UpdateCache(cmd.Key, cmd.Value, false)
  190. }
  191. err = s.DB.Set(cmd.Key, cmd.Value, msg.CommandIndex)
  192. if err == nil {
  193. s.Watcher.Notify(cmd.Key, cmd.Value, KVSet)
  194. }
  195. case KVDel:
  196. // Update Auth Cache for system keys
  197. if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
  198. s.AuthManager.UpdateCache(cmd.Key, "", true)
  199. }
  200. err = s.DB.Delete(cmd.Key, msg.CommandIndex)
  201. if err == nil {
  202. s.Watcher.Notify(cmd.Key, "", KVDel)
  203. }
  204. default:
  205. s.Raft.config.Logger.Error("Unknown command type: %d", cmd.Type)
  206. }
  207. if err != nil {
  208. s.Raft.config.Logger.Error("DB Apply failed: %v", err)
  209. }
  210. // Notify anyone waiting for this index
  211. s.notifyPending(msg.CommandIndex, err)
  212. } else if msg.SnapshotValid {
  213. if err := s.DB.Restore(msg.Snapshot); err != nil {
  214. s.Raft.config.Logger.Error("DB Restore failed: %v", err)
  215. }
  216. }
  217. }
  218. }
  219. // notifyPending notifies any waiting requests for the given index
  220. func (s *KVServer) notifyPending(index uint64, err error) {
  221. s.pendingMu.Lock()
  222. defer s.pendingMu.Unlock()
  223. if ch, ok := s.pendingRequests[index]; ok {
  224. // Non-blocking send in case listener is gone (buffered channel recommended)
  225. select {
  226. case ch <- err:
  227. default:
  228. }
  229. close(ch)
  230. delete(s.pendingRequests, index)
  231. }
  232. }
  233. // SetAuthenticatedAsync sets a key-value pair asynchronously with permission check
  234. func (s *KVServer) SetAuthenticatedAsync(key, value, token string) error {
  235. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, value); err != nil {
  236. return err
  237. }
  238. return s.Set(key, value)
  239. }
  240. // SetAuthenticated sets a key-value pair with permission check
  241. func (s *KVServer) SetAuthenticated(key, value, token string) error {
  242. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, value); err != nil {
  243. return err
  244. }
  245. // Use SetSync for consistency in CLI/API
  246. return s.SetSync(key, value)
  247. }
  248. // DelAuthenticated deletes a key with permission check
  249. func (s *KVServer) DelAuthenticated(key, token string) error {
  250. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, ""); err != nil {
  251. return err
  252. }
  253. // Use SetSync (via Del which should be updated or create DelSync)
  254. // For simplicity, we implement DelSync logic here or update Del to be sync?
  255. // Let's implement DelSync
  256. return s.DelSync(key)
  257. }
  258. // Set sets a key-value pair (Async - eventually consistent)
  259. func (s *KVServer) Set(key, value string) error {
  260. cmd := KVCommand{
  261. Type: KVSet,
  262. Key: key,
  263. Value: value,
  264. }
  265. data, err := json.Marshal(cmd)
  266. if err != nil {
  267. return err
  268. }
  269. _, _, err = s.Raft.ProposeWithForward(data)
  270. return err
  271. }
  272. // SetSync sets a key-value pair and waits for it to be applied (Read-Your-Writes)
  273. func (s *KVServer) SetSync(key, value string) error {
  274. cmd := KVCommand{
  275. Type: KVSet,
  276. Key: key,
  277. Value: value,
  278. }
  279. data, err := json.Marshal(cmd)
  280. if err != nil {
  281. return err
  282. }
  283. index, _, err := s.Raft.ProposeWithForward(data)
  284. if err != nil {
  285. return err
  286. }
  287. // Wait for application
  288. ch := make(chan error, 1)
  289. s.pendingMu.Lock()
  290. s.pendingRequests[index] = ch
  291. s.pendingMu.Unlock()
  292. select {
  293. case applyErr := <-ch:
  294. return applyErr
  295. case <-time.After(5 * time.Second):
  296. s.pendingMu.Lock()
  297. delete(s.pendingRequests, index)
  298. s.pendingMu.Unlock()
  299. return fmt.Errorf("timeout waiting for apply")
  300. }
  301. }
  302. // Del deletes a key (Async)
  303. func (s *KVServer) Del(key string) error {
  304. cmd := KVCommand{
  305. Type: KVDel,
  306. Key: key,
  307. }
  308. data, err := json.Marshal(cmd)
  309. if err != nil {
  310. return err
  311. }
  312. _, _, err = s.Raft.ProposeWithForward(data)
  313. return err
  314. }
  315. // DelSync deletes a key and waits for it to be applied
  316. func (s *KVServer) DelSync(key string) error {
  317. cmd := KVCommand{
  318. Type: KVDel,
  319. Key: key,
  320. }
  321. data, err := json.Marshal(cmd)
  322. if err != nil {
  323. return err
  324. }
  325. index, _, err := s.Raft.ProposeWithForward(data)
  326. if err != nil {
  327. return err
  328. }
  329. // Wait for application
  330. ch := make(chan error, 1)
  331. s.pendingMu.Lock()
  332. s.pendingRequests[index] = ch
  333. s.pendingMu.Unlock()
  334. select {
  335. case applyErr := <-ch:
  336. return applyErr
  337. case <-time.After(5 * time.Second):
  338. s.pendingMu.Lock()
  339. delete(s.pendingRequests, index)
  340. s.pendingMu.Unlock()
  341. return fmt.Errorf("timeout waiting for apply")
  342. }
  343. }
  344. // GetAuthenticated gets a value with permission check (local read)
  345. func (s *KVServer) GetAuthenticated(key, token string) (string, bool, error) {
  346. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
  347. return "", false, err
  348. }
  349. val, ok := s.Get(key)
  350. return val, ok, nil
  351. }
  352. // Get gets a value (local read, can be stale)
  353. // For linearizable reads, use GetLinear instead
  354. func (s *KVServer) Get(key string) (string, bool) {
  355. return s.DB.Get(key)
  356. }
  357. // GetLinearAuthenticated gets a value with linearizable consistency and permission check
  358. func (s *KVServer) GetLinearAuthenticated(key, token string) (string, bool, error) {
  359. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
  360. return "", false, err
  361. }
  362. return s.GetLinear(key)
  363. }
  364. // Logout invalidates the current session
  365. func (s *KVServer) Logout(token string) error {
  366. return s.AuthManager.Logout(token)
  367. }
  368. // GetSessionInfo returns the session details
  369. func (s *KVServer) GetSessionInfo(token string) (*Session, error) {
  370. return s.AuthManager.GetSession(token)
  371. }
  372. // IsRoot checks if the token belongs to the root user
  373. func (s *KVServer) IsRoot(token string) bool {
  374. sess, err := s.AuthManager.GetSession(token)
  375. if err != nil {
  376. return false
  377. }
  378. return sess.Username == "root"
  379. }
  380. // CreateUser creates a new user (Root only)
  381. func (s *KVServer) CreateUser(username, password string, roles []string, token string) error {
  382. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  383. return fmt.Errorf("permission denied: root access required")
  384. }
  385. // Use RegisterUserSync
  386. return s.AuthManager.RegisterUser(username, password, roles)
  387. }
  388. // DeleteUser deletes a user (Root only)
  389. func (s *KVServer) DeleteUser(username string, token string) error {
  390. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  391. return fmt.Errorf("permission denied: root access required")
  392. }
  393. // Check if user exists
  394. if _, err := s.AuthManager.GetUser(username); err != nil {
  395. return err
  396. }
  397. if username == "root" {
  398. return fmt.Errorf("cannot delete root user")
  399. }
  400. // Use DelSync
  401. return s.DelSync(AuthUserPrefix + username)
  402. }
  403. // UpdateUser updates generic user fields (Root only)
  404. func (s *KVServer) UpdateUser(user User, token string) error {
  405. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  406. return fmt.Errorf("permission denied: root access required")
  407. }
  408. // Check if user exists
  409. if _, err := s.AuthManager.GetUser(user.Username); err != nil {
  410. return err
  411. }
  412. return s.AuthManager.UpdateUser(user)
  413. }
  414. // ChangeUserPassword changes a user's password (Root or Self)
  415. func (s *KVServer) ChangeUserPassword(username, newPassword string, token string) error {
  416. if s.AuthManager.IsEnabled() {
  417. session, err := s.AuthManager.GetSession(token)
  418. if err != nil {
  419. return err
  420. }
  421. if session.Username != "root" && session.Username != username {
  422. return fmt.Errorf("permission denied")
  423. }
  424. }
  425. // Use ChangePassword (which should use SetSync)
  426. return s.AuthManager.ChangePassword(username, newPassword)
  427. }
  428. // Role Management Helpers
  429. // CreateRole creates a new role (Root only)
  430. func (s *KVServer) CreateRole(name string, token string) error {
  431. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  432. return fmt.Errorf("permission denied: root access required")
  433. }
  434. return s.AuthManager.CreateRole(name)
  435. }
  436. // DeleteRole deletes a role (Root only)
  437. func (s *KVServer) DeleteRole(name string, token string) error {
  438. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  439. return fmt.Errorf("permission denied: root access required")
  440. }
  441. return s.AuthManager.DeleteRole(name)
  442. }
  443. // UpdateRole updates a role (Root only)
  444. func (s *KVServer) UpdateRole(role Role, token string) error {
  445. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  446. return fmt.Errorf("permission denied: root access required")
  447. }
  448. return s.AuthManager.UpdateRole(role)
  449. }
  450. // ListUsers lists all users (Root only)
  451. func (s *KVServer) ListUsers(token string) ([]*User, error) {
  452. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  453. return nil, fmt.Errorf("permission denied: root access required")
  454. }
  455. return s.AuthManager.ListUsers(), nil
  456. }
  457. // ListRoles lists all roles (Root only)
  458. func (s *KVServer) ListRoles(token string) ([]*Role, error) {
  459. if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
  460. return nil, fmt.Errorf("permission denied: root access required")
  461. }
  462. return s.AuthManager.ListRoles(), nil
  463. }
  464. // SearchAuthenticated searches keys with permission checks
  465. func (s *KVServer) SearchAuthenticated(pattern string, limit, offset int, token string) ([]db.QueryResult, error) {
  466. // Optimization: If user has full access (*), delegate to DB with limit/offset
  467. if s.AuthManager.HasFullAccess(token) {
  468. sql := fmt.Sprintf("key like \"%s\" LIMIT %d OFFSET %d", pattern, limit, offset)
  469. return s.DB.Query(sql)
  470. }
  471. // Slow path: Fetch all potential matches and filter
  472. // We construct a SQL query that retrieves CANDIDATES.
  473. // We cannot safely apply LIMIT/OFFSET in SQL because we might filter out some results.
  474. // So we must fetch ALL matches.
  475. // WARNING: This can be slow and memory intensive.
  476. // If pattern is wildcard, we might fetch everything.
  477. sql := fmt.Sprintf("key like \"%s\"", pattern)
  478. results, err := s.DB.Query(sql)
  479. if err != nil {
  480. return nil, err
  481. }
  482. filtered := make([]db.QueryResult, 0, len(results))
  483. // Apply Filtering
  484. for _, r := range results {
  485. if err := s.AuthManager.CheckPermission(token, r.Key, ActionRead, ""); err == nil {
  486. filtered = append(filtered, r)
  487. }
  488. }
  489. // Apply Pagination on Filtered Results
  490. if offset > len(filtered) {
  491. return []db.QueryResult{}, nil
  492. }
  493. start := offset
  494. end := offset + limit
  495. if end > len(filtered) {
  496. end = len(filtered)
  497. }
  498. return filtered[start:end], nil
  499. }
  500. // CountAuthenticated counts keys with permission checks
  501. func (s *KVServer) CountAuthenticated(pattern string, token string) (int, error) {
  502. // Optimization: If user has full access
  503. if s.AuthManager.HasFullAccess(token) {
  504. sql := ""
  505. if pattern == "*" {
  506. sql = "*"
  507. } else {
  508. sql = fmt.Sprintf("key like \"%s\"", pattern)
  509. }
  510. return s.DB.Count(sql)
  511. }
  512. // Slow path: Iterate and check
  513. // We use DB.Query to get keys (Query returns values too, which is wasteful but DB engine doesn't expose keys-only Query yet)
  514. // Actually, we can access s.DB.Index.WalkPrefix if we want to be faster and avoid value read,
  515. // but s.DB.Index is inside 'db' package. We can access it if it's exported.
  516. // 'db' package exports 'Engine' and 'FlatIndex'.
  517. // So s.DB.Index IS accessible.
  518. count := 0
  519. // Determine prefix from pattern
  520. prefix := ""
  521. if strings.HasSuffix(pattern, "*") {
  522. prefix = strings.TrimSuffix(pattern, "*")
  523. } else if pattern == "*" {
  524. prefix = ""
  525. } else {
  526. // Exact match check
  527. if err := s.AuthManager.CheckPermission(token, pattern, ActionRead, ""); err == nil {
  528. // Check if exists
  529. if _, ok := s.DB.Get(pattern); ok {
  530. return 1, nil
  531. }
  532. return 0, nil
  533. }
  534. return 0, nil // No perm or not found
  535. }
  536. // Walk
  537. // Note: WalkPrefix locks the DB Index (Read Lock).
  538. // Calling CheckPermission inside might involve some logic but it is memory-only and usually fast.
  539. // However, if CheckPermission takes time, we hold DB lock.
  540. s.DB.Index.WalkPrefix(prefix, func(key string, entry db.IndexEntry) bool {
  541. // Check pattern match first (WalkPrefix is just prefix, pattern might be more complex like "user.*.name")
  542. if !db.WildcardMatch(key, pattern) {
  543. return true
  544. }
  545. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err == nil {
  546. count++
  547. }
  548. return true
  549. })
  550. return count, nil
  551. }
  552. // GetLinear gets a value with linearizable consistency
  553. // This ensures the read sees all writes committed before the read started
  554. func (s *KVServer) GetLinear(key string) (string, bool, error) {
  555. // First, ensure we have up-to-date data via ReadIndex
  556. _, err := s.Raft.ReadIndex()
  557. if err != nil {
  558. // If we're not leader, try forwarding
  559. if errors.Is(err, ErrNotLeader) {
  560. return s.forwardGet(key)
  561. }
  562. return "", false, err
  563. }
  564. val, ok := s.DB.Get(key)
  565. return val, ok, nil
  566. }
  567. // forwardGet forwards a get request to the leader
  568. func (s *KVServer) forwardGet(key string) (string, bool, error) {
  569. return s.Raft.ForwardGet(key)
  570. }
  571. // Join joins an existing cluster
  572. func (s *KVServer) Join(nodeID, addr string) error {
  573. return s.Raft.AddNodeWithForward(nodeID, addr)
  574. }
  575. // Leave leaves the cluster
  576. func (s *KVServer) Leave(nodeID string) error {
  577. // Mark node as leaving to prevent auto-rejoin
  578. s.leavingNodes.Store(nodeID, time.Now())
  579. // Auto-expire the leaving flag after a while
  580. go func() {
  581. time.Sleep(30 * time.Second)
  582. s.leavingNodes.Delete(nodeID)
  583. }()
  584. // Remove from RaftNode discovery key first to prevent auto-rejoin
  585. if err := s.removeNodeFromDiscovery(nodeID); err != nil {
  586. s.Raft.config.Logger.Warn("Failed to remove node from discovery key: %v", err)
  587. // Continue anyway, as the main goal is to leave the cluster
  588. }
  589. return s.Raft.RemoveNodeWithForward(nodeID)
  590. }
  591. // removeNodeFromDiscovery removes a node from the RaftNode key to prevent auto-rejoin
  592. func (s *KVServer) removeNodeFromDiscovery(targetID string) error {
  593. val, ok := s.Get("RaftNode")
  594. if !ok || val == "" {
  595. return nil
  596. }
  597. parts := strings.Split(val, ";")
  598. var newParts []string
  599. changed := false
  600. for _, part := range parts {
  601. if part == "" {
  602. continue
  603. }
  604. kv := strings.SplitN(part, "=", 2)
  605. if len(kv) == 2 {
  606. if kv[0] == targetID {
  607. changed = true
  608. continue // Skip this node
  609. }
  610. newParts = append(newParts, part)
  611. }
  612. }
  613. if changed {
  614. newVal := strings.Join(newParts, ";")
  615. return s.Set("RaftNode", newVal)
  616. }
  617. return nil
  618. }
  619. // WaitForLeader waits until a leader is elected
  620. func (s *KVServer) WaitForLeader(timeout time.Duration) error {
  621. deadline := time.Now().Add(timeout)
  622. for time.Now().Before(deadline) {
  623. leader := s.Raft.GetLeaderID()
  624. if leader != "" {
  625. return nil
  626. }
  627. time.Sleep(100 * time.Millisecond)
  628. }
  629. return fmt.Errorf("timeout waiting for leader")
  630. }
  631. // HealthCheck returns the health status of this server
  632. func (s *KVServer) HealthCheck() HealthStatus {
  633. return s.Raft.HealthCheck()
  634. }
  635. // GetStats returns runtime statistics
  636. func (s *KVServer) GetStats() Stats {
  637. return s.Raft.GetStats()
  638. }
  639. // GetMetrics returns runtime metrics
  640. func (s *KVServer) GetMetrics() Metrics {
  641. return s.Raft.GetMetrics()
  642. }
  643. // TransferLeadership transfers leadership to the specified node
  644. func (s *KVServer) TransferLeadership(targetID string) error {
  645. return s.Raft.TransferLeadership(targetID)
  646. }
  647. // GetClusterNodes returns current cluster membership
  648. func (s *KVServer) GetClusterNodes() map[string]string {
  649. return s.Raft.GetClusterNodes()
  650. }
  651. // IsLeader returns true if this node is the leader
  652. func (s *KVServer) IsLeader() bool {
  653. _, isLeader := s.Raft.GetState()
  654. return isLeader
  655. }
  656. // GetLeaderID returns the current leader ID
  657. func (s *KVServer) GetLeaderID() string {
  658. return s.Raft.GetLeaderID()
  659. }
  660. // GetLogSize returns the raft log size
  661. func (s *KVServer) GetLogSize() int64 {
  662. return s.Raft.log.GetLogSize()
  663. }
  664. // GetDBSize returns the db size
  665. func (s *KVServer) GetDBSize() int64 {
  666. return s.DB.GetDBSize()
  667. }
  668. // WatchURL registers a webhook url for a key
  669. func (s *KVServer) WatchURL(key, url string) {
  670. s.Watcher.Subscribe(key, url)
  671. }
  672. // UnwatchURL removes a webhook url for a key
  673. func (s *KVServer) UnwatchURL(key, url string) {
  674. s.Watcher.Unsubscribe(key, url)
  675. }
  676. // WatchAll registers a watcher for all keys
  677. func (s *KVServer) WatchAll(handler WatchHandler) {
  678. // s.FSM.WatchAll(handler)
  679. // TODO: Implement Watcher for DB
  680. }
  681. // Watch registers a watcher for a key
  682. func (s *KVServer) Watch(key string, handler WatchHandler) {
  683. // s.FSM.Watch(key, handler)
  684. // TODO: Implement Watcher for DB
  685. }
  686. // Unwatch removes watchers for a key
  687. func (s *KVServer) Unwatch(key string) {
  688. // s.FSM.Unwatch(key)
  689. // TODO: Implement Watcher for DB
  690. }
  691. func (s *KVServer) maintenanceLoop() {
  692. defer s.wg.Done()
  693. // Check every 1 second for faster reaction
  694. ticker := time.NewTicker(1 * time.Second)
  695. defer ticker.Stop()
  696. for {
  697. select {
  698. case <-s.stopCh:
  699. return
  700. case <-ticker.C:
  701. s.updateNodeInfo()
  702. s.checkConnections()
  703. }
  704. }
  705. }
  706. func (s *KVServer) updateNodeInfo() {
  707. // 1. Ensure "CreateNode/<NodeID>" is set to self address
  708. // We do this via Propose (Set) so it's replicated
  709. myID := s.Raft.config.NodeID
  710. myAddr := s.Raft.config.ListenAddr
  711. key := fmt.Sprintf("CreateNode/%s", myID)
  712. // Check if we need to update (avoid spamming logs/proposals)
  713. val, exists := s.Get(key)
  714. if !exists || val != myAddr {
  715. // Run in goroutine to avoid blocking
  716. go func() {
  717. if err := s.Set(key, myAddr); err != nil {
  718. s.Raft.config.Logger.Debug("Failed to update node info: %v", err)
  719. }
  720. }()
  721. }
  722. // 2. Only leader updates RaftNode aggregation
  723. if s.IsLeader() {
  724. // Read current RaftNode to preserve history
  725. currentVal, _ := s.Get("RaftNode")
  726. knownNodes := make(map[string]string)
  727. if currentVal != "" {
  728. parts := strings.Split(currentVal, ";")
  729. for _, part := range parts {
  730. if part == "" { continue }
  731. kv := strings.SplitN(part, "=", 2)
  732. if len(kv) == 2 {
  733. knownNodes[kv[0]] = kv[1]
  734. }
  735. }
  736. }
  737. // Merge current cluster nodes
  738. changed := false
  739. currentCluster := s.GetClusterNodes()
  740. for id, addr := range currentCluster {
  741. // Skip nodes that are marked as leaving
  742. if _, leaving := s.leavingNodes.Load(id); leaving {
  743. continue
  744. }
  745. if knownNodes[id] != addr {
  746. knownNodes[id] = addr
  747. changed = true
  748. }
  749. }
  750. // If changed, update RaftNode
  751. if changed {
  752. var peers []string
  753. for id, addr := range knownNodes {
  754. peers = append(peers, fmt.Sprintf("%s=%s", id, addr))
  755. }
  756. sort.Strings(peers)
  757. newVal := strings.Join(peers, ";")
  758. // Check again if we need to write to avoid loops if Get returned stale
  759. if newVal != currentVal {
  760. go func(k, v string) {
  761. if err := s.Set(k, v); err != nil {
  762. s.Raft.config.Logger.Warn("Failed to update RaftNode key: %v", err)
  763. }
  764. }("RaftNode", newVal)
  765. }
  766. }
  767. }
  768. }
  769. func (s *KVServer) checkConnections() {
  770. if !s.IsLeader() {
  771. return
  772. }
  773. // Read RaftNode key to find potential members that are missing
  774. val, ok := s.Get("RaftNode")
  775. if !ok || val == "" {
  776. return
  777. }
  778. // Parse saved nodes
  779. savedParts := strings.Split(val, ";")
  780. currentNodes := s.GetClusterNodes()
  781. // Invert currentNodes for address check
  782. currentAddrs := make(map[string]bool)
  783. for _, addr := range currentNodes {
  784. currentAddrs[addr] = true
  785. }
  786. for _, part := range savedParts {
  787. if part == "" {
  788. continue
  789. }
  790. // Expect id=addr
  791. kv := strings.SplitN(part, "=", 2)
  792. if len(kv) != 2 {
  793. continue
  794. }
  795. id, addr := kv[0], kv[1]
  796. // Skip invalid addresses
  797. if strings.HasPrefix(addr, ".") || !strings.Contains(addr, ":") {
  798. continue
  799. }
  800. if !currentAddrs[addr] {
  801. // Skip nodes that are marked as leaving
  802. if _, leaving := s.leavingNodes.Load(id); leaving {
  803. continue
  804. }
  805. // Found a node that was previously in the cluster but is now missing
  806. // Try to add it back
  807. // We use AddNodeWithForward which handles non-blocking internally somewhat,
  808. // but we should run this in goroutine to not block the loop
  809. go func(nodeID, nodeAddr string) {
  810. // Try to add node
  811. s.Raft.config.Logger.Info("Auto-rejoining node found in RaftNode: %s (%s)", nodeID, nodeAddr)
  812. if err := s.Join(nodeID, nodeAddr); err != nil {
  813. s.Raft.config.Logger.Debug("Failed to auto-rejoin node %s: %v", nodeID, err)
  814. }
  815. }(id, addr)
  816. }
  817. }
  818. }
  819. // startHTTPServer starts the HTTP API server
  820. func (s *KVServer) startHTTPServer(addr string) error {
  821. mux := http.NewServeMux()
  822. // KV API
  823. mux.HandleFunc("/kv", func(w http.ResponseWriter, r *http.Request) {
  824. token := r.Header.Get("X-Raft-Token")
  825. switch r.Method {
  826. case http.MethodGet:
  827. key := r.URL.Query().Get("key")
  828. if key == "" {
  829. http.Error(w, "missing key", http.StatusBadRequest)
  830. return
  831. }
  832. // Use Authenticated method
  833. val, found, err := s.GetLinearAuthenticated(key, token)
  834. if err != nil {
  835. // Distinguish auth error vs raft error?
  836. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  837. http.Error(w, err.Error(), http.StatusForbidden)
  838. } else {
  839. http.Error(w, err.Error(), http.StatusInternalServerError)
  840. }
  841. return
  842. }
  843. if !found {
  844. http.Error(w, "not found", http.StatusNotFound)
  845. return
  846. }
  847. w.Write([]byte(val))
  848. case http.MethodPost:
  849. body, _ := io.ReadAll(r.Body)
  850. var req struct {
  851. Key string `json:"key"`
  852. Value string `json:"value"`
  853. }
  854. if err := json.Unmarshal(body, &req); err != nil {
  855. http.Error(w, "invalid json", http.StatusBadRequest)
  856. return
  857. }
  858. if err := s.SetAuthenticated(req.Key, req.Value, token); err != nil {
  859. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  860. http.Error(w, err.Error(), http.StatusForbidden)
  861. } else {
  862. http.Error(w, err.Error(), http.StatusInternalServerError)
  863. }
  864. return
  865. }
  866. w.WriteHeader(http.StatusOK)
  867. case http.MethodDelete:
  868. key := r.URL.Query().Get("key")
  869. if key == "" {
  870. http.Error(w, "missing key", http.StatusBadRequest)
  871. return
  872. }
  873. if err := s.DelAuthenticated(key, token); err != nil {
  874. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  875. http.Error(w, err.Error(), http.StatusForbidden)
  876. } else {
  877. http.Error(w, err.Error(), http.StatusInternalServerError)
  878. }
  879. return
  880. }
  881. w.WriteHeader(http.StatusOK)
  882. default:
  883. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  884. }
  885. })
  886. // Auth API
  887. mux.HandleFunc("/auth/login", func(w http.ResponseWriter, r *http.Request) {
  888. if r.Method != http.MethodPost {
  889. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  890. return
  891. }
  892. var req struct {
  893. Username string `json:"username"`
  894. Password string `json:"password"`
  895. Code string `json:"code"`
  896. }
  897. body, _ := io.ReadAll(r.Body)
  898. if err := json.Unmarshal(body, &req); err != nil {
  899. http.Error(w, "invalid json", http.StatusBadRequest)
  900. return
  901. }
  902. ip := r.RemoteAddr
  903. if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil {
  904. ip = host
  905. }
  906. token, err := s.AuthManager.Login(req.Username, req.Password, req.Code, ip)
  907. if err != nil {
  908. http.Error(w, err.Error(), http.StatusUnauthorized)
  909. return
  910. }
  911. resp := struct {
  912. Token string `json:"token"`
  913. }{Token: token}
  914. json.NewEncoder(w).Encode(resp)
  915. })
  916. // Watcher API
  917. mux.HandleFunc("/watch", func(w http.ResponseWriter, r *http.Request) {
  918. if r.Method != http.MethodPost {
  919. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  920. return
  921. }
  922. body, _ := io.ReadAll(r.Body)
  923. var req struct {
  924. Key string `json:"key"`
  925. URL string `json:"url"`
  926. }
  927. if err := json.Unmarshal(body, &req); err != nil {
  928. http.Error(w, "invalid json", http.StatusBadRequest)
  929. return
  930. }
  931. if req.Key == "" || req.URL == "" {
  932. http.Error(w, "missing key or url", http.StatusBadRequest)
  933. return
  934. }
  935. s.WatchURL(req.Key, req.URL)
  936. w.WriteHeader(http.StatusOK)
  937. })
  938. mux.HandleFunc("/unwatch", func(w http.ResponseWriter, r *http.Request) {
  939. if r.Method != http.MethodPost {
  940. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  941. return
  942. }
  943. body, _ := io.ReadAll(r.Body)
  944. var req struct {
  945. Key string `json:"key"`
  946. URL string `json:"url"`
  947. }
  948. if err := json.Unmarshal(body, &req); err != nil {
  949. http.Error(w, "invalid json", http.StatusBadRequest)
  950. return
  951. }
  952. s.UnwatchURL(req.Key, req.URL)
  953. w.WriteHeader(http.StatusOK)
  954. })
  955. s.httpServer = &http.Server{
  956. Addr: addr,
  957. Handler: mux,
  958. }
  959. go func() {
  960. s.Raft.config.Logger.Info("HTTP API server listening on %s", addr)
  961. if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  962. s.Raft.config.Logger.Error("HTTP server failed: %v", err)
  963. }
  964. }()
  965. return nil
  966. }