server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "sort"
  7. "strings"
  8. "sync"
  9. "time"
  10. "igit.com/xbase/raft/db"
  11. )
  12. // KVServer wraps Raft to provide a distributed key-value store
  13. type KVServer struct {
  14. Raft *Raft
  15. DB *db.Engine
  16. CLI *CLI
  17. stopCh chan struct{}
  18. wg sync.WaitGroup
  19. stopOnce sync.Once
  20. }
  21. // NewKVServer creates a new KV server
  22. func NewKVServer(config *Config) (*KVServer, error) {
  23. // Initialize DB Engine
  24. // Use a subdirectory for DB to avoid conflict with Raft logs if they share DataDir
  25. dbPath := config.DataDir + "/kv_engine"
  26. engine, err := db.NewEngine(dbPath)
  27. if err != nil {
  28. return nil, fmt.Errorf("failed to create db engine: %w", err)
  29. }
  30. // Initialize LastAppliedIndex from DB to prevent re-applying entries
  31. config.LastAppliedIndex = engine.GetLastAppliedIndex()
  32. // Create stop channel early for use in callbacks
  33. stopCh := make(chan struct{})
  34. // Configure snapshot provider
  35. config.SnapshotProvider = func(minIncludeIndex uint64) ([]byte, error) {
  36. // Wait for DB to catch up to the requested index
  37. // This is critical for data integrity during compaction
  38. for engine.GetLastAppliedIndex() < minIncludeIndex {
  39. select {
  40. case <-stopCh:
  41. return nil, fmt.Errorf("server stopping")
  42. default:
  43. time.Sleep(10 * time.Millisecond)
  44. }
  45. }
  46. // Force sync to disk to ensure data durability before compaction
  47. // This prevents data loss if Raft logs are compacted but DB data is only in OS cache
  48. if err := engine.Sync(); err != nil {
  49. return nil, fmt.Errorf("failed to sync engine before snapshot: %w", err)
  50. }
  51. return engine.Snapshot()
  52. }
  53. // Configure get handler for remote reads
  54. config.GetHandler = func(key string) (string, bool) {
  55. return engine.Get(key)
  56. }
  57. applyCh := make(chan ApplyMsg, 1000) // Increase buffer for async processing
  58. transport := NewTCPTransport(config.ListenAddr, 10, config.Logger)
  59. r, err := NewRaft(config, transport, applyCh)
  60. if err != nil {
  61. engine.Close()
  62. return nil, err
  63. }
  64. s := &KVServer{
  65. Raft: r,
  66. DB: engine,
  67. CLI: nil,
  68. stopCh: stopCh,
  69. }
  70. // Initialize CLI
  71. s.CLI = NewCLI(s)
  72. // Start applying entries
  73. go s.runApplyLoop(applyCh)
  74. // Start background maintenance loop
  75. s.wg.Add(1)
  76. go s.maintenanceLoop()
  77. return s, nil
  78. }
  79. func (s *KVServer) Start() error {
  80. // Start CLI if enabled
  81. if s.Raft.config.EnableCLI {
  82. go s.CLI.Start()
  83. }
  84. return s.Raft.Start()
  85. }
  86. func (s *KVServer) Stop() error {
  87. var err error
  88. s.stopOnce.Do(func() {
  89. // Stop maintenance loop
  90. if s.stopCh != nil {
  91. close(s.stopCh)
  92. s.wg.Wait()
  93. }
  94. // Stop Raft first
  95. if errRaft := s.Raft.Stop(); errRaft != nil {
  96. err = errRaft
  97. }
  98. // Close DB
  99. if s.DB != nil {
  100. if errDB := s.DB.Close(); errDB != nil {
  101. // Combine errors if both fail
  102. if err != nil {
  103. err = fmt.Errorf("raft stop error: %v, db close error: %v", err, errDB)
  104. } else {
  105. err = errDB
  106. }
  107. }
  108. }
  109. })
  110. return err
  111. }
  112. func (s *KVServer) runApplyLoop(applyCh chan ApplyMsg) {
  113. for msg := range applyCh {
  114. if msg.CommandValid {
  115. // Optimization: Skip if already applied
  116. // We check this here to avoid unmarshalling and locking DB for known duplicates
  117. if msg.CommandIndex <= s.DB.GetLastAppliedIndex() {
  118. continue
  119. }
  120. var cmd KVCommand
  121. if err := json.Unmarshal(msg.Command, &cmd); err != nil {
  122. s.Raft.config.Logger.Error("Failed to unmarshal command: %v", err)
  123. continue
  124. }
  125. var err error
  126. switch cmd.Type {
  127. case KVSet:
  128. err = s.DB.Set(cmd.Key, cmd.Value, msg.CommandIndex)
  129. case KVDel:
  130. err = s.DB.Delete(cmd.Key, msg.CommandIndex)
  131. default:
  132. s.Raft.config.Logger.Error("Unknown command type: %d", cmd.Type)
  133. }
  134. if err != nil {
  135. s.Raft.config.Logger.Error("DB Apply failed: %v", err)
  136. }
  137. } else if msg.SnapshotValid {
  138. if err := s.DB.Restore(msg.Snapshot); err != nil {
  139. s.Raft.config.Logger.Error("DB Restore failed: %v", err)
  140. }
  141. }
  142. }
  143. }
  144. // Set sets a key-value pair
  145. func (s *KVServer) Set(key, value string) error {
  146. cmd := KVCommand{
  147. Type: KVSet,
  148. Key: key,
  149. Value: value,
  150. }
  151. data, err := json.Marshal(cmd)
  152. if err != nil {
  153. return err
  154. }
  155. _, _, err = s.Raft.ProposeWithForward(data)
  156. return err
  157. }
  158. // Del deletes a key
  159. func (s *KVServer) Del(key string) error {
  160. cmd := KVCommand{
  161. Type: KVDel,
  162. Key: key,
  163. }
  164. data, err := json.Marshal(cmd)
  165. if err != nil {
  166. return err
  167. }
  168. _, _, err = s.Raft.ProposeWithForward(data)
  169. return err
  170. }
  171. // Get gets a value (local read, can be stale)
  172. // For linearizable reads, use GetLinear instead
  173. func (s *KVServer) Get(key string) (string, bool) {
  174. return s.DB.Get(key)
  175. }
  176. // GetLinear gets a value with linearizable consistency
  177. // This ensures the read sees all writes committed before the read started
  178. func (s *KVServer) GetLinear(key string) (string, bool, error) {
  179. // First, ensure we have up-to-date data via ReadIndex
  180. _, err := s.Raft.ReadIndex()
  181. if err != nil {
  182. // If we're not leader, try forwarding
  183. if errors.Is(err, ErrNotLeader) {
  184. return s.forwardGet(key)
  185. }
  186. return "", false, err
  187. }
  188. val, ok := s.DB.Get(key)
  189. return val, ok, nil
  190. }
  191. // forwardGet forwards a get request to the leader
  192. func (s *KVServer) forwardGet(key string) (string, bool, error) {
  193. return s.Raft.ForwardGet(key)
  194. }
  195. // Join joins an existing cluster
  196. func (s *KVServer) Join(nodeID, addr string) error {
  197. return s.Raft.AddNodeWithForward(nodeID, addr)
  198. }
  199. // Leave leaves the cluster
  200. func (s *KVServer) Leave(nodeID string) error {
  201. return s.Raft.RemoveNodeWithForward(nodeID)
  202. }
  203. // WaitForLeader waits until a leader is elected
  204. func (s *KVServer) WaitForLeader(timeout time.Duration) error {
  205. deadline := time.Now().Add(timeout)
  206. for time.Now().Before(deadline) {
  207. leader := s.Raft.GetLeaderID()
  208. if leader != "" {
  209. return nil
  210. }
  211. time.Sleep(100 * time.Millisecond)
  212. }
  213. return fmt.Errorf("timeout waiting for leader")
  214. }
  215. // HealthCheck returns the health status of this server
  216. func (s *KVServer) HealthCheck() HealthStatus {
  217. return s.Raft.HealthCheck()
  218. }
  219. // GetStats returns runtime statistics
  220. func (s *KVServer) GetStats() Stats {
  221. return s.Raft.GetStats()
  222. }
  223. // GetMetrics returns runtime metrics
  224. func (s *KVServer) GetMetrics() Metrics {
  225. return s.Raft.GetMetrics()
  226. }
  227. // TransferLeadership transfers leadership to the specified node
  228. func (s *KVServer) TransferLeadership(targetID string) error {
  229. return s.Raft.TransferLeadership(targetID)
  230. }
  231. // GetClusterNodes returns current cluster membership
  232. func (s *KVServer) GetClusterNodes() map[string]string {
  233. return s.Raft.GetClusterNodes()
  234. }
  235. // IsLeader returns true if this node is the leader
  236. func (s *KVServer) IsLeader() bool {
  237. _, isLeader := s.Raft.GetState()
  238. return isLeader
  239. }
  240. // GetLeaderID returns the current leader ID
  241. func (s *KVServer) GetLeaderID() string {
  242. return s.Raft.GetLeaderID()
  243. }
  244. // GetLogSize returns the raft log size
  245. func (s *KVServer) GetLogSize() int64 {
  246. return s.Raft.log.GetLogSize()
  247. }
  248. // GetDBSize returns the db size
  249. func (s *KVServer) GetDBSize() int64 {
  250. return s.DB.GetDBSize()
  251. }
  252. // WatchAll registers a watcher for all keys
  253. func (s *KVServer) WatchAll(handler WatchHandler) {
  254. // s.FSM.WatchAll(handler)
  255. // TODO: Implement Watcher for DB
  256. }
  257. // Watch registers a watcher for a key
  258. func (s *KVServer) Watch(key string, handler WatchHandler) {
  259. // s.FSM.Watch(key, handler)
  260. // TODO: Implement Watcher for DB
  261. }
  262. // Unwatch removes watchers for a key
  263. func (s *KVServer) Unwatch(key string) {
  264. // s.FSM.Unwatch(key)
  265. // TODO: Implement Watcher for DB
  266. }
  267. func (s *KVServer) maintenanceLoop() {
  268. defer s.wg.Done()
  269. // Check every 1 second for faster reaction
  270. ticker := time.NewTicker(1 * time.Second)
  271. defer ticker.Stop()
  272. for {
  273. select {
  274. case <-s.stopCh:
  275. return
  276. case <-ticker.C:
  277. s.updateNodeInfo()
  278. s.checkConnections()
  279. }
  280. }
  281. }
  282. func (s *KVServer) updateNodeInfo() {
  283. // 1. Ensure "CreateNode/<NodeID>" is set to self address
  284. // We do this via Propose (Set) so it's replicated
  285. myID := s.Raft.config.NodeID
  286. myAddr := s.Raft.config.ListenAddr
  287. key := fmt.Sprintf("CreateNode/%s", myID)
  288. // Check if we need to update (avoid spamming logs/proposals)
  289. val, exists := s.Get(key)
  290. if !exists || val != myAddr {
  291. // Run in goroutine to avoid blocking
  292. go func() {
  293. if err := s.Set(key, myAddr); err != nil {
  294. s.Raft.config.Logger.Debug("Failed to update node info: %v", err)
  295. }
  296. }()
  297. }
  298. // 2. Only leader updates RaftNode aggregation
  299. if s.IsLeader() {
  300. // Read current RaftNode to preserve history
  301. currentVal, _ := s.Get("RaftNode")
  302. knownNodes := make(map[string]string)
  303. if currentVal != "" {
  304. parts := strings.Split(currentVal, ";")
  305. for _, part := range parts {
  306. if part == "" { continue }
  307. kv := strings.SplitN(part, "=", 2)
  308. if len(kv) == 2 {
  309. knownNodes[kv[0]] = kv[1]
  310. }
  311. }
  312. }
  313. // Merge current cluster nodes
  314. changed := false
  315. currentCluster := s.GetClusterNodes()
  316. for id, addr := range currentCluster {
  317. if knownNodes[id] != addr {
  318. knownNodes[id] = addr
  319. changed = true
  320. }
  321. }
  322. // If changed, update RaftNode
  323. if changed {
  324. var peers []string
  325. for id, addr := range knownNodes {
  326. peers = append(peers, fmt.Sprintf("%s=%s", id, addr))
  327. }
  328. sort.Strings(peers)
  329. newVal := strings.Join(peers, ";")
  330. // Check again if we need to write to avoid loops if Get returned stale
  331. if newVal != currentVal {
  332. go func(k, v string) {
  333. if err := s.Set(k, v); err != nil {
  334. s.Raft.config.Logger.Warn("Failed to update RaftNode key: %v", err)
  335. }
  336. }("RaftNode", newVal)
  337. }
  338. }
  339. }
  340. }
  341. func (s *KVServer) checkConnections() {
  342. if !s.IsLeader() {
  343. return
  344. }
  345. // Read RaftNode key to find potential members that are missing
  346. val, ok := s.Get("RaftNode")
  347. if !ok || val == "" {
  348. return
  349. }
  350. // Parse saved nodes
  351. savedParts := strings.Split(val, ";")
  352. currentNodes := s.GetClusterNodes()
  353. // Invert currentNodes for address check
  354. currentAddrs := make(map[string]bool)
  355. for _, addr := range currentNodes {
  356. currentAddrs[addr] = true
  357. }
  358. for _, part := range savedParts {
  359. if part == "" {
  360. continue
  361. }
  362. // Expect id=addr
  363. kv := strings.SplitN(part, "=", 2)
  364. if len(kv) != 2 {
  365. continue
  366. }
  367. id, addr := kv[0], kv[1]
  368. // Skip invalid addresses
  369. if strings.HasPrefix(addr, ".") || !strings.Contains(addr, ":") {
  370. continue
  371. }
  372. if !currentAddrs[addr] {
  373. // Found a node that was previously in the cluster but is now missing
  374. // Try to add it back
  375. // We use AddNodeWithForward which handles non-blocking internally somewhat,
  376. // but we should run this in goroutine to not block the loop
  377. go func(nodeID, nodeAddr string) {
  378. // Try to add node
  379. s.Raft.config.Logger.Info("Auto-rejoining node found in RaftNode: %s (%s)", nodeID, nodeAddr)
  380. if err := s.Join(nodeID, nodeAddr); err != nil {
  381. s.Raft.config.Logger.Debug("Failed to auto-rejoin node %s: %v", nodeID, err)
  382. }
  383. }(id, addr)
  384. }
  385. }
  386. }