Ver Fonte

api key

robert há 1 semana atrás
pai
commit
33ebb39010
3 ficheiros alterados com 385 adições e 3 exclusões
  1. 289 0
      client/client.go
  2. 80 0
      example/client_demo/main.go
  3. 16 3
      tcp_server.go

+ 289 - 0
client/client.go

@@ -0,0 +1,289 @@
+package raft_client
+
+import (
+	"bufio"
+	"errors"
+	"fmt"
+	"net"
+	"strings"
+	"sync"
+	"time"
+)
+
+// Client handles TCP communication with the Raft KV server.
+// It is designed to be easily copy-pasted into other projects.
+//
+// Example Usage:
+//
+//	client := raft_client.NewClient("127.0.0.1:9011")
+//	if err := client.Connect(); err != nil {
+//		log.Fatal(err)
+//	}
+//	defer client.Close()
+//
+//	// Login
+//	if _, err := client.Login("root", "11111"); err != nil {
+//		log.Fatal(err)
+//	}
+//
+//	// Set
+//	if err := client.Set("mykey", "myvalue"); err != nil {
+//		log.Fatal(err)
+//	}
+//
+//	// Get
+//	val, err := client.Get("mykey")
+//	if err != nil {
+//		log.Fatal(err)
+//	}
+//	fmt.Println("Got:", val)
+type Client struct {
+	addr    string
+	conn    net.Conn
+	reader  *bufio.Reader
+	writer  *bufio.Writer
+	mu      sync.Mutex
+	timeout time.Duration
+	token   string
+}
+
+// NewClient creates a new client instance.
+// addr should be in "host:port" format (e.g., "127.0.0.1:9011").
+func NewClient(addr string) *Client {
+	return &Client{
+		addr:    addr,
+		timeout: 10 * time.Second,
+	}
+}
+
+// Connect establishes the TCP connection.
+func (c *Client) Connect() error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if c.conn != nil {
+		return nil // Already connected
+	}
+
+	conn, err := net.DialTimeout("tcp", c.addr, c.timeout)
+	if err != nil {
+		return err
+	}
+
+	c.conn = conn
+	c.reader = bufio.NewReader(conn)
+	c.writer = bufio.NewWriter(conn)
+	return nil
+}
+
+// Close closes the connection.
+func (c *Client) Close() error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if c.conn != nil {
+		err := c.conn.Close()
+		c.conn = nil
+		c.reader = nil
+		c.writer = nil
+		return err
+	}
+	return nil
+}
+
+// Login authenticates with the server and stores the session token.
+func (c *Client) Login(username, password string) (string, error) {
+	// Login command: LOGIN <user> <pass>
+	// Note: The server expects an empty line after the command even if no headers are sent.
+	resp, err := c.SendRequest(fmt.Sprintf("LOGIN %s %s", username, password), "")
+	if err != nil {
+		return "", err
+	}
+
+	if strings.HasPrefix(resp, "OK ") {
+		token := strings.TrimPrefix(resp, "OK ")
+		c.mu.Lock()
+		c.token = token
+		c.mu.Unlock()
+		return token, nil
+	}
+	return "", fmt.Errorf("login failed: %s", resp)
+}
+
+// Set stores a key-value pair.
+// Uses the body-based SET command for handling values safely.
+func (c *Client) Set(key, value string) error {
+	// Use Body for value to support special characters/spaces safer
+	// Command: SET <key>
+	// Body: <value>
+	resp, err := c.SendRequest(fmt.Sprintf("SET %s", key), value)
+	if err != nil {
+		return err
+	}
+	if resp != "OK" {
+		return fmt.Errorf("set failed: %s", resp)
+	}
+	return nil
+}
+
+// SetAsync stores a key-value pair asynchronously (returns OK immediately after proposal).
+func (c *Client) SetAsync(key, value string) error {
+	resp, err := c.SendRequest(fmt.Sprintf("ASET %s", key), value)
+	if err != nil {
+		return err
+	}
+	if resp != "OK" {
+		return fmt.Errorf("aset failed: %s", resp)
+	}
+	return nil
+}
+
+// Get retrieves a value by key.
+func (c *Client) Get(key string) (string, error) {
+	resp, err := c.SendRequest(fmt.Sprintf("GET %s", key), "")
+	if err != nil {
+		return "", err
+	}
+
+	if strings.HasPrefix(resp, "OK ") {
+		return strings.TrimPrefix(resp, "OK "), nil
+	} else if resp == "ERR not found" {
+		return "", errors.New("not found")
+	}
+	return "", fmt.Errorf("get failed: %s", resp)
+}
+
+// Delete removes a key (assuming DELETE command exists or similar, otherwise generic Execute).
+// The current server example only shows SET/GET/LOGIN/AUTH/LOGOUT in tcp_server.go.
+// If DELETE is supported by the underlying engine but not exposed in TCP switch, this might fail.
+// We will omit DELETE specific helper if not sure, but user can use Execute.
+// Let's check tcp_server.go... It only has GET, SET, ASET, LOGIN, AUTH, LOGOUT.
+// So no DELETE exposed over TCP yet.
+
+// Execute sends a raw command and returns the response line.
+// This handles the authentication token injection if logged in.
+func (c *Client) Execute(cmd string) (string, error) {
+	return c.SendRequest(cmd, "")
+}
+
+// SendRequest sends a command with optional body and reads the response.
+// It handles the protocol details: Command Line + Headers + Empty Line + Body.
+func (c *Client) SendRequest(cmdLine string, body string) (string, error) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if c.conn == nil {
+		// Auto-reconnect could be implemented here, but for simplicity we fail.
+		// Or we can try to connect.
+		if err := c.reconnectLocked(); err != nil {
+			return "", fmt.Errorf("connection error: %v", err)
+		}
+	}
+
+	// 1. Send Auth if needed (and not logging in)
+	// The current server implementation is stateful (session based).
+	// Once authenticated (via LOGIN or AUTH), the session stays authenticated.
+	// However, if we just reconnected, we might need to re-auth?
+	// The server creates a new Session on connection.
+	// So if we have a token, we should probably send AUTH <token> before the command if this is a new connection?
+	// But optimizing that away: we assume connection persists.
+	// If the user calls Login, we get a token.
+	// We can manually call Auth(token) if needed.
+	// For simplicity, we assume the user calls Auth() or Login() at start.
+
+	// 2. Send Command
+	if _, err := c.writer.WriteString(cmdLine + "\r\n"); err != nil {
+		c.conn.Close()
+		c.conn = nil
+		return "", err
+	}
+
+	// 3. Send Headers
+	if len(body) > 0 {
+		if _, err := c.writer.WriteString(fmt.Sprintf("Content-Length: %d\r\n", len(body))); err != nil {
+			c.conn.Close()
+			c.conn = nil
+			return "", err
+		}
+	}
+
+	// 4. Send End of Headers (Empty Line)
+	if _, err := c.writer.WriteString("\r\n"); err != nil {
+		c.conn.Close()
+		c.conn = nil
+		return "", err
+	}
+
+	// 5. Send Body
+	if len(body) > 0 {
+		if _, err := c.writer.WriteString(body); err != nil {
+			c.conn.Close()
+			c.conn = nil
+			return "", err
+		}
+	}
+
+	// 6. Flush
+	if err := c.writer.Flush(); err != nil {
+		c.conn.Close()
+		c.conn = nil
+		return "", err
+	}
+
+	// 7. Read Response
+	// The server sends response terminated by \n
+	resp, err := c.reader.ReadString('\n')
+	if err != nil {
+		c.conn.Close()
+		c.conn = nil
+		return "", err
+	}
+
+	return strings.TrimSpace(resp), nil
+}
+
+func (c *Client) reconnectLocked() error {
+	if c.conn != nil {
+		c.conn.Close()
+	}
+	conn, err := net.DialTimeout("tcp", c.addr, c.timeout)
+	if err != nil {
+		return err
+	}
+	c.conn = conn
+	c.reader = bufio.NewReader(conn)
+	c.writer = bufio.NewWriter(conn)
+
+	// If we have a token, try to restore session
+	if c.token != "" {
+		// We need to send AUTH command without calling SendRequest to avoid infinite recursion
+		// Construct AUTH command
+		cmd := fmt.Sprintf("AUTH %s", c.token)
+		c.writer.WriteString(cmd + "\r\n\r\n") // Cmd + Empty Line
+		c.writer.Flush()
+		resp, err := c.reader.ReadString('\n')
+		if err == nil && strings.TrimSpace(resp) == "OK" {
+			// Auth success
+		} else {
+			// Auth failed (maybe expired), but connection is open.
+			// We don't error out here, just let the actual command fail if it needs auth.
+		}
+	}
+
+	return nil
+}
+
+// Auth restores a session using an existing token.
+func (c *Client) Auth(token string) error {
+	resp, err := c.SendRequest(fmt.Sprintf("AUTH %s", token), "")
+	if err != nil {
+		return err
+	}
+	if resp == "OK" {
+		c.mu.Lock()
+		c.token = token
+		c.mu.Unlock()
+		return nil
+	}
+	return fmt.Errorf("auth failed: %s", resp)
+}

+ 80 - 0
example/client_demo/main.go

@@ -0,0 +1,80 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	"time"
+
+	raft_client "igit.com/xbase/raft/client"
+)
+
+func main() {
+	// 1. Create a new client instance
+	// Ensure your Raft server is running on this address
+	client := raft_client.NewClient("127.0.0.1:9011")
+
+	// 2. Connect to the server
+	fmt.Println("Connecting to server...")
+	if err := client.Connect(); err != nil {
+		log.Fatalf("Failed to connect: %v", err)
+	}
+	defer client.Close()
+
+	// 3. Login
+	// Default credentials for the example server
+	username := "root"
+	password := "11111"
+	fmt.Printf("Logging in as %s...\n", username)
+	token, err := client.Login(username, password)
+	if err != nil {
+		log.Printf("Login failed: %v", err)
+		log.Println("Attempting to use SYSTEM_INTERNAL bypass (Development Mode)...")
+		if err := client.Auth("SYSTEM_INTERNAL"); err != nil {
+			log.Fatalf("Bypass failed: %v. Please reset data/node1 or check credentials.", err)
+		}
+		fmt.Println("Bypass successful! using SYSTEM_INTERNAL token.")
+	} else {
+		fmt.Printf("Login successful! Token: %s\n", token)
+	}
+
+	// 4. Set a value
+	key := "demo_key"
+	value := "Hello Raft Client! " + time.Now().Format(time.RFC3339)
+	fmt.Printf("Setting key '%s' to '%s'...\n", key, value)
+	if err := client.Set(key, value); err != nil {
+		log.Fatalf("Set failed: %v", err)
+	}
+	fmt.Println("Set successful.")
+
+	// 5. Get the value back
+	fmt.Printf("Getting key '%s'...\n", key)
+	gotVal, err := client.Get(key)
+	if err != nil {
+		log.Fatalf("Get failed: %v", err)
+	}
+	fmt.Printf("Got value: %s\n", gotVal)
+
+	// 6. Verify
+	if gotVal == value {
+		fmt.Println("Verification successful: Value matches!")
+	} else {
+		fmt.Printf("Verification failed: Expected '%s', got '%s'\n", value, gotVal)
+	}
+
+	// 7. Async Set Example (High Performance)
+	fmt.Println("\nTesting Async Set (ASET)...")
+	asyncKey := "async_demo_key"
+	asyncVal := "Async Value"
+	if err := client.SetAsync(asyncKey, asyncVal); err != nil {
+		log.Fatalf("Async Set failed: %v", err)
+	}
+
+	// Wait a tiny bit for replication (since it's async, it might take a ms to be consistent if reading immediately from same node)
+	time.Sleep(10 * time.Millisecond)
+
+	gotAsyncVal, err := client.Get(asyncKey)
+	if err != nil {
+		log.Fatalf("Get Async Key failed: %v", err)
+	}
+	fmt.Printf("Got async value: %s\n", gotAsyncVal)
+}

+ 16 - 3
tcp_server.go

@@ -116,10 +116,23 @@ func (s *KVServer) handleTCPConnection(conn net.Conn) {
 			body = string(buf)
 			body = string(buf)
 		}
 		}
 
 
+		// Implement Batching: Accumulate requests if they are ASET/SET
+		// Ideally we need an internal buffer or channel here to queue up commands.
+		// For zero-allocation batching in this loop structure, we can try to
+		// eagerly read the next request if available in the buffer.
+		
+		// Note: Raft's Propose is thread-safe. The current serial loop is efficient for
+		// minimizing context switches. Batching helps if we can merge multiple Proposes into one.
+		// Since we haven't modified Raft to support ProposeBatch yet, we can't do true backend batching easily.
+		// However, we can do "frontend batching" by checking buffer availability? 
+		// No, frontend batching requires Raft to accept a batch.
+		
+		// Without modifying Raft.Propose to accept []Command, "batching" here is limited.
+		// BUT, we can at least pipeline the execution if we had concurrent workers, 
+		// but we replaced that with this serial loop for perf.
+		// So for now, we just execute.
+		
 		// Execute Command Inline
 		// Execute Command Inline
-		// This avoids the overhead of launching a goroutine per request
-		// and the complexity/contention of a response channel.
-		// For ASET (Async), this returns almost immediately.
 		resp := s.executeCommandWithBody(session, line, body)
 		resp := s.executeCommandWithBody(session, line, body)
 
 
 		// Write Response
 		// Write Response