| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- package raft
- import (
- "encoding/json"
- "fmt"
- "sync"
- )
- // KVCommandType defines the type of KV operation
- type KVCommandType int
- const (
- KVSet KVCommandType = iota
- KVDel
- )
- // KVCommand represents a key-value operation
- type KVCommand struct {
- Type KVCommandType `json:"type"`
- Key string `json:"key"`
- Value string `json:"value,omitempty"`
- }
- // KVStateMachine implements a simple key-value store state machine
- type KVStateMachine struct {
- mu sync.RWMutex
- data map[string]string
- watchers map[string][]WatchHandler
- globalWatchers []WatchHandler
- }
- func NewKVStateMachine() *KVStateMachine {
- return &KVStateMachine{
- data: make(map[string]string),
- watchers: make(map[string][]WatchHandler),
- }
- }
- // WatchAll registers a handler for all keys
- func (sm *KVStateMachine) WatchAll(handler WatchHandler) {
- sm.mu.Lock()
- defer sm.mu.Unlock()
- sm.globalWatchers = append(sm.globalWatchers, handler)
- }
- // Watch registers a handler for a specific key
- func (sm *KVStateMachine) Watch(key string, handler WatchHandler) {
- sm.mu.Lock()
- defer sm.mu.Unlock()
- if sm.watchers == nil {
- sm.watchers = make(map[string][]WatchHandler)
- }
- sm.watchers[key] = append(sm.watchers[key], handler)
- }
- // Unwatch removes a handler for a specific key
- // Note: This is a simple implementation that removes all handlers for the key
- // or would need a way to identify specific handlers (e.g., via pointer or ID)
- // For now, let's implement removing all handlers for a key
- func (sm *KVStateMachine) Unwatch(key string) {
- sm.mu.Lock()
- defer sm.mu.Unlock()
- delete(sm.watchers, key)
- }
- // Apply applies a command to the state machine
- func (sm *KVStateMachine) Apply(command []byte) (interface{}, error) {
- var cmd KVCommand
- if err := json.Unmarshal(command, &cmd); err != nil {
- return nil, err
- }
- // We'll capture handlers to call after releasing the lock
- // to avoid potential deadlocks if handlers try to lock again
- var handlersToCall []WatchHandler
- sm.mu.Lock()
- switch cmd.Type {
- case KVSet:
- sm.data[cmd.Key] = cmd.Value
- case KVDel:
- delete(sm.data, cmd.Key)
- default:
- sm.mu.Unlock()
- return nil, fmt.Errorf("unknown command type: %d", cmd.Type)
- }
- // Check for watchers
- if handlers, ok := sm.watchers[cmd.Key]; ok && len(handlers) > 0 {
- handlersToCall = make([]WatchHandler, len(handlers))
- copy(handlersToCall, handlers)
- }
- // Add global watchers
- if len(sm.globalWatchers) > 0 {
- handlersToCall = append(handlersToCall, sm.globalWatchers...)
- }
- sm.mu.Unlock()
- // Notify watchers (outside the lock)
- // Note: We only notify for new events applied via log, not during snapshot restore
- for _, handler := range handlersToCall {
- // Run in a safe way, potentially recovering from panics if needed
- // or just calling directly. Since we want sequential consistency for hooks,
- // we call them synchronously here.
- handler(cmd.Key, cmd.Value, cmd.Type)
- }
- return nil, nil
- }
- // Get gets a value from the state machine
- func (sm *KVStateMachine) Get(key string) (string, bool) {
- sm.mu.RLock()
- defer sm.mu.RUnlock()
- val, ok := sm.data[key]
- return val, ok
- }
- // Snapshot returns a snapshot of the current state
- func (sm *KVStateMachine) Snapshot() ([]byte, error) {
- sm.mu.RLock()
- defer sm.mu.RUnlock()
- return json.Marshal(sm.data)
- }
- // Restore restores the state machine from a snapshot
- func (sm *KVStateMachine) Restore(snapshot []byte) error {
- var data map[string]string
- if err := json.Unmarshal(snapshot, &data); err != nil {
- return err
- }
- sm.mu.Lock()
- defer sm.mu.Unlock()
- sm.data = data
- // Note: We do NOT trigger watchers during restore
- return nil
- }
|