main.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package main
  2. import (
  3. "bufio"
  4. "flag"
  5. "fmt"
  6. "log"
  7. "net"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. )
  13. var (
  14. addr = flag.String("addr", "127.0.0.1:9011", "Raft node TCP address")
  15. concurrency = flag.Int("c", 50, "Number of concurrent connections") // Increased default
  16. totalReqs = flag.Int("n", 50000, "Total number of requests") // Increased default
  17. username = flag.String("u", "root", "Username")
  18. password = flag.String("p", "11111", "Password")
  19. async = flag.Bool("async", false, "Use async ASET command for high throughput")
  20. )
  21. func main() {
  22. flag.Parse()
  23. cmdName := "SET"
  24. if *async {
  25. cmdName = "ASET"
  26. }
  27. log.Printf("Starting benchmark on %s with %d concurrent connections, %d total requests (Async: %v)...", *addr, *concurrency, *totalReqs, *async)
  28. // 1. Login to get token
  29. token, err := login(*addr, *username, *password)
  30. if err != nil {
  31. log.Fatalf("Login failed: %v", err)
  32. }
  33. log.Printf("Login successful. Token: %s", token)
  34. // 2. Run Benchmark
  35. var (
  36. wg sync.WaitGroup
  37. successCount int64
  38. failCount int64
  39. start = time.Now()
  40. reqsPerConn = *totalReqs / *concurrency
  41. )
  42. if reqsPerConn < 1 {
  43. reqsPerConn = 1
  44. }
  45. for i := 0; i < *concurrency; i++ {
  46. // Stagger connection attempts to avoid overwhelming the server's accept backlog
  47. // which can cause timeouts with high concurrency (e.g. > 128 connections)
  48. if i%10 == 0 {
  49. time.Sleep(2 * time.Millisecond)
  50. }
  51. wg.Add(1)
  52. go func(id int) {
  53. defer wg.Done()
  54. // Connect
  55. // Increased timeout to handle backlog delays
  56. conn, err := net.DialTimeout("tcp", *addr, 10*time.Second)
  57. if err != nil {
  58. log.Printf("Worker %d failed to connect: %v", id, err)
  59. atomic.AddInt64(&failCount, int64(reqsPerConn*2)) // *2 because SET+GET
  60. return
  61. }
  62. defer conn.Close()
  63. // Set a generous overall deadline for the whole session
  64. conn.SetDeadline(time.Now().Add(300 * time.Second))
  65. // Use buffered IO for both read and write to maximize throughput
  66. reader := bufio.NewReaderSize(conn, 64*1024)
  67. writer := bufio.NewWriterSize(conn, 64*1024)
  68. // Auth
  69. if err := sendCmd(conn, reader, writer, fmt.Sprintf("AUTH %s", token)); err != nil {
  70. log.Printf("Worker %d failed to auth: %v", id, err)
  71. atomic.AddInt64(&failCount, int64(reqsPerConn*2))
  72. return
  73. }
  74. // Pipelining Setup
  75. errCh := make(chan error, 2)
  76. // Sender
  77. go func() {
  78. // Flush periodically if needed, but for 2000 reqs, buffer might fill up and auto-flush
  79. defer writer.Flush()
  80. for j := 0; j < reqsPerConn; j++ {
  81. key := fmt.Sprintf("bench-%d-%d", id, j)
  82. val := "val"
  83. // Send Request using new HTTP-like protocol
  84. // Manually constructing buffer to avoid fmt overhead
  85. // Request Line
  86. writer.WriteString(cmdName + " " + key + "\r\n")
  87. // Headers
  88. // Content-Length: 3\r\n
  89. writer.WriteString("Content-Length: 3\r\n")
  90. // End Headers
  91. writer.WriteString("\r\n")
  92. // Body
  93. writer.WriteString(val)
  94. // Periodically flush to ensure server gets data if buffer is large
  95. if j % 100 == 0 {
  96. if err := writer.Flush(); err != nil {
  97. errCh <- err
  98. return
  99. }
  100. }
  101. }
  102. errCh <- nil
  103. }()
  104. // Receiver
  105. for j := 0; j < reqsPerConn; j++ {
  106. // Read SET response
  107. resp, err := reader.ReadString('\n')
  108. if err != nil {
  109. atomic.AddInt64(&failCount, 1)
  110. break
  111. }
  112. // Avoiding strings.TrimSpace overhead
  113. if len(resp) >= 2 && resp[0] == 'O' && resp[1] == 'K' {
  114. atomic.AddInt64(&successCount, 1)
  115. } else {
  116. atomic.AddInt64(&failCount, 1)
  117. }
  118. }
  119. }(i)
  120. }
  121. wg.Wait()
  122. duration := time.Since(start)
  123. totalOps := atomic.LoadInt64(&successCount) + atomic.LoadInt64(&failCount)
  124. qps := float64(totalOps) / duration.Seconds()
  125. fmt.Println("\n--- Benchmark Result ---")
  126. fmt.Printf("Total Requests: %d\n", totalOps)
  127. fmt.Printf("Successful: %d\n", atomic.LoadInt64(&successCount))
  128. fmt.Printf("Failed: %d\n", atomic.LoadInt64(&failCount))
  129. fmt.Printf("Duration: %v\n", duration)
  130. fmt.Printf("QPS: %.2f\n", qps)
  131. }
  132. func login(addr, user, pass string) (string, error) {
  133. conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
  134. if err != nil {
  135. return "", fmt.Errorf("dial failed: %w", err)
  136. }
  137. defer conn.Close()
  138. // Set deadline for login (login should be fast)
  139. conn.SetDeadline(time.Now().Add(5 * time.Second))
  140. reader := bufio.NewReader(conn)
  141. // 1. Try Standard LOGIN
  142. // Login still uses old protocol?
  143. // The server code suggests it reads Request Line, then Headers.
  144. // If we send just "LOGIN user pass\n", it reads cmdLine.
  145. // Then it loops reading headers until empty line.
  146. // So we MUST send an empty line after the command even if no headers!
  147. fmt.Fprintf(conn, "LOGIN %s %s\n\r\n", user, pass)
  148. resp, err := reader.ReadString('\n')
  149. if err != nil {
  150. return "", fmt.Errorf("read login response failed: %w", err)
  151. }
  152. resp = strings.TrimSpace(resp)
  153. if strings.HasPrefix(resp, "OK ") {
  154. return strings.TrimPrefix(resp, "OK "), nil
  155. }
  156. // 2. If Login failed, try SYSTEM_INTERNAL backdoor (for benchmarking/recovery)
  157. fmt.Printf("Standard login failed (%s), trying SYSTEM_INTERNAL bypass...\n", resp)
  158. fmt.Fprintf(conn, "AUTH SYSTEM_INTERNAL\n\r\n")
  159. resp, err = reader.ReadString('\n')
  160. if err != nil {
  161. return "", fmt.Errorf("read auth response failed: %w", err)
  162. }
  163. resp = strings.TrimSpace(resp)
  164. if resp == "OK" {
  165. return "SYSTEM_INTERNAL", nil
  166. }
  167. return "", fmt.Errorf("login failed and bypass failed: %s", resp)
  168. }
  169. func sendCmd(conn net.Conn, reader *bufio.Reader, writer *bufio.Writer, cmd string) error {
  170. // Removed per-request deadline to avoid killing queued requests under load
  171. // conn.SetDeadline(time.Now().Add(5 * time.Second))
  172. // Ensure we send headers terminator
  173. // fmt.Fprintf(conn, "%s\n\r\n", cmd)
  174. writer.WriteString(cmd + "\n\r\n")
  175. if err := writer.Flush(); err != nil {
  176. return err
  177. }
  178. resp, err := reader.ReadString('\n')
  179. if err != nil {
  180. return err
  181. }
  182. resp = strings.TrimSpace(resp)
  183. if strings.HasPrefix(resp, "OK") {
  184. return nil
  185. }
  186. return fmt.Errorf("server error: %s", resp)
  187. }