package raft import ( "sync" ) // LogManager provides a high-level interface for managing Raft logs type LogManager struct { mu sync.RWMutex storage Storage logger Logger // Cached values for fast access firstIndex uint64 lastIndex uint64 lastTerm uint64 } // NewLogManager creates a new log manager func NewLogManager(storage Storage, logger Logger) *LogManager { if logger == nil { logger = &NoopLogger{} } lm := &LogManager{ storage: storage, logger: logger, } // Initialize cached values lm.firstIndex = storage.GetFirstIndex() lm.lastIndex = storage.GetLastIndex() if lm.lastIndex > 0 { if entry, err := storage.GetEntry(lm.lastIndex); err == nil { lm.lastTerm = entry.Term } } return lm } // FirstIndex returns the first index in the log func (lm *LogManager) FirstIndex() uint64 { lm.mu.RLock() defer lm.mu.RUnlock() return lm.firstIndex } // LastIndex returns the last index in the log func (lm *LogManager) LastIndex() uint64 { lm.mu.RLock() defer lm.mu.RUnlock() return lm.lastIndex } // LastTerm returns the term of the last entry func (lm *LogManager) LastTerm() uint64 { lm.mu.RLock() defer lm.mu.RUnlock() return lm.lastTerm } // LastIndexAndTerm returns both last index and term atomically func (lm *LogManager) LastIndexAndTerm() (uint64, uint64) { lm.mu.RLock() defer lm.mu.RUnlock() return lm.lastIndex, lm.lastTerm } // GetEntry retrieves a single entry func (lm *LogManager) GetEntry(index uint64) (*LogEntry, error) { return lm.storage.GetEntry(index) } // GetEntries retrieves a range of entries [start, end) func (lm *LogManager) GetEntries(start, end uint64) ([]LogEntry, error) { return lm.storage.GetEntries(start, end) } // GetTerm returns the term of the entry at the given index func (lm *LogManager) GetTerm(index uint64) (uint64, error) { if index == 0 { return 0, nil } lm.mu.RLock() // Fast path for last index if index == lm.lastIndex { term := lm.lastTerm lm.mu.RUnlock() return term, nil } // Check if index is in valid range if index < lm.firstIndex { lm.mu.RUnlock() return 0, ErrCompacted } if index > lm.lastIndex { lm.mu.RUnlock() return 0, ErrOutOfRange } lm.mu.RUnlock() entry, err := lm.storage.GetEntry(index) if err != nil { return 0, err } return entry.Term, nil } // Append adds new entries to the log func (lm *LogManager) Append(entries ...LogEntry) error { if len(entries) == 0 { return nil } lm.mu.Lock() defer lm.mu.Unlock() // Assign indices if not set for i := range entries { if entries[i].Index == 0 { entries[i].Index = lm.lastIndex + uint64(i) + 1 } } if err := lm.storage.AppendEntries(entries); err != nil { return err } // Update cached values lastEntry := entries[len(entries)-1] lm.lastIndex = lastEntry.Index lm.lastTerm = lastEntry.Term if lm.firstIndex == 0 { lm.firstIndex = entries[0].Index } return nil } // AppendCommand creates and appends a new log entry with the given command func (lm *LogManager) AppendCommand(term uint64, command []byte) (uint64, error) { lm.mu.Lock() index := lm.lastIndex + 1 lm.mu.Unlock() entry := LogEntry{ Index: index, Term: term, Command: command, } if err := lm.Append(entry); err != nil { return 0, err } return index, nil } // TruncateAfter removes all entries after the given index func (lm *LogManager) TruncateAfter(index uint64) error { lm.mu.Lock() defer lm.mu.Unlock() if err := lm.storage.TruncateAfter(index); err != nil { return err } lm.lastIndex = index if index > 0 { if entry, err := lm.storage.GetEntry(index); err == nil { lm.lastTerm = entry.Term } } else { lm.lastTerm = 0 } return nil } // MatchTerm checks if the entry at the given index has the given term func (lm *LogManager) MatchTerm(index, term uint64) bool { if index == 0 { return term == 0 } entryTerm, err := lm.GetTerm(index) if err != nil { return false } return entryTerm == term } // FindConflict finds the first entry that conflicts with the given entries // Returns the index and term of the first conflicting entry, or 0, 0 if no conflict func (lm *LogManager) FindConflict(entries []LogEntry) (uint64, uint64) { for _, entry := range entries { if !lm.MatchTerm(entry.Index, entry.Term) { if entry.Index <= lm.LastIndex() { existingEntry, err := lm.GetEntry(entry.Index) if err == nil { return entry.Index, existingEntry.Term } } return entry.Index, 0 } } return 0, 0 } // AppendEntriesFromLeader handles entries received from the leader // This implements the log matching and conflict resolution logic func (lm *LogManager) AppendEntriesFromLeader(prevLogIndex, prevLogTerm uint64, entries []LogEntry) (bool, uint64, uint64) { lm.mu.Lock() defer lm.mu.Unlock() // Check if we have the entry at prevLogIndex with prevLogTerm if prevLogIndex > 0 { // If prevLogIndex is before our first index (compacted), we need snapshot if prevLogIndex < lm.firstIndex { // We've compacted past this point, tell leader we need snapshot // Return success for the heartbeat but don't process entries // The leader will detect via matchIndex that we need snapshot lm.logger.Debug("prevLogIndex %d is before firstIndex %d, need snapshot", prevLogIndex, lm.firstIndex) return false, lm.firstIndex, 0 } if prevLogIndex > lm.lastIndex { // We don't have the entry at prevLogIndex return false, lm.lastIndex + 1, 0 } entry, err := lm.storage.GetEntry(prevLogIndex) if err != nil { if err == ErrCompacted { return false, lm.firstIndex, 0 } lm.logger.Error("Failed to get entry at prevLogIndex %d: %v", prevLogIndex, err) return false, prevLogIndex, 0 } if entry.Term != prevLogTerm { // Term mismatch - find the first entry of the conflicting term conflictTerm := entry.Term conflictIndex := prevLogIndex // Search backwards for the first entry of this term for idx := prevLogIndex - 1; idx >= lm.firstIndex; idx-- { e, err := lm.storage.GetEntry(idx) if err != nil || e.Term != conflictTerm { break } conflictIndex = idx } return false, conflictIndex, conflictTerm } } // If no entries to append, just return success (heartbeat) if len(entries) == 0 { return true, 0, 0 } // Find where the new entries start newEntriesStart := 0 for i, entry := range entries { if entry.Index > lm.lastIndex { newEntriesStart = i break } // Skip entries before our firstIndex (compacted) if entry.Index < lm.firstIndex { newEntriesStart = i + 1 continue } existingEntry, err := lm.storage.GetEntry(entry.Index) if err != nil { if err == ErrCompacted { newEntriesStart = i + 1 continue } newEntriesStart = i break } if existingEntry.Term != entry.Term { // Conflict - truncate and append if err := lm.storage.TruncateAfter(entry.Index - 1); err != nil { lm.logger.Error("Failed to truncate log: %v", err) return false, entry.Index, existingEntry.Term } lm.lastIndex = entry.Index - 1 if lm.lastIndex > 0 && lm.lastIndex >= lm.firstIndex { if e, err := lm.storage.GetEntry(lm.lastIndex); err == nil { lm.lastTerm = e.Term } } else { lm.lastTerm = 0 } newEntriesStart = i break } newEntriesStart = i + 1 } // Append new entries if newEntriesStart < len(entries) { newEntries := entries[newEntriesStart:] if err := lm.storage.AppendEntries(newEntries); err != nil { lm.logger.Error("Failed to append entries: %v", err) return false, 0, 0 } // Update cached values only if entries were actually appended // Check storage to get actual lastIndex actualLastIndex := lm.storage.GetLastIndex() if actualLastIndex > lm.lastIndex { lm.lastIndex = actualLastIndex if e, err := lm.storage.GetEntry(lm.lastIndex); err == nil { lm.lastTerm = e.Term } } if lm.firstIndex == 0 && len(newEntries) > 0 { lm.firstIndex = newEntries[0].Index } } return true, 0, 0 } // IsUpToDate checks if the given log is at least as up-to-date as this log // Used for leader election func (lm *LogManager) IsUpToDate(lastLogIndex, lastLogTerm uint64) bool { lm.mu.RLock() defer lm.mu.RUnlock() if lastLogTerm != lm.lastTerm { return lastLogTerm > lm.lastTerm } return lastLogIndex >= lm.lastIndex } // GetEntriesForFollower returns entries to send to a follower // starting from nextIndex, limited by maxEntries func (lm *LogManager) GetEntriesForFollower(nextIndex uint64, maxEntries int) ([]LogEntry, uint64, uint64, error) { lm.mu.RLock() firstIndex := lm.firstIndex lastIndex := lm.lastIndex lm.mu.RUnlock() // Check if requested entries have been compacted if nextIndex < firstIndex { return nil, 0, 0, ErrCompacted } // If nextIndex is beyond our log, return empty entries if nextIndex > lastIndex+1 { return nil, lastIndex, 0, nil } prevLogIndex := nextIndex - 1 var prevLogTerm uint64 if prevLogIndex > 0 { // If prevLogIndex is before firstIndex, we need snapshot if prevLogIndex < firstIndex { return nil, 0, 0, ErrCompacted } entry, err := lm.storage.GetEntry(prevLogIndex) if err != nil { // If compacted, signal that snapshot is needed if err == ErrCompacted { return nil, 0, 0, ErrCompacted } return nil, 0, 0, err } prevLogTerm = entry.Term } // Calculate end index endIndex := lastIndex + 1 if nextIndex+uint64(maxEntries) < endIndex { endIndex = nextIndex + uint64(maxEntries) } // Get entries if nextIndex >= endIndex { return nil, prevLogIndex, prevLogTerm, nil } entries, err := lm.storage.GetEntries(nextIndex, endIndex) if err != nil { return nil, 0, 0, err } return entries, prevLogIndex, prevLogTerm, nil } // Compact removes entries before the given index func (lm *LogManager) Compact(index uint64) error { lm.mu.Lock() defer lm.mu.Unlock() if err := lm.storage.TruncateBefore(index); err != nil { return err } if index > lm.firstIndex { lm.firstIndex = index } return nil } // Sync forces a sync to disk func (lm *LogManager) Sync() error { lm.mu.Lock() defer lm.mu.Unlock() return lm.storage.Sync() } // Flush forces buffered data to OS cache func (lm *LogManager) Flush() error { lm.mu.Lock() defer lm.mu.Unlock() return lm.storage.Flush() }