kv.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package raft
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "sync"
  6. )
  7. // KVCommandType defines the type of KV operation
  8. type KVCommandType int
  9. const (
  10. KVSet KVCommandType = iota
  11. KVDel
  12. )
  13. // KVCommand represents a key-value operation
  14. type KVCommand struct {
  15. Type KVCommandType `json:"type"`
  16. Key string `json:"key"`
  17. Value string `json:"value,omitempty"`
  18. }
  19. // KVStateMachine implements a simple key-value store state machine
  20. type KVStateMachine struct {
  21. mu sync.RWMutex
  22. data map[string]string
  23. watchers map[string][]WatchHandler
  24. globalWatchers []WatchHandler
  25. }
  26. func NewKVStateMachine() *KVStateMachine {
  27. return &KVStateMachine{
  28. data: make(map[string]string),
  29. watchers: make(map[string][]WatchHandler),
  30. }
  31. }
  32. // WatchAll registers a handler for all keys
  33. func (sm *KVStateMachine) WatchAll(handler WatchHandler) {
  34. sm.mu.Lock()
  35. defer sm.mu.Unlock()
  36. sm.globalWatchers = append(sm.globalWatchers, handler)
  37. }
  38. // Watch registers a handler for a specific key
  39. func (sm *KVStateMachine) Watch(key string, handler WatchHandler) {
  40. sm.mu.Lock()
  41. defer sm.mu.Unlock()
  42. if sm.watchers == nil {
  43. sm.watchers = make(map[string][]WatchHandler)
  44. }
  45. sm.watchers[key] = append(sm.watchers[key], handler)
  46. }
  47. // Unwatch removes a handler for a specific key
  48. // Note: This is a simple implementation that removes all handlers for the key
  49. // or would need a way to identify specific handlers (e.g., via pointer or ID)
  50. // For now, let's implement removing all handlers for a key
  51. func (sm *KVStateMachine) Unwatch(key string) {
  52. sm.mu.Lock()
  53. defer sm.mu.Unlock()
  54. delete(sm.watchers, key)
  55. }
  56. // Apply applies a command to the state machine
  57. func (sm *KVStateMachine) Apply(command []byte) (interface{}, error) {
  58. var cmd KVCommand
  59. if err := json.Unmarshal(command, &cmd); err != nil {
  60. return nil, err
  61. }
  62. // We'll capture handlers to call after releasing the lock
  63. // to avoid potential deadlocks if handlers try to lock again
  64. var handlersToCall []WatchHandler
  65. sm.mu.Lock()
  66. switch cmd.Type {
  67. case KVSet:
  68. sm.data[cmd.Key] = cmd.Value
  69. case KVDel:
  70. delete(sm.data, cmd.Key)
  71. default:
  72. sm.mu.Unlock()
  73. return nil, fmt.Errorf("unknown command type: %d", cmd.Type)
  74. }
  75. // Check for watchers
  76. if handlers, ok := sm.watchers[cmd.Key]; ok && len(handlers) > 0 {
  77. handlersToCall = make([]WatchHandler, len(handlers))
  78. copy(handlersToCall, handlers)
  79. }
  80. // Add global watchers
  81. if len(sm.globalWatchers) > 0 {
  82. handlersToCall = append(handlersToCall, sm.globalWatchers...)
  83. }
  84. sm.mu.Unlock()
  85. // Notify watchers (outside the lock)
  86. // Note: We only notify for new events applied via log, not during snapshot restore
  87. for _, handler := range handlersToCall {
  88. // Run in a safe way, potentially recovering from panics if needed
  89. // or just calling directly. Since we want sequential consistency for hooks,
  90. // we call them synchronously here.
  91. handler(cmd.Key, cmd.Value, cmd.Type)
  92. }
  93. return nil, nil
  94. }
  95. // Get gets a value from the state machine
  96. func (sm *KVStateMachine) Get(key string) (string, bool) {
  97. sm.mu.RLock()
  98. defer sm.mu.RUnlock()
  99. val, ok := sm.data[key]
  100. return val, ok
  101. }
  102. // Snapshot returns a snapshot of the current state
  103. func (sm *KVStateMachine) Snapshot() ([]byte, error) {
  104. sm.mu.RLock()
  105. defer sm.mu.RUnlock()
  106. return json.Marshal(sm.data)
  107. }
  108. // Restore restores the state machine from a snapshot
  109. func (sm *KVStateMachine) Restore(snapshot []byte) error {
  110. var data map[string]string
  111. if err := json.Unmarshal(snapshot, &data); err != nil {
  112. return err
  113. }
  114. sm.mu.Lock()
  115. defer sm.mu.Unlock()
  116. sm.data = data
  117. // Note: We do NOT trigger watchers during restore
  118. return nil
  119. }