| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- package raft_client
- import (
- "bufio"
- "errors"
- "fmt"
- "net"
- "strings"
- "sync"
- "time"
- )
- // Client handles TCP communication with the Raft KV server.
- // It is designed to be easily copy-pasted into other projects.
- //
- // Example Usage:
- //
- // client := raft_client.NewClient("127.0.0.1:9011")
- // if err := client.Connect(); err != nil {
- // log.Fatal(err)
- // }
- // defer client.Close()
- //
- // // Login
- // if _, err := client.Login("root", "11111"); err != nil {
- // log.Fatal(err)
- // }
- //
- // // Set
- // if err := client.Set("mykey", "myvalue"); err != nil {
- // log.Fatal(err)
- // }
- //
- // // Get
- // val, err := client.Get("mykey")
- // if err != nil {
- // log.Fatal(err)
- // }
- // fmt.Println("Got:", val)
- type Client struct {
- addr string
- conn net.Conn
- reader *bufio.Reader
- writer *bufio.Writer
- mu sync.Mutex
- timeout time.Duration
- token string
- }
- // NewClient creates a new client instance.
- // addr should be in "host:port" format (e.g., "127.0.0.1:9011").
- func NewClient(addr string) *Client {
- return &Client{
- addr: addr,
- timeout: 10 * time.Second,
- }
- }
- // Connect establishes the TCP connection.
- func (c *Client) Connect() error {
- c.mu.Lock()
- defer c.mu.Unlock()
- if c.conn != nil {
- return nil // Already connected
- }
- conn, err := net.DialTimeout("tcp", c.addr, c.timeout)
- if err != nil {
- return err
- }
- c.conn = conn
- c.reader = bufio.NewReader(conn)
- c.writer = bufio.NewWriter(conn)
- return nil
- }
- // Close closes the connection.
- func (c *Client) Close() error {
- c.mu.Lock()
- defer c.mu.Unlock()
- if c.conn != nil {
- err := c.conn.Close()
- c.conn = nil
- c.reader = nil
- c.writer = nil
- return err
- }
- return nil
- }
- // Login authenticates with the server and stores the session token.
- func (c *Client) Login(username, password string) (string, error) {
- // Login command: LOGIN <user> <pass>
- // Note: The server expects an empty line after the command even if no headers are sent.
- resp, err := c.SendRequest(fmt.Sprintf("LOGIN %s %s", username, password), "")
- if err != nil {
- return "", err
- }
- if strings.HasPrefix(resp, "OK ") {
- token := strings.TrimPrefix(resp, "OK ")
- c.mu.Lock()
- c.token = token
- c.mu.Unlock()
- return token, nil
- }
- return "", fmt.Errorf("login failed: %s", resp)
- }
- // Set stores a key-value pair.
- // Uses the body-based SET command for handling values safely.
- func (c *Client) Set(key, value string) error {
- // Use Body for value to support special characters/spaces safer
- // Command: SET <key>
- // Body: <value>
- resp, err := c.SendRequest(fmt.Sprintf("SET %s", key), value)
- if err != nil {
- return err
- }
- if resp != "OK" {
- return fmt.Errorf("set failed: %s", resp)
- }
- return nil
- }
- // SetAsync stores a key-value pair asynchronously (returns OK immediately after proposal).
- func (c *Client) SetAsync(key, value string) error {
- resp, err := c.SendRequest(fmt.Sprintf("ASET %s", key), value)
- if err != nil {
- return err
- }
- if resp != "OK" {
- return fmt.Errorf("aset failed: %s", resp)
- }
- return nil
- }
- // Get retrieves a value by key.
- func (c *Client) Get(key string) (string, error) {
- resp, err := c.SendRequest(fmt.Sprintf("GET %s", key), "")
- if err != nil {
- return "", err
- }
- if strings.HasPrefix(resp, "OK ") {
- return strings.TrimPrefix(resp, "OK "), nil
- } else if resp == "ERR not found" {
- return "", errors.New("not found")
- }
- return "", fmt.Errorf("get failed: %s", resp)
- }
- // Delete removes a key (assuming DELETE command exists or similar, otherwise generic Execute).
- // The current server example only shows SET/GET/LOGIN/AUTH/LOGOUT in tcp_server.go.
- // If DELETE is supported by the underlying engine but not exposed in TCP switch, this might fail.
- // We will omit DELETE specific helper if not sure, but user can use Execute.
- // Let's check tcp_server.go... It only has GET, SET, ASET, LOGIN, AUTH, LOGOUT.
- // So no DELETE exposed over TCP yet.
- // Execute sends a raw command and returns the response line.
- // This handles the authentication token injection if logged in.
- func (c *Client) Execute(cmd string) (string, error) {
- return c.SendRequest(cmd, "")
- }
- // SendRequest sends a command with optional body and reads the response.
- // It handles the protocol details: Command Line + Headers + Empty Line + Body.
- func (c *Client) SendRequest(cmdLine string, body string) (string, error) {
- c.mu.Lock()
- defer c.mu.Unlock()
- if c.conn == nil {
- // Auto-reconnect could be implemented here, but for simplicity we fail.
- // Or we can try to connect.
- if err := c.reconnectLocked(); err != nil {
- return "", fmt.Errorf("connection error: %v", err)
- }
- }
- // 1. Send Auth if needed (and not logging in)
- // The current server implementation is stateful (session based).
- // Once authenticated (via LOGIN or AUTH), the session stays authenticated.
- // However, if we just reconnected, we might need to re-auth?
- // The server creates a new Session on connection.
- // So if we have a token, we should probably send AUTH <token> before the command if this is a new connection?
- // But optimizing that away: we assume connection persists.
- // If the user calls Login, we get a token.
- // We can manually call Auth(token) if needed.
- // For simplicity, we assume the user calls Auth() or Login() at start.
- // 2. Send Command
- if _, err := c.writer.WriteString(cmdLine + "\r\n"); err != nil {
- c.conn.Close()
- c.conn = nil
- return "", err
- }
- // 3. Send Headers
- if len(body) > 0 {
- if _, err := c.writer.WriteString(fmt.Sprintf("Content-Length: %d\r\n", len(body))); err != nil {
- c.conn.Close()
- c.conn = nil
- return "", err
- }
- }
- // 4. Send End of Headers (Empty Line)
- if _, err := c.writer.WriteString("\r\n"); err != nil {
- c.conn.Close()
- c.conn = nil
- return "", err
- }
- // 5. Send Body
- if len(body) > 0 {
- if _, err := c.writer.WriteString(body); err != nil {
- c.conn.Close()
- c.conn = nil
- return "", err
- }
- }
- // 6. Flush
- if err := c.writer.Flush(); err != nil {
- c.conn.Close()
- c.conn = nil
- return "", err
- }
- // 7. Read Response
- // The server sends response terminated by \n
- resp, err := c.reader.ReadString('\n')
- if err != nil {
- c.conn.Close()
- c.conn = nil
- return "", err
- }
- return strings.TrimSpace(resp), nil
- }
- func (c *Client) reconnectLocked() error {
- if c.conn != nil {
- c.conn.Close()
- }
- conn, err := net.DialTimeout("tcp", c.addr, c.timeout)
- if err != nil {
- return err
- }
- c.conn = conn
- c.reader = bufio.NewReader(conn)
- c.writer = bufio.NewWriter(conn)
- // If we have a token, try to restore session
- if c.token != "" {
- // We need to send AUTH command without calling SendRequest to avoid infinite recursion
- // Construct AUTH command
- cmd := fmt.Sprintf("AUTH %s", c.token)
- c.writer.WriteString(cmd + "\r\n\r\n") // Cmd + Empty Line
- c.writer.Flush()
- resp, err := c.reader.ReadString('\n')
- if err == nil && strings.TrimSpace(resp) == "OK" {
- // Auth success
- } else {
- // Auth failed (maybe expired), but connection is open.
- // We don't error out here, just let the actual command fail if it needs auth.
- }
- }
- return nil
- }
- // Auth restores a session using an existing token.
- func (c *Client) Auth(token string) error {
- resp, err := c.SendRequest(fmt.Sprintf("AUTH %s", token), "")
- if err != nil {
- return err
- }
- if resp == "OK" {
- c.mu.Lock()
- c.token = token
- c.mu.Unlock()
- return nil
- }
- return fmt.Errorf("auth failed: %s", resp)
- }
|