package raft import ( "bytes" "encoding/json" "net/http" "sync" "sync/atomic" "time" ) // WatchHandler defines the function signature for watching key changes type WatchHandler func(key, value string, eventType KVCommandType) // Watcher interface for specialized handling type Watcher interface { OnWait(key, value string, eventType KVCommandType) } // WatcherWrapper adapts a Watcher interface to a WatchHandler function func WatcherWrapper(w Watcher) WatchHandler { return func(key, value string, eventType KVCommandType) { w.OnWait(key, value, eventType) } } // WebHookNotification represents the payload sent to the callback URL type WebHookNotification struct { Key string `json:"key"` Value string `json:"value"` EventType KVCommandType `json:"event_type"` // 0=Set, 1=Del Timestamp int64 `json:"timestamp"` } // NotificationTask represents a pending notification type NotificationTask struct { URL string Notification WebHookNotification RetryCount int MaxRetries int } // WatcherMetrics holds runtime stats for the watcher type WatcherMetrics struct { QueueSize int64 DroppedNotifications uint64 SentNotifications uint64 FailedNotifications uint64 } // WebHookWatcher manages network callbacks for key changes type WebHookWatcher struct { mu sync.RWMutex subscribers map[string][]string // Key -> []URL taskCh chan NotificationTask stopCh chan struct{} wg sync.WaitGroup client *http.Client maxRetries int logger Logger // Metrics metrics WatcherMetrics // Log throttling lastDropLogTime int64 // unix timestamp } // NewWebHookWatcher creates a new WebHookWatcher func NewWebHookWatcher(workers int, maxRetries int, logger Logger) *WebHookWatcher { w := &WebHookWatcher{ subscribers: make(map[string][]string), taskCh: make(chan NotificationTask, 10000), // Large buffer to prevent blocking stopCh: make(chan struct{}), maxRetries: maxRetries, client: &http.Client{ Timeout: 5 * time.Second, }, logger: logger, } // Start workers for i := 0; i < workers; i++ { w.wg.Add(1) go w.worker() } return w } // Subscribe adds a callback URL for a specific key func (w *WebHookWatcher) Subscribe(key, url string) { w.mu.Lock() defer w.mu.Unlock() urls := w.subscribers[key] for _, u := range urls { if u == url { return // Already subscribed } } w.subscribers[key] = append(urls, url) w.logger.Info("Subscribed %s to key %s", url, key) } // Unsubscribe removes a callback URL for a specific key func (w *WebHookWatcher) Unsubscribe(key, url string) { w.mu.Lock() defer w.mu.Unlock() urls := w.subscribers[key] newURLs := make([]string, 0, len(urls)) for _, u := range urls { if u != url { newURLs = append(newURLs, u) } } if len(newURLs) == 0 { delete(w.subscribers, key) } else { w.subscribers[key] = newURLs } } // Notify queues notifications for a key change func (w *WebHookWatcher) Notify(key, value string, eventType KVCommandType) { w.mu.RLock() urls := w.subscribers[key] // Copy to avoid holding lock targetURLs := make([]string, len(urls)) copy(targetURLs, urls) w.mu.RUnlock() if len(targetURLs) == 0 { return } payload := WebHookNotification{ Key: key, Value: value, EventType: eventType, Timestamp: time.Now().UnixNano(), } for _, url := range targetURLs { select { case w.taskCh <- NotificationTask{ URL: url, Notification: payload, MaxRetries: w.maxRetries, }: atomic.AddInt64(&w.metrics.QueueSize, 1) default: // Queue full - Drop notification newDropped := atomic.AddUint64(&w.metrics.DroppedNotifications, 1) // Log Throttling: Log only once per second to avoid IO blocking now := time.Now().Unix() last := atomic.LoadInt64(&w.lastDropLogTime) if now > last { if atomic.CompareAndSwapInt64(&w.lastDropLogTime, last, now) { w.logger.Warn("Notification queue full! Dropped %d notifications total. (Key: %s -> %s)", newDropped, key, url) } } } } } // worker processes notification tasks func (w *WebHookWatcher) worker() { defer w.wg.Done() for { select { case <-w.stopCh: return case task := <-w.taskCh: atomic.AddInt64(&w.metrics.QueueSize, -1) w.sendNotification(task) } } } // sendNotification sends the HTTP request with retry logic func (w *WebHookWatcher) sendNotification(task NotificationTask) { data, err := json.Marshal(task.Notification) if err != nil { w.logger.Error("Failed to marshal notification: %v", err) return } // Exponential backoff for retries // 100ms, 200ms, 400ms, ... backoff := 100 * time.Millisecond for i := 0; i <= task.MaxRetries; i++ { if i > 0 { time.Sleep(backoff) backoff *= 2 } resp, err := w.client.Post(task.URL, "application/json", bytes.NewBuffer(data)) if err == nil { resp.Body.Close() if resp.StatusCode >= 200 && resp.StatusCode < 300 { // Success atomic.AddUint64(&w.metrics.SentNotifications, 1) return } w.logger.Debug("Notification to %s failed with status %d (attempt %d/%d)", task.URL, resp.StatusCode, i+1, task.MaxRetries+1) } else { w.logger.Debug("Notification to %s failed: %v (attempt %d/%d)", task.URL, err, i+1, task.MaxRetries+1) } } atomic.AddUint64(&w.metrics.FailedNotifications, 1) w.logger.Warn("Failed to deliver notification for key %s to %s after %d attempts", task.Notification.Key, task.URL, task.MaxRetries+1) } // Stop stops the watcher and waits for workers func (w *WebHookWatcher) Stop() { close(w.stopCh) w.wg.Wait() } // GetMetrics returns a copy of current metrics func (w *WebHookWatcher) GetMetrics() WatcherMetrics { return WatcherMetrics{ QueueSize: atomic.LoadInt64(&w.metrics.QueueSize), DroppedNotifications: atomic.LoadUint64(&w.metrics.DroppedNotifications), SentNotifications: atomic.LoadUint64(&w.metrics.SentNotifications), FailedNotifications: atomic.LoadUint64(&w.metrics.FailedNotifications), } }