log.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. package raft
  2. import (
  3. "sync"
  4. )
  5. // LogManager provides a high-level interface for managing Raft logs
  6. type LogManager struct {
  7. mu sync.RWMutex
  8. storage Storage
  9. logger Logger
  10. // Cached values for fast access
  11. firstIndex uint64
  12. lastIndex uint64
  13. lastTerm uint64
  14. }
  15. // NewLogManager creates a new log manager
  16. func NewLogManager(storage Storage, logger Logger) *LogManager {
  17. if logger == nil {
  18. logger = &NoopLogger{}
  19. }
  20. lm := &LogManager{
  21. storage: storage,
  22. logger: logger,
  23. }
  24. // Initialize cached values
  25. lm.firstIndex = storage.GetFirstIndex()
  26. lm.lastIndex = storage.GetLastIndex()
  27. if lm.lastIndex > 0 {
  28. if entry, err := storage.GetEntry(lm.lastIndex); err == nil {
  29. lm.lastTerm = entry.Term
  30. }
  31. }
  32. return lm
  33. }
  34. // FirstIndex returns the first index in the log
  35. func (lm *LogManager) FirstIndex() uint64 {
  36. lm.mu.RLock()
  37. defer lm.mu.RUnlock()
  38. return lm.firstIndex
  39. }
  40. // LastIndex returns the last index in the log
  41. func (lm *LogManager) LastIndex() uint64 {
  42. lm.mu.RLock()
  43. defer lm.mu.RUnlock()
  44. return lm.lastIndex
  45. }
  46. // LastTerm returns the term of the last entry
  47. func (lm *LogManager) LastTerm() uint64 {
  48. lm.mu.RLock()
  49. defer lm.mu.RUnlock()
  50. return lm.lastTerm
  51. }
  52. // LastIndexAndTerm returns both last index and term atomically
  53. func (lm *LogManager) LastIndexAndTerm() (uint64, uint64) {
  54. lm.mu.RLock()
  55. defer lm.mu.RUnlock()
  56. return lm.lastIndex, lm.lastTerm
  57. }
  58. // GetEntry retrieves a single entry
  59. func (lm *LogManager) GetEntry(index uint64) (*LogEntry, error) {
  60. return lm.storage.GetEntry(index)
  61. }
  62. // GetEntries retrieves a range of entries [start, end)
  63. func (lm *LogManager) GetEntries(start, end uint64) ([]LogEntry, error) {
  64. return lm.storage.GetEntries(start, end)
  65. }
  66. // GetTerm returns the term of the entry at the given index
  67. func (lm *LogManager) GetTerm(index uint64) (uint64, error) {
  68. if index == 0 {
  69. return 0, nil
  70. }
  71. lm.mu.RLock()
  72. // Fast path for last index
  73. if index == lm.lastIndex {
  74. term := lm.lastTerm
  75. lm.mu.RUnlock()
  76. return term, nil
  77. }
  78. // Check if index is in valid range
  79. if index < lm.firstIndex {
  80. lm.mu.RUnlock()
  81. return 0, ErrCompacted
  82. }
  83. if index > lm.lastIndex {
  84. lm.mu.RUnlock()
  85. return 0, ErrOutOfRange
  86. }
  87. lm.mu.RUnlock()
  88. entry, err := lm.storage.GetEntry(index)
  89. if err != nil {
  90. return 0, err
  91. }
  92. return entry.Term, nil
  93. }
  94. // Append adds new entries to the log
  95. func (lm *LogManager) Append(entries ...LogEntry) error {
  96. if len(entries) == 0 {
  97. return nil
  98. }
  99. lm.mu.Lock()
  100. defer lm.mu.Unlock()
  101. // Assign indices if not set
  102. for i := range entries {
  103. if entries[i].Index == 0 {
  104. entries[i].Index = lm.lastIndex + uint64(i) + 1
  105. }
  106. }
  107. if err := lm.storage.AppendEntries(entries); err != nil {
  108. return err
  109. }
  110. // Update cached values
  111. lastEntry := entries[len(entries)-1]
  112. lm.lastIndex = lastEntry.Index
  113. lm.lastTerm = lastEntry.Term
  114. if lm.firstIndex == 0 {
  115. lm.firstIndex = entries[0].Index
  116. }
  117. return nil
  118. }
  119. // AppendCommand creates and appends a new log entry with the given command
  120. func (lm *LogManager) AppendCommand(term uint64, command []byte) (uint64, error) {
  121. lm.mu.Lock()
  122. index := lm.lastIndex + 1
  123. lm.mu.Unlock()
  124. entry := LogEntry{
  125. Index: index,
  126. Term: term,
  127. Command: command,
  128. }
  129. if err := lm.Append(entry); err != nil {
  130. return 0, err
  131. }
  132. return index, nil
  133. }
  134. // TruncateAfter removes all entries after the given index
  135. func (lm *LogManager) TruncateAfter(index uint64) error {
  136. lm.mu.Lock()
  137. defer lm.mu.Unlock()
  138. if err := lm.storage.TruncateAfter(index); err != nil {
  139. return err
  140. }
  141. lm.lastIndex = index
  142. if index > 0 {
  143. if entry, err := lm.storage.GetEntry(index); err == nil {
  144. lm.lastTerm = entry.Term
  145. }
  146. } else {
  147. lm.lastTerm = 0
  148. }
  149. return nil
  150. }
  151. // MatchTerm checks if the entry at the given index has the given term
  152. func (lm *LogManager) MatchTerm(index, term uint64) bool {
  153. if index == 0 {
  154. return term == 0
  155. }
  156. entryTerm, err := lm.GetTerm(index)
  157. if err != nil {
  158. return false
  159. }
  160. return entryTerm == term
  161. }
  162. // FindConflict finds the first entry that conflicts with the given entries
  163. // Returns the index and term of the first conflicting entry, or 0, 0 if no conflict
  164. func (lm *LogManager) FindConflict(entries []LogEntry) (uint64, uint64) {
  165. for _, entry := range entries {
  166. if !lm.MatchTerm(entry.Index, entry.Term) {
  167. if entry.Index <= lm.LastIndex() {
  168. existingEntry, err := lm.GetEntry(entry.Index)
  169. if err == nil {
  170. return entry.Index, existingEntry.Term
  171. }
  172. }
  173. return entry.Index, 0
  174. }
  175. }
  176. return 0, 0
  177. }
  178. // AppendEntriesFromLeader handles entries received from the leader
  179. // This implements the log matching and conflict resolution logic
  180. func (lm *LogManager) AppendEntriesFromLeader(prevLogIndex, prevLogTerm uint64, entries []LogEntry) (bool, uint64, uint64) {
  181. lm.mu.Lock()
  182. defer lm.mu.Unlock()
  183. // Check if we have the entry at prevLogIndex with prevLogTerm
  184. if prevLogIndex > 0 {
  185. // If prevLogIndex is before our first index (compacted), we need snapshot
  186. if prevLogIndex < lm.firstIndex {
  187. // We've compacted past this point, tell leader we need snapshot
  188. // Return success for the heartbeat but don't process entries
  189. // The leader will detect via matchIndex that we need snapshot
  190. lm.logger.Debug("prevLogIndex %d is before firstIndex %d, need snapshot", prevLogIndex, lm.firstIndex)
  191. return false, lm.firstIndex, 0
  192. }
  193. if prevLogIndex > lm.lastIndex {
  194. // We don't have the entry at prevLogIndex
  195. return false, lm.lastIndex + 1, 0
  196. }
  197. entry, err := lm.storage.GetEntry(prevLogIndex)
  198. if err != nil {
  199. if err == ErrCompacted {
  200. return false, lm.firstIndex, 0
  201. }
  202. lm.logger.Error("Failed to get entry at prevLogIndex %d: %v", prevLogIndex, err)
  203. return false, prevLogIndex, 0
  204. }
  205. if entry.Term != prevLogTerm {
  206. // Term mismatch - find the first entry of the conflicting term
  207. conflictTerm := entry.Term
  208. conflictIndex := prevLogIndex
  209. // Search backwards for the first entry of this term
  210. for idx := prevLogIndex - 1; idx >= lm.firstIndex; idx-- {
  211. e, err := lm.storage.GetEntry(idx)
  212. if err != nil || e.Term != conflictTerm {
  213. break
  214. }
  215. conflictIndex = idx
  216. }
  217. return false, conflictIndex, conflictTerm
  218. }
  219. }
  220. // If no entries to append, just return success (heartbeat)
  221. if len(entries) == 0 {
  222. return true, 0, 0
  223. }
  224. // Find where the new entries start
  225. newEntriesStart := 0
  226. for i, entry := range entries {
  227. if entry.Index > lm.lastIndex {
  228. newEntriesStart = i
  229. break
  230. }
  231. // Skip entries before our firstIndex (compacted)
  232. if entry.Index < lm.firstIndex {
  233. newEntriesStart = i + 1
  234. continue
  235. }
  236. existingEntry, err := lm.storage.GetEntry(entry.Index)
  237. if err != nil {
  238. if err == ErrCompacted {
  239. newEntriesStart = i + 1
  240. continue
  241. }
  242. newEntriesStart = i
  243. break
  244. }
  245. if existingEntry.Term != entry.Term {
  246. // Conflict - truncate and append
  247. if err := lm.storage.TruncateAfter(entry.Index - 1); err != nil {
  248. lm.logger.Error("Failed to truncate log: %v", err)
  249. return false, entry.Index, existingEntry.Term
  250. }
  251. lm.lastIndex = entry.Index - 1
  252. if lm.lastIndex > 0 && lm.lastIndex >= lm.firstIndex {
  253. if e, err := lm.storage.GetEntry(lm.lastIndex); err == nil {
  254. lm.lastTerm = e.Term
  255. }
  256. } else {
  257. lm.lastTerm = 0
  258. }
  259. newEntriesStart = i
  260. break
  261. }
  262. newEntriesStart = i + 1
  263. }
  264. // Append new entries
  265. if newEntriesStart < len(entries) {
  266. newEntries := entries[newEntriesStart:]
  267. if err := lm.storage.AppendEntries(newEntries); err != nil {
  268. lm.logger.Error("Failed to append entries: %v", err)
  269. return false, 0, 0
  270. }
  271. // Update cached values only if entries were actually appended
  272. // Check storage to get actual lastIndex
  273. actualLastIndex := lm.storage.GetLastIndex()
  274. if actualLastIndex > lm.lastIndex {
  275. lm.lastIndex = actualLastIndex
  276. if e, err := lm.storage.GetEntry(lm.lastIndex); err == nil {
  277. lm.lastTerm = e.Term
  278. }
  279. }
  280. if lm.firstIndex == 0 && len(newEntries) > 0 {
  281. lm.firstIndex = newEntries[0].Index
  282. }
  283. }
  284. return true, 0, 0
  285. }
  286. // IsUpToDate checks if the given log is at least as up-to-date as this log
  287. // Used for leader election
  288. func (lm *LogManager) IsUpToDate(lastLogIndex, lastLogTerm uint64) bool {
  289. lm.mu.RLock()
  290. defer lm.mu.RUnlock()
  291. if lastLogTerm != lm.lastTerm {
  292. return lastLogTerm > lm.lastTerm
  293. }
  294. return lastLogIndex >= lm.lastIndex
  295. }
  296. // GetEntriesForFollower returns entries to send to a follower
  297. // starting from nextIndex, limited by maxEntries
  298. func (lm *LogManager) GetEntriesForFollower(nextIndex uint64, maxEntries int) ([]LogEntry, uint64, uint64, error) {
  299. lm.mu.RLock()
  300. firstIndex := lm.firstIndex
  301. lastIndex := lm.lastIndex
  302. lm.mu.RUnlock()
  303. // Check if requested entries have been compacted
  304. if nextIndex < firstIndex {
  305. return nil, 0, 0, ErrCompacted
  306. }
  307. // If nextIndex is beyond our log, return empty entries
  308. if nextIndex > lastIndex+1 {
  309. return nil, lastIndex, 0, nil
  310. }
  311. prevLogIndex := nextIndex - 1
  312. var prevLogTerm uint64
  313. if prevLogIndex > 0 {
  314. // If prevLogIndex is before firstIndex, we need snapshot
  315. if prevLogIndex < firstIndex {
  316. return nil, 0, 0, ErrCompacted
  317. }
  318. entry, err := lm.storage.GetEntry(prevLogIndex)
  319. if err != nil {
  320. // If compacted, signal that snapshot is needed
  321. if err == ErrCompacted {
  322. return nil, 0, 0, ErrCompacted
  323. }
  324. return nil, 0, 0, err
  325. }
  326. prevLogTerm = entry.Term
  327. }
  328. // Calculate end index
  329. endIndex := lastIndex + 1
  330. if nextIndex+uint64(maxEntries) < endIndex {
  331. endIndex = nextIndex + uint64(maxEntries)
  332. }
  333. // Get entries
  334. if nextIndex >= endIndex {
  335. return nil, prevLogIndex, prevLogTerm, nil
  336. }
  337. entries, err := lm.storage.GetEntries(nextIndex, endIndex)
  338. if err != nil {
  339. return nil, 0, 0, err
  340. }
  341. return entries, prevLogIndex, prevLogTerm, nil
  342. }
  343. // Compact removes entries before the given index
  344. func (lm *LogManager) Compact(index uint64) error {
  345. lm.mu.Lock()
  346. defer lm.mu.Unlock()
  347. if err := lm.storage.TruncateBefore(index); err != nil {
  348. return err
  349. }
  350. if index > lm.firstIndex {
  351. lm.firstIndex = index
  352. }
  353. return nil
  354. }
  355. // Sync forces a sync to disk
  356. func (lm *LogManager) Sync() error {
  357. lm.mu.Lock()
  358. defer lm.mu.Unlock()
  359. return lm.storage.Sync()
  360. }
  361. // Flush forces buffered data to OS cache
  362. func (lm *LogManager) Flush() error {
  363. lm.mu.Lock()
  364. defer lm.mu.Unlock()
  365. return lm.storage.Flush()
  366. }