| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307 |
- package raft
- import (
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net"
- "net/http"
- "sort"
- "strings"
- "sync"
- "time"
- "igit.com/xbase/raft/db"
- )
- // KVServer wraps Raft to provide a distributed key-value store
- type KVServer struct {
- Raft *Raft
- DB *db.Engine
- CLI *CLI
- AuthManager *AuthManager
- Watcher *WebHookWatcher
- httpServer *http.Server
- stopCh chan struct{}
- wg sync.WaitGroup
- stopOnce sync.Once
- // leavingNodes tracks nodes that are currently being removed
- // to prevent auto-rejoin/discovery logic from interfering
- leavingNodes sync.Map
-
- // Pending requests for synchronous operations (Read-Your-Writes)
- pendingRequests map[uint64]chan error
- pendingMu sync.Mutex
- }
- // NewKVServer creates a new KV server
- func NewKVServer(config *Config) (*KVServer, error) {
- // Initialize DB Engine
- // Use a subdirectory for DB to avoid conflict with Raft logs if they share DataDir
- dbPath := config.DataDir + "/kv_engine"
- engine, err := db.NewEngine(dbPath)
- if err != nil {
- return nil, fmt.Errorf("failed to create db engine: %w", err)
- }
- // Initialize LastAppliedIndex from DB to prevent re-applying entries
- config.LastAppliedIndex = engine.GetLastAppliedIndex()
- // Create stop channel early for use in callbacks
- stopCh := make(chan struct{})
- // Configure snapshot provider
- config.SnapshotProvider = func(minIncludeIndex uint64) ([]byte, error) {
- // Wait for DB to catch up to the requested index
- // This is critical for data integrity during compaction
- for engine.GetLastAppliedIndex() < minIncludeIndex {
- select {
- case <-stopCh:
- return nil, fmt.Errorf("server stopping")
- default:
- time.Sleep(10 * time.Millisecond)
- }
- }
- // Force sync to disk to ensure data durability before compaction
- // This prevents data loss if Raft logs are compacted but DB data is only in OS cache
- if err := engine.Sync(); err != nil {
- return nil, fmt.Errorf("failed to sync engine before snapshot: %w", err)
- }
- return engine.Snapshot()
- }
- // Configure get handler for remote reads
- config.GetHandler = func(key string) (string, bool) {
- return engine.Get(key)
- }
- applyCh := make(chan ApplyMsg, 1000) // Increase buffer for async processing
- transport := NewTCPTransport(config.ListenAddr, 10, config.Logger)
- // Initialize WebHookWatcher
- // 5 workers, 3 retries
- watcher := NewWebHookWatcher(5, 3, config.Logger)
- r, err := NewRaft(config, transport, applyCh)
- if err != nil {
- engine.Close()
- return nil, err
- }
- s := &KVServer{
- Raft: r,
- DB: engine,
- CLI: nil,
- AuthManager: nil, // initialized below
- Watcher: watcher,
- stopCh: stopCh,
- pendingRequests: make(map[uint64]chan error),
- }
- // Initialize AuthManager
- s.AuthManager = NewAuthManager(s)
- // Load Auth Data from DB (if any)
- if err := s.AuthManager.LoadFromDB(); err != nil {
- s.Raft.config.Logger.Warn("Failed to load auth data from DB: %v", err)
- }
- // Initialize CLI
- s.CLI = NewCLI(s)
- // Start applying entries
- go s.runApplyLoop(applyCh)
- // Start background maintenance loop
- s.wg.Add(1)
- go s.maintenanceLoop()
- return s, nil
- }
- func (s *KVServer) Start() error {
- // Start CLI if enabled
- if s.Raft.config.EnableCLI {
- go s.CLI.Start()
- }
- // Start HTTP Server if configured
- if s.Raft.config.HTTPAddr != "" {
- if err := s.startHTTPServer(s.Raft.config.HTTPAddr); err != nil {
- s.Raft.config.Logger.Warn("Failed to start HTTP server: %v", err)
- }
- }
-
- // Start TCP Server (Hardcoded offset +10 for demo or config?)
- // User didn't add TCPPort to config, so let's derive it or just fix it.
- // For demo, if ListenAddr is 127.0.0.1:9001, we try 9011.
- host, port, _ := net.SplitHostPort(s.Raft.config.ListenAddr)
- if port == "9001" {
- tcpAddr := fmt.Sprintf("%s:%s", host, "9011")
- if err := s.StartTCPServer(tcpAddr); err != nil {
- s.Raft.config.Logger.Warn("Failed to start TCP server: %v", err)
- }
- } else if port == "9002" {
- tcpAddr := fmt.Sprintf("%s:%s", host, "9012")
- s.StartTCPServer(tcpAddr)
- } else {
- // Fallback or ignore for other nodes in demo
- }
- return s.Raft.Start()
- }
- func (s *KVServer) Stop() error {
- var err error
- s.stopOnce.Do(func() {
- // Stop maintenance loop
- if s.stopCh != nil {
- close(s.stopCh)
- s.wg.Wait()
- }
- // Stop Watcher
- if s.Watcher != nil {
- s.Watcher.Stop()
- }
- // Stop HTTP Server
- if s.httpServer != nil {
- s.httpServer.Close()
- }
- // Stop Raft first
- if errRaft := s.Raft.Stop(); errRaft != nil {
- err = errRaft
- }
- // Close DB
- if s.DB != nil {
- if errDB := s.DB.Close(); errDB != nil {
- // Combine errors if both fail
- if err != nil {
- err = fmt.Errorf("raft stop error: %v, db close error: %v", err, errDB)
- } else {
- err = errDB
- }
- }
- }
- })
- return err
- }
- func (s *KVServer) runApplyLoop(applyCh chan ApplyMsg) {
- for msg := range applyCh {
- if msg.CommandValid {
- // Optimization: Skip if already applied
- // We check this here to avoid unmarshalling and locking DB for known duplicates
- if msg.CommandIndex <= s.DB.GetLastAppliedIndex() {
- // Even if duplicate, we might have someone waiting for it if they just proposed it
- // but this is unlikely for duplicates.
- // However, notify waiters just in case.
- s.notifyPending(msg.CommandIndex, nil)
- continue
- }
- var cmd KVCommand
- if err := json.Unmarshal(msg.Command, &cmd); err != nil {
- s.Raft.config.Logger.Error("Failed to unmarshal command: %v", err)
- s.notifyPending(msg.CommandIndex, err)
- continue
- }
- var err error
- switch cmd.Type {
- case KVSet:
- // Update Auth Cache for system keys
- // This includes AuthLockPrefix now
- if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
- s.AuthManager.UpdateCache(cmd.Key, cmd.Value, false)
- }
-
- err = s.DB.Set(cmd.Key, cmd.Value, msg.CommandIndex)
- if err == nil {
- s.Watcher.Notify(cmd.Key, cmd.Value, KVSet)
- }
- case KVDel:
- // Update Auth Cache for system keys
- if strings.HasPrefix(cmd.Key, SystemKeyPrefix) {
- s.AuthManager.UpdateCache(cmd.Key, "", true)
- }
- err = s.DB.Delete(cmd.Key, msg.CommandIndex)
- if err == nil {
- s.Watcher.Notify(cmd.Key, "", KVDel)
- }
- default:
- s.Raft.config.Logger.Error("Unknown command type: %d", cmd.Type)
- }
- if err != nil {
- s.Raft.config.Logger.Error("DB Apply failed: %v", err)
- }
-
- // Notify anyone waiting for this index
- s.notifyPending(msg.CommandIndex, err)
- } else if msg.SnapshotValid {
- if err := s.DB.Restore(msg.Snapshot); err != nil {
- s.Raft.config.Logger.Error("DB Restore failed: %v", err)
- }
- }
- }
- }
- // notifyPending notifies any waiting requests for the given index
- func (s *KVServer) notifyPending(index uint64, err error) {
- s.pendingMu.Lock()
- defer s.pendingMu.Unlock()
-
- if ch, ok := s.pendingRequests[index]; ok {
- // Non-blocking send in case listener is gone (buffered channel recommended)
- select {
- case ch <- err:
- default:
- }
- close(ch)
- delete(s.pendingRequests, index)
- }
- }
- // SetAuthenticatedAsync sets a key-value pair asynchronously with permission check
- func (s *KVServer) SetAuthenticatedAsync(key, value, token string) error {
- if err := s.AuthManager.CheckPermission(token, key, ActionWrite, value); err != nil {
- return err
- }
- return s.Set(key, value)
- }
- // SetAuthenticated sets a key-value pair with permission check
- func (s *KVServer) SetAuthenticated(key, value, token string) error {
- if err := s.AuthManager.CheckPermission(token, key, ActionWrite, value); err != nil {
- return err
- }
- // Use SetSync for consistency in CLI/API
- return s.SetSync(key, value)
- }
- // DelAuthenticated deletes a key with permission check
- func (s *KVServer) DelAuthenticated(key, token string) error {
- if err := s.AuthManager.CheckPermission(token, key, ActionWrite, ""); err != nil {
- return err
- }
- // Use SetSync (via Del which should be updated or create DelSync)
- // For simplicity, we implement DelSync logic here or update Del to be sync?
- // Let's implement DelSync
- return s.DelSync(key)
- }
- // Set sets a key-value pair (Async - eventually consistent)
- func (s *KVServer) Set(key, value string) error {
- cmd := KVCommand{
- Type: KVSet,
- Key: key,
- Value: value,
- }
- data, err := json.Marshal(cmd)
- if err != nil {
- return err
- }
- _, _, err = s.Raft.ProposeWithForward(data)
- return err
- }
- // SetSync sets a key-value pair and waits for it to be applied (Read-Your-Writes)
- func (s *KVServer) SetSync(key, value string) error {
- cmd := KVCommand{
- Type: KVSet,
- Key: key,
- Value: value,
- }
- data, err := json.Marshal(cmd)
- if err != nil {
- return err
- }
- index, _, err := s.Raft.ProposeWithForward(data)
- if err != nil {
- return err
- }
- // Wait for application
- ch := make(chan error, 1)
- s.pendingMu.Lock()
- s.pendingRequests[index] = ch
- s.pendingMu.Unlock()
- select {
- case applyErr := <-ch:
- return applyErr
- case <-time.After(5 * time.Second):
- s.pendingMu.Lock()
- delete(s.pendingRequests, index)
- s.pendingMu.Unlock()
- return fmt.Errorf("timeout waiting for apply")
- }
- }
- // Del deletes a key (Async)
- func (s *KVServer) Del(key string) error {
- cmd := KVCommand{
- Type: KVDel,
- Key: key,
- }
- data, err := json.Marshal(cmd)
- if err != nil {
- return err
- }
- _, _, err = s.Raft.ProposeWithForward(data)
- return err
- }
- // DelSync deletes a key and waits for it to be applied
- func (s *KVServer) DelSync(key string) error {
- cmd := KVCommand{
- Type: KVDel,
- Key: key,
- }
- data, err := json.Marshal(cmd)
- if err != nil {
- return err
- }
- index, _, err := s.Raft.ProposeWithForward(data)
- if err != nil {
- return err
- }
- // Wait for application
- ch := make(chan error, 1)
- s.pendingMu.Lock()
- s.pendingRequests[index] = ch
- s.pendingMu.Unlock()
- select {
- case applyErr := <-ch:
- return applyErr
- case <-time.After(5 * time.Second):
- s.pendingMu.Lock()
- delete(s.pendingRequests, index)
- s.pendingMu.Unlock()
- return fmt.Errorf("timeout waiting for apply")
- }
- }
- // GetAuthenticated gets a value with permission check (local read)
- func (s *KVServer) GetAuthenticated(key, token string) (string, bool, error) {
- if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
- return "", false, err
- }
- val, ok := s.Get(key)
- return val, ok, nil
- }
- // Get gets a value (local read, can be stale)
- // For linearizable reads, use GetLinear instead
- func (s *KVServer) Get(key string) (string, bool) {
- return s.DB.Get(key)
- }
- // GetLinearAuthenticated gets a value with linearizable consistency and permission check
- func (s *KVServer) GetLinearAuthenticated(key, token string) (string, bool, error) {
- if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
- return "", false, err
- }
- return s.GetLinear(key)
- }
- // Logout invalidates the current session
- func (s *KVServer) Logout(token string) error {
- return s.AuthManager.Logout(token)
- }
- // GetSessionInfo returns the session details
- func (s *KVServer) GetSessionInfo(token string) (*Session, error) {
- return s.AuthManager.GetSession(token)
- }
- // IsAdmin checks if the token belongs to a user with admin privileges
- // defined as having "admin" action on "*" key pattern
- func (s *KVServer) IsAdmin(token string) bool {
- if !s.AuthManager.IsEnabled() {
- return true // If auth disabled, everyone is admin (or handled by caller)
- }
-
- // Check internal system token
- if token == "SYSTEM_INTERNAL" {
- return true
- }
- return s.AuthManager.CheckPermission(token, "*", ActionAdmin, "") == nil
- }
- // IsRoot checks if the token belongs to the root user
- // Deprecated: Use IsAdmin instead
- func (s *KVServer) IsRoot(token string) bool {
- sess, err := s.AuthManager.GetSession(token)
- if err != nil {
- return false
- }
- return sess.Username == "root"
- }
- // CreateUser creates a new user (Delegated Admin)
- func (s *KVServer) CreateUser(username, password string, roles []string, token string) error {
- if s.AuthManager.IsEnabled() {
- // 1. Permission check: Must have write permission on system.user.<username>
- // This enables delegated administration (e.g., dept_admin can create users in their scope)
- // Or if global admin
- targetKey := AuthUserPrefix + username
- if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
- // Fallback to global admin check for backward compatibility or clearer error
- if !s.IsAdmin(token) {
- return fmt.Errorf("permission denied: cannot create user '%s'", username)
- }
- }
-
- // 2. Verify Delegation: Can I assign these roles?
- // To assign a role, I must have ALL permissions that the role contains.
- for _, roleName := range roles {
- // Let's implement a helper in AuthManager to get Role Effective Permissions
- rolePerms, err := s.AuthManager.GetRoleEffectivePermissions(roleName)
- if err != nil {
- return err
- }
-
- if !s.AuthManager.IsSubset(token, rolePerms) {
- return fmt.Errorf("permission denied: cannot assign role '%s' (exceeds your permissions)", roleName)
- }
- }
- }
- // Use RegisterUserSync
- return s.AuthManager.RegisterUser(username, password, roles)
- }
- // DeleteUser deletes a user (Delegated Admin)
- func (s *KVServer) DeleteUser(username string, token string) error {
- if s.AuthManager.IsEnabled() {
- // 1. Permission Check: Must have write permission on system.user.<username>
- targetKey := AuthUserPrefix + username
- if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
- if !s.IsAdmin(token) {
- return fmt.Errorf("permission denied: cannot delete user '%s'", username)
- }
- }
- // 2. Verify Delegation: Can I delete this user?
- // I can only delete a user if I *could* have created them (i.e. I dominate their permissions).
- targetUser, err := s.AuthManager.GetUser(username)
- if err != nil {
- return err
- }
-
- if !s.AuthManager.IsSubset(token, targetUser.EffectivePermissions) {
- return fmt.Errorf("permission denied: cannot delete user '%s' (has permissions exceeding yours)", username)
- }
- }
-
- // Check if user exists
- if _, err := s.AuthManager.GetUser(username); err != nil {
- return err
- }
- if username == "root" {
- return fmt.Errorf("cannot delete root user")
- }
- // Use DelSync
- return s.DelSync(AuthUserPrefix + username)
- }
- // UpdateUser updates generic user fields (Delegated Admin)
- func (s *KVServer) UpdateUser(user User, token string) error {
- if s.AuthManager.IsEnabled() {
- // 1. Permission Check
- targetKey := AuthUserPrefix + user.Username
- if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
- if !s.IsAdmin(token) {
- return fmt.Errorf("permission denied: cannot modify user '%s'", user.Username)
- }
- }
-
- // 2. Check if I can modify this user (Target Check)
- oldUser, err := s.AuthManager.GetUser(user.Username)
- if err != nil {
- return err
- }
- if !s.AuthManager.IsSubset(token, oldUser.EffectivePermissions) {
- return fmt.Errorf("permission denied: cannot modify user '%s' (has permissions exceeding yours)", user.Username)
- }
-
- // 3. Check if the NEW state is valid (Delegation Check)
- for _, roleName := range user.Roles {
- rolePerms, err := s.AuthManager.GetRoleEffectivePermissions(roleName)
- if err != nil {
- return err
- }
- if !s.AuthManager.IsSubset(token, rolePerms) {
- return fmt.Errorf("permission denied: cannot assign role '%s'", roleName)
- }
- }
- // Also check specific AllowPermissions if updated
- if !s.AuthManager.IsSubset(token, user.AllowPermissions) {
- return fmt.Errorf("permission denied: cannot assign specific permissions")
- }
- }
-
- // Check if user exists
- if _, err := s.AuthManager.GetUser(user.Username); err != nil {
- return err
- }
- return s.AuthManager.UpdateUser(user)
- }
- // ChangeUserPassword changes a user's password (Admin or Self)
- func (s *KVServer) ChangeUserPassword(username, newPassword string, token string) error {
- if s.AuthManager.IsEnabled() {
- session, err := s.AuthManager.GetSession(token)
- if err != nil {
- return err
- }
-
- // Allow if Self
- if session.Username == username {
- // OK
- } else {
- // Or has write permission on target user
- targetKey := AuthUserPrefix + username
- if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
- if !s.IsAdmin(token) {
- return fmt.Errorf("permission denied")
- }
- }
- }
- }
- // Use ChangePassword (which should use SetSync)
- return s.AuthManager.ChangePassword(username, newPassword)
- }
- // SetUserMFA updates a user's MFA settings (Admin or Self)
- func (s *KVServer) SetUserMFA(username string, secret string, enabled bool, token string) error {
- if s.AuthManager.IsEnabled() {
- session, err := s.AuthManager.GetSession(token)
- if err != nil {
- return err
- }
- // Allow if Self
- if session.Username == username {
- // OK
- } else {
- // Or has write permission on target user
- targetKey := AuthUserPrefix + username
- if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
- if !s.IsAdmin(token) {
- return fmt.Errorf("permission denied: cannot modify user '%s'", username)
- }
- }
- }
- }
- user, err := s.AuthManager.GetUser(username)
- if err != nil {
- return err
- }
- // Create a copy to modify
- newUser := *user
- newUser.MFASecret = secret
- newUser.MFAEnabled = enabled
-
- // Use underlying UpdateUser (which syncs to Raft)
- return s.AuthManager.UpdateUser(newUser)
- }
- // Role Management Helpers
- // CreateRole creates a new role (Delegated Admin)
- func (s *KVServer) CreateRole(name string, token string) error {
- if s.AuthManager.IsEnabled() {
- // Permission Check: Write on system.role.<name>
- targetKey := AuthRolePrefix + name
- if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
- if !s.IsAdmin(token) {
- return fmt.Errorf("permission denied: cannot create role '%s'", name)
- }
- }
- }
- // Creating an empty role is safe?
- // Yes, permissions are added later.
- return s.AuthManager.CreateRole(name)
- }
- // DeleteRole deletes a role (Delegated Admin)
- func (s *KVServer) DeleteRole(name string, token string) error {
- if s.AuthManager.IsEnabled() {
- // Permission Check
- targetKey := AuthRolePrefix + name
- if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
- if !s.IsAdmin(token) {
- return fmt.Errorf("permission denied: cannot delete role '%s'", name)
- }
- }
- // Verify: Can I delete this role?
- // I must dominate the role's permissions.
- rolePerms, err := s.AuthManager.GetRoleEffectivePermissions(name)
- if err != nil {
- return err
- }
- if !s.AuthManager.IsSubset(token, rolePerms) {
- return fmt.Errorf("permission denied: cannot delete role '%s' (exceeds your permissions)", name)
- }
- }
- return s.AuthManager.DeleteRole(name)
- }
- // UpdateRole updates a role (Delegated Admin)
- func (s *KVServer) UpdateRole(role Role, token string) error {
- if s.AuthManager.IsEnabled() {
- // Permission Check
- targetKey := AuthRolePrefix + role.Name
- if err := s.AuthManager.CheckPermission(token, targetKey, ActionWrite, ""); err != nil {
- if !s.IsAdmin(token) {
- return fmt.Errorf("permission denied: cannot modify role '%s'", role.Name)
- }
- }
-
- // 1. Check Old State (Can I modify it?)
- oldPerms, err := s.AuthManager.GetRoleEffectivePermissions(role.Name)
- if err != nil {
- return err
- }
- if !s.AuthManager.IsSubset(token, oldPerms) {
- return fmt.Errorf("permission denied: cannot modify role '%s' (current permissions exceed yours)", role.Name)
- }
-
- // 2. Check New State (Can I assign these new permissions?)
- // Check Parents
- for _, p := range role.ParentRoles {
- pPerms, err := s.AuthManager.GetRoleEffectivePermissions(p)
- if err != nil {
- return err
- }
- if !s.AuthManager.IsSubset(token, pPerms) {
- return fmt.Errorf("permission denied: cannot inherit from role '%s'", p)
- }
- }
- // Check Permissions
- if !s.AuthManager.IsSubset(token, role.Permissions) {
- return fmt.Errorf("permission denied: cannot assign specified permissions")
- }
- }
- return s.AuthManager.UpdateRole(role)
- }
- // ListUsers lists all users (Admin only)
- func (s *KVServer) ListUsers(token string) ([]*User, error) {
- if s.AuthManager.IsEnabled() {
- // Check for general read permission on users
- // Ideally checking system.user.*
- if err := s.AuthManager.CheckPermission(token, AuthUserPrefix + "*", ActionRead, ""); err != nil {
- if !s.IsAdmin(token) {
- return nil, fmt.Errorf("permission denied: admin access required")
- }
- }
- }
- return s.AuthManager.ListUsers(), nil
- }
- // ListRoles lists all roles (Admin only)
- func (s *KVServer) ListRoles(token string) ([]*Role, error) {
- if s.AuthManager.IsEnabled() {
- // Check for general read permission on roles
- if err := s.AuthManager.CheckPermission(token, AuthRolePrefix + "*", ActionRead, ""); err != nil {
- if !s.IsAdmin(token) {
- return nil, fmt.Errorf("permission denied: admin access required")
- }
- }
- }
- return s.AuthManager.ListRoles(), nil
- }
- // SearchAuthenticated searches keys with permission checks
- func (s *KVServer) SearchAuthenticated(pattern string, limit, offset int, token string) ([]db.QueryResult, error) {
- // Optimization: If user has full access (*), delegate to DB with limit/offset
- if s.AuthManager.HasFullAccess(token) {
- sql := fmt.Sprintf("key like \"%s\" LIMIT %d OFFSET %d", pattern, limit, offset)
- return s.DB.Query(sql)
- }
- // Slow path: Fetch all potential matches and filter
- // We construct a SQL query that retrieves CANDIDATES.
- // We cannot safely apply LIMIT/OFFSET in SQL because we might filter out some results.
- // So we must fetch ALL matches.
- // WARNING: This can be slow and memory intensive.
-
- // If pattern is wildcard, we might fetch everything.
- sql := fmt.Sprintf("key like \"%s\"", pattern)
- results, err := s.DB.Query(sql)
- if err != nil {
- return nil, err
- }
- filtered := make([]db.QueryResult, 0, len(results))
-
- // Apply Filtering
- for _, r := range results {
- if err := s.AuthManager.CheckPermission(token, r.Key, ActionRead, ""); err == nil {
- filtered = append(filtered, r)
- }
- }
- // Apply Pagination on Filtered Results
- if offset > len(filtered) {
- return []db.QueryResult{}, nil
- }
-
- start := offset
- end := offset + limit
- if end > len(filtered) {
- end = len(filtered)
- }
-
- return filtered[start:end], nil
- }
- // CountAuthenticated counts keys with permission checks
- func (s *KVServer) CountAuthenticated(pattern string, token string) (int, error) {
- // Optimization: If user has full access
- if s.AuthManager.HasFullAccess(token) {
- sql := ""
- if pattern == "*" {
- sql = "*"
- } else {
- sql = fmt.Sprintf("key like \"%s\"", pattern)
- }
- return s.DB.Count(sql)
- }
- // Slow path: Iterate and check
- // We use DB.Query to get keys (Query returns values too, which is wasteful but DB engine doesn't expose keys-only Query yet)
- // Actually, we can access s.DB.Index.WalkPrefix if we want to be faster and avoid value read,
- // but s.DB.Index is inside 'db' package. We can access it if it's exported.
- // 'db' package exports 'Engine' and 'FlatIndex'.
- // So s.DB.Index IS accessible.
-
- count := 0
-
- // Determine prefix from pattern
- prefix := ""
- if strings.HasSuffix(pattern, "*") {
- prefix = strings.TrimSuffix(pattern, "*")
- } else if pattern == "*" {
- prefix = ""
- } else {
- // Exact match check
- if err := s.AuthManager.CheckPermission(token, pattern, ActionRead, ""); err == nil {
- // Check if exists
- if _, ok := s.DB.Get(pattern); ok {
- return 1, nil
- }
- return 0, nil
- }
- return 0, nil // No perm or not found
- }
- // Walk
- // Note: WalkPrefix locks the DB Index (Read Lock).
- // Calling CheckPermission inside might involve some logic but it is memory-only and usually fast.
- // However, if CheckPermission takes time, we hold DB lock.
- s.DB.Index.WalkPrefix(prefix, func(key string, entry db.IndexEntry) bool {
- // Check pattern match first (WalkPrefix is just prefix, pattern might be more complex like "user.*.name")
- if !db.WildcardMatch(key, pattern) {
- return true
- }
-
- if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err == nil {
- count++
- }
- return true
- })
- return count, nil
- }
- // GetLinear gets a value with linearizable consistency
- // This ensures the read sees all writes committed before the read started
- func (s *KVServer) GetLinear(key string) (string, bool, error) {
- // First, ensure we have up-to-date data via ReadIndex
- _, err := s.Raft.ReadIndex()
- if err != nil {
- // If we're not leader, try forwarding
- if errors.Is(err, ErrNotLeader) {
- return s.forwardGet(key)
- }
- return "", false, err
- }
- val, ok := s.DB.Get(key)
- return val, ok, nil
- }
- // forwardGet forwards a get request to the leader
- func (s *KVServer) forwardGet(key string) (string, bool, error) {
- return s.Raft.ForwardGet(key)
- }
- // Join joins an existing cluster
- func (s *KVServer) Join(nodeID, addr string) error {
- return s.Raft.AddNodeWithForward(nodeID, addr)
- }
- // Leave leaves the cluster
- func (s *KVServer) Leave(nodeID string) error {
- // Mark node as leaving to prevent auto-rejoin
- s.leavingNodes.Store(nodeID, time.Now())
-
- // Auto-expire the leaving flag after a while
- go func() {
- time.Sleep(30 * time.Second)
- s.leavingNodes.Delete(nodeID)
- }()
- // Remove from RaftNode discovery key first to prevent auto-rejoin
- if err := s.removeNodeFromDiscovery(nodeID); err != nil {
- s.Raft.config.Logger.Warn("Failed to remove node from discovery key: %v", err)
- // Continue anyway, as the main goal is to leave the cluster
- }
- return s.Raft.RemoveNodeWithForward(nodeID)
- }
- // removeNodeFromDiscovery removes a node from the RaftNode key to prevent auto-rejoin
- func (s *KVServer) removeNodeFromDiscovery(targetID string) error {
- val, ok := s.Get("RaftNode")
- if !ok || val == "" {
- return nil
- }
- parts := strings.Split(val, ";")
- var newParts []string
- changed := false
- for _, part := range parts {
- if part == "" {
- continue
- }
- kv := strings.SplitN(part, "=", 2)
- if len(kv) == 2 {
- if kv[0] == targetID {
- changed = true
- continue // Skip this node
- }
- newParts = append(newParts, part)
- }
- }
- if changed {
- newVal := strings.Join(newParts, ";")
- return s.Set("RaftNode", newVal)
- }
- return nil
- }
- // WaitForLeader waits until a leader is elected
- func (s *KVServer) WaitForLeader(timeout time.Duration) error {
- deadline := time.Now().Add(timeout)
- for time.Now().Before(deadline) {
- leader := s.Raft.GetLeaderID()
- if leader != "" {
- return nil
- }
- time.Sleep(100 * time.Millisecond)
- }
- return fmt.Errorf("timeout waiting for leader")
- }
- // HealthCheck returns the health status of this server
- func (s *KVServer) HealthCheck() HealthStatus {
- return s.Raft.HealthCheck()
- }
- // GetStats returns runtime statistics
- func (s *KVServer) GetStats() Stats {
- return s.Raft.GetStats()
- }
- // GetMetrics returns runtime metrics
- func (s *KVServer) GetMetrics() Metrics {
- return s.Raft.GetMetrics()
- }
- // TransferLeadership transfers leadership to the specified node
- func (s *KVServer) TransferLeadership(targetID string) error {
- return s.Raft.TransferLeadership(targetID)
- }
- // GetClusterNodes returns current cluster membership
- func (s *KVServer) GetClusterNodes() map[string]string {
- return s.Raft.GetClusterNodes()
- }
- // IsLeader returns true if this node is the leader
- func (s *KVServer) IsLeader() bool {
- _, isLeader := s.Raft.GetState()
- return isLeader
- }
- // GetLeaderID returns the current leader ID
- func (s *KVServer) GetLeaderID() string {
- return s.Raft.GetLeaderID()
- }
- // GetLogSize returns the raft log size
- func (s *KVServer) GetLogSize() int64 {
- return s.Raft.log.GetLogSize()
- }
- // GetDBSize returns the db size
- func (s *KVServer) GetDBSize() int64 {
- return s.DB.GetDBSize()
- }
- // WatchURL registers a webhook url for a key
- func (s *KVServer) WatchURL(key, url string) {
- s.Watcher.Subscribe(key, url)
- }
- // UnwatchURL removes a webhook url for a key
- func (s *KVServer) UnwatchURL(key, url string) {
- s.Watcher.Unsubscribe(key, url)
- }
- // WatchAll registers a watcher for all keys
- func (s *KVServer) WatchAll(handler WatchHandler) {
- // s.FSM.WatchAll(handler)
- // TODO: Implement Watcher for DB
- }
- // Watch registers a watcher for a key
- func (s *KVServer) Watch(key string, handler WatchHandler) {
- // s.FSM.Watch(key, handler)
- // TODO: Implement Watcher for DB
- }
- // Unwatch removes watchers for a key
- func (s *KVServer) Unwatch(key string) {
- // s.FSM.Unwatch(key)
- // TODO: Implement Watcher for DB
- }
- func (s *KVServer) maintenanceLoop() {
- defer s.wg.Done()
- // Check every 1 second for faster reaction
- ticker := time.NewTicker(1 * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-s.stopCh:
- return
- case <-ticker.C:
- s.updateNodeInfo()
- s.checkConnections()
- }
- }
- }
- func (s *KVServer) updateNodeInfo() {
- // 1. Ensure "CreateNode/<NodeID>" is set to self address
- // We do this via Propose (Set) so it's replicated
- myID := s.Raft.config.NodeID
- myAddr := s.Raft.config.ListenAddr
- key := fmt.Sprintf("CreateNode/%s", myID)
- // Check if we need to update (avoid spamming logs/proposals)
- val, exists := s.Get(key)
- if !exists || val != myAddr {
- // Run in goroutine to avoid blocking
- go func() {
- if err := s.Set(key, myAddr); err != nil {
- s.Raft.config.Logger.Debug("Failed to update node info: %v", err)
- }
- }()
- }
- // 2. Only leader updates RaftNode aggregation
- if s.IsLeader() {
- // Read current RaftNode to preserve history
- currentVal, _ := s.Get("RaftNode")
-
- knownNodes := make(map[string]string)
- if currentVal != "" {
- parts := strings.Split(currentVal, ";")
- for _, part := range parts {
- if part == "" { continue }
- kv := strings.SplitN(part, "=", 2)
- if len(kv) == 2 {
- knownNodes[kv[0]] = kv[1]
- }
- }
- }
- // Merge current cluster nodes
- changed := false
- currentCluster := s.GetClusterNodes()
- for id, addr := range currentCluster {
- // Skip nodes that are marked as leaving
- if _, leaving := s.leavingNodes.Load(id); leaving {
- continue
- }
- if knownNodes[id] != addr {
- knownNodes[id] = addr
- changed = true
- }
- }
- // If changed, update RaftNode
- if changed {
- var peers []string
- for id, addr := range knownNodes {
- peers = append(peers, fmt.Sprintf("%s=%s", id, addr))
- }
- sort.Strings(peers)
- newVal := strings.Join(peers, ";")
- // Check again if we need to write to avoid loops if Get returned stale
- if newVal != currentVal {
- go func(k, v string) {
- if err := s.Set(k, v); err != nil {
- s.Raft.config.Logger.Warn("Failed to update RaftNode key: %v", err)
- }
- }("RaftNode", newVal)
- }
- }
- }
- }
- func (s *KVServer) checkConnections() {
- if !s.IsLeader() {
- return
- }
- // Read RaftNode key to find potential members that are missing
- val, ok := s.Get("RaftNode")
- if !ok || val == "" {
- return
- }
- // Parse saved nodes
- savedParts := strings.Split(val, ";")
- currentNodes := s.GetClusterNodes()
- // Invert currentNodes for address check
- currentAddrs := make(map[string]bool)
- for _, addr := range currentNodes {
- currentAddrs[addr] = true
- }
- for _, part := range savedParts {
- if part == "" {
- continue
- }
- // Expect id=addr
- kv := strings.SplitN(part, "=", 2)
- if len(kv) != 2 {
- continue
- }
- id, addr := kv[0], kv[1]
- // Skip invalid addresses
- if strings.HasPrefix(addr, ".") || !strings.Contains(addr, ":") {
- continue
- }
- if !currentAddrs[addr] {
- // Skip nodes that are marked as leaving
- if _, leaving := s.leavingNodes.Load(id); leaving {
- continue
- }
- // Found a node that was previously in the cluster but is now missing
- // Try to add it back
- // We use AddNodeWithForward which handles non-blocking internally somewhat,
- // but we should run this in goroutine to not block the loop
- go func(nodeID, nodeAddr string) {
- // Try to add node
- s.Raft.config.Logger.Info("Auto-rejoining node found in RaftNode: %s (%s)", nodeID, nodeAddr)
- if err := s.Join(nodeID, nodeAddr); err != nil {
- s.Raft.config.Logger.Debug("Failed to auto-rejoin node %s: %v", nodeID, err)
- }
- }(id, addr)
- }
- }
- }
- // startHTTPServer starts the HTTP API server
- func (s *KVServer) startHTTPServer(addr string) error {
- mux := http.NewServeMux()
- // KV API
- mux.HandleFunc("/kv", func(w http.ResponseWriter, r *http.Request) {
- token := r.Header.Get("X-Raft-Token")
-
- switch r.Method {
- case http.MethodGet:
- key := r.URL.Query().Get("key")
- if key == "" {
- http.Error(w, "missing key", http.StatusBadRequest)
- return
- }
- // Use Authenticated method
- val, found, err := s.GetLinearAuthenticated(key, token)
- if err != nil {
- // Distinguish auth error vs raft error?
- if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
- http.Error(w, err.Error(), http.StatusForbidden)
- } else {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- }
- return
- }
- if !found {
- http.Error(w, "not found", http.StatusNotFound)
- return
- }
- w.Write([]byte(val))
- case http.MethodPost:
- body, _ := io.ReadAll(r.Body)
- var req struct {
- Key string `json:"key"`
- Value string `json:"value"`
- }
- if err := json.Unmarshal(body, &req); err != nil {
- http.Error(w, "invalid json", http.StatusBadRequest)
- return
- }
- if err := s.SetAuthenticated(req.Key, req.Value, token); err != nil {
- if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
- http.Error(w, err.Error(), http.StatusForbidden)
- } else {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- }
- return
- }
- w.WriteHeader(http.StatusOK)
- case http.MethodDelete:
- key := r.URL.Query().Get("key")
- if key == "" {
- http.Error(w, "missing key", http.StatusBadRequest)
- return
- }
- if err := s.DelAuthenticated(key, token); err != nil {
- if strings.Contains(err.Error(), "permission") || strings.Contains(err.Error(), "unauthorized") {
- http.Error(w, err.Error(), http.StatusForbidden)
- } else {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- }
- return
- }
- w.WriteHeader(http.StatusOK)
-
- default:
- http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
- }
- })
- // Auth API
- mux.HandleFunc("/auth/login", func(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodPost {
- http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
- return
- }
- var req struct {
- Username string `json:"username"`
- Password string `json:"password"`
- Code string `json:"code"`
- }
- body, _ := io.ReadAll(r.Body)
- if err := json.Unmarshal(body, &req); err != nil {
- http.Error(w, "invalid json", http.StatusBadRequest)
- return
- }
-
- ip := r.RemoteAddr
- if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil {
- ip = host
- }
-
- token, err := s.AuthManager.Login(req.Username, req.Password, req.Code, ip)
- if err != nil {
- http.Error(w, err.Error(), http.StatusUnauthorized)
- return
- }
-
- resp := struct {
- Token string `json:"token"`
- }{Token: token}
-
- json.NewEncoder(w).Encode(resp)
- })
- // Watcher API
- mux.HandleFunc("/watch", func(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodPost {
- http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
- return
- }
- body, _ := io.ReadAll(r.Body)
- var req struct {
- Key string `json:"key"`
- URL string `json:"url"`
- }
- if err := json.Unmarshal(body, &req); err != nil {
- http.Error(w, "invalid json", http.StatusBadRequest)
- return
- }
- if req.Key == "" || req.URL == "" {
- http.Error(w, "missing key or url", http.StatusBadRequest)
- return
- }
- s.WatchURL(req.Key, req.URL)
- w.WriteHeader(http.StatusOK)
- })
- mux.HandleFunc("/unwatch", func(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodPost {
- http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
- return
- }
- body, _ := io.ReadAll(r.Body)
- var req struct {
- Key string `json:"key"`
- URL string `json:"url"`
- }
- if err := json.Unmarshal(body, &req); err != nil {
- http.Error(w, "invalid json", http.StatusBadRequest)
- return
- }
- s.UnwatchURL(req.Key, req.URL)
- w.WriteHeader(http.StatusOK)
- })
- s.httpServer = &http.Server{
- Addr: addr,
- Handler: mux,
- }
- go func() {
- s.Raft.config.Logger.Info("HTTP API server listening on %s", addr)
- if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
- s.Raft.config.Logger.Error("HTTP server failed: %v", err)
- }
- }()
- return nil
- }
|