xbase 2 هفته پیش
والد
کامیت
acff8b80c8
9فایلهای تغییر یافته به همراه845 افزوده شده و 10 حذف شده
  1. 45 0
      README.md
  2. 75 0
      example/watch/README.md
  3. 103 0
      example/watch/client1/main.go
  4. 103 0
      example/watch/client2/main.go
  5. 63 0
      example/watch/node1/main.go
  6. 63 0
      example/watch/node2/main.go
  7. 164 10
      server.go
  8. 5 0
      types.go
  9. 224 0
      watcher.go

+ 45 - 0
README.md

@@ -22,6 +22,10 @@
     - Follower 节点会自动将写请求(以及配置的读请求)转发给当前的 Leader,客户端无需自行维护 Leader 状态。
 - ✅ **Leader 租约检查 (Lease Check)**
     - Leader 在处理读写请求前会检查与多数派的心跳连接,防止网络分区下的脑裂(Split-Brain)写入。
+- ✅ **WebHook 通知 (Data Change Notification)**
+    - 支持通过 HTTP WebHook 订阅 Key 的变更事件。
+    - **非阻塞设计**:采用异步 Worker Pool 处理回调,确保不阻塞 Raft 核心共识流程。
+    - **自动重试**:内置指数退避重试机制,确保通知的高可靠性。
 - ✅ **持久化存储 (Persistence)**
     - 关键的 Raft 日志和状态(Term, Vote)均实时持久化到磁盘,保证重启后数据不丢失。
 - ✅ **健康检查与监控 (Health Checks & Metrics)**
@@ -213,6 +217,47 @@ err := server.TransferLeadership("node2")
 
 机制:当前 Leader 会发送 `TimeoutNow` RPC 给目标节点,使其立即发起选举(并无视选举超时时间),从而大概率赢得选举。
 
+### WebHook 变更通知
+
+系统提供了一套高性能的 WebHook 机制,允许外部系统通过 HTTP 接口订阅 Key 的变更事件。
+
+**启用 HTTP 服务:**
+
+```go
+// 在启动时配置 HTTP 监听地址
+config.HTTPAddr = ":8080"
+server, _ := raft.NewKVServer(config)
+```
+
+**订阅变更:**
+
+通过 POST 请求订阅指定 Key 的变更:
+
+```bash
+curl -X POST http://localhost:8080/watch \
+     -d '{"key": "my-key", "url": "http://your-service/callback"}'
+```
+
+**取消订阅:**
+
+```bash
+curl -X POST http://localhost:8080/unwatch \
+     -d '{"key": "my-key", "url": "http://your-service/callback"}'
+```
+
+**回调格式:**
+
+当 Key 发生变化时,系统会向注册的 URL 发送 JSON 格式的 POST 请求:
+
+```json
+{
+    "key": "my-key",
+    "value": "new-value",
+    "event_type": 0, // 0=Set, 1=Del
+    "timestamp": 1630000000000
+}
+```
+
 ## 监控与运维
 
 ### 健康检查

+ 75 - 0
example/watch/README.md

@@ -0,0 +1,75 @@
+# Multi-Node Watch Example
+
+This example demonstrates how to use the WebHook notification feature in a 2-node Raft cluster. Each client registers to a specific node watching **different keys**, demonstrating independent notification channels.
+
+## Architecture
+
+*   **Cluster**: 2 Nodes
+    *   **Node 1**: Raft Port 9500, HTTP API Port 8500
+    *   **Node 2**: Raft Port 9501, HTTP API Port 8501
+*   **Clients**: 2 Listeners
+    *   **Client 1**: Listens on 9600, Subscribes to Node 1 for `key-1`
+    *   **Client 2**: Listens on 9601, Subscribes to Node 2 for `key-2`
+
+## Running the Example
+
+You will need 4 terminal windows.
+
+### 1. Start the Cluster Nodes
+
+**Terminal 1 (Node 1):**
+```bash
+cd example/watch/node1
+go run main.go
+```
+
+**Terminal 2 (Node 2):**
+```bash
+cd example/watch/node2
+go run main.go
+```
+
+Wait a moment for them to elect a leader.
+
+### 2. Start the Clients
+
+**Terminal 3 (Client 1):**
+```bash
+cd example/watch/client1
+go run main.go
+```
+*This will start a listener and auto-subscribe to Node 1 for `key-1`.*
+
+**Terminal 4 (Client 2):**
+```bash
+cd example/watch/client2
+go run main.go
+```
+*This will start a listener and auto-subscribe to Node 2 for `key-2`.*
+
+### 3. Trigger Data Changes
+
+You can trigger changes by sending a request to **ANY** node's HTTP API. Since the clients are watching different keys, you can target specific clients.
+
+**Trigger Client 1 (via Node 1):**
+```bash
+# Sets key-1, Client 1 should notify
+curl -X POST http://127.0.0.1:8500/kv \
+     -H "Content-Type: application/json" \
+     -d '{"key": "key-1", "value": "hello-client-1"}'
+```
+
+**Trigger Client 2 (via Node 1):**
+```bash
+# Sets key-2, replicated to Node 2, Client 2 should notify
+curl -X POST http://127.0.0.1:8500/kv \
+     -H "Content-Type: application/json" \
+     -d '{"key": "key-2", "value": "hello-client-2"}'
+```
+
+### 4. Expected Output
+
+*   When you set `key-1`, only **Client 1** should output a notification.
+*   When you set `key-2`, only **Client 2** should output a notification.
+
+This demonstrates that notifications are filtered by key and dispatched locally by the node holding the subscription, even though the data is consistent across the entire cluster.

+ 103 - 0
example/watch/client1/main.go

@@ -0,0 +1,103 @@
+package main
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"log"
+	"net/http"
+	"time"
+)
+
+// WebHookNotification must match the server's definition
+type WebHookNotification struct {
+	Key       string `json:"key"`
+	Value     string `json:"value"`
+	EventType int    `json:"event_type"` // 0=Set, 1=Del
+	Timestamp int64  `json:"timestamp"`
+}
+
+func main() {
+	listenPort := ":9600"
+	targetNodeAPI := "http://127.0.0.1:8500" // Node 1 API
+	myURL := fmt.Sprintf("http://127.0.0.1%s/callback", listenPort)
+	
+	// Start Listener
+	go func() {
+		http.HandleFunc("/callback", func(w http.ResponseWriter, r *http.Request) {
+			if r.Method != http.MethodPost {
+				http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+				return
+			}
+
+			body, err := io.ReadAll(r.Body)
+			if err != nil {
+				log.Printf("Error reading body: %v", err)
+				http.Error(w, "Bad request", http.StatusBadRequest)
+				return
+			}
+			defer r.Body.Close()
+
+			var event WebHookNotification
+			if err := json.Unmarshal(body, &event); err != nil {
+				log.Printf("Error unmarshalling JSON: %v", err)
+				http.Error(w, "Invalid JSON", http.StatusBadRequest)
+				return
+			}
+
+			eventTypeStr := "SET"
+			if event.EventType == 1 {
+				eventTypeStr = "DEL"
+			}
+
+			// Print received event
+			fmt.Printf("\n[Client 1 Received] %s\n", time.Now().Format("15:04:05"))
+			fmt.Printf("  Key:   %s\n", event.Key)
+			fmt.Printf("  Type:  %s\n", eventTypeStr)
+			if event.EventType == 0 {
+				fmt.Printf("  Value: %s\n", event.Value)
+			}
+			fmt.Println("----------------------------------------")
+
+			w.WriteHeader(http.StatusOK)
+		})
+
+		log.Printf("Client 1 Listener started on %s", listenPort)
+		if err := http.ListenAndServe(listenPort, nil); err != nil {
+			log.Fatalf("Server failed: %v", err)
+		}
+	}()
+
+	// Give listener time to start
+	time.Sleep(1 * time.Second)
+
+	// Subscribe to Node 1
+	log.Printf("Subscribing to Node 1 (%s)...", targetNodeAPI)
+	subscribe(targetNodeAPI, "key-1", myURL)
+
+	// Keep running
+	select {}
+}
+
+func subscribe(apiBase, key, callbackURL string) {
+	data := map[string]string{
+		"key": key,
+		"url": callbackURL,
+	}
+	jsonData, _ := json.Marshal(data)
+
+	resp, err := http.Post(apiBase+"/watch", "application/json", bytes.NewBuffer(jsonData))
+	if err != nil {
+		log.Printf("Failed to subscribe: %v", err)
+		return
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode == http.StatusOK {
+		log.Printf("Successfully subscribed to key '%s'", key)
+	} else {
+		log.Printf("Failed to subscribe, status: %d", resp.StatusCode)
+	}
+}
+

+ 103 - 0
example/watch/client2/main.go

@@ -0,0 +1,103 @@
+package main
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"log"
+	"net/http"
+	"time"
+)
+
+// WebHookNotification must match the server's definition
+type WebHookNotification struct {
+	Key       string `json:"key"`
+	Value     string `json:"value"`
+	EventType int    `json:"event_type"` // 0=Set, 1=Del
+	Timestamp int64  `json:"timestamp"`
+}
+
+func main() {
+	listenPort := ":9601"
+	targetNodeAPI := "http://127.0.0.1:8501" // Node 2 API
+	myURL := fmt.Sprintf("http://127.0.0.1%s/callback", listenPort)
+	
+	// Start Listener
+	go func() {
+		http.HandleFunc("/callback", func(w http.ResponseWriter, r *http.Request) {
+			if r.Method != http.MethodPost {
+				http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+				return
+			}
+
+			body, err := io.ReadAll(r.Body)
+			if err != nil {
+				log.Printf("Error reading body: %v", err)
+				http.Error(w, "Bad request", http.StatusBadRequest)
+				return
+			}
+			defer r.Body.Close()
+
+			var event WebHookNotification
+			if err := json.Unmarshal(body, &event); err != nil {
+				log.Printf("Error unmarshalling JSON: %v", err)
+				http.Error(w, "Invalid JSON", http.StatusBadRequest)
+				return
+			}
+
+			eventTypeStr := "SET"
+			if event.EventType == 1 {
+				eventTypeStr = "DEL"
+			}
+
+			// Print received event
+			fmt.Printf("\n[Client 2 Received] %s\n", time.Now().Format("15:04:05"))
+			fmt.Printf("  Key:   %s\n", event.Key)
+			fmt.Printf("  Type:  %s\n", eventTypeStr)
+			if event.EventType == 0 {
+				fmt.Printf("  Value: %s\n", event.Value)
+			}
+			fmt.Println("----------------------------------------")
+
+			w.WriteHeader(http.StatusOK)
+		})
+
+		log.Printf("Client 2 Listener started on %s", listenPort)
+		if err := http.ListenAndServe(listenPort, nil); err != nil {
+			log.Fatalf("Server failed: %v", err)
+		}
+	}()
+
+	// Give listener time to start
+	time.Sleep(1 * time.Second)
+
+	// Subscribe to Node 2
+	log.Printf("Subscribing to Node 2 (%s)...", targetNodeAPI)
+	subscribe(targetNodeAPI, "key-2", myURL)
+
+	// Keep running
+	select {}
+}
+
+func subscribe(apiBase, key, callbackURL string) {
+	data := map[string]string{
+		"key": key,
+		"url": callbackURL,
+	}
+	jsonData, _ := json.Marshal(data)
+
+	resp, err := http.Post(apiBase+"/watch", "application/json", bytes.NewBuffer(jsonData))
+	if err != nil {
+		log.Printf("Failed to subscribe: %v", err)
+		return
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode == http.StatusOK {
+		log.Printf("Successfully subscribed to key '%s'", key)
+	} else {
+		log.Printf("Failed to subscribe, status: %d", resp.StatusCode)
+	}
+}
+

+ 63 - 0
example/watch/node1/main.go

@@ -0,0 +1,63 @@
+package main
+
+import (
+	"log"
+	"os"
+
+	"igit.com/xbase/raft"
+	"igit.com/xbase/raft/example/basic/common"
+)
+
+func main() {
+	// Configuration
+	nodeID := "node1"
+	addr := "127.0.0.1:9500" // Raft Port
+	httpAddr := ":8500"      // HTTP API Port
+	dataDir := "../data/node1"
+
+	// Cluster configuration
+	clusterNodes := map[string]string{
+		"node1": "127.0.0.1:9500",
+		"node2": "127.0.0.1:9501",
+	}
+
+	config := raft.DefaultConfig()
+	config.NodeID = nodeID
+	config.ListenAddr = addr
+	config.DataDir = dataDir
+	config.ClusterNodes = clusterNodes
+	
+	// Enable HTTP API for WebHooks
+	config.HTTPAddr = httpAddr
+	
+	// Console Logger
+	config.Logger = raft.NewConsoleLogger(nodeID, 3) // Error level only (Quiet mode)
+
+	// Ensure data directory exists
+	if err := os.MkdirAll(dataDir, 0755); err != nil {
+		log.Fatalf("Failed to create data directory: %v", err)
+	}
+
+	// Create KV Server
+	server, err := raft.NewKVServer(config)
+	if err != nil {
+		log.Fatalf("Failed to create server: %v", err)
+	}
+
+	// Register demo commands
+	common.RegisterDemoCommands(server.CLI)
+
+	// Start server
+	if err := server.Start(); err != nil {
+		log.Fatalf("Failed to start server: %v", err)
+	}
+	
+	log.Printf("Raft Node %s started at %s", nodeID, addr)
+	log.Printf("HTTP API started at %s", httpAddr)
+
+	defer server.Stop()
+
+	// Keep main thread running
+	select {}
+}
+

+ 63 - 0
example/watch/node2/main.go

@@ -0,0 +1,63 @@
+package main
+
+import (
+	"log"
+	"os"
+
+	"igit.com/xbase/raft"
+	"igit.com/xbase/raft/example/basic/common"
+)
+
+func main() {
+	// Configuration
+	nodeID := "node2"
+	addr := "127.0.0.1:9501" // Raft Port
+	httpAddr := ":8501"      // HTTP API Port
+	dataDir := "../data/node2"
+
+	// Cluster configuration
+	clusterNodes := map[string]string{
+		"node1": "127.0.0.1:9500",
+		"node2": "127.0.0.1:9501",
+	}
+
+	config := raft.DefaultConfig()
+	config.NodeID = nodeID
+	config.ListenAddr = addr
+	config.DataDir = dataDir
+	config.ClusterNodes = clusterNodes
+	
+	// Enable HTTP API for WebHooks
+	config.HTTPAddr = httpAddr
+	
+	// Console Logger
+	config.Logger = raft.NewConsoleLogger(nodeID, 3) // Error level only (Quiet mode)
+
+	// Ensure data directory exists
+	if err := os.MkdirAll(dataDir, 0755); err != nil {
+		log.Fatalf("Failed to create data directory: %v", err)
+	}
+
+	// Create KV Server
+	server, err := raft.NewKVServer(config)
+	if err != nil {
+		log.Fatalf("Failed to create server: %v", err)
+	}
+
+	// Register demo commands
+	common.RegisterDemoCommands(server.CLI)
+
+	// Start server
+	if err := server.Start(); err != nil {
+		log.Fatalf("Failed to start server: %v", err)
+	}
+	
+	log.Printf("Raft Node %s started at %s", nodeID, addr)
+	log.Printf("HTTP API started at %s", httpAddr)
+
+	defer server.Stop()
+
+	// Keep main thread running
+	select {}
+}
+

+ 164 - 10
server.go

@@ -4,6 +4,8 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"io"
+	"net/http"
 	"sort"
 	"strings"
 	"sync"
@@ -14,12 +16,14 @@ import (
 
 // KVServer wraps Raft to provide a distributed key-value store
 type KVServer struct {
-	Raft     *Raft
-	DB       *db.Engine
-	CLI      *CLI
-	stopCh   chan struct{}
-	wg       sync.WaitGroup
-	stopOnce sync.Once
+	Raft       *Raft
+	DB         *db.Engine
+	CLI        *CLI
+	Watcher    *WebHookWatcher
+	httpServer *http.Server
+	stopCh     chan struct{}
+	wg         sync.WaitGroup
+	stopOnce   sync.Once
 	// leavingNodes tracks nodes that are currently being removed
 	// to prevent auto-rejoin/discovery logic from interfering
 	leavingNodes sync.Map
@@ -71,6 +75,10 @@ func NewKVServer(config *Config) (*KVServer, error) {
 	applyCh := make(chan ApplyMsg, 1000) // Increase buffer for async processing
 	transport := NewTCPTransport(config.ListenAddr, 10, config.Logger)
 
+	// Initialize WebHookWatcher
+	// 5 workers, 3 retries
+	watcher := NewWebHookWatcher(5, 3, config.Logger)
+
 	r, err := NewRaft(config, transport, applyCh)
 	if err != nil {
 		engine.Close()
@@ -78,10 +86,11 @@ func NewKVServer(config *Config) (*KVServer, error) {
 	}
 
 	s := &KVServer{
-		Raft:   r,
-		DB:     engine,
-		CLI:    nil,
-		stopCh: stopCh,
+		Raft:    r,
+		DB:      engine,
+		CLI:     nil,
+		Watcher: watcher,
+		stopCh:  stopCh,
 	}
 
 	// Initialize CLI
@@ -102,6 +111,14 @@ func (s *KVServer) Start() error {
 	if s.Raft.config.EnableCLI {
 		go s.CLI.Start()
 	}
+
+	// Start HTTP Server if configured
+	if s.Raft.config.HTTPAddr != "" {
+		if err := s.startHTTPServer(s.Raft.config.HTTPAddr); err != nil {
+			s.Raft.config.Logger.Warn("Failed to start HTTP server: %v", err)
+		}
+	}
+
 	return s.Raft.Start()
 }
 
@@ -113,6 +130,14 @@ func (s *KVServer) Stop() error {
 			close(s.stopCh)
 			s.wg.Wait()
 		}
+		// Stop Watcher
+		if s.Watcher != nil {
+			s.Watcher.Stop()
+		}
+		// Stop HTTP Server
+		if s.httpServer != nil {
+			s.httpServer.Close()
+		}
 		// Stop Raft first
 		if errRaft := s.Raft.Stop(); errRaft != nil {
 			err = errRaft
@@ -151,8 +176,14 @@ func (s *KVServer) runApplyLoop(applyCh chan ApplyMsg) {
 			switch cmd.Type {
 			case KVSet:
 				err = s.DB.Set(cmd.Key, cmd.Value, msg.CommandIndex)
+				if err == nil {
+					s.Watcher.Notify(cmd.Key, cmd.Value, KVSet)
+				}
 			case KVDel:
 				err = s.DB.Delete(cmd.Key, msg.CommandIndex)
+				if err == nil {
+					s.Watcher.Notify(cmd.Key, "", KVDel)
+				}
 			default:
 				s.Raft.config.Logger.Error("Unknown command type: %d", cmd.Type)
 			}
@@ -342,6 +373,16 @@ func (s *KVServer) GetDBSize() int64 {
 	return s.DB.GetDBSize()
 }
 
+// WatchURL registers a webhook url for a key
+func (s *KVServer) WatchURL(key, url string) {
+	s.Watcher.Subscribe(key, url)
+}
+
+// UnwatchURL removes a webhook url for a key
+func (s *KVServer) UnwatchURL(key, url string) {
+	s.Watcher.Unsubscribe(key, url)
+}
+
 // WatchAll registers a watcher for all keys
 func (s *KVServer) WatchAll(handler WatchHandler) {
 	// s.FSM.WatchAll(handler)
@@ -505,3 +546,116 @@ func (s *KVServer) checkConnections() {
 		}
 	}
 }
+
+// startHTTPServer starts the HTTP API server
+func (s *KVServer) startHTTPServer(addr string) error {
+	mux := http.NewServeMux()
+
+	// KV API
+	mux.HandleFunc("/kv", func(w http.ResponseWriter, r *http.Request) {
+		switch r.Method {
+		case http.MethodGet:
+			key := r.URL.Query().Get("key")
+			if key == "" {
+				http.Error(w, "missing key", http.StatusBadRequest)
+				return
+			}
+			val, found, err := s.GetLinear(key)
+			if err != nil {
+				http.Error(w, err.Error(), http.StatusInternalServerError)
+				return
+			}
+			if !found {
+				http.Error(w, "not found", http.StatusNotFound)
+				return
+			}
+			w.Write([]byte(val))
+
+		case http.MethodPost:
+			body, _ := io.ReadAll(r.Body)
+			var req struct {
+				Key   string `json:"key"`
+				Value string `json:"value"`
+			}
+			if err := json.Unmarshal(body, &req); err != nil {
+				http.Error(w, "invalid json", http.StatusBadRequest)
+				return
+			}
+			if err := s.Set(req.Key, req.Value); err != nil {
+				http.Error(w, err.Error(), http.StatusInternalServerError)
+				return
+			}
+			w.WriteHeader(http.StatusOK)
+
+		case http.MethodDelete:
+			key := r.URL.Query().Get("key")
+			if key == "" {
+				http.Error(w, "missing key", http.StatusBadRequest)
+				return
+			}
+			if err := s.Del(key); err != nil {
+				http.Error(w, err.Error(), http.StatusInternalServerError)
+				return
+			}
+			w.WriteHeader(http.StatusOK)
+		
+		default:
+			http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+		}
+	})
+
+	// Watcher API
+	mux.HandleFunc("/watch", func(w http.ResponseWriter, r *http.Request) {
+		if r.Method != http.MethodPost {
+			http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+			return
+		}
+		body, _ := io.ReadAll(r.Body)
+		var req struct {
+			Key string `json:"key"`
+			URL string `json:"url"`
+		}
+		if err := json.Unmarshal(body, &req); err != nil {
+			http.Error(w, "invalid json", http.StatusBadRequest)
+			return
+		}
+		if req.Key == "" || req.URL == "" {
+			http.Error(w, "missing key or url", http.StatusBadRequest)
+			return
+		}
+		s.WatchURL(req.Key, req.URL)
+		w.WriteHeader(http.StatusOK)
+	})
+
+	mux.HandleFunc("/unwatch", func(w http.ResponseWriter, r *http.Request) {
+		if r.Method != http.MethodPost {
+			http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+			return
+		}
+		body, _ := io.ReadAll(r.Body)
+		var req struct {
+			Key string `json:"key"`
+			URL string `json:"url"`
+		}
+		if err := json.Unmarshal(body, &req); err != nil {
+			http.Error(w, "invalid json", http.StatusBadRequest)
+			return
+		}
+		s.UnwatchURL(req.Key, req.URL)
+		w.WriteHeader(http.StatusOK)
+	})
+
+	s.httpServer = &http.Server{
+		Addr:    addr,
+		Handler: mux,
+	}
+
+	go func() {
+		s.Raft.config.Logger.Info("HTTP API server listening on %s", addr)
+		if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+			s.Raft.config.Logger.Error("HTTP server failed: %v", err)
+		}
+	}()
+
+	return nil
+}

+ 5 - 0
types.go

@@ -223,6 +223,10 @@ type Config struct {
 	// Default: false
 	EnableCLI bool
 
+	// HTTPAddr is the address for the HTTP API server (e.g. ":8080")
+	// If empty, the HTTP server is disabled
+	HTTPAddr string
+
 	// Logger for debug output
 	Logger Logger
 }
@@ -292,6 +296,7 @@ func DefaultConfig() *Config {
 		BatchMaxWait:            10 * time.Millisecond,
 		BatchMaxSize:            100,
 		EnableCLI:               true,
+		HTTPAddr:                "",
 	}
 }
 

+ 224 - 0
watcher.go

@@ -1,5 +1,14 @@
 package raft
 
+import (
+	"bytes"
+	"encoding/json"
+	"net/http"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
 // WatchHandler defines the function signature for watching key changes
 type WatchHandler func(key, value string, eventType KVCommandType)
 
@@ -14,3 +23,218 @@ func WatcherWrapper(w Watcher) WatchHandler {
 		w.OnWait(key, value, eventType)
 	}
 }
+
+// WebHookNotification represents the payload sent to the callback URL
+type WebHookNotification struct {
+	Key       string        `json:"key"`
+	Value     string        `json:"value"`
+	EventType KVCommandType `json:"event_type"` // 0=Set, 1=Del
+	Timestamp int64         `json:"timestamp"`
+}
+
+// NotificationTask represents a pending notification
+type NotificationTask struct {
+	URL          string
+	Notification WebHookNotification
+	RetryCount   int
+	MaxRetries   int
+}
+
+// WatcherMetrics holds runtime stats for the watcher
+type WatcherMetrics struct {
+	QueueSize            int64
+	DroppedNotifications uint64
+	SentNotifications    uint64
+	FailedNotifications  uint64
+}
+
+// WebHookWatcher manages network callbacks for key changes
+type WebHookWatcher struct {
+	mu          sync.RWMutex
+	subscribers map[string][]string // Key -> []URL
+	
+	taskCh      chan NotificationTask
+	stopCh      chan struct{}
+	wg          sync.WaitGroup
+	
+	client      *http.Client
+	maxRetries  int
+	logger      Logger
+
+	// Metrics
+	metrics WatcherMetrics
+	
+	// Log throttling
+	lastDropLogTime int64 // unix timestamp
+}
+
+// NewWebHookWatcher creates a new WebHookWatcher
+func NewWebHookWatcher(workers int, maxRetries int, logger Logger) *WebHookWatcher {
+	w := &WebHookWatcher{
+		subscribers: make(map[string][]string),
+		taskCh:      make(chan NotificationTask, 10000), // Large buffer to prevent blocking
+		stopCh:      make(chan struct{}),
+		maxRetries:  maxRetries,
+		client: &http.Client{
+			Timeout: 5 * time.Second,
+		},
+		logger: logger,
+	}
+
+	// Start workers
+	for i := 0; i < workers; i++ {
+		w.wg.Add(1)
+		go w.worker()
+	}
+
+	return w
+}
+
+// Subscribe adds a callback URL for a specific key
+func (w *WebHookWatcher) Subscribe(key, url string) {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	
+	urls := w.subscribers[key]
+	for _, u := range urls {
+		if u == url {
+			return // Already subscribed
+		}
+	}
+	w.subscribers[key] = append(urls, url)
+	w.logger.Info("Subscribed %s to key %s", url, key)
+}
+
+// Unsubscribe removes a callback URL for a specific key
+func (w *WebHookWatcher) Unsubscribe(key, url string) {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	
+	urls := w.subscribers[key]
+	newURLs := make([]string, 0, len(urls))
+	for _, u := range urls {
+		if u != url {
+			newURLs = append(newURLs, u)
+		}
+	}
+	
+	if len(newURLs) == 0 {
+		delete(w.subscribers, key)
+	} else {
+		w.subscribers[key] = newURLs
+	}
+}
+
+// Notify queues notifications for a key change
+func (w *WebHookWatcher) Notify(key, value string, eventType KVCommandType) {
+	w.mu.RLock()
+	urls := w.subscribers[key]
+	// Copy to avoid holding lock
+	targetURLs := make([]string, len(urls))
+	copy(targetURLs, urls)
+	w.mu.RUnlock()
+
+	if len(targetURLs) == 0 {
+		return
+	}
+
+	payload := WebHookNotification{
+		Key:       key,
+		Value:     value,
+		EventType: eventType,
+		Timestamp: time.Now().UnixNano(),
+	}
+
+	for _, url := range targetURLs {
+		select {
+		case w.taskCh <- NotificationTask{
+			URL:          url,
+			Notification: payload,
+			MaxRetries:   w.maxRetries,
+		}:
+			atomic.AddInt64(&w.metrics.QueueSize, 1)
+		default:
+			// Queue full - Drop notification
+			newDropped := atomic.AddUint64(&w.metrics.DroppedNotifications, 1)
+			
+			// Log Throttling: Log only once per second to avoid IO blocking
+			now := time.Now().Unix()
+			last := atomic.LoadInt64(&w.lastDropLogTime)
+			if now > last {
+				if atomic.CompareAndSwapInt64(&w.lastDropLogTime, last, now) {
+					w.logger.Warn("Notification queue full! Dropped %d notifications total. (Key: %s -> %s)", newDropped, key, url)
+				}
+			}
+		}
+	}
+}
+
+// worker processes notification tasks
+func (w *WebHookWatcher) worker() {
+	defer w.wg.Done()
+
+	for {
+		select {
+		case <-w.stopCh:
+			return
+		case task := <-w.taskCh:
+			atomic.AddInt64(&w.metrics.QueueSize, -1)
+			w.sendNotification(task)
+		}
+	}
+}
+
+// sendNotification sends the HTTP request with retry logic
+func (w *WebHookWatcher) sendNotification(task NotificationTask) {
+	data, err := json.Marshal(task.Notification)
+	if err != nil {
+		w.logger.Error("Failed to marshal notification: %v", err)
+		return
+	}
+
+	// Exponential backoff for retries
+	// 100ms, 200ms, 400ms, ...
+	backoff := 100 * time.Millisecond
+
+	for i := 0; i <= task.MaxRetries; i++ {
+		if i > 0 {
+			time.Sleep(backoff)
+			backoff *= 2
+		}
+
+		resp, err := w.client.Post(task.URL, "application/json", bytes.NewBuffer(data))
+		if err == nil {
+			resp.Body.Close()
+			if resp.StatusCode >= 200 && resp.StatusCode < 300 {
+				// Success
+				atomic.AddUint64(&w.metrics.SentNotifications, 1)
+				return
+			}
+			w.logger.Debug("Notification to %s failed with status %d (attempt %d/%d)", 
+				task.URL, resp.StatusCode, i+1, task.MaxRetries+1)
+		} else {
+			w.logger.Debug("Notification to %s failed: %v (attempt %d/%d)", 
+				task.URL, err, i+1, task.MaxRetries+1)
+		}
+	}
+
+	atomic.AddUint64(&w.metrics.FailedNotifications, 1)
+	w.logger.Warn("Failed to deliver notification for key %s to %s after %d attempts", 
+		task.Notification.Key, task.URL, task.MaxRetries+1)
+}
+
+// Stop stops the watcher and waits for workers
+func (w *WebHookWatcher) Stop() {
+	close(w.stopCh)
+	w.wg.Wait()
+}
+
+// GetMetrics returns a copy of current metrics
+func (w *WebHookWatcher) GetMetrics() WatcherMetrics {
+	return WatcherMetrics{
+		QueueSize:            atomic.LoadInt64(&w.metrics.QueueSize),
+		DroppedNotifications: atomic.LoadUint64(&w.metrics.DroppedNotifications),
+		SentNotifications:    atomic.LoadUint64(&w.metrics.SentNotifications),
+		FailedNotifications:  atomic.LoadUint64(&w.metrics.FailedNotifications),
+	}
+}