watcher.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package raft
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "net/http"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. // WatchHandler defines the function signature for watching key changes
  11. type WatchHandler func(key, value string, eventType KVCommandType)
  12. // Watcher interface for specialized handling
  13. type Watcher interface {
  14. OnWait(key, value string, eventType KVCommandType)
  15. }
  16. // WatcherWrapper adapts a Watcher interface to a WatchHandler function
  17. func WatcherWrapper(w Watcher) WatchHandler {
  18. return func(key, value string, eventType KVCommandType) {
  19. w.OnWait(key, value, eventType)
  20. }
  21. }
  22. // WebHookNotification represents the payload sent to the callback URL
  23. type WebHookNotification struct {
  24. Key string `json:"key"`
  25. Value string `json:"value"`
  26. EventType KVCommandType `json:"event_type"` // 0=Set, 1=Del
  27. Timestamp int64 `json:"timestamp"`
  28. }
  29. // NotificationTask represents a pending notification
  30. type NotificationTask struct {
  31. URL string
  32. Notification WebHookNotification
  33. RetryCount int
  34. MaxRetries int
  35. }
  36. // WatcherMetrics holds runtime stats for the watcher
  37. type WatcherMetrics struct {
  38. QueueSize int64
  39. DroppedNotifications uint64
  40. SentNotifications uint64
  41. FailedNotifications uint64
  42. }
  43. // WebHookWatcher manages network callbacks for key changes
  44. type WebHookWatcher struct {
  45. mu sync.RWMutex
  46. subscribers map[string][]string // Key -> []URL
  47. taskCh chan NotificationTask
  48. stopCh chan struct{}
  49. wg sync.WaitGroup
  50. client *http.Client
  51. maxRetries int
  52. logger Logger
  53. // Metrics
  54. metrics WatcherMetrics
  55. // Log throttling
  56. lastDropLogTime int64 // unix timestamp
  57. }
  58. // NewWebHookWatcher creates a new WebHookWatcher
  59. func NewWebHookWatcher(workers int, maxRetries int, logger Logger) *WebHookWatcher {
  60. w := &WebHookWatcher{
  61. subscribers: make(map[string][]string),
  62. taskCh: make(chan NotificationTask, 10000), // Large buffer to prevent blocking
  63. stopCh: make(chan struct{}),
  64. maxRetries: maxRetries,
  65. client: &http.Client{
  66. Timeout: 5 * time.Second,
  67. },
  68. logger: logger,
  69. }
  70. // Start workers
  71. for i := 0; i < workers; i++ {
  72. w.wg.Add(1)
  73. go w.worker()
  74. }
  75. return w
  76. }
  77. // Subscribe adds a callback URL for a specific key
  78. func (w *WebHookWatcher) Subscribe(key, url string) {
  79. w.mu.Lock()
  80. defer w.mu.Unlock()
  81. urls := w.subscribers[key]
  82. for _, u := range urls {
  83. if u == url {
  84. return // Already subscribed
  85. }
  86. }
  87. w.subscribers[key] = append(urls, url)
  88. w.logger.Info("Subscribed %s to key %s", url, key)
  89. }
  90. // Unsubscribe removes a callback URL for a specific key
  91. func (w *WebHookWatcher) Unsubscribe(key, url string) {
  92. w.mu.Lock()
  93. defer w.mu.Unlock()
  94. urls := w.subscribers[key]
  95. newURLs := make([]string, 0, len(urls))
  96. for _, u := range urls {
  97. if u != url {
  98. newURLs = append(newURLs, u)
  99. }
  100. }
  101. if len(newURLs) == 0 {
  102. delete(w.subscribers, key)
  103. } else {
  104. w.subscribers[key] = newURLs
  105. }
  106. }
  107. // Notify queues notifications for a key change
  108. func (w *WebHookWatcher) Notify(key, value string, eventType KVCommandType) {
  109. w.mu.RLock()
  110. urls := w.subscribers[key]
  111. // Copy to avoid holding lock
  112. targetURLs := make([]string, len(urls))
  113. copy(targetURLs, urls)
  114. w.mu.RUnlock()
  115. if len(targetURLs) == 0 {
  116. return
  117. }
  118. payload := WebHookNotification{
  119. Key: key,
  120. Value: value,
  121. EventType: eventType,
  122. Timestamp: time.Now().UnixNano(),
  123. }
  124. for _, url := range targetURLs {
  125. select {
  126. case w.taskCh <- NotificationTask{
  127. URL: url,
  128. Notification: payload,
  129. MaxRetries: w.maxRetries,
  130. }:
  131. atomic.AddInt64(&w.metrics.QueueSize, 1)
  132. default:
  133. // Queue full - Drop notification
  134. newDropped := atomic.AddUint64(&w.metrics.DroppedNotifications, 1)
  135. // Log Throttling: Log only once per second to avoid IO blocking
  136. now := time.Now().Unix()
  137. last := atomic.LoadInt64(&w.lastDropLogTime)
  138. if now > last {
  139. if atomic.CompareAndSwapInt64(&w.lastDropLogTime, last, now) {
  140. w.logger.Warn("Notification queue full! Dropped %d notifications total. (Key: %s -> %s)", newDropped, key, url)
  141. }
  142. }
  143. }
  144. }
  145. }
  146. // worker processes notification tasks
  147. func (w *WebHookWatcher) worker() {
  148. defer w.wg.Done()
  149. for {
  150. select {
  151. case <-w.stopCh:
  152. return
  153. case task := <-w.taskCh:
  154. atomic.AddInt64(&w.metrics.QueueSize, -1)
  155. w.sendNotification(task)
  156. }
  157. }
  158. }
  159. // sendNotification sends the HTTP request with retry logic
  160. func (w *WebHookWatcher) sendNotification(task NotificationTask) {
  161. data, err := json.Marshal(task.Notification)
  162. if err != nil {
  163. w.logger.Error("Failed to marshal notification: %v", err)
  164. return
  165. }
  166. // Exponential backoff for retries
  167. // 100ms, 200ms, 400ms, ...
  168. backoff := 100 * time.Millisecond
  169. for i := 0; i <= task.MaxRetries; i++ {
  170. if i > 0 {
  171. time.Sleep(backoff)
  172. backoff *= 2
  173. }
  174. resp, err := w.client.Post(task.URL, "application/json", bytes.NewBuffer(data))
  175. if err == nil {
  176. resp.Body.Close()
  177. if resp.StatusCode >= 200 && resp.StatusCode < 300 {
  178. // Success
  179. atomic.AddUint64(&w.metrics.SentNotifications, 1)
  180. return
  181. }
  182. w.logger.Debug("Notification to %s failed with status %d (attempt %d/%d)",
  183. task.URL, resp.StatusCode, i+1, task.MaxRetries+1)
  184. } else {
  185. w.logger.Debug("Notification to %s failed: %v (attempt %d/%d)",
  186. task.URL, err, i+1, task.MaxRetries+1)
  187. }
  188. }
  189. atomic.AddUint64(&w.metrics.FailedNotifications, 1)
  190. w.logger.Warn("Failed to deliver notification for key %s to %s after %d attempts",
  191. task.Notification.Key, task.URL, task.MaxRetries+1)
  192. }
  193. // Stop stops the watcher and waits for workers
  194. func (w *WebHookWatcher) Stop() {
  195. close(w.stopCh)
  196. w.wg.Wait()
  197. }
  198. // GetMetrics returns a copy of current metrics
  199. func (w *WebHookWatcher) GetMetrics() WatcherMetrics {
  200. return WatcherMetrics{
  201. QueueSize: atomic.LoadInt64(&w.metrics.QueueSize),
  202. DroppedNotifications: atomic.LoadUint64(&w.metrics.DroppedNotifications),
  203. SentNotifications: atomic.LoadUint64(&w.metrics.SentNotifications),
  204. FailedNotifications: atomic.LoadUint64(&w.metrics.FailedNotifications),
  205. }
  206. }