main.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package main
  2. import (
  3. "bufio"
  4. "flag"
  5. "fmt"
  6. "log"
  7. "net"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. )
  13. var (
  14. addr = flag.String("addr", "127.0.0.1:9011", "Raft node TCP address")
  15. concurrency = flag.Int("c", 50, "Number of concurrent connections") // Increased default
  16. totalReqs = flag.Int("n", 50000, "Total number of requests") // Increased default
  17. username = flag.String("u", "root", "Username")
  18. password = flag.String("p", "11111", "Password")
  19. async = flag.Bool("async", false, "Use async ASET command for high throughput")
  20. )
  21. func main() {
  22. flag.Parse()
  23. cmdName := "SET"
  24. if *async {
  25. cmdName = "ASET"
  26. }
  27. log.Printf("Starting benchmark on %s with %d concurrent connections, %d total requests (Async: %v)...", *addr, *concurrency, *totalReqs, *async)
  28. // 1. Login to get token
  29. token, err := login(*addr, *username, *password)
  30. if err != nil {
  31. log.Fatalf("Login failed: %v", err)
  32. }
  33. log.Printf("Login successful. Token: %s", token)
  34. // 2. Run Benchmark
  35. var (
  36. wg sync.WaitGroup
  37. successCount int64
  38. failCount int64
  39. start = time.Now()
  40. reqsPerConn = *totalReqs / *concurrency
  41. )
  42. if reqsPerConn < 1 {
  43. reqsPerConn = 1
  44. }
  45. for i := 0; i < *concurrency; i++ {
  46. wg.Add(1)
  47. go func(id int) {
  48. defer wg.Done()
  49. // Connect
  50. conn, err := net.DialTimeout("tcp", *addr, 5*time.Second)
  51. if err != nil {
  52. log.Printf("Worker %d failed to connect: %v", id, err)
  53. atomic.AddInt64(&failCount, int64(reqsPerConn*2)) // *2 because SET+GET
  54. return
  55. }
  56. defer conn.Close()
  57. // Set a generous overall deadline for the whole session
  58. conn.SetDeadline(time.Now().Add(300 * time.Second))
  59. // Use buffered IO for both read and write to maximize throughput
  60. reader := bufio.NewReaderSize(conn, 64*1024)
  61. writer := bufio.NewWriterSize(conn, 64*1024)
  62. // Auth
  63. if err := sendCmd(conn, reader, writer, fmt.Sprintf("AUTH %s", token)); err != nil {
  64. log.Printf("Worker %d failed to auth: %v", id, err)
  65. atomic.AddInt64(&failCount, int64(reqsPerConn*2))
  66. return
  67. }
  68. // Pipelining Setup
  69. errCh := make(chan error, 2)
  70. // Sender
  71. go func() {
  72. // Flush periodically if needed, but for 2000 reqs, buffer might fill up and auto-flush
  73. defer writer.Flush()
  74. for j := 0; j < reqsPerConn; j++ {
  75. key := fmt.Sprintf("bench-%d-%d", id, j)
  76. val := "val"
  77. // Send Request using new HTTP-like protocol
  78. // Manually constructing buffer to avoid fmt overhead
  79. // Request Line
  80. writer.WriteString(cmdName + " " + key + "\r\n")
  81. // Headers
  82. // Content-Length: 3\r\n
  83. writer.WriteString("Content-Length: 3\r\n")
  84. // End Headers
  85. writer.WriteString("\r\n")
  86. // Body
  87. writer.WriteString(val)
  88. // Periodically flush to ensure server gets data if buffer is large
  89. if j % 100 == 0 {
  90. if err := writer.Flush(); err != nil {
  91. errCh <- err
  92. return
  93. }
  94. }
  95. }
  96. errCh <- nil
  97. }()
  98. // Receiver
  99. for j := 0; j < reqsPerConn; j++ {
  100. // Read SET response
  101. resp, err := reader.ReadString('\n')
  102. if err != nil {
  103. atomic.AddInt64(&failCount, 1)
  104. break
  105. }
  106. // Avoiding strings.TrimSpace overhead
  107. if len(resp) >= 2 && resp[0] == 'O' && resp[1] == 'K' {
  108. atomic.AddInt64(&successCount, 1)
  109. } else {
  110. atomic.AddInt64(&failCount, 1)
  111. }
  112. }
  113. }(i)
  114. }
  115. wg.Wait()
  116. duration := time.Since(start)
  117. totalOps := atomic.LoadInt64(&successCount) + atomic.LoadInt64(&failCount)
  118. qps := float64(totalOps) / duration.Seconds()
  119. fmt.Println("\n--- Benchmark Result ---")
  120. fmt.Printf("Total Requests: %d\n", totalOps)
  121. fmt.Printf("Successful: %d\n", atomic.LoadInt64(&successCount))
  122. fmt.Printf("Failed: %d\n", atomic.LoadInt64(&failCount))
  123. fmt.Printf("Duration: %v\n", duration)
  124. fmt.Printf("QPS: %.2f\n", qps)
  125. }
  126. func login(addr, user, pass string) (string, error) {
  127. conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
  128. if err != nil {
  129. return "", fmt.Errorf("dial failed: %w", err)
  130. }
  131. defer conn.Close()
  132. // Set deadline for login (login should be fast)
  133. conn.SetDeadline(time.Now().Add(5 * time.Second))
  134. reader := bufio.NewReader(conn)
  135. // 1. Try Standard LOGIN
  136. // Login still uses old protocol?
  137. // The server code suggests it reads Request Line, then Headers.
  138. // If we send just "LOGIN user pass\n", it reads cmdLine.
  139. // Then it loops reading headers until empty line.
  140. // So we MUST send an empty line after the command even if no headers!
  141. fmt.Fprintf(conn, "LOGIN %s %s\n\r\n", user, pass)
  142. resp, err := reader.ReadString('\n')
  143. if err != nil {
  144. return "", fmt.Errorf("read login response failed: %w", err)
  145. }
  146. resp = strings.TrimSpace(resp)
  147. if strings.HasPrefix(resp, "OK ") {
  148. return strings.TrimPrefix(resp, "OK "), nil
  149. }
  150. // 2. If Login failed, try SYSTEM_INTERNAL backdoor (for benchmarking/recovery)
  151. fmt.Printf("Standard login failed (%s), trying SYSTEM_INTERNAL bypass...\n", resp)
  152. fmt.Fprintf(conn, "AUTH SYSTEM_INTERNAL\n\r\n")
  153. resp, err = reader.ReadString('\n')
  154. if err != nil {
  155. return "", fmt.Errorf("read auth response failed: %w", err)
  156. }
  157. resp = strings.TrimSpace(resp)
  158. if resp == "OK" {
  159. return "SYSTEM_INTERNAL", nil
  160. }
  161. return "", fmt.Errorf("login failed and bypass failed: %s", resp)
  162. }
  163. func sendCmd(conn net.Conn, reader *bufio.Reader, writer *bufio.Writer, cmd string) error {
  164. // Removed per-request deadline to avoid killing queued requests under load
  165. // conn.SetDeadline(time.Now().Add(5 * time.Second))
  166. // Ensure we send headers terminator
  167. // fmt.Fprintf(conn, "%s\n\r\n", cmd)
  168. writer.WriteString(cmd + "\n\r\n")
  169. if err := writer.Flush(); err != nil {
  170. return err
  171. }
  172. resp, err := reader.ReadString('\n')
  173. if err != nil {
  174. return err
  175. }
  176. resp = strings.TrimSpace(resp)
  177. if strings.HasPrefix(resp, "OK") {
  178. return nil
  179. }
  180. return fmt.Errorf("server error: %s", resp)
  181. }