| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- package raft
- import (
- "bytes"
- "encoding/json"
- "net/http"
- "sync"
- "sync/atomic"
- "time"
- )
- // WatchHandler defines the function signature for watching key changes
- type WatchHandler func(key, value string, eventType KVCommandType)
- // Watcher interface for specialized handling
- type Watcher interface {
- OnWait(key, value string, eventType KVCommandType)
- }
- // WatcherWrapper adapts a Watcher interface to a WatchHandler function
- func WatcherWrapper(w Watcher) WatchHandler {
- return func(key, value string, eventType KVCommandType) {
- w.OnWait(key, value, eventType)
- }
- }
- // WebHookNotification represents the payload sent to the callback URL
- type WebHookNotification struct {
- Key string `json:"key"`
- Value string `json:"value"`
- EventType KVCommandType `json:"event_type"` // 0=Set, 1=Del
- Timestamp int64 `json:"timestamp"`
- }
- // NotificationTask represents a pending notification
- type NotificationTask struct {
- URL string
- Notification WebHookNotification
- RetryCount int
- MaxRetries int
- }
- // WatcherMetrics holds runtime stats for the watcher
- type WatcherMetrics struct {
- QueueSize int64
- DroppedNotifications uint64
- SentNotifications uint64
- FailedNotifications uint64
- }
- // WebHookWatcher manages network callbacks for key changes
- type WebHookWatcher struct {
- mu sync.RWMutex
- subscribers map[string][]string // Key -> []URL
-
- taskCh chan NotificationTask
- stopCh chan struct{}
- wg sync.WaitGroup
-
- client *http.Client
- maxRetries int
- logger Logger
- // Metrics
- metrics WatcherMetrics
-
- // Log throttling
- lastDropLogTime int64 // unix timestamp
- }
- // NewWebHookWatcher creates a new WebHookWatcher
- func NewWebHookWatcher(workers int, maxRetries int, logger Logger) *WebHookWatcher {
- w := &WebHookWatcher{
- subscribers: make(map[string][]string),
- taskCh: make(chan NotificationTask, 10000), // Large buffer to prevent blocking
- stopCh: make(chan struct{}),
- maxRetries: maxRetries,
- client: &http.Client{
- Timeout: 5 * time.Second,
- },
- logger: logger,
- }
- // Start workers
- for i := 0; i < workers; i++ {
- w.wg.Add(1)
- go w.worker()
- }
- return w
- }
- // Subscribe adds a callback URL for a specific key
- func (w *WebHookWatcher) Subscribe(key, url string) {
- w.mu.Lock()
- defer w.mu.Unlock()
-
- urls := w.subscribers[key]
- for _, u := range urls {
- if u == url {
- return // Already subscribed
- }
- }
- w.subscribers[key] = append(urls, url)
- w.logger.Info("Subscribed %s to key %s", url, key)
- }
- // Unsubscribe removes a callback URL for a specific key
- func (w *WebHookWatcher) Unsubscribe(key, url string) {
- w.mu.Lock()
- defer w.mu.Unlock()
-
- urls := w.subscribers[key]
- newURLs := make([]string, 0, len(urls))
- for _, u := range urls {
- if u != url {
- newURLs = append(newURLs, u)
- }
- }
-
- if len(newURLs) == 0 {
- delete(w.subscribers, key)
- } else {
- w.subscribers[key] = newURLs
- }
- }
- // Notify queues notifications for a key change
- func (w *WebHookWatcher) Notify(key, value string, eventType KVCommandType) {
- w.mu.RLock()
- urls := w.subscribers[key]
- // Copy to avoid holding lock
- targetURLs := make([]string, len(urls))
- copy(targetURLs, urls)
- w.mu.RUnlock()
- if len(targetURLs) == 0 {
- return
- }
- payload := WebHookNotification{
- Key: key,
- Value: value,
- EventType: eventType,
- Timestamp: time.Now().UnixNano(),
- }
- for _, url := range targetURLs {
- select {
- case w.taskCh <- NotificationTask{
- URL: url,
- Notification: payload,
- MaxRetries: w.maxRetries,
- }:
- atomic.AddInt64(&w.metrics.QueueSize, 1)
- default:
- // Queue full - Drop notification
- newDropped := atomic.AddUint64(&w.metrics.DroppedNotifications, 1)
-
- // Log Throttling: Log only once per second to avoid IO blocking
- now := time.Now().Unix()
- last := atomic.LoadInt64(&w.lastDropLogTime)
- if now > last {
- if atomic.CompareAndSwapInt64(&w.lastDropLogTime, last, now) {
- w.logger.Warn("Notification queue full! Dropped %d notifications total. (Key: %s -> %s)", newDropped, key, url)
- }
- }
- }
- }
- }
- // worker processes notification tasks
- func (w *WebHookWatcher) worker() {
- defer w.wg.Done()
- for {
- select {
- case <-w.stopCh:
- return
- case task := <-w.taskCh:
- atomic.AddInt64(&w.metrics.QueueSize, -1)
- w.sendNotification(task)
- }
- }
- }
- // sendNotification sends the HTTP request with retry logic
- func (w *WebHookWatcher) sendNotification(task NotificationTask) {
- data, err := json.Marshal(task.Notification)
- if err != nil {
- w.logger.Error("Failed to marshal notification: %v", err)
- return
- }
- // Exponential backoff for retries
- // 100ms, 200ms, 400ms, ...
- backoff := 100 * time.Millisecond
- for i := 0; i <= task.MaxRetries; i++ {
- if i > 0 {
- time.Sleep(backoff)
- backoff *= 2
- }
- resp, err := w.client.Post(task.URL, "application/json", bytes.NewBuffer(data))
- if err == nil {
- resp.Body.Close()
- if resp.StatusCode >= 200 && resp.StatusCode < 300 {
- // Success
- atomic.AddUint64(&w.metrics.SentNotifications, 1)
- return
- }
- w.logger.Debug("Notification to %s failed with status %d (attempt %d/%d)",
- task.URL, resp.StatusCode, i+1, task.MaxRetries+1)
- } else {
- w.logger.Debug("Notification to %s failed: %v (attempt %d/%d)",
- task.URL, err, i+1, task.MaxRetries+1)
- }
- }
- atomic.AddUint64(&w.metrics.FailedNotifications, 1)
- w.logger.Warn("Failed to deliver notification for key %s to %s after %d attempts",
- task.Notification.Key, task.URL, task.MaxRetries+1)
- }
- // Stop stops the watcher and waits for workers
- func (w *WebHookWatcher) Stop() {
- close(w.stopCh)
- w.wg.Wait()
- }
- // GetMetrics returns a copy of current metrics
- func (w *WebHookWatcher) GetMetrics() WatcherMetrics {
- return WatcherMetrics{
- QueueSize: atomic.LoadInt64(&w.metrics.QueueSize),
- DroppedNotifications: atomic.LoadUint64(&w.metrics.DroppedNotifications),
- SentNotifications: atomic.LoadUint64(&w.metrics.SentNotifications),
- FailedNotifications: atomic.LoadUint64(&w.metrics.FailedNotifications),
- }
- }
|