server.go 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "sort"
  10. "strings"
  11. "sync"
  12. "time"
  13. "igit.com/xbase/raft/db"
  14. )
  15. // KVServer wraps Raft to provide a distributed key-value store
  16. type KVServer struct {
  17. Raft *Raft
  18. DB *db.Engine
  19. CLI *CLI
  20. AuthManager *AuthManager
  21. Watcher *WebHookWatcher
  22. httpServer *http.Server
  23. stopCh chan struct{}
  24. wg sync.WaitGroup
  25. stopOnce sync.Once
  26. // leavingNodes tracks nodes that are currently being removed
  27. // to prevent auto-rejoin/discovery logic from interfering
  28. leavingNodes sync.Map
  29. // Pending requests for synchronous operations (Read-Your-Writes)
  30. pendingRequests map[uint64]chan error
  31. pendingMu sync.Mutex
  32. }
  33. // NewKVServer creates a new KV server
  34. func NewKVServer(config *Config) (*KVServer, error) {
  35. // Initialize DB Engine
  36. // Use a subdirectory for DB to avoid conflict with Raft logs if they share DataDir
  37. dbPath := config.DataDir + "/kv_engine"
  38. engine, err := db.NewEngine(dbPath)
  39. if err != nil {
  40. return nil, fmt.Errorf("failed to create db engine: %w", err)
  41. }
  42. // Initialize LastAppliedIndex from DB to prevent re-applying entries
  43. config.LastAppliedIndex = engine.GetLastAppliedIndex()
  44. // Create stop channel early for use in callbacks
  45. stopCh := make(chan struct{})
  46. // Configure snapshot provider
  47. config.SnapshotProvider = func(minIncludeIndex uint64) ([]byte, error) {
  48. // Wait for DB to catch up to the requested index
  49. // This is critical for data integrity during compaction
  50. for engine.GetLastAppliedIndex() < minIncludeIndex {
  51. select {
  52. case <-stopCh:
  53. return nil, fmt.Errorf("server stopping")
  54. default:
  55. time.Sleep(10 * time.Millisecond)
  56. }
  57. }
  58. // Force sync to disk to ensure data durability before compaction
  59. // This prevents data loss if Raft logs are compacted but DB data is only in OS cache
  60. if err := engine.Sync(); err != nil {
  61. return nil, fmt.Errorf("failed to sync engine before snapshot: %w", err)
  62. }
  63. return engine.Snapshot()
  64. }
  65. // Configure get handler for remote reads
  66. config.GetHandler = func(key string) (string, bool) {
  67. return engine.Get(key)
  68. }
  69. applyCh := make(chan ApplyMsg, 1000) // Increase buffer for async processing
  70. transport := NewTCPTransport(config.ListenAddr, 10, config.Logger)
  71. // Initialize WebHookWatcher
  72. // 5 workers, 3 retries
  73. watcher := NewWebHookWatcher(5, 3, config.Logger)
  74. r, err := NewRaft(config, transport, applyCh)
  75. if err != nil {
  76. engine.Close()
  77. return nil, err
  78. }
  79. s := &KVServer{
  80. Raft: r,
  81. DB: engine,
  82. CLI: nil,
  83. AuthManager: nil, // initialized below
  84. Watcher: watcher,
  85. stopCh: stopCh,
  86. pendingRequests: make(map[uint64]chan error),
  87. }
  88. // Initialize AuthManager
  89. s.AuthManager = NewAuthManager(s)
  90. // Load Auth Data from DB (if any)
  91. if err := s.AuthManager.LoadFromDB(); err != nil {
  92. s.Raft.config.Logger.Warn("Failed to load auth data from DB: %v", err)
  93. }
  94. // Initialize CLI
  95. s.CLI = NewCLI(s)
  96. // Start applying entries
  97. go s.runApplyLoop(applyCh)
  98. // Start background maintenance loop
  99. s.wg.Add(1)
  100. go s.maintenanceLoop()
  101. return s, nil
  102. }
  103. func (s *KVServer) Start() error {
  104. // Start CLI if enabled
  105. if s.Raft.config.EnableCLI {
  106. go s.CLI.Start()
  107. }
  108. // Start HTTP Server if configured
  109. if s.Raft.config.HTTPAddr != "" {
  110. if err := s.startHTTPServer(s.Raft.config.HTTPAddr); err != nil {
  111. s.Raft.config.Logger.Warn("Failed to start HTTP server: %v", err)
  112. }
  113. }
  114. // Start TCP Server (Hardcoded offset +10 for demo or config?)
  115. // User didn't add TCPPort to config, so let's derive it or just fix it.
  116. // For demo, if ListenAddr is 127.0.0.1:9001, we try 9011.
  117. host, port, _ := net.SplitHostPort(s.Raft.config.ListenAddr)
  118. if port == "9001" {
  119. tcpAddr := fmt.Sprintf("%s:%s", host, "9011")
  120. if err := s.StartTCPServer(tcpAddr); err != nil {
  121. s.Raft.config.Logger.Warn("Failed to start TCP server: %v", err)
  122. }
  123. } else if port == "9002" {
  124. tcpAddr := fmt.Sprintf("%s:%s", host, "9012")
  125. s.StartTCPServer(tcpAddr)
  126. } else {
  127. // Fallback or ignore for other nodes in demo
  128. }
  129. return s.Raft.Start()
  130. }
  131. func (s *KVServer) Stop() error {
  132. var err error
  133. s.stopOnce.Do(func() {
  134. // Stop maintenance loop
  135. if s.stopCh != nil {
  136. close(s.stopCh)
  137. s.wg.Wait()
  138. }
  139. // Stop Watcher
  140. if s.Watcher != nil {
  141. s.Watcher.Stop()
  142. }
  143. // Stop HTTP Server
  144. if s.httpServer != nil {
  145. s.httpServer.Close()
  146. }
  147. // Stop Raft first
  148. if errRaft := s.Raft.Stop(); errRaft != nil {
  149. err = errRaft
  150. }
  151. // Close DB
  152. if s.DB != nil {
  153. if errDB := s.DB.Close(); errDB != nil {
  154. // Combine errors if both fail
  155. if err != nil {
  156. err = fmt.Errorf("raft stop error: %v, db close error: %v", err, errDB)
  157. } else {
  158. err = errDB
  159. }
  160. }
  161. }
  162. })
  163. return err
  164. }
  165. func (s *KVServer) runApplyLoop(applyCh chan ApplyMsg) {
  166. for msg := range applyCh {
  167. if msg.CommandValid {
  168. // Optimization: Skip if already applied
  169. // We check this here to avoid unmarshalling and locking DB for known duplicates
  170. if msg.CommandIndex <= s.DB.GetLastAppliedIndex() {
  171. // Even if duplicate, we might have someone waiting for it if they just proposed it
  172. // but this is unlikely for duplicates.
  173. // However, notify waiters just in case.
  174. s.notifyPending(msg.CommandIndex, nil)
  175. continue
  176. }
  177. var cmd KVCommand
  178. if err := json.Unmarshal(msg.Command, &cmd); err != nil {
  179. s.Raft.config.Logger.Error("Failed to unmarshal command: %v", err)
  180. s.notifyPending(msg.CommandIndex, err)
  181. continue
  182. }
  183. var err error
  184. switch cmd.Type {
  185. case KVSet:
  186. // Update Auth Cache for system keys
  187. // This includes AuthLockPrefix now
  188. if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
  189. s.AuthManager.UpdateCache(cmd.Key, cmd.Value, false)
  190. }
  191. err = s.DB.Set(cmd.Key, cmd.Value, msg.CommandIndex)
  192. if err == nil {
  193. s.Watcher.Notify(cmd.Key, cmd.Value, KVSet)
  194. }
  195. case KVDel:
  196. // Update Auth Cache for system keys
  197. if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
  198. s.AuthManager.UpdateCache(cmd.Key, "", true)
  199. }
  200. err = s.DB.Delete(cmd.Key, msg.CommandIndex)
  201. if err == nil {
  202. s.Watcher.Notify(cmd.Key, "", KVDel)
  203. }
  204. default:
  205. s.Raft.config.Logger.Error("Unknown command type: %d", cmd.Type)
  206. }
  207. if err != nil {
  208. s.Raft.config.Logger.Error("DB Apply failed: %v", err)
  209. }
  210. // Notify anyone waiting for this index
  211. s.notifyPending(msg.CommandIndex, err)
  212. } else if msg.SnapshotValid {
  213. if err := s.DB.Restore(msg.Snapshot); err != nil {
  214. s.Raft.config.Logger.Error("DB Restore failed: %v", err)
  215. }
  216. }
  217. }
  218. }
  219. // notifyPending notifies any waiting requests for the given index
  220. func (s *KVServer) notifyPending(index uint64, err error) {
  221. s.pendingMu.Lock()
  222. defer s.pendingMu.Unlock()
  223. if ch, ok := s.pendingRequests[index]; ok {
  224. // Non-blocking send in case listener is gone (buffered channel recommended)
  225. select {
  226. case ch <- err:
  227. default:
  228. }
  229. close(ch)
  230. delete(s.pendingRequests, index)
  231. }
  232. }
  233. // SetAuthenticatedAsync sets a key-value pair asynchronously with permission check
  234. func (s *KVServer) SetAuthenticatedAsync(key, value, token string) error {
  235. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, value); err != nil {
  236. return err
  237. }
  238. return s.Set(key, value)
  239. }
  240. // SetAuthenticated sets a key-value pair with permission check
  241. func (s *KVServer) SetAuthenticated(key, value, token string) error {
  242. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, value); err != nil {
  243. return err
  244. }
  245. // Use SetSync for consistency in CLI/API
  246. return s.SetSync(key, value)
  247. }
  248. // DelAuthenticated deletes a key with permission check
  249. func (s *KVServer) DelAuthenticated(key, token string) error {
  250. if err := s.AuthManager.CheckPermission(token, key, ActionWrite, ""); err != nil {
  251. return err
  252. }
  253. // Use SetSync (via Del which should be updated or create DelSync)
  254. // For simplicity, we implement DelSync logic here or update Del to be sync?
  255. // Let's implement DelSync
  256. return s.DelSync(key)
  257. }
  258. // Set sets a key-value pair (Async - eventually consistent)
  259. func (s *KVServer) Set(key, value string) error {
  260. cmd := KVCommand{
  261. Type: KVSet,
  262. Key: key,
  263. Value: value,
  264. }
  265. data, err := json.Marshal(cmd)
  266. if err != nil {
  267. return err
  268. }
  269. _, _, err = s.Raft.ProposeWithForward(data)
  270. return err
  271. }
  272. // SetSync sets a key-value pair and waits for it to be applied (Read-Your-Writes)
  273. func (s *KVServer) SetSync(key, value string) error {
  274. cmd := KVCommand{
  275. Type: KVSet,
  276. Key: key,
  277. Value: value,
  278. }
  279. data, err := json.Marshal(cmd)
  280. if err != nil {
  281. return err
  282. }
  283. index, _, err := s.Raft.ProposeWithForward(data)
  284. if err != nil {
  285. return err
  286. }
  287. // Wait for application
  288. ch := make(chan error, 1)
  289. s.pendingMu.Lock()
  290. s.pendingRequests[index] = ch
  291. s.pendingMu.Unlock()
  292. select {
  293. case applyErr := <-ch:
  294. return applyErr
  295. case <-time.After(5 * time.Second):
  296. s.pendingMu.Lock()
  297. delete(s.pendingRequests, index)
  298. s.pendingMu.Unlock()
  299. return fmt.Errorf("timeout waiting for apply")
  300. }
  301. }
  302. // Del deletes a key (Async)
  303. func (s *KVServer) Del(key string) error {
  304. cmd := KVCommand{
  305. Type: KVDel,
  306. Key: key,
  307. }
  308. data, err := json.Marshal(cmd)
  309. if err != nil {
  310. return err
  311. }
  312. _, _, err = s.Raft.ProposeWithForward(data)
  313. return err
  314. }
  315. // DelSync deletes a key and waits for it to be applied
  316. func (s *KVServer) DelSync(key string) error {
  317. cmd := KVCommand{
  318. Type: KVDel,
  319. Key: key,
  320. }
  321. data, err := json.Marshal(cmd)
  322. if err != nil {
  323. return err
  324. }
  325. index, _, err := s.Raft.ProposeWithForward(data)
  326. if err != nil {
  327. return err
  328. }
  329. // Wait for application
  330. ch := make(chan error, 1)
  331. s.pendingMu.Lock()
  332. s.pendingRequests[index] = ch
  333. s.pendingMu.Unlock()
  334. select {
  335. case applyErr := <-ch:
  336. return applyErr
  337. case <-time.After(5 * time.Second):
  338. s.pendingMu.Lock()
  339. delete(s.pendingRequests, index)
  340. s.pendingMu.Unlock()
  341. return fmt.Errorf("timeout waiting for apply")
  342. }
  343. }
  344. // GetAuthenticated gets a value with permission check (local read)
  345. func (s *KVServer) GetAuthenticated(key, token string) (string, bool, error) {
  346. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
  347. return "", false, err
  348. }
  349. val, ok := s.Get(key)
  350. return val, ok, nil
  351. }
  352. // Get gets a value (local read, can be stale)
  353. // For linearizable reads, use GetLinear instead
  354. func (s *KVServer) Get(key string) (string, bool) {
  355. return s.DB.Get(key)
  356. }
  357. // GetLinearAuthenticated gets a value with linearizable consistency and permission check
  358. func (s *KVServer) GetLinearAuthenticated(key, token string) (string, bool, error) {
  359. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
  360. return "", false, err
  361. }
  362. return s.GetLinear(key)
  363. }
  364. // Logout invalidates the current session
  365. func (s *KVServer) Logout(token string) error {
  366. return s.AuthManager.Logout(token)
  367. }
  368. // GetSessionInfo returns the session details
  369. func (s *KVServer) GetSessionInfo(token string) (*Session, error) {
  370. return s.AuthManager.GetSession(token)
  371. }
  372. // IsAdmin checks if the token belongs to a user with admin privileges
  373. // defined as having "admin" action on "*" key pattern
  374. func (s *KVServer) IsAdmin(token string) bool {
  375. if !s.AuthManager.IsEnabled() {
  376. return true // If auth disabled, everyone is admin (or handled by caller)
  377. }
  378. // Check internal system token
  379. if token == "SYSTEM_INTERNAL" {
  380. return true
  381. }
  382. return s.AuthManager.CheckPermission(token, "*", ActionAdmin, "") == nil
  383. }
  384. // IsRoot checks if the token belongs to the root user
  385. // Deprecated: Use IsAdmin instead
  386. func (s *KVServer) IsRoot(token string) bool {
  387. sess, err := s.AuthManager.GetSession(token)
  388. if err != nil {
  389. return false
  390. }
  391. return sess.Username == "root"
  392. }
  393. // CreateUser creates a new user (Delegated Admin)
  394. func (s *KVServer) CreateUser(username, password string, roles []string, token string) error {
  395. if s.AuthManager.IsEnabled() {
  396. // 1. Permission check: Must have write permission on system.user.<username>
  397. // This enables delegated administration (e.g., dept_admin can create users in their scope)
  398. // Or if global admin
  399. targetKey := AuthUserPrefix + username
  400. if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
  401. // Fallback to global admin check for backward compatibility or clearer error
  402. if !s.IsAdmin(token) {
  403. return fmt.Errorf("permission denied: cannot create user '%s'", username)
  404. }
  405. }
  406. // 2. Verify Delegation: Can I assign these roles?
  407. // To assign a role, I must have ALL permissions that the role contains.
  408. for _, roleName := range roles {
  409. // Let's implement a helper in AuthManager to get Role Effective Permissions
  410. rolePerms, err := s.AuthManager.GetRoleEffectivePermissions(roleName)
  411. if err != nil {
  412. return err
  413. }
  414. if !s.AuthManager.IsSubset(token, rolePerms) {
  415. return fmt.Errorf("permission denied: cannot assign role '%s' (exceeds your permissions)", roleName)
  416. }
  417. }
  418. }
  419. // Use RegisterUserSync
  420. return s.AuthManager.RegisterUser(username, password, roles)
  421. }
  422. // DeleteUser deletes a user (Delegated Admin)
  423. func (s *KVServer) DeleteUser(username string, token string) error {
  424. if s.AuthManager.IsEnabled() {
  425. // 1. Permission Check: Must have write permission on system.user.<username>
  426. targetKey := AuthUserPrefix + username
  427. if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
  428. if !s.IsAdmin(token) {
  429. return fmt.Errorf("permission denied: cannot delete user '%s'", username)
  430. }
  431. }
  432. // 2. Verify Delegation: Can I delete this user?
  433. // I can only delete a user if I *could* have created them (i.e. I dominate their permissions).
  434. targetUser, err := s.AuthManager.GetUser(username)
  435. if err != nil {
  436. return err
  437. }
  438. if !s.AuthManager.IsSubset(token, targetUser.EffectivePermissions) {
  439. return fmt.Errorf("permission denied: cannot delete user '%s' (has permissions exceeding yours)", username)
  440. }
  441. }
  442. // Check if user exists
  443. if _, err := s.AuthManager.GetUser(username); err != nil {
  444. return err
  445. }
  446. if username == "root" {
  447. return fmt.Errorf("cannot delete root user")
  448. }
  449. // Use DelSync
  450. return s.DelSync(AuthUserPrefix + username)
  451. }
  452. // UpdateUser updates generic user fields (Delegated Admin)
  453. func (s *KVServer) UpdateUser(user User, token string) error {
  454. if s.AuthManager.IsEnabled() {
  455. // 1. Permission Check
  456. targetKey := AuthUserPrefix + user.Username
  457. if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
  458. if !s.IsAdmin(token) {
  459. return fmt.Errorf("permission denied: cannot modify user '%s'", user.Username)
  460. }
  461. }
  462. // 2. Check if I can modify this user (Target Check)
  463. oldUser, err := s.AuthManager.GetUser(user.Username)
  464. if err != nil {
  465. return err
  466. }
  467. if !s.AuthManager.IsSubset(token, oldUser.EffectivePermissions) {
  468. return fmt.Errorf("permission denied: cannot modify user '%s' (has permissions exceeding yours)", user.Username)
  469. }
  470. // 3. Check if the NEW state is valid (Delegation Check)
  471. for _, roleName := range user.Roles {
  472. rolePerms, err := s.AuthManager.GetRoleEffectivePermissions(roleName)
  473. if err != nil {
  474. return err
  475. }
  476. if !s.AuthManager.IsSubset(token, rolePerms) {
  477. return fmt.Errorf("permission denied: cannot assign role '%s'", roleName)
  478. }
  479. }
  480. // Also check specific AllowPermissions if updated
  481. if !s.AuthManager.IsSubset(token, user.AllowPermissions) {
  482. return fmt.Errorf("permission denied: cannot assign specific permissions")
  483. }
  484. }
  485. // Check if user exists
  486. if _, err := s.AuthManager.GetUser(user.Username); err != nil {
  487. return err
  488. }
  489. return s.AuthManager.UpdateUser(user)
  490. }
  491. // ChangeUserPassword changes a user's password (Admin or Self)
  492. func (s *KVServer) ChangeUserPassword(username, newPassword string, token string) error {
  493. if s.AuthManager.IsEnabled() {
  494. session, err := s.AuthManager.GetSession(token)
  495. if err != nil {
  496. return err
  497. }
  498. // Allow if Self
  499. if session.Username == username {
  500. // OK
  501. } else {
  502. // Or has write permission on target user
  503. targetKey := AuthUserPrefix + username
  504. if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
  505. if !s.IsAdmin(token) {
  506. return fmt.Errorf("permission denied")
  507. }
  508. }
  509. }
  510. }
  511. // Use ChangePassword (which should use SetSync)
  512. return s.AuthManager.ChangePassword(username, newPassword)
  513. }
  514. // Role Management Helpers
  515. // CreateRole creates a new role (Delegated Admin)
  516. func (s *KVServer) CreateRole(name string, token string) error {
  517. if s.AuthManager.IsEnabled() {
  518. // Permission Check: Write on system.role.<name>
  519. targetKey := AuthRolePrefix + name
  520. if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
  521. if !s.IsAdmin(token) {
  522. return fmt.Errorf("permission denied: cannot create role '%s'", name)
  523. }
  524. }
  525. }
  526. // Creating an empty role is safe?
  527. // Yes, permissions are added later.
  528. return s.AuthManager.CreateRole(name)
  529. }
  530. // DeleteRole deletes a role (Delegated Admin)
  531. func (s *KVServer) DeleteRole(name string, token string) error {
  532. if s.AuthManager.IsEnabled() {
  533. // Permission Check
  534. targetKey := AuthRolePrefix + name
  535. if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
  536. if !s.IsAdmin(token) {
  537. return fmt.Errorf("permission denied: cannot delete role '%s'", name)
  538. }
  539. }
  540. // Verify: Can I delete this role?
  541. // I must dominate the role's permissions.
  542. rolePerms, err := s.AuthManager.GetRoleEffectivePermissions(name)
  543. if err != nil {
  544. return err
  545. }
  546. if !s.AuthManager.IsSubset(token, rolePerms) {
  547. return fmt.Errorf("permission denied: cannot delete role '%s' (exceeds your permissions)", name)
  548. }
  549. }
  550. return s.AuthManager.DeleteRole(name)
  551. }
  552. // UpdateRole updates a role (Delegated Admin)
  553. func (s *KVServer) UpdateRole(role Role, token string) error {
  554. if s.AuthManager.IsEnabled() {
  555. // Permission Check
  556. targetKey := AuthRolePrefix + role.Name
  557. if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
  558. if !s.IsAdmin(token) {
  559. return fmt.Errorf("permission denied: cannot modify role '%s'", role.Name)
  560. }
  561. }
  562. // 1. Check Old State (Can I modify it?)
  563. oldPerms, err := s.AuthManager.GetRoleEffectivePermissions(role.Name)
  564. if err != nil {
  565. return err
  566. }
  567. if !s.AuthManager.IsSubset(token, oldPerms) {
  568. return fmt.Errorf("permission denied: cannot modify role '%s' (current permissions exceed yours)", role.Name)
  569. }
  570. // 2. Check New State (Can I assign these new permissions?)
  571. // Check Parents
  572. for _, p := range role.ParentRoles {
  573. pPerms, err := s.AuthManager.GetRoleEffectivePermissions(p)
  574. if err != nil {
  575. return err
  576. }
  577. if !s.AuthManager.IsSubset(token, pPerms) {
  578. return fmt.Errorf("permission denied: cannot inherit from role '%s'", p)
  579. }
  580. }
  581. // Check Permissions
  582. if !s.AuthManager.IsSubset(token, role.Permissions) {
  583. return fmt.Errorf("permission denied: cannot assign specified permissions")
  584. }
  585. }
  586. return s.AuthManager.UpdateRole(role)
  587. }
  588. // ListUsers lists all users (Admin only)
  589. func (s *KVServer) ListUsers(token string) ([]*User, error) {
  590. if s.AuthManager.IsEnabled() {
  591. // Check for general read permission on users
  592. // Ideally checking system.user.*
  593. if err := s.AuthManager.CheckPermission(token, AuthUserPrefix + "*", ActionRead, ""); err != nil {
  594. if !s.IsAdmin(token) {
  595. return nil, fmt.Errorf("permission denied: admin access required")
  596. }
  597. }
  598. }
  599. return s.AuthManager.ListUsers(), nil
  600. }
  601. // ListRoles lists all roles (Admin only)
  602. func (s *KVServer) ListRoles(token string) ([]*Role, error) {
  603. if s.AuthManager.IsEnabled() {
  604. // Check for general read permission on roles
  605. if err := s.AuthManager.CheckPermission(token, AuthRolePrefix + "*", ActionRead, ""); err != nil {
  606. if !s.IsAdmin(token) {
  607. return nil, fmt.Errorf("permission denied: admin access required")
  608. }
  609. }
  610. }
  611. return s.AuthManager.ListRoles(), nil
  612. }
  613. // SearchAuthenticated searches keys with permission checks
  614. func (s *KVServer) SearchAuthenticated(pattern string, limit, offset int, token string) ([]db.QueryResult, error) {
  615. // Optimization: If user has full access (*), delegate to DB with limit/offset
  616. if s.AuthManager.HasFullAccess(token) {
  617. sql := fmt.Sprintf("key like \"%s\" LIMIT %d OFFSET %d", pattern, limit, offset)
  618. return s.DB.Query(sql)
  619. }
  620. // Slow path: Fetch all potential matches and filter
  621. // We construct a SQL query that retrieves CANDIDATES.
  622. // We cannot safely apply LIMIT/OFFSET in SQL because we might filter out some results.
  623. // So we must fetch ALL matches.
  624. // WARNING: This can be slow and memory intensive.
  625. // If pattern is wildcard, we might fetch everything.
  626. sql := fmt.Sprintf("key like \"%s\"", pattern)
  627. results, err := s.DB.Query(sql)
  628. if err != nil {
  629. return nil, err
  630. }
  631. filtered := make([]db.QueryResult, 0, len(results))
  632. // Apply Filtering
  633. for _, r := range results {
  634. if err := s.AuthManager.CheckPermission(token, r.Key, ActionRead, ""); err == nil {
  635. filtered = append(filtered, r)
  636. }
  637. }
  638. // Apply Pagination on Filtered Results
  639. if offset > len(filtered) {
  640. return []db.QueryResult{}, nil
  641. }
  642. start := offset
  643. end := offset + limit
  644. if end > len(filtered) {
  645. end = len(filtered)
  646. }
  647. return filtered[start:end], nil
  648. }
  649. // CountAuthenticated counts keys with permission checks
  650. func (s *KVServer) CountAuthenticated(pattern string, token string) (int, error) {
  651. // Optimization: If user has full access
  652. if s.AuthManager.HasFullAccess(token) {
  653. sql := ""
  654. if pattern == "*" {
  655. sql = "*"
  656. } else {
  657. sql = fmt.Sprintf("key like \"%s\"", pattern)
  658. }
  659. return s.DB.Count(sql)
  660. }
  661. // Slow path: Iterate and check
  662. // We use DB.Query to get keys (Query returns values too, which is wasteful but DB engine doesn't expose keys-only Query yet)
  663. // Actually, we can access s.DB.Index.WalkPrefix if we want to be faster and avoid value read,
  664. // but s.DB.Index is inside 'db' package. We can access it if it's exported.
  665. // 'db' package exports 'Engine' and 'FlatIndex'.
  666. // So s.DB.Index IS accessible.
  667. count := 0
  668. // Determine prefix from pattern
  669. prefix := ""
  670. if strings.HasSuffix(pattern, "*") {
  671. prefix = strings.TrimSuffix(pattern, "*")
  672. } else if pattern == "*" {
  673. prefix = ""
  674. } else {
  675. // Exact match check
  676. if err := s.AuthManager.CheckPermission(token, pattern, ActionRead, ""); err == nil {
  677. // Check if exists
  678. if _, ok := s.DB.Get(pattern); ok {
  679. return 1, nil
  680. }
  681. return 0, nil
  682. }
  683. return 0, nil // No perm or not found
  684. }
  685. // Walk
  686. // Note: WalkPrefix locks the DB Index (Read Lock).
  687. // Calling CheckPermission inside might involve some logic but it is memory-only and usually fast.
  688. // However, if CheckPermission takes time, we hold DB lock.
  689. s.DB.Index.WalkPrefix(prefix, func(key string, entry db.IndexEntry) bool {
  690. // Check pattern match first (WalkPrefix is just prefix, pattern might be more complex like "user.*.name")
  691. if !db.WildcardMatch(key, pattern) {
  692. return true
  693. }
  694. if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err == nil {
  695. count++
  696. }
  697. return true
  698. })
  699. return count, nil
  700. }
  701. // GetLinear gets a value with linearizable consistency
  702. // This ensures the read sees all writes committed before the read started
  703. func (s *KVServer) GetLinear(key string) (string, bool, error) {
  704. // First, ensure we have up-to-date data via ReadIndex
  705. _, err := s.Raft.ReadIndex()
  706. if err != nil {
  707. // If we're not leader, try forwarding
  708. if errors.Is(err, ErrNotLeader) {
  709. return s.forwardGet(key)
  710. }
  711. return "", false, err
  712. }
  713. val, ok := s.DB.Get(key)
  714. return val, ok, nil
  715. }
  716. // forwardGet forwards a get request to the leader
  717. func (s *KVServer) forwardGet(key string) (string, bool, error) {
  718. return s.Raft.ForwardGet(key)
  719. }
  720. // Join joins an existing cluster
  721. func (s *KVServer) Join(nodeID, addr string) error {
  722. return s.Raft.AddNodeWithForward(nodeID, addr)
  723. }
  724. // Leave leaves the cluster
  725. func (s *KVServer) Leave(nodeID string) error {
  726. // Mark node as leaving to prevent auto-rejoin
  727. s.leavingNodes.Store(nodeID, time.Now())
  728. // Auto-expire the leaving flag after a while
  729. go func() {
  730. time.Sleep(30 * time.Second)
  731. s.leavingNodes.Delete(nodeID)
  732. }()
  733. // Remove from RaftNode discovery key first to prevent auto-rejoin
  734. if err := s.removeNodeFromDiscovery(nodeID); err != nil {
  735. s.Raft.config.Logger.Warn("Failed to remove node from discovery key: %v", err)
  736. // Continue anyway, as the main goal is to leave the cluster
  737. }
  738. return s.Raft.RemoveNodeWithForward(nodeID)
  739. }
  740. // removeNodeFromDiscovery removes a node from the RaftNode key to prevent auto-rejoin
  741. func (s *KVServer) removeNodeFromDiscovery(targetID string) error {
  742. val, ok := s.Get("RaftNode")
  743. if !ok || val == "" {
  744. return nil
  745. }
  746. parts := strings.Split(val, ";")
  747. var newParts []string
  748. changed := false
  749. for _, part := range parts {
  750. if part == "" {
  751. continue
  752. }
  753. kv := strings.SplitN(part, "=", 2)
  754. if len(kv) == 2 {
  755. if kv[0] == targetID {
  756. changed = true
  757. continue // Skip this node
  758. }
  759. newParts = append(newParts, part)
  760. }
  761. }
  762. if changed {
  763. newVal := strings.Join(newParts, ";")
  764. return s.Set("RaftNode", newVal)
  765. }
  766. return nil
  767. }
  768. // WaitForLeader waits until a leader is elected
  769. func (s *KVServer) WaitForLeader(timeout time.Duration) error {
  770. deadline := time.Now().Add(timeout)
  771. for time.Now().Before(deadline) {
  772. leader := s.Raft.GetLeaderID()
  773. if leader != "" {
  774. return nil
  775. }
  776. time.Sleep(100 * time.Millisecond)
  777. }
  778. return fmt.Errorf("timeout waiting for leader")
  779. }
  780. // HealthCheck returns the health status of this server
  781. func (s *KVServer) HealthCheck() HealthStatus {
  782. return s.Raft.HealthCheck()
  783. }
  784. // GetStats returns runtime statistics
  785. func (s *KVServer) GetStats() Stats {
  786. return s.Raft.GetStats()
  787. }
  788. // GetMetrics returns runtime metrics
  789. func (s *KVServer) GetMetrics() Metrics {
  790. return s.Raft.GetMetrics()
  791. }
  792. // TransferLeadership transfers leadership to the specified node
  793. func (s *KVServer) TransferLeadership(targetID string) error {
  794. return s.Raft.TransferLeadership(targetID)
  795. }
  796. // GetClusterNodes returns current cluster membership
  797. func (s *KVServer) GetClusterNodes() map[string]string {
  798. return s.Raft.GetClusterNodes()
  799. }
  800. // IsLeader returns true if this node is the leader
  801. func (s *KVServer) IsLeader() bool {
  802. _, isLeader := s.Raft.GetState()
  803. return isLeader
  804. }
  805. // GetLeaderID returns the current leader ID
  806. func (s *KVServer) GetLeaderID() string {
  807. return s.Raft.GetLeaderID()
  808. }
  809. // GetLogSize returns the raft log size
  810. func (s *KVServer) GetLogSize() int64 {
  811. return s.Raft.log.GetLogSize()
  812. }
  813. // GetDBSize returns the db size
  814. func (s *KVServer) GetDBSize() int64 {
  815. return s.DB.GetDBSize()
  816. }
  817. // WatchURL registers a webhook url for a key
  818. func (s *KVServer) WatchURL(key, url string) {
  819. s.Watcher.Subscribe(key, url)
  820. }
  821. // UnwatchURL removes a webhook url for a key
  822. func (s *KVServer) UnwatchURL(key, url string) {
  823. s.Watcher.Unsubscribe(key, url)
  824. }
  825. // WatchAll registers a watcher for all keys
  826. func (s *KVServer) WatchAll(handler WatchHandler) {
  827. // s.FSM.WatchAll(handler)
  828. // TODO: Implement Watcher for DB
  829. }
  830. // Watch registers a watcher for a key
  831. func (s *KVServer) Watch(key string, handler WatchHandler) {
  832. // s.FSM.Watch(key, handler)
  833. // TODO: Implement Watcher for DB
  834. }
  835. // Unwatch removes watchers for a key
  836. func (s *KVServer) Unwatch(key string) {
  837. // s.FSM.Unwatch(key)
  838. // TODO: Implement Watcher for DB
  839. }
  840. func (s *KVServer) maintenanceLoop() {
  841. defer s.wg.Done()
  842. // Check every 1 second for faster reaction
  843. ticker := time.NewTicker(1 * time.Second)
  844. defer ticker.Stop()
  845. for {
  846. select {
  847. case <-s.stopCh:
  848. return
  849. case <-ticker.C:
  850. s.updateNodeInfo()
  851. s.checkConnections()
  852. }
  853. }
  854. }
  855. func (s *KVServer) updateNodeInfo() {
  856. // 1. Ensure "CreateNode/<NodeID>" is set to self address
  857. // We do this via Propose (Set) so it's replicated
  858. myID := s.Raft.config.NodeID
  859. myAddr := s.Raft.config.ListenAddr
  860. key := fmt.Sprintf("CreateNode/%s", myID)
  861. // Check if we need to update (avoid spamming logs/proposals)
  862. val, exists := s.Get(key)
  863. if !exists || val != myAddr {
  864. // Run in goroutine to avoid blocking
  865. go func() {
  866. if err := s.Set(key, myAddr); err != nil {
  867. s.Raft.config.Logger.Debug("Failed to update node info: %v", err)
  868. }
  869. }()
  870. }
  871. // 2. Only leader updates RaftNode aggregation
  872. if s.IsLeader() {
  873. // Read current RaftNode to preserve history
  874. currentVal, _ := s.Get("RaftNode")
  875. knownNodes := make(map[string]string)
  876. if currentVal != "" {
  877. parts := strings.Split(currentVal, ";")
  878. for _, part := range parts {
  879. if part == "" { continue }
  880. kv := strings.SplitN(part, "=", 2)
  881. if len(kv) == 2 {
  882. knownNodes[kv[0]] = kv[1]
  883. }
  884. }
  885. }
  886. // Merge current cluster nodes
  887. changed := false
  888. currentCluster := s.GetClusterNodes()
  889. for id, addr := range currentCluster {
  890. // Skip nodes that are marked as leaving
  891. if _, leaving := s.leavingNodes.Load(id); leaving {
  892. continue
  893. }
  894. if knownNodes[id] != addr {
  895. knownNodes[id] = addr
  896. changed = true
  897. }
  898. }
  899. // If changed, update RaftNode
  900. if changed {
  901. var peers []string
  902. for id, addr := range knownNodes {
  903. peers = append(peers, fmt.Sprintf("%s=%s", id, addr))
  904. }
  905. sort.Strings(peers)
  906. newVal := strings.Join(peers, ";")
  907. // Check again if we need to write to avoid loops if Get returned stale
  908. if newVal != currentVal {
  909. go func(k, v string) {
  910. if err := s.Set(k, v); err != nil {
  911. s.Raft.config.Logger.Warn("Failed to update RaftNode key: %v", err)
  912. }
  913. }("RaftNode", newVal)
  914. }
  915. }
  916. }
  917. }
  918. func (s *KVServer) checkConnections() {
  919. if !s.IsLeader() {
  920. return
  921. }
  922. // Read RaftNode key to find potential members that are missing
  923. val, ok := s.Get("RaftNode")
  924. if !ok || val == "" {
  925. return
  926. }
  927. // Parse saved nodes
  928. savedParts := strings.Split(val, ";")
  929. currentNodes := s.GetClusterNodes()
  930. // Invert currentNodes for address check
  931. currentAddrs := make(map[string]bool)
  932. for _, addr := range currentNodes {
  933. currentAddrs[addr] = true
  934. }
  935. for _, part := range savedParts {
  936. if part == "" {
  937. continue
  938. }
  939. // Expect id=addr
  940. kv := strings.SplitN(part, "=", 2)
  941. if len(kv) != 2 {
  942. continue
  943. }
  944. id, addr := kv[0], kv[1]
  945. // Skip invalid addresses
  946. if strings.HasPrefix(addr, ".") || !strings.Contains(addr, ":") {
  947. continue
  948. }
  949. if !currentAddrs[addr] {
  950. // Skip nodes that are marked as leaving
  951. if _, leaving := s.leavingNodes.Load(id); leaving {
  952. continue
  953. }
  954. // Found a node that was previously in the cluster but is now missing
  955. // Try to add it back
  956. // We use AddNodeWithForward which handles non-blocking internally somewhat,
  957. // but we should run this in goroutine to not block the loop
  958. go func(nodeID, nodeAddr string) {
  959. // Try to add node
  960. s.Raft.config.Logger.Info("Auto-rejoining node found in RaftNode: %s (%s)", nodeID, nodeAddr)
  961. if err := s.Join(nodeID, nodeAddr); err != nil {
  962. s.Raft.config.Logger.Debug("Failed to auto-rejoin node %s: %v", nodeID, err)
  963. }
  964. }(id, addr)
  965. }
  966. }
  967. }
  968. // startHTTPServer starts the HTTP API server
  969. func (s *KVServer) startHTTPServer(addr string) error {
  970. mux := http.NewServeMux()
  971. // KV API
  972. mux.HandleFunc("/kv", func(w http.ResponseWriter, r *http.Request) {
  973. token := r.Header.Get("X-Raft-Token")
  974. switch r.Method {
  975. case http.MethodGet:
  976. key := r.URL.Query().Get("key")
  977. if key == "" {
  978. http.Error(w, "missing key", http.StatusBadRequest)
  979. return
  980. }
  981. // Use Authenticated method
  982. val, found, err := s.GetLinearAuthenticated(key, token)
  983. if err != nil {
  984. // Distinguish auth error vs raft error?
  985. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  986. http.Error(w, err.Error(), http.StatusForbidden)
  987. } else {
  988. http.Error(w, err.Error(), http.StatusInternalServerError)
  989. }
  990. return
  991. }
  992. if !found {
  993. http.Error(w, "not found", http.StatusNotFound)
  994. return
  995. }
  996. w.Write([]byte(val))
  997. case http.MethodPost:
  998. body, _ := io.ReadAll(r.Body)
  999. var req struct {
  1000. Key string `json:"key"`
  1001. Value string `json:"value"`
  1002. }
  1003. if err := json.Unmarshal(body, &req); err != nil {
  1004. http.Error(w, "invalid json", http.StatusBadRequest)
  1005. return
  1006. }
  1007. if err := s.SetAuthenticated(req.Key, req.Value, token); err != nil {
  1008. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  1009. http.Error(w, err.Error(), http.StatusForbidden)
  1010. } else {
  1011. http.Error(w, err.Error(), http.StatusInternalServerError)
  1012. }
  1013. return
  1014. }
  1015. w.WriteHeader(http.StatusOK)
  1016. case http.MethodDelete:
  1017. key := r.URL.Query().Get("key")
  1018. if key == "" {
  1019. http.Error(w, "missing key", http.StatusBadRequest)
  1020. return
  1021. }
  1022. if err := s.DelAuthenticated(key, token); err != nil {
  1023. if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
  1024. http.Error(w, err.Error(), http.StatusForbidden)
  1025. } else {
  1026. http.Error(w, err.Error(), http.StatusInternalServerError)
  1027. }
  1028. return
  1029. }
  1030. w.WriteHeader(http.StatusOK)
  1031. default:
  1032. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  1033. }
  1034. })
  1035. // Auth API
  1036. mux.HandleFunc("/auth/login", func(w http.ResponseWriter, r *http.Request) {
  1037. if r.Method != http.MethodPost {
  1038. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  1039. return
  1040. }
  1041. var req struct {
  1042. Username string `json:"username"`
  1043. Password string `json:"password"`
  1044. Code string `json:"code"`
  1045. }
  1046. body, _ := io.ReadAll(r.Body)
  1047. if err := json.Unmarshal(body, &req); err != nil {
  1048. http.Error(w, "invalid json", http.StatusBadRequest)
  1049. return
  1050. }
  1051. ip := r.RemoteAddr
  1052. if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil {
  1053. ip = host
  1054. }
  1055. token, err := s.AuthManager.Login(req.Username, req.Password, req.Code, ip)
  1056. if err != nil {
  1057. http.Error(w, err.Error(), http.StatusUnauthorized)
  1058. return
  1059. }
  1060. resp := struct {
  1061. Token string `json:"token"`
  1062. }{Token: token}
  1063. json.NewEncoder(w).Encode(resp)
  1064. })
  1065. // Watcher API
  1066. mux.HandleFunc("/watch", func(w http.ResponseWriter, r *http.Request) {
  1067. if r.Method != http.MethodPost {
  1068. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  1069. return
  1070. }
  1071. body, _ := io.ReadAll(r.Body)
  1072. var req struct {
  1073. Key string `json:"key"`
  1074. URL string `json:"url"`
  1075. }
  1076. if err := json.Unmarshal(body, &req); err != nil {
  1077. http.Error(w, "invalid json", http.StatusBadRequest)
  1078. return
  1079. }
  1080. if req.Key == "" || req.URL == "" {
  1081. http.Error(w, "missing key or url", http.StatusBadRequest)
  1082. return
  1083. }
  1084. s.WatchURL(req.Key, req.URL)
  1085. w.WriteHeader(http.StatusOK)
  1086. })
  1087. mux.HandleFunc("/unwatch", func(w http.ResponseWriter, r *http.Request) {
  1088. if r.Method != http.MethodPost {
  1089. http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
  1090. return
  1091. }
  1092. body, _ := io.ReadAll(r.Body)
  1093. var req struct {
  1094. Key string `json:"key"`
  1095. URL string `json:"url"`
  1096. }
  1097. if err := json.Unmarshal(body, &req); err != nil {
  1098. http.Error(w, "invalid json", http.StatusBadRequest)
  1099. return
  1100. }
  1101. s.UnwatchURL(req.Key, req.URL)
  1102. w.WriteHeader(http.StatusOK)
  1103. })
  1104. s.httpServer = &http.Server{
  1105. Addr: addr,
  1106. Handler: mux,
  1107. }
  1108. go func() {
  1109. s.Raft.config.Logger.Info("HTTP API server listening on %s", addr)
  1110. if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  1111. s.Raft.config.Logger.Error("HTTP server failed: %v", err)
  1112. }
  1113. }()
  1114. return nil
  1115. }