| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437 |
- package raft
- import (
- "sync"
- )
- // LogManager provides a high-level interface for managing Raft logs
- type LogManager struct {
- mu sync.RWMutex
- storage Storage
- logger Logger
- // Cached values for fast access
- firstIndex uint64
- lastIndex uint64
- lastTerm uint64
- }
- // NewLogManager creates a new log manager
- func NewLogManager(storage Storage, logger Logger) *LogManager {
- if logger == nil {
- logger = &NoopLogger{}
- }
- lm := &LogManager{
- storage: storage,
- logger: logger,
- }
- // Initialize cached values
- lm.firstIndex = storage.GetFirstIndex()
- lm.lastIndex = storage.GetLastIndex()
- if lm.lastIndex > 0 {
- if entry, err := storage.GetEntry(lm.lastIndex); err == nil {
- lm.lastTerm = entry.Term
- }
- }
- return lm
- }
- // FirstIndex returns the first index in the log
- func (lm *LogManager) FirstIndex() uint64 {
- lm.mu.RLock()
- defer lm.mu.RUnlock()
- return lm.firstIndex
- }
- // LastIndex returns the last index in the log
- func (lm *LogManager) LastIndex() uint64 {
- lm.mu.RLock()
- defer lm.mu.RUnlock()
- return lm.lastIndex
- }
- // LastTerm returns the term of the last entry
- func (lm *LogManager) LastTerm() uint64 {
- lm.mu.RLock()
- defer lm.mu.RUnlock()
- return lm.lastTerm
- }
- // LastIndexAndTerm returns both last index and term atomically
- func (lm *LogManager) LastIndexAndTerm() (uint64, uint64) {
- lm.mu.RLock()
- defer lm.mu.RUnlock()
- return lm.lastIndex, lm.lastTerm
- }
- // GetEntry retrieves a single entry
- func (lm *LogManager) GetEntry(index uint64) (*LogEntry, error) {
- return lm.storage.GetEntry(index)
- }
- // GetEntries retrieves a range of entries [start, end)
- func (lm *LogManager) GetEntries(start, end uint64) ([]LogEntry, error) {
- return lm.storage.GetEntries(start, end)
- }
- // GetTerm returns the term of the entry at the given index
- func (lm *LogManager) GetTerm(index uint64) (uint64, error) {
- if index == 0 {
- return 0, nil
- }
- lm.mu.RLock()
- // Fast path for last index
- if index == lm.lastIndex {
- term := lm.lastTerm
- lm.mu.RUnlock()
- return term, nil
- }
- // Check if index is in valid range
- if index < lm.firstIndex {
- lm.mu.RUnlock()
- return 0, ErrCompacted
- }
- if index > lm.lastIndex {
- lm.mu.RUnlock()
- return 0, ErrOutOfRange
- }
- lm.mu.RUnlock()
- entry, err := lm.storage.GetEntry(index)
- if err != nil {
- return 0, err
- }
- return entry.Term, nil
- }
- // Append adds new entries to the log
- func (lm *LogManager) Append(entries ...LogEntry) error {
- if len(entries) == 0 {
- return nil
- }
- lm.mu.Lock()
- defer lm.mu.Unlock()
- // Assign indices if not set
- for i := range entries {
- if entries[i].Index == 0 {
- entries[i].Index = lm.lastIndex + uint64(i) + 1
- }
- }
- if err := lm.storage.AppendEntries(entries); err != nil {
- return err
- }
- // Update cached values
- lastEntry := entries[len(entries)-1]
- lm.lastIndex = lastEntry.Index
- lm.lastTerm = lastEntry.Term
- if lm.firstIndex == 0 {
- lm.firstIndex = entries[0].Index
- }
- return nil
- }
- // AppendCommand creates and appends a new log entry with the given command
- func (lm *LogManager) AppendCommand(term uint64, command []byte) (uint64, error) {
- lm.mu.Lock()
- index := lm.lastIndex + 1
- lm.mu.Unlock()
- entry := LogEntry{
- Index: index,
- Term: term,
- Command: command,
- }
- if err := lm.Append(entry); err != nil {
- return 0, err
- }
- return index, nil
- }
- // TruncateAfter removes all entries after the given index
- func (lm *LogManager) TruncateAfter(index uint64) error {
- lm.mu.Lock()
- defer lm.mu.Unlock()
- if err := lm.storage.TruncateAfter(index); err != nil {
- return err
- }
- lm.lastIndex = index
- if index > 0 {
- if entry, err := lm.storage.GetEntry(index); err == nil {
- lm.lastTerm = entry.Term
- }
- } else {
- lm.lastTerm = 0
- }
- return nil
- }
- // MatchTerm checks if the entry at the given index has the given term
- func (lm *LogManager) MatchTerm(index, term uint64) bool {
- if index == 0 {
- return term == 0
- }
- entryTerm, err := lm.GetTerm(index)
- if err != nil {
- return false
- }
- return entryTerm == term
- }
- // FindConflict finds the first entry that conflicts with the given entries
- // Returns the index and term of the first conflicting entry, or 0, 0 if no conflict
- func (lm *LogManager) FindConflict(entries []LogEntry) (uint64, uint64) {
- for _, entry := range entries {
- if !lm.MatchTerm(entry.Index, entry.Term) {
- if entry.Index <= lm.LastIndex() {
- existingEntry, err := lm.GetEntry(entry.Index)
- if err == nil {
- return entry.Index, existingEntry.Term
- }
- }
- return entry.Index, 0
- }
- }
- return 0, 0
- }
- // AppendEntriesFromLeader handles entries received from the leader
- // This implements the log matching and conflict resolution logic
- func (lm *LogManager) AppendEntriesFromLeader(prevLogIndex, prevLogTerm uint64, entries []LogEntry) (bool, uint64, uint64) {
- lm.mu.Lock()
- defer lm.mu.Unlock()
- // Check if we have the entry at prevLogIndex with prevLogTerm
- if prevLogIndex > 0 {
- // If prevLogIndex is before our first index (compacted), we need snapshot
- if prevLogIndex < lm.firstIndex {
- // We've compacted past this point, tell leader we need snapshot
- // Return success for the heartbeat but don't process entries
- // The leader will detect via matchIndex that we need snapshot
- lm.logger.Debug("prevLogIndex %d is before firstIndex %d, need snapshot", prevLogIndex, lm.firstIndex)
- return false, lm.firstIndex, 0
- }
- if prevLogIndex > lm.lastIndex {
- // We don't have the entry at prevLogIndex
- return false, lm.lastIndex + 1, 0
- }
- entry, err := lm.storage.GetEntry(prevLogIndex)
- if err != nil {
- if err == ErrCompacted {
- return false, lm.firstIndex, 0
- }
- lm.logger.Error("Failed to get entry at prevLogIndex %d: %v", prevLogIndex, err)
- return false, prevLogIndex, 0
- }
- if entry.Term != prevLogTerm {
- // Term mismatch - find the first entry of the conflicting term
- conflictTerm := entry.Term
- conflictIndex := prevLogIndex
- // Search backwards for the first entry of this term
- for idx := prevLogIndex - 1; idx >= lm.firstIndex; idx-- {
- e, err := lm.storage.GetEntry(idx)
- if err != nil || e.Term != conflictTerm {
- break
- }
- conflictIndex = idx
- }
- return false, conflictIndex, conflictTerm
- }
- }
- // If no entries to append, just return success (heartbeat)
- if len(entries) == 0 {
- return true, 0, 0
- }
- // Find where the new entries start
- newEntriesStart := 0
- for i, entry := range entries {
- if entry.Index > lm.lastIndex {
- newEntriesStart = i
- break
- }
- // Skip entries before our firstIndex (compacted)
- if entry.Index < lm.firstIndex {
- newEntriesStart = i + 1
- continue
- }
- existingEntry, err := lm.storage.GetEntry(entry.Index)
- if err != nil {
- if err == ErrCompacted {
- newEntriesStart = i + 1
- continue
- }
- newEntriesStart = i
- break
- }
- if existingEntry.Term != entry.Term {
- // Conflict - truncate and append
- if err := lm.storage.TruncateAfter(entry.Index - 1); err != nil {
- lm.logger.Error("Failed to truncate log: %v", err)
- return false, entry.Index, existingEntry.Term
- }
- lm.lastIndex = entry.Index - 1
- if lm.lastIndex > 0 && lm.lastIndex >= lm.firstIndex {
- if e, err := lm.storage.GetEntry(lm.lastIndex); err == nil {
- lm.lastTerm = e.Term
- }
- } else {
- lm.lastTerm = 0
- }
- newEntriesStart = i
- break
- }
- newEntriesStart = i + 1
- }
- // Append new entries
- if newEntriesStart < len(entries) {
- newEntries := entries[newEntriesStart:]
- if err := lm.storage.AppendEntries(newEntries); err != nil {
- lm.logger.Error("Failed to append entries: %v", err)
- return false, 0, 0
- }
- // Update cached values only if entries were actually appended
- // Check storage to get actual lastIndex
- actualLastIndex := lm.storage.GetLastIndex()
- if actualLastIndex > lm.lastIndex {
- lm.lastIndex = actualLastIndex
- if e, err := lm.storage.GetEntry(lm.lastIndex); err == nil {
- lm.lastTerm = e.Term
- }
- }
- if lm.firstIndex == 0 && len(newEntries) > 0 {
- lm.firstIndex = newEntries[0].Index
- }
- }
- return true, 0, 0
- }
- // IsUpToDate checks if the given log is at least as up-to-date as this log
- // Used for leader election
- func (lm *LogManager) IsUpToDate(lastLogIndex, lastLogTerm uint64) bool {
- lm.mu.RLock()
- defer lm.mu.RUnlock()
- if lastLogTerm != lm.lastTerm {
- return lastLogTerm > lm.lastTerm
- }
- return lastLogIndex >= lm.lastIndex
- }
- // GetEntriesForFollower returns entries to send to a follower
- // starting from nextIndex, limited by maxEntries
- func (lm *LogManager) GetEntriesForFollower(nextIndex uint64, maxEntries int) ([]LogEntry, uint64, uint64, error) {
- lm.mu.RLock()
- firstIndex := lm.firstIndex
- lastIndex := lm.lastIndex
- lm.mu.RUnlock()
- // Check if requested entries have been compacted
- if nextIndex < firstIndex {
- return nil, 0, 0, ErrCompacted
- }
- // If nextIndex is beyond our log, return empty entries
- if nextIndex > lastIndex+1 {
- return nil, lastIndex, 0, nil
- }
- prevLogIndex := nextIndex - 1
- var prevLogTerm uint64
- if prevLogIndex > 0 {
- // If prevLogIndex is before firstIndex, we need snapshot
- if prevLogIndex < firstIndex {
- return nil, 0, 0, ErrCompacted
- }
- entry, err := lm.storage.GetEntry(prevLogIndex)
- if err != nil {
- // If compacted, signal that snapshot is needed
- if err == ErrCompacted {
- return nil, 0, 0, ErrCompacted
- }
- return nil, 0, 0, err
- }
- prevLogTerm = entry.Term
- }
- // Calculate end index
- endIndex := lastIndex + 1
- if nextIndex+uint64(maxEntries) < endIndex {
- endIndex = nextIndex + uint64(maxEntries)
- }
- // Get entries
- if nextIndex >= endIndex {
- return nil, prevLogIndex, prevLogTerm, nil
- }
- entries, err := lm.storage.GetEntries(nextIndex, endIndex)
- if err != nil {
- return nil, 0, 0, err
- }
- return entries, prevLogIndex, prevLogTerm, nil
- }
- // Compact removes entries before the given index
- func (lm *LogManager) Compact(index uint64) error {
- lm.mu.Lock()
- defer lm.mu.Unlock()
- if err := lm.storage.TruncateBefore(index); err != nil {
- return err
- }
- if index > lm.firstIndex {
- lm.firstIndex = index
- }
- return nil
- }
- // Sync forces a sync to disk
- func (lm *LogManager) Sync() error {
- lm.mu.Lock()
- defer lm.mu.Unlock()
- return lm.storage.Sync()
- }
- // Flush forces buffered data to OS cache
- func (lm *LogManager) Flush() error {
- lm.mu.Lock()
- defer lm.mu.Unlock()
- return lm.storage.Flush()
- }
|