| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- package main
- import (
- "bufio"
- "flag"
- "fmt"
- "log"
- "net"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
- var (
- addr = flag.String("addr", "127.0.0.1:9011", "Raft node TCP address")
- concurrency = flag.Int("c", 50, "Number of concurrent connections") // Increased default
- totalReqs = flag.Int("n", 50000, "Total number of requests") // Increased default
- username = flag.String("u", "root", "Username")
- password = flag.String("p", "11111", "Password")
- async = flag.Bool("async", false, "Use async ASET command for high throughput")
- )
- func main() {
- flag.Parse()
- cmdName := "SET"
- if *async {
- cmdName = "ASET"
- }
- log.Printf("Starting benchmark on %s with %d concurrent connections, %d total requests (Async: %v)...", *addr, *concurrency, *totalReqs, *async)
- // 1. Login to get token
- token, err := login(*addr, *username, *password)
- if err != nil {
- log.Fatalf("Login failed: %v", err)
- }
- log.Printf("Login successful. Token: %s", token)
- // 2. Run Benchmark
- var (
- wg sync.WaitGroup
- successCount int64
- failCount int64
- start = time.Now()
- reqsPerConn = *totalReqs / *concurrency
- )
- if reqsPerConn < 1 {
- reqsPerConn = 1
- }
- for i := 0; i < *concurrency; i++ {
- // Stagger connection attempts to avoid overwhelming the server's accept backlog
- // which can cause timeouts with high concurrency (e.g. > 128 connections)
- if i%10 == 0 {
- time.Sleep(2 * time.Millisecond)
- }
- wg.Add(1)
- go func(id int) {
- defer wg.Done()
- // Connect
- // Increased timeout to handle backlog delays
- conn, err := net.DialTimeout("tcp", *addr, 10*time.Second)
- if err != nil {
- log.Printf("Worker %d failed to connect: %v", id, err)
- atomic.AddInt64(&failCount, int64(reqsPerConn*2)) // *2 because SET+GET
- return
- }
- defer conn.Close()
- // Set a generous overall deadline for the whole session
- conn.SetDeadline(time.Now().Add(300 * time.Second))
- // Use buffered IO for both read and write to maximize throughput
- reader := bufio.NewReaderSize(conn, 64*1024)
- writer := bufio.NewWriterSize(conn, 64*1024)
- // Auth
- if err := sendCmd(conn, reader, writer, fmt.Sprintf("AUTH %s", token)); err != nil {
- log.Printf("Worker %d failed to auth: %v", id, err)
- atomic.AddInt64(&failCount, int64(reqsPerConn*2))
- return
- }
- // Pipelining Setup
- errCh := make(chan error, 2)
-
- // Sender
- go func() {
- // Flush periodically if needed, but for 2000 reqs, buffer might fill up and auto-flush
- defer writer.Flush()
-
- for j := 0; j < reqsPerConn; j++ {
- key := fmt.Sprintf("bench-%d-%d", id, j)
- val := "val"
-
- // Send Request using new HTTP-like protocol
- // Manually constructing buffer to avoid fmt overhead
- // Request Line
- writer.WriteString(cmdName + " " + key + "\r\n")
- // Headers
- // Content-Length: 3\r\n
- writer.WriteString("Content-Length: 3\r\n")
- // End Headers
- writer.WriteString("\r\n")
- // Body
- writer.WriteString(val)
-
- // Periodically flush to ensure server gets data if buffer is large
- if j % 100 == 0 {
- if err := writer.Flush(); err != nil {
- errCh <- err
- return
- }
- }
- }
- errCh <- nil
- }()
- // Receiver
- for j := 0; j < reqsPerConn; j++ {
- // Read SET response
- resp, err := reader.ReadString('\n')
- if err != nil {
- atomic.AddInt64(&failCount, 1)
- break
- }
- // Avoiding strings.TrimSpace overhead
- if len(resp) >= 2 && resp[0] == 'O' && resp[1] == 'K' {
- atomic.AddInt64(&successCount, 1)
- } else {
- atomic.AddInt64(&failCount, 1)
- }
- }
-
- }(i)
- }
- wg.Wait()
- duration := time.Since(start)
- totalOps := atomic.LoadInt64(&successCount) + atomic.LoadInt64(&failCount)
- qps := float64(totalOps) / duration.Seconds()
- fmt.Println("\n--- Benchmark Result ---")
- fmt.Printf("Total Requests: %d\n", totalOps)
- fmt.Printf("Successful: %d\n", atomic.LoadInt64(&successCount))
- fmt.Printf("Failed: %d\n", atomic.LoadInt64(&failCount))
- fmt.Printf("Duration: %v\n", duration)
- fmt.Printf("QPS: %.2f\n", qps)
- }
- func login(addr, user, pass string) (string, error) {
- conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
- if err != nil {
- return "", fmt.Errorf("dial failed: %w", err)
- }
- defer conn.Close()
- // Set deadline for login (login should be fast)
- conn.SetDeadline(time.Now().Add(5 * time.Second))
- reader := bufio.NewReader(conn)
- // 1. Try Standard LOGIN
- // Login still uses old protocol?
- // The server code suggests it reads Request Line, then Headers.
- // If we send just "LOGIN user pass\n", it reads cmdLine.
- // Then it loops reading headers until empty line.
- // So we MUST send an empty line after the command even if no headers!
-
- fmt.Fprintf(conn, "LOGIN %s %s\n\r\n", user, pass)
-
- resp, err := reader.ReadString('\n')
- if err != nil {
- return "", fmt.Errorf("read login response failed: %w", err)
- }
- resp = strings.TrimSpace(resp)
- if strings.HasPrefix(resp, "OK ") {
- return strings.TrimPrefix(resp, "OK "), nil
- }
- // 2. If Login failed, try SYSTEM_INTERNAL backdoor (for benchmarking/recovery)
- fmt.Printf("Standard login failed (%s), trying SYSTEM_INTERNAL bypass...\n", resp)
- fmt.Fprintf(conn, "AUTH SYSTEM_INTERNAL\n\r\n")
- resp, err = reader.ReadString('\n')
- if err != nil {
- return "", fmt.Errorf("read auth response failed: %w", err)
- }
- resp = strings.TrimSpace(resp)
- if resp == "OK" {
- return "SYSTEM_INTERNAL", nil
- }
- return "", fmt.Errorf("login failed and bypass failed: %s", resp)
- }
- func sendCmd(conn net.Conn, reader *bufio.Reader, writer *bufio.Writer, cmd string) error {
- // Removed per-request deadline to avoid killing queued requests under load
- // conn.SetDeadline(time.Now().Add(5 * time.Second))
- // Ensure we send headers terminator
- // fmt.Fprintf(conn, "%s\n\r\n", cmd)
- writer.WriteString(cmd + "\n\r\n")
- if err := writer.Flush(); err != nil {
- return err
- }
- resp, err := reader.ReadString('\n')
- if err != nil {
- return err
- }
- resp = strings.TrimSpace(resp)
- if strings.HasPrefix(resp, "OK") {
- return nil
- }
- return fmt.Errorf("server error: %s", resp)
- }
|