package raft import ( "encoding/json" "fmt" "sync" ) // KVCommandType defines the type of KV operation type KVCommandType int const ( KVSet KVCommandType = iota KVDel ) // KVCommand represents a key-value operation type KVCommand struct { Type KVCommandType `json:"type"` Key string `json:"key"` Value string `json:"value,omitempty"` } // KVStateMachine implements a simple key-value store state machine type KVStateMachine struct { mu sync.RWMutex data map[string]string watchers map[string][]WatchHandler globalWatchers []WatchHandler } func NewKVStateMachine() *KVStateMachine { return &KVStateMachine{ data: make(map[string]string), watchers: make(map[string][]WatchHandler), } } // WatchAll registers a handler for all keys func (sm *KVStateMachine) WatchAll(handler WatchHandler) { sm.mu.Lock() defer sm.mu.Unlock() sm.globalWatchers = append(sm.globalWatchers, handler) } // Watch registers a handler for a specific key func (sm *KVStateMachine) Watch(key string, handler WatchHandler) { sm.mu.Lock() defer sm.mu.Unlock() if sm.watchers == nil { sm.watchers = make(map[string][]WatchHandler) } sm.watchers[key] = append(sm.watchers[key], handler) } // Unwatch removes a handler for a specific key // Note: This is a simple implementation that removes all handlers for the key // or would need a way to identify specific handlers (e.g., via pointer or ID) // For now, let's implement removing all handlers for a key func (sm *KVStateMachine) Unwatch(key string) { sm.mu.Lock() defer sm.mu.Unlock() delete(sm.watchers, key) } // Apply applies a command to the state machine func (sm *KVStateMachine) Apply(command []byte) (interface{}, error) { var cmd KVCommand if err := json.Unmarshal(command, &cmd); err != nil { return nil, err } // We'll capture handlers to call after releasing the lock // to avoid potential deadlocks if handlers try to lock again var handlersToCall []WatchHandler sm.mu.Lock() switch cmd.Type { case KVSet: sm.data[cmd.Key] = cmd.Value case KVDel: delete(sm.data, cmd.Key) default: sm.mu.Unlock() return nil, fmt.Errorf("unknown command type: %d", cmd.Type) } // Check for watchers if handlers, ok := sm.watchers[cmd.Key]; ok && len(handlers) > 0 { handlersToCall = make([]WatchHandler, len(handlers)) copy(handlersToCall, handlers) } // Add global watchers if len(sm.globalWatchers) > 0 { handlersToCall = append(handlersToCall, sm.globalWatchers...) } sm.mu.Unlock() // Notify watchers (outside the lock) // Note: We only notify for new events applied via log, not during snapshot restore for _, handler := range handlersToCall { // Run in a safe way, potentially recovering from panics if needed // or just calling directly. Since we want sequential consistency for hooks, // we call them synchronously here. handler(cmd.Key, cmd.Value, cmd.Type) } return nil, nil } // Get gets a value from the state machine func (sm *KVStateMachine) Get(key string) (string, bool) { sm.mu.RLock() defer sm.mu.RUnlock() val, ok := sm.data[key] return val, ok } // Snapshot returns a snapshot of the current state func (sm *KVStateMachine) Snapshot() ([]byte, error) { sm.mu.RLock() defer sm.mu.RUnlock() return json.Marshal(sm.data) } // Restore restores the state machine from a snapshot func (sm *KVStateMachine) Restore(snapshot []byte) error { var data map[string]string if err := json.Unmarshal(snapshot, &data); err != nil { return err } sm.mu.Lock() defer sm.mu.Unlock() sm.data = data // Note: We do NOT trigger watchers during restore return nil }