package raft_client import ( "bufio" "errors" "fmt" "net" "strings" "sync" "time" ) // Client handles TCP communication with the Raft KV server. // It is designed to be easily copy-pasted into other projects. // // Example Usage: // // client := raft_client.NewClient("127.0.0.1:9011") // if err := client.Connect(); err != nil { // log.Fatal(err) // } // defer client.Close() // // // Login // if _, err := client.Login("root", "11111"); err != nil { // log.Fatal(err) // } // // // Set // if err := client.Set("mykey", "myvalue"); err != nil { // log.Fatal(err) // } // // // Get // val, err := client.Get("mykey") // if err != nil { // log.Fatal(err) // } // fmt.Println("Got:", val) type Client struct { addr string conn net.Conn reader *bufio.Reader writer *bufio.Writer mu sync.Mutex timeout time.Duration token string } // NewClient creates a new client instance. // addr should be in "host:port" format (e.g., "127.0.0.1:9011"). func NewClient(addr string) *Client { return &Client{ addr: addr, timeout: 10 * time.Second, } } // Connect establishes the TCP connection. func (c *Client) Connect() error { c.mu.Lock() defer c.mu.Unlock() if c.conn != nil { return nil // Already connected } conn, err := net.DialTimeout("tcp", c.addr, c.timeout) if err != nil { return err } c.conn = conn c.reader = bufio.NewReader(conn) c.writer = bufio.NewWriter(conn) return nil } // Close closes the connection. func (c *Client) Close() error { c.mu.Lock() defer c.mu.Unlock() if c.conn != nil { err := c.conn.Close() c.conn = nil c.reader = nil c.writer = nil return err } return nil } // Login authenticates with the server and stores the session token. func (c *Client) Login(username, password string) (string, error) { // Login command: LOGIN // Note: The server expects an empty line after the command even if no headers are sent. resp, err := c.SendRequest(fmt.Sprintf("LOGIN %s %s", username, password), "") if err != nil { return "", err } if strings.HasPrefix(resp, "OK ") { token := strings.TrimPrefix(resp, "OK ") c.mu.Lock() c.token = token c.mu.Unlock() return token, nil } return "", fmt.Errorf("login failed: %s", resp) } // Set stores a key-value pair. // Uses the body-based SET command for handling values safely. func (c *Client) Set(key, value string) error { // Use Body for value to support special characters/spaces safer // Command: SET // Body: resp, err := c.SendRequest(fmt.Sprintf("SET %s", key), value) if err != nil { return err } if resp != "OK" { return fmt.Errorf("set failed: %s", resp) } return nil } // SetAsync stores a key-value pair asynchronously (returns OK immediately after proposal). func (c *Client) SetAsync(key, value string) error { resp, err := c.SendRequest(fmt.Sprintf("ASET %s", key), value) if err != nil { return err } if resp != "OK" { return fmt.Errorf("aset failed: %s", resp) } return nil } // Get retrieves a value by key. func (c *Client) Get(key string) (string, error) { resp, err := c.SendRequest(fmt.Sprintf("GET %s", key), "") if err != nil { return "", err } if strings.HasPrefix(resp, "OK ") { return strings.TrimPrefix(resp, "OK "), nil } else if resp == "ERR not found" { return "", errors.New("not found") } return "", fmt.Errorf("get failed: %s", resp) } // Delete removes a key (assuming DELETE command exists or similar, otherwise generic Execute). // The current server example only shows SET/GET/LOGIN/AUTH/LOGOUT in tcp_server.go. // If DELETE is supported by the underlying engine but not exposed in TCP switch, this might fail. // We will omit DELETE specific helper if not sure, but user can use Execute. // Let's check tcp_server.go... It only has GET, SET, ASET, LOGIN, AUTH, LOGOUT. // So no DELETE exposed over TCP yet. // Execute sends a raw command and returns the response line. // This handles the authentication token injection if logged in. func (c *Client) Execute(cmd string) (string, error) { return c.SendRequest(cmd, "") } // SendRequest sends a command with optional body and reads the response. // It handles the protocol details: Command Line + Headers + Empty Line + Body. func (c *Client) SendRequest(cmdLine string, body string) (string, error) { c.mu.Lock() defer c.mu.Unlock() if c.conn == nil { // Auto-reconnect could be implemented here, but for simplicity we fail. // Or we can try to connect. if err := c.reconnectLocked(); err != nil { return "", fmt.Errorf("connection error: %v", err) } } // 1. Send Auth if needed (and not logging in) // The current server implementation is stateful (session based). // Once authenticated (via LOGIN or AUTH), the session stays authenticated. // However, if we just reconnected, we might need to re-auth? // The server creates a new Session on connection. // So if we have a token, we should probably send AUTH before the command if this is a new connection? // But optimizing that away: we assume connection persists. // If the user calls Login, we get a token. // We can manually call Auth(token) if needed. // For simplicity, we assume the user calls Auth() or Login() at start. // 2. Send Command if _, err := c.writer.WriteString(cmdLine + "\r\n"); err != nil { c.conn.Close() c.conn = nil return "", err } // 3. Send Headers if len(body) > 0 { if _, err := c.writer.WriteString(fmt.Sprintf("Content-Length: %d\r\n", len(body))); err != nil { c.conn.Close() c.conn = nil return "", err } } // 4. Send End of Headers (Empty Line) if _, err := c.writer.WriteString("\r\n"); err != nil { c.conn.Close() c.conn = nil return "", err } // 5. Send Body if len(body) > 0 { if _, err := c.writer.WriteString(body); err != nil { c.conn.Close() c.conn = nil return "", err } } // 6. Flush if err := c.writer.Flush(); err != nil { c.conn.Close() c.conn = nil return "", err } // 7. Read Response // The server sends response terminated by \n resp, err := c.reader.ReadString('\n') if err != nil { c.conn.Close() c.conn = nil return "", err } return strings.TrimSpace(resp), nil } func (c *Client) reconnectLocked() error { if c.conn != nil { c.conn.Close() } conn, err := net.DialTimeout("tcp", c.addr, c.timeout) if err != nil { return err } c.conn = conn c.reader = bufio.NewReader(conn) c.writer = bufio.NewWriter(conn) // If we have a token, try to restore session if c.token != "" { // We need to send AUTH command without calling SendRequest to avoid infinite recursion // Construct AUTH command cmd := fmt.Sprintf("AUTH %s", c.token) c.writer.WriteString(cmd + "\r\n\r\n") // Cmd + Empty Line c.writer.Flush() resp, err := c.reader.ReadString('\n') if err == nil && strings.TrimSpace(resp) == "OK" { // Auth success } else { // Auth failed (maybe expired), but connection is open. // We don't error out here, just let the actual command fail if it needs auth. } } return nil } // Auth restores a session using an existing token. func (c *Client) Auth(token string) error { resp, err := c.SendRequest(fmt.Sprintf("AUTH %s", token), "") if err != nil { return err } if resp == "OK" { c.mu.Lock() c.token = token c.mu.Unlock() return nil } return fmt.Errorf("auth failed: %s", resp) }