client.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. package raft_client
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. // Client handles TCP communication with the Raft KV server.
  12. // It is designed to be easily copy-pasted into other projects.
  13. //
  14. // Example Usage:
  15. //
  16. // client := raft_client.NewClient("127.0.0.1:9011")
  17. // if err := client.Connect(); err != nil {
  18. // log.Fatal(err)
  19. // }
  20. // defer client.Close()
  21. //
  22. // // Login
  23. // if _, err := client.Login("root", "11111"); err != nil {
  24. // log.Fatal(err)
  25. // }
  26. //
  27. // // Set
  28. // if err := client.Set("mykey", "myvalue"); err != nil {
  29. // log.Fatal(err)
  30. // }
  31. //
  32. // // Get
  33. // val, err := client.Get("mykey")
  34. // if err != nil {
  35. // log.Fatal(err)
  36. // }
  37. // fmt.Println("Got:", val)
  38. type Client struct {
  39. addr string
  40. conn net.Conn
  41. reader *bufio.Reader
  42. writer *bufio.Writer
  43. mu sync.Mutex
  44. timeout time.Duration
  45. token string
  46. }
  47. // NewClient creates a new client instance.
  48. // addr should be in "host:port" format (e.g., "127.0.0.1:9011").
  49. func NewClient(addr string) *Client {
  50. return &Client{
  51. addr: addr,
  52. timeout: 10 * time.Second,
  53. }
  54. }
  55. // Connect establishes the TCP connection.
  56. func (c *Client) Connect() error {
  57. c.mu.Lock()
  58. defer c.mu.Unlock()
  59. if c.conn != nil {
  60. return nil // Already connected
  61. }
  62. conn, err := net.DialTimeout("tcp", c.addr, c.timeout)
  63. if err != nil {
  64. return err
  65. }
  66. c.conn = conn
  67. c.reader = bufio.NewReader(conn)
  68. c.writer = bufio.NewWriter(conn)
  69. return nil
  70. }
  71. // Close closes the connection.
  72. func (c *Client) Close() error {
  73. c.mu.Lock()
  74. defer c.mu.Unlock()
  75. if c.conn != nil {
  76. err := c.conn.Close()
  77. c.conn = nil
  78. c.reader = nil
  79. c.writer = nil
  80. return err
  81. }
  82. return nil
  83. }
  84. // Login authenticates with the server and stores the session token.
  85. func (c *Client) Login(username, password string) (string, error) {
  86. // Login command: LOGIN <user> <pass>
  87. // Note: The server expects an empty line after the command even if no headers are sent.
  88. resp, err := c.SendRequest(fmt.Sprintf("LOGIN %s %s", username, password), "")
  89. if err != nil {
  90. return "", err
  91. }
  92. if strings.HasPrefix(resp, "OK ") {
  93. token := strings.TrimPrefix(resp, "OK ")
  94. c.mu.Lock()
  95. c.token = token
  96. c.mu.Unlock()
  97. return token, nil
  98. }
  99. return "", fmt.Errorf("login failed: %s", resp)
  100. }
  101. // Set stores a key-value pair.
  102. // Uses the body-based SET command for handling values safely.
  103. func (c *Client) Set(key, value string) error {
  104. // Use Body for value to support special characters/spaces safer
  105. // Command: SET <key>
  106. // Body: <value>
  107. resp, err := c.SendRequest(fmt.Sprintf("SET %s", key), value)
  108. if err != nil {
  109. return err
  110. }
  111. if resp != "OK" {
  112. return fmt.Errorf("set failed: %s", resp)
  113. }
  114. return nil
  115. }
  116. // SetAsync stores a key-value pair asynchronously (returns OK immediately after proposal).
  117. func (c *Client) SetAsync(key, value string) error {
  118. resp, err := c.SendRequest(fmt.Sprintf("ASET %s", key), value)
  119. if err != nil {
  120. return err
  121. }
  122. if resp != "OK" {
  123. return fmt.Errorf("aset failed: %s", resp)
  124. }
  125. return nil
  126. }
  127. // Get retrieves a value by key.
  128. func (c *Client) Get(key string) (string, error) {
  129. resp, err := c.SendRequest(fmt.Sprintf("GET %s", key), "")
  130. if err != nil {
  131. return "", err
  132. }
  133. if strings.HasPrefix(resp, "OK ") {
  134. return strings.TrimPrefix(resp, "OK "), nil
  135. } else if resp == "ERR not found" {
  136. return "", errors.New("not found")
  137. }
  138. return "", fmt.Errorf("get failed: %s", resp)
  139. }
  140. // Delete removes a key (assuming DELETE command exists or similar, otherwise generic Execute).
  141. // The current server example only shows SET/GET/LOGIN/AUTH/LOGOUT in tcp_server.go.
  142. // If DELETE is supported by the underlying engine but not exposed in TCP switch, this might fail.
  143. // We will omit DELETE specific helper if not sure, but user can use Execute.
  144. // Let's check tcp_server.go... It only has GET, SET, ASET, LOGIN, AUTH, LOGOUT.
  145. // So no DELETE exposed over TCP yet.
  146. // Execute sends a raw command and returns the response line.
  147. // This handles the authentication token injection if logged in.
  148. func (c *Client) Execute(cmd string) (string, error) {
  149. return c.SendRequest(cmd, "")
  150. }
  151. // SendRequest sends a command with optional body and reads the response.
  152. // It handles the protocol details: Command Line + Headers + Empty Line + Body.
  153. func (c *Client) SendRequest(cmdLine string, body string) (string, error) {
  154. c.mu.Lock()
  155. defer c.mu.Unlock()
  156. if c.conn == nil {
  157. // Auto-reconnect could be implemented here, but for simplicity we fail.
  158. // Or we can try to connect.
  159. if err := c.reconnectLocked(); err != nil {
  160. return "", fmt.Errorf("connection error: %v", err)
  161. }
  162. }
  163. // 1. Send Auth if needed (and not logging in)
  164. // The current server implementation is stateful (session based).
  165. // Once authenticated (via LOGIN or AUTH), the session stays authenticated.
  166. // However, if we just reconnected, we might need to re-auth?
  167. // The server creates a new Session on connection.
  168. // So if we have a token, we should probably send AUTH <token> before the command if this is a new connection?
  169. // But optimizing that away: we assume connection persists.
  170. // If the user calls Login, we get a token.
  171. // We can manually call Auth(token) if needed.
  172. // For simplicity, we assume the user calls Auth() or Login() at start.
  173. // 2. Send Command
  174. if _, err := c.writer.WriteString(cmdLine + "\r\n"); err != nil {
  175. c.conn.Close()
  176. c.conn = nil
  177. return "", err
  178. }
  179. // 3. Send Headers
  180. if len(body) > 0 {
  181. if _, err := c.writer.WriteString(fmt.Sprintf("Content-Length: %d\r\n", len(body))); err != nil {
  182. c.conn.Close()
  183. c.conn = nil
  184. return "", err
  185. }
  186. }
  187. // 4. Send End of Headers (Empty Line)
  188. if _, err := c.writer.WriteString("\r\n"); err != nil {
  189. c.conn.Close()
  190. c.conn = nil
  191. return "", err
  192. }
  193. // 5. Send Body
  194. if len(body) > 0 {
  195. if _, err := c.writer.WriteString(body); err != nil {
  196. c.conn.Close()
  197. c.conn = nil
  198. return "", err
  199. }
  200. }
  201. // 6. Flush
  202. if err := c.writer.Flush(); err != nil {
  203. c.conn.Close()
  204. c.conn = nil
  205. return "", err
  206. }
  207. // 7. Read Response
  208. // The server sends response terminated by \n
  209. resp, err := c.reader.ReadString('\n')
  210. if err != nil {
  211. c.conn.Close()
  212. c.conn = nil
  213. return "", err
  214. }
  215. return strings.TrimSpace(resp), nil
  216. }
  217. func (c *Client) reconnectLocked() error {
  218. if c.conn != nil {
  219. c.conn.Close()
  220. }
  221. conn, err := net.DialTimeout("tcp", c.addr, c.timeout)
  222. if err != nil {
  223. return err
  224. }
  225. c.conn = conn
  226. c.reader = bufio.NewReader(conn)
  227. c.writer = bufio.NewWriter(conn)
  228. // If we have a token, try to restore session
  229. if c.token != "" {
  230. // We need to send AUTH command without calling SendRequest to avoid infinite recursion
  231. // Construct AUTH command
  232. cmd := fmt.Sprintf("AUTH %s", c.token)
  233. c.writer.WriteString(cmd + "\r\n\r\n") // Cmd + Empty Line
  234. c.writer.Flush()
  235. resp, err := c.reader.ReadString('\n')
  236. if err == nil && strings.TrimSpace(resp) == "OK" {
  237. // Auth success
  238. } else {
  239. // Auth failed (maybe expired), but connection is open.
  240. // We don't error out here, just let the actual command fail if it needs auth.
  241. }
  242. }
  243. return nil
  244. }
  245. // Auth restores a session using an existing token.
  246. func (c *Client) Auth(token string) error {
  247. resp, err := c.SendRequest(fmt.Sprintf("AUTH %s", token), "")
  248. if err != nil {
  249. return err
  250. }
  251. if resp == "OK" {
  252. c.mu.Lock()
  253. c.token = token
  254. c.mu.Unlock()
  255. return nil
  256. }
  257. return fmt.Errorf("auth failed: %s", resp)
  258. }