package main import ( "bufio" "flag" "fmt" "log" "net" "strings" "sync" "sync/atomic" "time" ) var ( addr = flag.String("addr", "127.0.0.1:9011", "Raft node TCP address") concurrency = flag.Int("c", 50, "Number of concurrent connections") // Increased default totalReqs = flag.Int("n", 50000, "Total number of requests") // Increased default username = flag.String("u", "root", "Username") password = flag.String("p", "11111", "Password") async = flag.Bool("async", false, "Use async ASET command for high throughput") ) func main() { flag.Parse() cmdName := "SET" if *async { cmdName = "ASET" } log.Printf("Starting benchmark on %s with %d concurrent connections, %d total requests (Async: %v)...", *addr, *concurrency, *totalReqs, *async) // 1. Login to get token token, err := login(*addr, *username, *password) if err != nil { log.Fatalf("Login failed: %v", err) } log.Printf("Login successful. Token: %s", token) // 2. Run Benchmark var ( wg sync.WaitGroup successCount int64 failCount int64 start = time.Now() reqsPerConn = *totalReqs / *concurrency ) if reqsPerConn < 1 { reqsPerConn = 1 } for i := 0; i < *concurrency; i++ { wg.Add(1) go func(id int) { defer wg.Done() // Connect conn, err := net.DialTimeout("tcp", *addr, 5*time.Second) if err != nil { log.Printf("Worker %d failed to connect: %v", id, err) atomic.AddInt64(&failCount, int64(reqsPerConn*2)) // *2 because SET+GET return } defer conn.Close() // Set a generous overall deadline for the whole session conn.SetDeadline(time.Now().Add(300 * time.Second)) // Use buffered IO for both read and write to maximize throughput reader := bufio.NewReaderSize(conn, 64*1024) writer := bufio.NewWriterSize(conn, 64*1024) // Auth if err := sendCmd(conn, reader, writer, fmt.Sprintf("AUTH %s", token)); err != nil { log.Printf("Worker %d failed to auth: %v", id, err) atomic.AddInt64(&failCount, int64(reqsPerConn*2)) return } // Pipelining Setup errCh := make(chan error, 2) // Sender go func() { // Flush periodically if needed, but for 2000 reqs, buffer might fill up and auto-flush defer writer.Flush() for j := 0; j < reqsPerConn; j++ { key := fmt.Sprintf("bench-%d-%d", id, j) val := "val" // Send Request using new HTTP-like protocol // Manually constructing buffer to avoid fmt overhead // Request Line writer.WriteString(cmdName + " " + key + "\r\n") // Headers // Content-Length: 3\r\n writer.WriteString("Content-Length: 3\r\n") // End Headers writer.WriteString("\r\n") // Body writer.WriteString(val) // Periodically flush to ensure server gets data if buffer is large if j % 100 == 0 { if err := writer.Flush(); err != nil { errCh <- err return } } } errCh <- nil }() // Receiver for j := 0; j < reqsPerConn; j++ { // Read SET response resp, err := reader.ReadString('\n') if err != nil { atomic.AddInt64(&failCount, 1) break } // Avoiding strings.TrimSpace overhead if len(resp) >= 2 && resp[0] == 'O' && resp[1] == 'K' { atomic.AddInt64(&successCount, 1) } else { atomic.AddInt64(&failCount, 1) } } }(i) } wg.Wait() duration := time.Since(start) totalOps := atomic.LoadInt64(&successCount) + atomic.LoadInt64(&failCount) qps := float64(totalOps) / duration.Seconds() fmt.Println("\n--- Benchmark Result ---") fmt.Printf("Total Requests: %d\n", totalOps) fmt.Printf("Successful: %d\n", atomic.LoadInt64(&successCount)) fmt.Printf("Failed: %d\n", atomic.LoadInt64(&failCount)) fmt.Printf("Duration: %v\n", duration) fmt.Printf("QPS: %.2f\n", qps) } func login(addr, user, pass string) (string, error) { conn, err := net.DialTimeout("tcp", addr, 5*time.Second) if err != nil { return "", fmt.Errorf("dial failed: %w", err) } defer conn.Close() // Set deadline for login (login should be fast) conn.SetDeadline(time.Now().Add(5 * time.Second)) reader := bufio.NewReader(conn) // 1. Try Standard LOGIN // Login still uses old protocol? // The server code suggests it reads Request Line, then Headers. // If we send just "LOGIN user pass\n", it reads cmdLine. // Then it loops reading headers until empty line. // So we MUST send an empty line after the command even if no headers! fmt.Fprintf(conn, "LOGIN %s %s\n\r\n", user, pass) resp, err := reader.ReadString('\n') if err != nil { return "", fmt.Errorf("read login response failed: %w", err) } resp = strings.TrimSpace(resp) if strings.HasPrefix(resp, "OK ") { return strings.TrimPrefix(resp, "OK "), nil } // 2. If Login failed, try SYSTEM_INTERNAL backdoor (for benchmarking/recovery) fmt.Printf("Standard login failed (%s), trying SYSTEM_INTERNAL bypass...\n", resp) fmt.Fprintf(conn, "AUTH SYSTEM_INTERNAL\n\r\n") resp, err = reader.ReadString('\n') if err != nil { return "", fmt.Errorf("read auth response failed: %w", err) } resp = strings.TrimSpace(resp) if resp == "OK" { return "SYSTEM_INTERNAL", nil } return "", fmt.Errorf("login failed and bypass failed: %s", resp) } func sendCmd(conn net.Conn, reader *bufio.Reader, writer *bufio.Writer, cmd string) error { // Removed per-request deadline to avoid killing queued requests under load // conn.SetDeadline(time.Now().Add(5 * time.Second)) // Ensure we send headers terminator // fmt.Fprintf(conn, "%s\n\r\n", cmd) writer.WriteString(cmd + "\n\r\n") if err := writer.Flush(); err != nil { return err } resp, err := reader.ReadString('\n') if err != nil { return err } resp = strings.TrimSpace(resp) if strings.HasPrefix(resp, "OK") { return nil } return fmt.Errorf("server error: %s", resp) }