tcp_server.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803
  1. package raft
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net"
  8. "runtime"
  9. "strconv"
  10. "strings"
  11. "time"
  12. )
  13. // TCPClientSession holds state for a TCP connection
  14. type TCPClientSession struct {
  15. conn net.Conn
  16. server *KVServer
  17. token string
  18. username string
  19. reader *bufio.Reader
  20. writer *bufio.Writer
  21. remoteAddr string
  22. }
  23. func (s *KVServer) StartTCPServer(addr string) error {
  24. listener, err := net.Listen("tcp", addr)
  25. if err != nil {
  26. return err
  27. }
  28. s.Raft.config.Logger.Info("TCP API server listening on %s", addr)
  29. go func() {
  30. defer listener.Close()
  31. for {
  32. conn, err := listener.Accept()
  33. if err != nil {
  34. if s.stopCh != nil {
  35. select {
  36. case <-s.stopCh:
  37. return
  38. default:
  39. }
  40. }
  41. s.Raft.config.Logger.Error("TCP Accept error: %v", err)
  42. continue
  43. }
  44. go s.handleTCPConnection(conn)
  45. }
  46. }()
  47. return nil
  48. }
  49. func (s *KVServer) handleTCPConnection(conn net.Conn) {
  50. defer conn.Close()
  51. // Use larger buffers for high throughput
  52. reader := bufio.NewReaderSize(conn, 64*1024)
  53. writer := bufio.NewWriterSize(conn, 64*1024)
  54. session := &TCPClientSession{
  55. conn: conn,
  56. server: s,
  57. reader: reader,
  58. writer: writer,
  59. remoteAddr: conn.RemoteAddr().String(),
  60. }
  61. for {
  62. // Set Keep-Alive Deadline
  63. // Using a longer deadline avoids frequent syscalls if traffic is continuous
  64. conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  65. // Read Request Line
  66. line, err := reader.ReadString('\n')
  67. if err != nil {
  68. return
  69. }
  70. line = strings.TrimSpace(line)
  71. if line == "" {
  72. continue
  73. }
  74. // Parse Headers
  75. var contentLength int
  76. for {
  77. hLine, err := reader.ReadString('\n')
  78. if err != nil {
  79. return
  80. }
  81. hLine = strings.TrimSpace(hLine)
  82. if hLine == "" {
  83. break
  84. }
  85. // Optimized Content-Length check
  86. // "Content-Length: 123"
  87. lowerHLine := strings.ToLower(hLine)
  88. if strings.HasPrefix(lowerHLine, "content-length:") {
  89. valStr := strings.TrimSpace(hLine[15:])
  90. if l, err := strconv.Atoi(valStr); err == nil {
  91. contentLength = l
  92. }
  93. }
  94. }
  95. // Read Body
  96. var body string
  97. if contentLength > 0 {
  98. // Optimization: Avoid allocation for small bodies if possible,
  99. // but Raft commands need to be passed as bytes/string anyway.
  100. buf := make([]byte, contentLength)
  101. if _, err := io.ReadFull(reader, buf); err != nil {
  102. return
  103. }
  104. body = string(buf)
  105. }
  106. // Implement Batching: Accumulate requests if they are ASET/SET
  107. // Ideally we need an internal buffer or channel here to queue up commands.
  108. // For zero-allocation batching in this loop structure, we can try to
  109. // eagerly read the next request if available in the buffer.
  110. // Note: Raft's Propose is thread-safe. The current serial loop is efficient for
  111. // minimizing context switches. Batching helps if we can merge multiple Proposes into one.
  112. // Since we haven't modified Raft to support ProposeBatch yet, we can't do true backend batching easily.
  113. // However, we can do "frontend batching" by checking buffer availability?
  114. // No, frontend batching requires Raft to accept a batch.
  115. // Without modifying Raft.Propose to accept []Command, "batching" here is limited.
  116. // BUT, we can at least pipeline the execution if we had concurrent workers,
  117. // but we replaced that with this serial loop for perf.
  118. // So for now, we just execute.
  119. // Execute Command Inline
  120. // s.Raft.config.Logger.Debug("TCP Request: %s", line)
  121. resp := s.executeCommandWithBody(session, line, body)
  122. // Write Response
  123. if _, err := writer.WriteString(resp + "\n"); err != nil {
  124. return
  125. }
  126. // Flush Optimization:
  127. // Only flush if there is no more data in the read buffer.
  128. // This automatically batches responses when requests are pipelined.
  129. if reader.Buffered() == 0 {
  130. if err := writer.Flush(); err != nil {
  131. return
  132. }
  133. }
  134. }
  135. }
  136. // Helper to bridge old logic
  137. func (s *KVServer) executeCommandWithBody(session *TCPClientSession, line string, body string) string {
  138. parts := strings.Fields(line)
  139. if len(parts) == 0 {
  140. return ""
  141. }
  142. cmd := strings.ToUpper(parts[0])
  143. // If body is present, it overrides the value part of the command
  144. // We handle specific commands that use body
  145. if body != "" {
  146. switch cmd {
  147. case "SET", "ASET":
  148. if len(parts) < 2 {
  149. return "ERR usage: SET <key> (value in body)"
  150. }
  151. key := parts[1]
  152. // Value is body
  153. if cmd == "SET" {
  154. if err := s.SetAuthenticated(key, body, session.token); err != nil {
  155. return fmt.Sprintf("ERR %v", err)
  156. }
  157. return "OK"
  158. } else {
  159. if err := s.SetAuthenticatedAsync(key, body, session.token); err != nil {
  160. return fmt.Sprintf("ERR %v", err)
  161. }
  162. return "OK"
  163. }
  164. }
  165. }
  166. // Fallback to legacy parsing if no body or command doesn't use body
  167. return s.executeCommand(session, line)
  168. }
  169. func (s *KVServer) executeCommand(session *TCPClientSession, line string) string {
  170. parts := strings.Fields(line)
  171. if len(parts) == 0 {
  172. return ""
  173. }
  174. cmd := strings.ToUpper(parts[0])
  175. var resp string
  176. switch cmd {
  177. case "LOGIN":
  178. if len(parts) < 3 {
  179. resp = "ERR usage: LOGIN <username> <password> [otp]"
  180. } else {
  181. user := parts[1]
  182. pass := parts[2]
  183. otp := ""
  184. if len(parts) > 3 {
  185. otp = parts[3]
  186. }
  187. // Extract IP
  188. ip := session.remoteAddr
  189. if host, _, err := net.SplitHostPort(ip); err == nil {
  190. ip = host
  191. }
  192. token, err := s.AuthManager.Login(user, pass, otp, ip)
  193. if err != nil {
  194. resp = fmt.Sprintf("ERR %v", err)
  195. } else {
  196. session.token = token
  197. session.username = user
  198. resp = fmt.Sprintf("OK %s", token)
  199. }
  200. }
  201. case "AUTH":
  202. if len(parts) < 2 {
  203. resp = "ERR usage: AUTH <token>"
  204. } else {
  205. token := parts[1]
  206. // Verify token
  207. sess, err := s.AuthManager.GetSession(token)
  208. if err != nil {
  209. resp = fmt.Sprintf("ERR %v", err)
  210. } else {
  211. session.token = token
  212. session.username = sess.Username
  213. resp = "OK"
  214. }
  215. }
  216. case "LOGOUT":
  217. if session.token != "" {
  218. s.AuthManager.Logout(session.token)
  219. session.token = ""
  220. session.username = ""
  221. }
  222. resp = "OK"
  223. case "GET":
  224. if len(parts) < 2 {
  225. resp = "ERR usage: GET <key>"
  226. } else {
  227. key := parts[1]
  228. val, found, err := s.GetLinearAuthenticated(key, session.token)
  229. if err != nil {
  230. resp = fmt.Sprintf("ERR %v", err)
  231. } else if !found {
  232. resp = "ERR not found"
  233. } else {
  234. resp = fmt.Sprintf("OK %s", val)
  235. }
  236. }
  237. case "SET":
  238. if len(parts) < 3 {
  239. resp = "ERR usage: SET <key> <value>"
  240. } else {
  241. key := parts[1]
  242. // Value might contain spaces, join the rest
  243. val := strings.Join(parts[2:], " ")
  244. // Use SetAuthenticated (Sync) by default for safety
  245. err := s.SetAuthenticated(key, val, session.token)
  246. if err != nil {
  247. resp = fmt.Sprintf("ERR %v", err)
  248. } else {
  249. resp = "OK"
  250. }
  251. }
  252. case "ASET":
  253. // Async SET for high performance
  254. if len(parts) < 3 {
  255. resp = "ERR usage: ASET <key> <value>"
  256. } else {
  257. key := parts[1]
  258. val := strings.Join(parts[2:], " ")
  259. err := s.SetAuthenticatedAsync(key, val, session.token)
  260. if err != nil {
  261. resp = fmt.Sprintf("ERR %v", err)
  262. } else {
  263. resp = "OK"
  264. }
  265. }
  266. case "DEL":
  267. if len(parts) < 2 {
  268. resp = "ERR usage: DEL <key>"
  269. } else {
  270. key := parts[1]
  271. err := s.DelAuthenticated(key, session.token)
  272. if err != nil {
  273. resp = fmt.Sprintf("ERR %v", err)
  274. } else {
  275. resp = "OK"
  276. }
  277. }
  278. case "MFA-GENERATE":
  279. if session.token == "" {
  280. resp = "ERR not authenticated"
  281. } else {
  282. secret, err := GenerateMFASecret()
  283. if err != nil {
  284. resp = fmt.Sprintf("ERR %v", err)
  285. } else {
  286. url := GenerateOTPAuthURL(session.username, secret, "RaftKV")
  287. resp = fmt.Sprintf("OK %s %s", secret, url)
  288. }
  289. }
  290. case "MFA-ENABLE":
  291. if len(parts) < 3 {
  292. resp = "ERR usage: MFA-ENABLE <secret> <code>"
  293. } else if session.token == "" {
  294. resp = "ERR not authenticated"
  295. } else {
  296. secret := parts[1]
  297. code := parts[2]
  298. if !ValidateTOTP(secret, code) {
  299. resp = "ERR invalid code"
  300. } else {
  301. if err := s.SetUserMFA(session.username, secret, true, session.token); err != nil {
  302. resp = fmt.Sprintf("ERR %v", err)
  303. } else {
  304. resp = "OK"
  305. }
  306. }
  307. }
  308. case "MFA-DISABLE":
  309. if session.token == "" {
  310. resp = "ERR not authenticated"
  311. } else {
  312. if err := s.SetUserMFA(session.username, "", false, session.token); err != nil {
  313. resp = fmt.Sprintf("ERR %v", err)
  314. } else {
  315. resp = "OK"
  316. }
  317. }
  318. case "MFA-STATUS":
  319. if session.token == "" {
  320. resp = "ERR not authenticated"
  321. } else {
  322. user, err := s.AuthManager.GetUser(session.username)
  323. if err != nil {
  324. resp = fmt.Sprintf("ERR %v", err)
  325. } else {
  326. status := "disabled"
  327. if user.MFAEnabled {
  328. status = "enabled"
  329. }
  330. resp = fmt.Sprintf("OK %s", status)
  331. }
  332. }
  333. case "WHOAMI":
  334. if session.username == "" {
  335. resp = "Guest"
  336. } else {
  337. resp = session.username
  338. }
  339. case "HELP":
  340. helpText := `Available Commands:
  341. GET <key> - Get value
  342. SET <key> <value> - Set value
  343. DEL <key> - Delete value
  344. SEARCH <pattern> [limit] - Search keys (e.g. user.*)
  345. COUNT <pattern> - Count keys
  346. INFO - Show system stats
  347. WHOAMI - Show current user
  348. JOIN <id> <addr> - Add node (Root only)
  349. LEAVE <id> - Remove node (Root only)
  350. USER_LIST - List users (Admin)
  351. ROLE_LIST - List roles (Admin)
  352. LOGIN/LOGOUT/EXIT`
  353. resp = "OK " + helpText
  354. case "INFO":
  355. // Check permission (Admin only if auth enabled)
  356. if s.AuthManager.IsEnabled() {
  357. // Allow if admin OR if root (HasFullAccess)
  358. // But IsAdmin is basically check for "admin" on "*"
  359. // Let's relax it slightly for dashboard if we want read-only dashboard?
  360. // For now, strict: Admin access required.
  361. if !s.IsAdmin(session.token) {
  362. resp = "ERR Permission Denied: Admin access required"
  363. break
  364. }
  365. }
  366. // Gather stats
  367. stats := s.GetStats()
  368. health := s.HealthCheck()
  369. dbSize := s.GetDBSize()
  370. logSize := s.GetLogSize()
  371. var m runtime.MemStats
  372. runtime.ReadMemStats(&m)
  373. // Construct JSON response
  374. info := map[string]interface{}{
  375. "node": map[string]interface{}{
  376. "id": health.NodeID,
  377. "state": health.State,
  378. "term": health.Term,
  379. "leader": health.LeaderID,
  380. "healthy": health.IsHealthy,
  381. },
  382. "storage": map[string]interface{}{
  383. "db_size": dbSize,
  384. "log_size": logSize,
  385. "mem_alloc": m.Alloc,
  386. "mem_sys": m.Sys,
  387. "num_gc": m.NumGC,
  388. },
  389. "indices": map[string]interface{}{
  390. "commit_index": stats.CommitIndex,
  391. "applied_index": stats.LastApplied,
  392. "last_log_index": stats.LastLogIndex,
  393. "db_applied": s.DB.GetLastAppliedIndex(),
  394. },
  395. "cluster": stats.ClusterNodes,
  396. "cluster_size": stats.ClusterSize,
  397. }
  398. data, err := json.Marshal(info)
  399. if err != nil {
  400. resp = fmt.Sprintf("ERR %v", err)
  401. } else {
  402. resp = "OK " + string(data)
  403. }
  404. case "SEARCH":
  405. // Usage: SEARCH <pattern> [limit] [offset]
  406. if len(parts) < 2 {
  407. resp = "ERR usage: SEARCH <pattern> [limit] [offset]"
  408. } else {
  409. pattern := parts[1]
  410. limit := 20
  411. offset := 0
  412. if len(parts) >= 3 {
  413. if l, err := strconv.Atoi(parts[2]); err == nil {
  414. limit = l
  415. }
  416. }
  417. if len(parts) >= 4 {
  418. if o, err := strconv.Atoi(parts[3]); err == nil {
  419. offset = o
  420. }
  421. }
  422. results, err := s.SearchAuthenticated(pattern, limit, offset, session.token)
  423. if err != nil {
  424. resp = fmt.Sprintf("ERR %v", err)
  425. } else {
  426. data, _ := json.Marshal(results)
  427. resp = "OK " + string(data)
  428. }
  429. }
  430. case "COUNT":
  431. // Usage: COUNT <pattern>
  432. if len(parts) < 2 {
  433. resp = "ERR usage: COUNT <pattern>"
  434. } else {
  435. pattern := parts[1]
  436. count, err := s.CountAuthenticated(pattern, session.token)
  437. if err != nil {
  438. resp = fmt.Sprintf("ERR %v", err)
  439. } else {
  440. resp = fmt.Sprintf("OK %d", count)
  441. }
  442. }
  443. case "JOIN":
  444. // Usage: JOIN <id> <addr>
  445. // Admin only
  446. if s.AuthManager.IsEnabled() {
  447. if !s.IsAdmin(session.token) {
  448. resp = "ERR Permission Denied: Admin access required"
  449. break
  450. }
  451. }
  452. if len(parts) < 3 {
  453. resp = "ERR usage: JOIN <id> <addr>"
  454. } else {
  455. err := s.Join(parts[1], parts[2])
  456. if err != nil {
  457. resp = fmt.Sprintf("ERR %v", err)
  458. } else {
  459. resp = "OK Join request sent"
  460. }
  461. }
  462. case "LEAVE":
  463. // Usage: LEAVE <id>
  464. // Admin only
  465. if s.AuthManager.IsEnabled() {
  466. if !s.IsAdmin(session.token) {
  467. resp = "ERR Permission Denied: Admin access required"
  468. break
  469. }
  470. }
  471. if len(parts) < 2 {
  472. resp = "ERR usage: LEAVE <id>"
  473. } else {
  474. err := s.Leave(parts[1])
  475. if err != nil {
  476. resp = fmt.Sprintf("ERR %v", err)
  477. } else {
  478. resp = "OK Leave request sent"
  479. }
  480. }
  481. // --- Admin Commands ---
  482. case "USER_LIST":
  483. users, err := s.ListUsers(session.token)
  484. if err != nil {
  485. resp = fmt.Sprintf("ERR %v", err)
  486. } else {
  487. data, err := json.Marshal(users)
  488. if err != nil {
  489. resp = fmt.Sprintf("ERR %v", err)
  490. } else {
  491. // Ensure it's a single line for TCP protocol simplicity
  492. jsonStr := string(data)
  493. resp = fmt.Sprintf("OK %s", jsonStr)
  494. }
  495. }
  496. case "ROLE_LIST":
  497. roles, err := s.ListRoles(session.token)
  498. if err != nil {
  499. resp = fmt.Sprintf("ERR %v", err)
  500. } else {
  501. data, err := json.Marshal(roles)
  502. if err != nil {
  503. resp = fmt.Sprintf("ERR %v", err)
  504. } else {
  505. resp = fmt.Sprintf("OK %s", string(data))
  506. }
  507. }
  508. case "USER_CREATE":
  509. // Usage: USER_CREATE <username> <password> <role1,role2>
  510. if len(parts) < 3 {
  511. resp = "ERR usage: USER_CREATE <user> <pass> [roles]"
  512. } else {
  513. u := parts[1]
  514. p := parts[2]
  515. var roles []string
  516. if len(parts) > 3 {
  517. roles = strings.Split(parts[3], ",")
  518. }
  519. // Use Server method which performs permission check
  520. err := s.CreateUser(u, p, roles, session.token)
  521. if err != nil {
  522. resp = fmt.Sprintf("ERR %v", err)
  523. } else {
  524. resp = "OK"
  525. }
  526. }
  527. case "USER_UPDATE":
  528. // Usage: USER_UPDATE <username> <new_pass|-> <new_roles|->
  529. if len(parts) < 4 {
  530. resp = "ERR usage: USER_UPDATE <username> <new_pass|-> <new_roles|->"
  531. } else {
  532. username := parts[1]
  533. newPass := parts[2]
  534. newRolesStr := parts[3]
  535. // Get existing user to preserve other fields
  536. userPtr, err := s.AuthManager.GetUser(username)
  537. if err != nil {
  538. resp = fmt.Sprintf("ERR %v", err)
  539. } else {
  540. // Create copy to modify
  541. user := *userPtr
  542. // Update Password if requested
  543. if newPass != "-" {
  544. salt := fmt.Sprintf("%d", time.Now().UnixNano())
  545. user.Salt = salt
  546. user.PasswordHash = HashPassword(newPass, salt)
  547. }
  548. // Update Roles if requested
  549. if newRolesStr != "-" {
  550. if newRolesStr == "" {
  551. user.Roles = []string{}
  552. } else {
  553. user.Roles = strings.Split(newRolesStr, ",")
  554. }
  555. }
  556. // Use Server method which performs permission check
  557. err := s.UpdateUser(user, session.token)
  558. if err != nil {
  559. resp = fmt.Sprintf("ERR %v", err)
  560. } else {
  561. resp = "OK"
  562. }
  563. }
  564. }
  565. case "ROLE_CREATE":
  566. // Usage: ROLE_CREATE <name>
  567. if len(parts) < 2 {
  568. resp = "ERR usage: ROLE_CREATE <name>"
  569. } else {
  570. name := parts[1]
  571. // Use Server method which performs permission check
  572. err := s.CreateRole(name, session.token)
  573. if err != nil {
  574. resp = fmt.Sprintf("ERR %v", err)
  575. } else {
  576. resp = "OK"
  577. }
  578. }
  579. case "ROLE_PERMISSION_ADD":
  580. // Usage: ROLE_PERMISSION_ADD <role> <pattern> <actions> [min] [max]
  581. // Actions: comma separated list of actions (read,write,admin,*)
  582. // Min/Max: optional numeric constraints for write operations ("-" for no constraint)
  583. if len(parts) < 4 {
  584. resp = "ERR usage: ROLE_PERMISSION_ADD <role> <pattern> <actions> [min] [max]"
  585. } else {
  586. roleName := parts[1]
  587. pattern := parts[2]
  588. actionsStr := parts[3]
  589. actions := strings.Split(actionsStr, ",")
  590. var minVal, maxVal *float64
  591. if len(parts) > 4 {
  592. if parts[4] != "-" && parts[4] != "null" {
  593. if v, err := strconv.ParseFloat(parts[4], 64); err == nil {
  594. minVal = &v
  595. } else {
  596. resp = "ERR invalid min value"
  597. break
  598. }
  599. }
  600. }
  601. if len(parts) > 5 {
  602. if parts[5] != "-" && parts[5] != "null" {
  603. if v, err := strconv.ParseFloat(parts[5], 64); err == nil {
  604. maxVal = &v
  605. } else {
  606. resp = "ERR invalid max value"
  607. break
  608. }
  609. }
  610. }
  611. // Check auth logic inside UpdateRole call?
  612. // No, UpdateRole in server.go handles Auth check now.
  613. // We just need to construct the object.
  614. rolePtr, err := s.AuthManager.GetRole(roleName)
  615. if err != nil {
  616. resp = fmt.Sprintf("ERR %v", err)
  617. } else {
  618. // Create a copy to modify
  619. role := *rolePtr
  620. // Deep copy permissions
  621. originalPerms := role.Permissions
  622. role.Permissions = make([]Permission, len(originalPerms))
  623. copy(role.Permissions, originalPerms)
  624. newPerm := Permission{
  625. KeyPattern: pattern,
  626. Actions: actions,
  627. }
  628. if minVal != nil || maxVal != nil {
  629. newPerm.Constraint = &Constraint{
  630. Min: minVal,
  631. Max: maxVal,
  632. }
  633. }
  634. // Upsert logic
  635. found := false
  636. for i, p := range role.Permissions {
  637. if p.KeyPattern == pattern {
  638. role.Permissions[i] = newPerm
  639. found = true
  640. break
  641. }
  642. }
  643. if !found {
  644. role.Permissions = append(role.Permissions, newPerm)
  645. }
  646. // Use server.UpdateRole which performs Delegation Check
  647. err := s.UpdateRole(role, session.token)
  648. if err != nil {
  649. resp = fmt.Sprintf("ERR %v", err)
  650. } else {
  651. resp = "OK"
  652. }
  653. }
  654. }
  655. case "ROLE_PERMISSION_REMOVE":
  656. // Usage: ROLE_PERMISSION_REMOVE <role> <pattern>
  657. if len(parts) < 3 {
  658. resp = "ERR usage: ROLE_PERMISSION_REMOVE <role> <pattern>"
  659. } else {
  660. roleName := parts[1]
  661. pattern := parts[2]
  662. rolePtr, err := s.AuthManager.GetRole(roleName)
  663. if err != nil {
  664. resp = fmt.Sprintf("ERR %v", err)
  665. } else {
  666. role := *rolePtr
  667. originalPerms := role.Permissions
  668. newPerms := make([]Permission, 0, len(originalPerms))
  669. found := false
  670. for _, p := range originalPerms {
  671. if p.KeyPattern == pattern {
  672. found = true
  673. continue
  674. }
  675. newPerms = append(newPerms, p)
  676. }
  677. if !found {
  678. resp = "ERR permission not found"
  679. } else {
  680. role.Permissions = newPerms
  681. // Use server.UpdateRole which performs Delegation Check
  682. err := s.UpdateRole(role, session.token)
  683. if err != nil {
  684. resp = fmt.Sprintf("ERR %v", err)
  685. } else {
  686. resp = "OK"
  687. }
  688. }
  689. }
  690. }
  691. case "USER_UNLOCK":
  692. // Usage: USER_UNLOCK <username>
  693. if s.AuthManager.IsEnabled() {
  694. if !s.IsAdmin(session.token) {
  695. resp = "ERR Permission Denied: Admin access required"
  696. break
  697. }
  698. }
  699. if len(parts) < 2 {
  700. resp = "ERR usage: USER_UNLOCK <username>"
  701. } else {
  702. // Manually clear the lock key
  703. userToUnlock := parts[1]
  704. // We use DelSync to ensure the lock is removed before returning
  705. err := s.DelSync("system.lock." + userToUnlock)
  706. if err != nil {
  707. resp = fmt.Sprintf("ERR %v", err)
  708. } else {
  709. resp = "OK"
  710. }
  711. }
  712. case "EXIT", "QUIT":
  713. resp = "BYE"
  714. // Need signal to close connection after write
  715. // For simplicity, handle it in handleTCPConnection loop break,
  716. // but here we just return the string.
  717. // Actually, BYE handling is tricky in async writer.
  718. // Let's keep connection open or let client close it.
  719. // Or send special signal?
  720. // For now, simple return. Client will read BYE and close.
  721. default:
  722. s.Raft.config.Logger.Warn("Unknown command received: %s (parts: %v)", cmd, parts)
  723. resp = fmt.Sprintf("ERR unknown command: %s", cmd)
  724. }
  725. return resp
  726. }