robert 1 долоо хоног өмнө
parent
commit
ed360f22ef
4 өөрчлөгдсөн 707 нэмэгдсэн , 113 устгасан
  1. 143 0
      auth.go
  2. 359 103
      cli.go
  3. BIN
      raft_node
  4. 205 10
      server.go

+ 143 - 0
auth.go

@@ -199,6 +199,9 @@ func (am *AuthManager) UpdateCache(key, value string, isDelete bool) {
 			var user User
 			if err := json.Unmarshal([]byte(value), &user); err == nil {
 				am.users[username] = &user
+				am.server.Raft.config.Logger.Info("Updated user cache for %s (hash prefix: %s)", username, user.PasswordHash[:8])
+			} else {
+				am.server.Raft.config.Logger.Error("Failed to unmarshal user %s: %v", username, err)
 			}
 		}
 		return
@@ -526,6 +529,146 @@ func (am *AuthManager) GetSession(token string) (*Session, error) {
 	return session, nil
 }
 
+// GetUser returns user info (Public for internal use)
+func (am *AuthManager) GetUser(username string) (*User, error) {
+	am.mu.RLock()
+	defer am.mu.RUnlock()
+	user, ok := am.users[username]
+	if !ok {
+		return nil, fmt.Errorf("user not found")
+	}
+	return user, nil
+}
+
+// RegisterUser creates a new user
+func (am *AuthManager) RegisterUser(username, password string, roles []string) error {
+	am.mu.RLock()
+	if _, exists := am.users[username]; exists {
+		am.mu.RUnlock()
+		return fmt.Errorf("user already exists")
+	}
+	am.mu.RUnlock()
+
+	salt := fmt.Sprintf("%d", time.Now().UnixNano()) // Simple salt
+	user := User{
+		Username:     username,
+		Salt:         salt,
+		PasswordHash: hashPassword(password, salt),
+		Roles:        roles,
+	}
+	data, _ := json.Marshal(user)
+	// Use SetSync to ensure user is created before returning
+	return am.server.SetSync(AuthUserPrefix+username, string(data))
+}
+
+// ChangePassword updates a user's password
+func (am *AuthManager) ChangePassword(username, newPassword string) error {
+	am.mu.RLock()
+	user, exists := am.users[username]
+	am.mu.RUnlock()
+	
+	if !exists {
+		return fmt.Errorf("user not found")
+	}
+	
+	newUser := *user // Shallow copy to modify
+	newUser.Salt = fmt.Sprintf("%d", time.Now().UnixNano())
+	newUser.PasswordHash = hashPassword(newPassword, newUser.Salt)
+	
+	data, _ := json.Marshal(newUser)
+	// Use SetSync to ensure password is updated before returning
+	return am.server.SetSync(AuthUserPrefix+username, string(data))
+}
+
+// UpdateUser updates generic fields of a user (including roles)
+func (am *AuthManager) UpdateUser(user User) error {
+	am.mu.RLock()
+	_, exists := am.users[user.Username]
+	am.mu.RUnlock()
+	
+	if !exists {
+		return fmt.Errorf("user not found")
+	}
+	
+	// Ensure we don't overwrite passwordhash/salt blindly if they are empty
+	// But usually UpdateUser is called with a fully populated user object from GetUser + mods
+	// So we just save it.
+	
+	data, _ := json.Marshal(user)
+	return am.server.SetSync(AuthUserPrefix+user.Username, string(data))
+}
+
+// CreateRole creates a new role
+func (am *AuthManager) CreateRole(name string) error {
+	am.mu.RLock()
+	if _, exists := am.roles[name]; exists {
+		am.mu.RUnlock()
+		return fmt.Errorf("role already exists")
+	}
+	am.mu.RUnlock()
+
+	role := Role{Name: name}
+	data, _ := json.Marshal(role)
+	return am.server.SetSync(AuthRolePrefix+name, string(data))
+}
+
+// DeleteRole deletes a role
+func (am *AuthManager) DeleteRole(name string) error {
+	am.mu.RLock()
+	if _, exists := am.roles[name]; !exists {
+		am.mu.RUnlock()
+		return fmt.Errorf("role not found")
+	}
+	am.mu.RUnlock()
+	return am.server.DelSync(AuthRolePrefix + name)
+}
+
+// UpdateRole updates an existing role
+func (am *AuthManager) UpdateRole(role Role) error {
+	am.mu.RLock()
+	if _, exists := am.roles[role.Name]; !exists {
+		am.mu.RUnlock()
+		return fmt.Errorf("role not found")
+	}
+	am.mu.RUnlock()
+	
+	data, _ := json.Marshal(role)
+	return am.server.SetSync(AuthRolePrefix+role.Name, string(data))
+}
+
+// GetRole returns a role definition
+func (am *AuthManager) GetRole(name string) (*Role, error) {
+	am.mu.RLock()
+	defer am.mu.RUnlock()
+	role, ok := am.roles[name]
+	if !ok {
+		return nil, fmt.Errorf("role not found")
+	}
+	return role, nil
+}
+
+// ListUsers lists all users
+func (am *AuthManager) ListUsers() []*User {
+	am.mu.RLock()
+	defer am.mu.RUnlock()
+	users := make([]*User, 0, len(am.users))
+	for _, u := range am.users {
+		users = append(users, u)
+	}
+	return users
+}
+
+// ListRoles lists all roles
+func (am *AuthManager) ListRoles() []*Role {
+	am.mu.RLock()
+	defer am.mu.RUnlock()
+	roles := make([]*Role, 0, len(am.roles))
+	for _, r := range am.roles {
+		roles = append(roles, r)
+	}
+	return roles
+}
+
 // HasFullAccess checks if the user has "*" permission on all keys (superuser)
 // allowing optimizations like pushing limits to DB engine.
 func (am *AuthManager) HasFullAccess(token string) bool {

+ 359 - 103
cli.go

@@ -78,7 +78,7 @@ func printBoxed(content string) {
 
 	// Add padding
 	contentWidth := maxWidth + 2 
-
+	
 	fmt.Println() // Start new line before box
 	// Top Border
 	fmt.Printf("%s╭%s╮%s\n", ColorDim, strings.Repeat("─", contentWidth), ColorReset)
@@ -340,121 +340,377 @@ func (c *CLI) registerDefaultCommands() {
 		}
 	})
 
-	c.RegisterCommand("login", "Login to system (login <user> [code])", func(parts []string, server *KVServer) {
+	// User Command Group
+	c.RegisterCommand("user", "User management commands (type 'user help' for more)", func(parts []string, server *KVServer) {
 		if len(parts) < 2 {
-			printBoxed("Usage: login <username> [mfa_code]")
+			printBoxed("Usage: user <subcommand> [args...]\nType 'user help' for list of subcommands.")
 			return
 		}
-		username := parts[1]
-		code := ""
-		if len(parts) > 2 {
-			code = parts[2]
-		}
-
-		// Prompt for password
-		fmt.Print("Password: ")
-		password, err := readPassword()
-		fmt.Println() // Newline after input
-
-		if err != nil {
-			printBoxed(fmt.Sprintf("Error reading password: %v", err))
-			return
-		}
-
-		token, err := server.AuthManager.Login(username, password, code, "cli")
-		if err != nil {
-			printBoxed(fmt.Sprintf("%sLogin Failed:%s %v", ColorRed, ColorReset, err))
-		} else {
-			c.mu.Lock()
-			c.token = token
-			c.mu.Unlock()
-			printBoxed(fmt.Sprintf("%sLogin Success! Token saved.%s", ColorGreen, ColorReset))
-		}
-	})
-
-	c.RegisterCommand("logout", "Logout from system", func(parts []string, server *KVServer) {
+		
+		subCmd := strings.ToLower(parts[1])
+		args := parts[2:]
+		
+		// Common vars
 		c.mu.RLock()
 		token := c.token
 		c.mu.RUnlock()
-		
-		if token != "" {
-			if err := server.Logout(token); err != nil {
-				printBoxed(fmt.Sprintf("%sError logging out:%s %v", ColorRed, ColorReset, err))
+
+		switch subCmd {
+		case "init": // Was auth-init
+			if server.AuthManager.IsEnabled() && !server.IsRoot(token) {
+				printBoxed(fmt.Sprintf("%sPermission Denied: Auth already enabled. Login as root to re-init.%s", ColorRed, ColorReset))
+				return
+			}
+			fmt.Print("Enter root password: ")
+			password, err := readPassword()
+			fmt.Println()
+			if err != nil || password == "" {
+				printBoxed("Error reading password")
+				return
+			}
+			if err := server.AuthManager.CreateRootUser(password); err != nil {
+				printBoxed(fmt.Sprintf("Failed to create root user: %v", err))
+				return
+			}
+			if err := server.AuthManager.EnableAuth(); err != nil {
+				printBoxed(fmt.Sprintf("Failed to enable auth: %v", err))
+				return
+			}
+			printBoxed("Auth Initialized. Please login as 'root'.")
+
+		case "login": // Was login
+			if len(args) < 1 {
+				printBoxed("Usage: user login <username> [code]")
+				return
+			}
+			username := args[0]
+			code := ""
+			if len(args) > 1 {
+				code = args[1]
+			}
+			fmt.Print("Password: ")
+			password, err := readPassword()
+			fmt.Println()
+			if err != nil {
+				printBoxed("Error reading password")
+				return
+			}
+			t, err := server.AuthManager.Login(username, password, code, "cli")
+			if err != nil {
+				printBoxed(fmt.Sprintf("%sLogin Failed:%s %v", ColorRed, ColorReset, err))
 			} else {
+				c.mu.Lock()
+				c.token = t
+				c.mu.Unlock()
+				printBoxed(fmt.Sprintf("%sLogin Success!%s", ColorGreen, ColorReset))
+			}
+
+		case "logout": // Was logout
+			if token != "" {
+				server.Logout(token)
+				c.mu.Lock()
+				c.token = ""
+				c.mu.Unlock()
 				printBoxed("Logged out.")
+			} else {
+				printBoxed("Not logged in.")
 			}
-		} else {
-			printBoxed("Not logged in.")
-		}
-		
-		c.mu.Lock()
-		c.token = ""
-		c.mu.Unlock()
-	})
 
-	c.RegisterCommand("auth-init", "Initialize Auth System (auth-init)", func(parts []string, server *KVServer) {
-		if len(parts) > 1 {
-			printBoxed("Usage: auth-init (prompts for password securely)")
-			return
-		}
-		
-		// If auth enabled, require root
-		c.mu.RLock()
-		token := c.token
-		c.mu.RUnlock()
+		case "who": // Was whoami
+			if token == "" {
+				printBoxed("Not logged in.")
+				return
+			}
+			session, err := server.GetSessionInfo(token)
+			if err != nil {
+				printBoxed(fmt.Sprintf("Error: %v", err))
+				return
+			}
+			printBoxed(fmt.Sprintf("Logged in as: %s%s%s", ColorCyan, session.Username, ColorReset))
 
-		if server.AuthManager.IsEnabled() && !server.IsRoot(token) {
-			printBoxed(fmt.Sprintf("%sPermission Denied: Auth already enabled. Login as root to re-init.%s", ColorRed, ColorReset))
-			return
-		}
+		case "add": // Was user-add
+			if len(args) < 1 {
+				printBoxed("Usage: user add <username> [role1,role2...]")
+				return
+			}
+			username := args[0]
+			var roles []string
+			if len(args) > 1 {
+				roles = strings.Split(args[1], ",")
+			}
+			fmt.Printf("Enter password for %s: ", username)
+			password, err := readPassword()
+			fmt.Println()
+			if err != nil || password == "" {
+				printBoxed("Error reading password")
+				return
+			}
+			if err := server.CreateUser(username, password, roles, token); err != nil {
+				printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+			} else {
+				printBoxed(fmt.Sprintf("%sUser %s created%s", ColorGreen, username, ColorReset))
+			}
 
-		// Prompt for password
-		fmt.Print("Enter root password: ")
-		password, err := readPassword()
-		fmt.Println() // Newline after input
-		
-		if err != nil {
-			printBoxed(fmt.Sprintf("Error reading password: %v", err))
-			return
-		}
-		
-		if password == "" {
-			printBoxed("Error: Password cannot be empty")
-			return
-		}
-		
-		// 1. Create Root User
-		if err := server.AuthManager.CreateRootUser(password); err != nil {
-			printBoxed(fmt.Sprintf("Failed to create root user: %v", err))
-			return
-		}
-		
-		// 2. Enable Auth
-		if err := server.AuthManager.EnableAuth(); err != nil {
-			printBoxed(fmt.Sprintf("Failed to enable auth: %v", err))
-			return
-		}
-		
-		printBoxed("Auth Initialized. Please login as 'root'.")
-	})
-	
-	c.RegisterCommand("whoami", "Show current user info", func(parts []string, server *KVServer) {
-		c.mu.RLock()
-		token := c.token
-		c.mu.RUnlock()
-		
-		if token == "" {
-			printBoxed("Not logged in.")
-			return
-		}
-		
-		session, err := server.GetSessionInfo(token)
-		if err != nil {
-			printBoxed(fmt.Sprintf("Error: %v (Session might have expired)", err))
-			return
+		case "del": // Was user-del
+			if len(args) < 1 {
+				printBoxed("Usage: user del <username>")
+				return
+			}
+			username := args[0]
+			fmt.Printf("Delete user '%s'? (y/N): ", username)
+			var resp string
+			fmt.Scanln(&resp)
+			if strings.ToLower(resp) != "y" {
+				return
+			}
+			if err := server.DeleteUser(username, token); err != nil {
+				printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+			} else {
+				printBoxed(fmt.Sprintf("%sUser deleted%s", ColorGreen, ColorReset))
+			}
+
+		case "list":
+			users, err := server.ListUsers(token)
+			if err != nil {
+				printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+				return
+			}
+			var buf bytes.Buffer
+			w := tabwriter.NewWriter(&buf, 0, 0, 2, ' ', 0)
+			fmt.Fprintln(w, "Username\tRoles\tStatus")
+			fmt.Fprintln(w, "--------\t-----\t------")
+			for _, u := range users {
+				status := "Offline"
+				if u.IsOnline { status = "Online" }
+				fmt.Fprintf(w, "%s\t%s\t%s\n", u.Username, strings.Join(u.Roles, ","), status)
+			}
+			w.Flush()
+			printBoxed(buf.String())
+
+		case "update":
+			// user update <username> [roles=r1,r2]
+			// Currently only password update via prompt, maybe roles later
+			// The prompt implies a generic update but also mentions "user update"
+			// Let's implement password update here or allow changing roles.
+			// Ideally arguments: user update <username> roles=admin,dev
+			// If no args (other than username), prompt for password?
+			if len(args) < 1 {
+				printBoxed("Usage: user update <username> [roles=r1,r2...]")
+				return
+			}
+			username := args[0]
+			
+			// If args present, update roles
+			if len(args) > 1 {
+				// Parse options
+				updated := false
+				// user, err := server.AuthManager.GetUser(username) // Use internal GetUser helper or logic?
+				// We need authenticated way. server.ListUsers gives us access if admin.
+				// But we need the User object to modify.
+				// Let's rely on server.UpdateUser taking a struct.
+				// We first need to GET the user to avoid overwriting fields.
+				// Since we don't have GetUser exposed in server (only List), let's add one or iterate List.
+				// Iterating List is inefficient but works for now.
+				users, _ := server.ListUsers(token)
+				var targetUser *User
+				for _, u := range users {
+					if u.Username == username {
+						targetUser = u
+						break
+					}
+				}
+				if targetUser == nil {
+					printBoxed("User not found or permission denied")
+					return
+				}
+				
+				// Apply updates
+				for _, arg := range args[1:] {
+					if strings.HasPrefix(arg, "roles=") {
+						roleStr := strings.TrimPrefix(arg, "roles=")
+						targetUser.Roles = strings.Split(roleStr, ",")
+						updated = true
+					}
+				}
+				
+				if updated {
+					if err := server.UpdateUser(*targetUser, token); err != nil {
+						printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+					} else {
+						printBoxed(fmt.Sprintf("%sUser updated%s", ColorGreen, ColorReset))
+					}
+				}
+			} else {
+				// No args -> Change Password
+				fmt.Printf("Enter new password for %s: ", username)
+				pass, err := readPassword()
+				fmt.Println()
+				if err != nil || pass == "" {
+					printBoxed("Error reading password")
+					return
+				}
+				if err := server.ChangeUserPassword(username, pass, token); err != nil {
+					printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+				} else {
+					printBoxed(fmt.Sprintf("%sPassword updated%s", ColorGreen, ColorReset))
+				}
+			}
+
+		case "passwd": // Keep alias or subcommand
+			if len(args) < 1 {
+				printBoxed("Usage: user passwd <username>")
+				return
+			}
+			// Reuse logic
+			username := args[0]
+			fmt.Printf("Enter new password for %s: ", username)
+			pass, err := readPassword()
+			fmt.Println()
+			if err != nil || pass == "" {
+				printBoxed("Error reading password")
+				return
+			}
+			if err := server.ChangeUserPassword(username, pass, token); err != nil {
+				printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+			} else {
+				printBoxed(fmt.Sprintf("%sPassword updated%s", ColorGreen, ColorReset))
+			}
+
+		case "role":
+			if len(args) < 1 {
+				printBoxed("Usage: user role <list|add|delete|update> ...")
+				return
+			}
+			roleCmd := strings.ToLower(args[0])
+			roleArgs := args[1:]
+			
+			switch roleCmd {
+			case "list":
+				roles, err := server.ListRoles(token)
+				if err != nil {
+					printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+					return
+				}
+				var buf bytes.Buffer
+				w := tabwriter.NewWriter(&buf, 0, 0, 2, ' ', 0)
+				fmt.Fprintln(w, "Role\tParents\tPerms")
+				fmt.Fprintln(w, "----\t-------\t-----")
+				for _, r := range roles {
+					parents := strings.Join(r.ParentRoles, ",")
+					permCount := len(r.Permissions)
+					fmt.Fprintf(w, "%s\t%s\t%d rules\n", r.Name, parents, permCount)
+				}
+				w.Flush()
+				printBoxed(buf.String())
+				
+			case "add":
+				if len(roleArgs) < 1 {
+					printBoxed("Usage: user role add <name>")
+					return
+				}
+				if err := server.CreateRole(roleArgs[0], token); err != nil {
+					printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+				} else {
+					printBoxed(fmt.Sprintf("%sRole %s created%s", ColorGreen, roleArgs[0], ColorReset))
+				}
+				
+			case "delete":
+				if len(roleArgs) < 1 {
+					printBoxed("Usage: user role delete <name>")
+					return
+				}
+				if err := server.DeleteRole(roleArgs[0], token); err != nil {
+					printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+				} else {
+					printBoxed(fmt.Sprintf("%sRole %s deleted%s", ColorGreen, roleArgs[0], ColorReset))
+				}
+				
+			case "update":
+				// Simplistic update: user role update <name> parents=p1,p2
+				// Complex permissions editing is hard in CLI without JSON
+				if len(roleArgs) < 1 {
+					printBoxed("Usage: user role update <name> [parents=p1,p2]")
+					return
+				}
+				name := roleArgs[0]
+				roles, _ := server.ListRoles(token)
+				var targetRole *Role
+				for _, r := range roles {
+					if r.Name == name {
+						targetRole = r
+						break
+					}
+				}
+				if targetRole == nil {
+					printBoxed("Role not found")
+					return
+				}
+				
+				updated := false
+				for _, arg := range roleArgs[1:] {
+					if strings.HasPrefix(arg, "parents=") {
+						pStr := strings.TrimPrefix(arg, "parents=")
+						targetRole.ParentRoles = strings.Split(pStr, ",")
+						updated = true
+					}
+				}
+				
+				if updated {
+					if err := server.UpdateRole(*targetRole, token); err != nil {
+						printBoxed(fmt.Sprintf("%sError:%s %v", ColorRed, ColorReset, err))
+					} else {
+						printBoxed(fmt.Sprintf("%sRole updated%s", ColorGreen, ColorReset))
+					}
+				} else {
+					printBoxed("No changes specified.")
+				}
+				
+			default:
+				printBoxed("Unknown sub-command. Options: list, add, delete, update")
+			}
+
+		case "help":
+			var sb strings.Builder
+			sb.WriteString(fmt.Sprintf("%sUser Management Commands:%s\n", ColorCyan, ColorReset))
+			
+			// Define subcommands for formatting
+			type subCmd struct {
+				Name string
+				Desc string
+			}
+			cmds := []subCmd{
+				{"user init", "Initialize auth system (Root)"},
+				{"user login <user> [code]", "Login"},
+				{"user logout", "Logout"},
+				{"user who", "Current user info"},
+				{"user add <user> [roles]", "Create user (Root)"},
+				{"user del <user>", "Delete user (Root)"},
+				{"user update <user> [opts]", "Update user/pass (Root)"},
+				{"user list", "List users (Root)"},
+				{"user role list", "List roles (Root)"},
+				{"user role add <name>", "Create role (Root)"},
+				{"user role delete <name>", "Delete role (Root)"},
+				{"user role update <name> ...", "Update role (Root)"},
+			}
+			
+			// Find max length
+			maxLen := 0
+			for _, c := range cmds {
+				if len(c.Name) > maxLen {
+					maxLen = len(c.Name)
+				}
+			}
+			
+			for _, c := range cmds {
+				padding := strings.Repeat(" ", maxLen-len(c.Name))
+				sb.WriteString(fmt.Sprintf("  %s%s%s%s  %s%s%s\n", ColorGreen, c.Name, ColorReset, padding, ColorDim, c.Desc, ColorReset))
+			}
+			
+			printBoxed(sb.String())
+
+		default:
+			printBoxed(fmt.Sprintf("Unknown subcommand '%s'. Type 'user help'.", subCmd))
 		}
-		
-		printBoxed(fmt.Sprintf("Logged in as: %s%s%s (Token: %s...)", ColorCyan, session.Username, ColorReset, token[:8]))
 	})
 
 	c.RegisterCommand("join", "Add node to cluster (join <id> <addr>)", func(parts []string, server *KVServer) {

BIN
raft_node


+ 205 - 10
server.go

@@ -29,6 +29,10 @@ type KVServer struct {
 	// leavingNodes tracks nodes that are currently being removed
 	// to prevent auto-rejoin/discovery logic from interfering
 	leavingNodes sync.Map
+	
+	// Pending requests for synchronous operations (Read-Your-Writes)
+	pendingRequests map[uint64]chan error
+	pendingMu       sync.Mutex
 }
 
 // NewKVServer creates a new KV server
@@ -88,12 +92,13 @@ func NewKVServer(config *Config) (*KVServer, error) {
 	}
 
 	s := &KVServer{
-		Raft:        r,
-		DB:          engine,
-		CLI:         nil,
-		AuthManager: nil, // initialized below
-		Watcher:     watcher,
-		stopCh:      stopCh,
+		Raft:            r,
+		DB:              engine,
+		CLI:             nil,
+		AuthManager:     nil, // initialized below
+		Watcher:         watcher,
+		stopCh:          stopCh,
+		pendingRequests: make(map[uint64]chan error),
 	}
 
 	// Initialize AuthManager
@@ -173,12 +178,17 @@ func (s *KVServer) runApplyLoop(applyCh chan ApplyMsg) {
 			// Optimization: Skip if already applied
 			// We check this here to avoid unmarshalling and locking DB for known duplicates
 			if msg.CommandIndex <= s.DB.GetLastAppliedIndex() {
+				// Even if duplicate, we might have someone waiting for it if they just proposed it
+				// but this is unlikely for duplicates.
+				// However, notify waiters just in case.
+				s.notifyPending(msg.CommandIndex, nil)
 				continue
 			}
 
 			var cmd KVCommand
 			if err := json.Unmarshal(msg.Command, &cmd); err != nil {
 				s.Raft.config.Logger.Error("Failed to unmarshal command: %v", err)
+				s.notifyPending(msg.CommandIndex, err)
 				continue
 			}
 
@@ -211,6 +221,10 @@ func (s *KVServer) runApplyLoop(applyCh chan ApplyMsg) {
 			if err != nil {
 				s.Raft.config.Logger.Error("DB Apply failed: %v", err)
 			}
+			
+			// Notify anyone waiting for this index
+			s.notifyPending(msg.CommandIndex, err)
+
 		} else if msg.SnapshotValid {
 			if err := s.DB.Restore(msg.Snapshot); err != nil {
 				s.Raft.config.Logger.Error("DB Restore failed: %v", err)
@@ -219,12 +233,29 @@ func (s *KVServer) runApplyLoop(applyCh chan ApplyMsg) {
 	}
 }
 
+// notifyPending notifies any waiting requests for the given index
+func (s *KVServer) notifyPending(index uint64, err error) {
+	s.pendingMu.Lock()
+	defer s.pendingMu.Unlock()
+	
+	if ch, ok := s.pendingRequests[index]; ok {
+		// Non-blocking send in case listener is gone (buffered channel recommended)
+		select {
+		case ch <- err:
+		default:
+		}
+		close(ch)
+		delete(s.pendingRequests, index)
+	}
+}
+
 // SetAuthenticated sets a key-value pair with permission check
 func (s *KVServer) SetAuthenticated(key, value, token string) error {
 	if err := s.AuthManager.CheckPermission(token, key, ActionWrite, value); err != nil {
 		return err
 	}
-	return s.Set(key, value)
+	// Use SetSync for consistency in CLI/API
+	return s.SetSync(key, value)
 }
 
 // DelAuthenticated deletes a key with permission check
@@ -232,10 +263,13 @@ func (s *KVServer) DelAuthenticated(key, token string) error {
 	if err := s.AuthManager.CheckPermission(token, key, ActionWrite, ""); err != nil {
 		return err
 	}
-	return s.Del(key)
+	// Use SetSync (via Del which should be updated or create DelSync)
+	// For simplicity, we implement DelSync logic here or update Del to be sync?
+	// Let's implement DelSync
+	return s.DelSync(key)
 }
 
-// Set sets a key-value pair
+// Set sets a key-value pair (Async - eventually consistent)
 func (s *KVServer) Set(key, value string) error {
 	cmd := KVCommand{
 		Type:  KVSet,
@@ -251,7 +285,41 @@ func (s *KVServer) Set(key, value string) error {
 	return err
 }
 
-// Del deletes a key
+// SetSync sets a key-value pair and waits for it to be applied (Read-Your-Writes)
+func (s *KVServer) SetSync(key, value string) error {
+	cmd := KVCommand{
+		Type:  KVSet,
+		Key:   key,
+		Value: value,
+	}
+	data, err := json.Marshal(cmd)
+	if err != nil {
+		return err
+	}
+
+	index, _, err := s.Raft.ProposeWithForward(data)
+	if err != nil {
+		return err
+	}
+
+	// Wait for application
+	ch := make(chan error, 1)
+	s.pendingMu.Lock()
+	s.pendingRequests[index] = ch
+	s.pendingMu.Unlock()
+
+	select {
+	case applyErr := <-ch:
+		return applyErr
+	case <-time.After(5 * time.Second):
+		s.pendingMu.Lock()
+		delete(s.pendingRequests, index)
+		s.pendingMu.Unlock()
+		return fmt.Errorf("timeout waiting for apply")
+	}
+}
+
+// Del deletes a key (Async)
 func (s *KVServer) Del(key string) error {
 	cmd := KVCommand{
 		Type: KVDel,
@@ -266,6 +334,39 @@ func (s *KVServer) Del(key string) error {
 	return err
 }
 
+// DelSync deletes a key and waits for it to be applied
+func (s *KVServer) DelSync(key string) error {
+	cmd := KVCommand{
+		Type: KVDel,
+		Key:  key,
+	}
+	data, err := json.Marshal(cmd)
+	if err != nil {
+		return err
+	}
+
+	index, _, err := s.Raft.ProposeWithForward(data)
+	if err != nil {
+		return err
+	}
+
+	// Wait for application
+	ch := make(chan error, 1)
+	s.pendingMu.Lock()
+	s.pendingRequests[index] = ch
+	s.pendingMu.Unlock()
+
+	select {
+	case applyErr := <-ch:
+		return applyErr
+	case <-time.After(5 * time.Second):
+		s.pendingMu.Lock()
+		delete(s.pendingRequests, index)
+		s.pendingMu.Unlock()
+		return fmt.Errorf("timeout waiting for apply")
+	}
+}
+
 // GetAuthenticated gets a value with permission check (local read)
 func (s *KVServer) GetAuthenticated(key, token string) (string, bool, error) {
 	if err := s.AuthManager.CheckPermission(token, key, ActionRead, ""); err != nil {
@@ -308,6 +409,100 @@ func (s *KVServer) IsRoot(token string) bool {
 	return sess.Username == "root"
 }
 
+// CreateUser creates a new user (Root only)
+func (s *KVServer) CreateUser(username, password string, roles []string, token string) error {
+	if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
+		return fmt.Errorf("permission denied: root access required")
+	}
+	// Use RegisterUserSync
+	return s.AuthManager.RegisterUser(username, password, roles)
+}
+
+// DeleteUser deletes a user (Root only)
+func (s *KVServer) DeleteUser(username string, token string) error {
+	if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
+		return fmt.Errorf("permission denied: root access required")
+	}
+	// Check if user exists
+	if _, err := s.AuthManager.GetUser(username); err != nil {
+		return err
+	}
+	if username == "root" {
+		return fmt.Errorf("cannot delete root user")
+	}
+	// Use DelSync
+	return s.DelSync(AuthUserPrefix + username)
+}
+
+// UpdateUser updates generic user fields (Root only)
+func (s *KVServer) UpdateUser(user User, token string) error {
+	if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
+		return fmt.Errorf("permission denied: root access required")
+	}
+	// Check if user exists
+	if _, err := s.AuthManager.GetUser(user.Username); err != nil {
+		return err
+	}
+	return s.AuthManager.UpdateUser(user)
+}
+
+// ChangeUserPassword changes a user's password (Root or Self)
+func (s *KVServer) ChangeUserPassword(username, newPassword string, token string) error {
+	if s.AuthManager.IsEnabled() {
+		session, err := s.AuthManager.GetSession(token)
+		if err != nil {
+			return err
+		}
+		if session.Username != "root" && session.Username != username {
+			return fmt.Errorf("permission denied")
+		}
+	}
+	// Use ChangePassword (which should use SetSync)
+	return s.AuthManager.ChangePassword(username, newPassword)
+}
+
+// Role Management Helpers
+
+// CreateRole creates a new role (Root only)
+func (s *KVServer) CreateRole(name string, token string) error {
+	if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
+		return fmt.Errorf("permission denied: root access required")
+	}
+	return s.AuthManager.CreateRole(name)
+}
+
+// DeleteRole deletes a role (Root only)
+func (s *KVServer) DeleteRole(name string, token string) error {
+	if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
+		return fmt.Errorf("permission denied: root access required")
+	}
+	return s.AuthManager.DeleteRole(name)
+}
+
+// UpdateRole updates a role (Root only)
+func (s *KVServer) UpdateRole(role Role, token string) error {
+	if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
+		return fmt.Errorf("permission denied: root access required")
+	}
+	return s.AuthManager.UpdateRole(role)
+}
+
+// ListUsers lists all users (Root only)
+func (s *KVServer) ListUsers(token string) ([]*User, error) {
+	if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
+		return nil, fmt.Errorf("permission denied: root access required")
+	}
+	return s.AuthManager.ListUsers(), nil
+}
+
+// ListRoles lists all roles (Root only)
+func (s *KVServer) ListRoles(token string) ([]*Role, error) {
+	if s.AuthManager.IsEnabled() && !s.IsRoot(token) {
+		return nil, fmt.Errorf("permission denied: root access required")
+	}
+	return s.AuthManager.ListRoles(), nil
+}
+
 // SearchAuthenticated searches keys with permission checks
 func (s *KVServer) SearchAuthenticated(pattern string, limit, offset int, token string) ([]db.QueryResult, error) {
 	// Optimization: If user has full access (*), delegate to DB with limit/offset