package raft import ( "bufio" "encoding/binary" "encoding/json" "errors" "fmt" "io" "os" "path/filepath" "sync" ) var ( ErrNotFound = errors.New("not found") ErrCorrupted = errors.New("corrupted data") ErrOutOfRange = errors.New("index out of range") ErrCompacted = errors.New("log has been compacted") ) // Storage interface defines the persistent storage operations type Storage interface { // State operations GetState() (*PersistentState, error) SaveState(state *PersistentState) error // Log operations GetFirstIndex() uint64 GetLastIndex() uint64 GetEntry(index uint64) (*LogEntry, error) GetEntries(startIndex, endIndex uint64) ([]LogEntry, error) AppendEntries(entries []LogEntry) error TruncateAfter(index uint64) error TruncateBefore(index uint64) error // Snapshot operations GetSnapshot() ([]byte, uint64, uint64, error) // data, lastIndex, lastTerm, error SaveSnapshot(data []byte, lastIndex, lastTerm uint64) error // Cluster configuration operations GetClusterConfig() (*ClusterConfig, error) SaveClusterConfig(config *ClusterConfig) error // Lifecycle Close() error Sync() error // fsync (slow, safe) Flush() error // write to OS cache (fast) } // HybridStorage implements a high-performance hybrid memory + file storage type HybridStorage struct { mu sync.RWMutex dataDir string logger Logger // In-memory cache for fast reads memoryLog []LogEntry // Recent entries in memory memoryStart uint64 // Start index of entries in memory memoryCapacity int // File-based persistent storage logFile *os.File logWriter *bufio.Writer stateFile string snapshotFile string clusterFile string // Cluster configuration file // Index tracking firstIndex uint64 // First index in storage (after compaction) lastIndex uint64 // Last index in storage // Entry offset index for fast file seeks entryOffsets map[uint64]int64 // index -> file offset // State cache stateCache *PersistentState // Cluster config cache clusterCache *ClusterConfig } // NewHybridStorage creates a new hybrid storage instance func NewHybridStorage(dataDir string, memoryCapacity int, logger Logger) (*HybridStorage, error) { if logger == nil { logger = &NoopLogger{} } if memoryCapacity <= 0 { memoryCapacity = 1000 // Safe default } if err := os.MkdirAll(dataDir, 0755); err != nil { return nil, fmt.Errorf("failed to create data directory: %w", err) } s := &HybridStorage{ dataDir: dataDir, logger: logger, memoryLog: make([]LogEntry, 0, memoryCapacity), memoryCapacity: memoryCapacity, stateFile: filepath.Join(dataDir, "state.json"), snapshotFile: filepath.Join(dataDir, "snapshot.bin"), clusterFile: filepath.Join(dataDir, "cluster.json"), firstIndex: 0, lastIndex: 0, entryOffsets: make(map[uint64]int64), } if err := s.recover(); err != nil { return nil, fmt.Errorf("failed to recover storage: %w", err) } return s, nil } // recover loads existing data from disk func (s *HybridStorage) recover() error { // Load state if _, err := s.loadState(); err != nil && !os.IsNotExist(err) { return fmt.Errorf("failed to load state: %w", err) } // Open or create log file logPath := filepath.Join(s.dataDir, "log.bin") var err error s.logFile, err = os.OpenFile(logPath, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return fmt.Errorf("failed to open log file: %w", err) } // Load snapshot to get compaction point if snapData, lastIndex, lastTerm, err := s.loadSnapshot(); err == nil && len(snapData) > 0 { s.firstIndex = lastIndex s.lastIndex = lastIndex s.logger.Info("Loaded snapshot at index %d, term %d", lastIndex, lastTerm) } // Build index and load recent entries if err := s.rebuildIndex(); err != nil { return fmt.Errorf("failed to rebuild index: %w", err) } s.logWriter = bufio.NewWriterSize(s.logFile, 1024*1024) // 1MB buffer // s.logWriter = bufio.NewWriterSize(s.logFile, 64*1024) // 64KB buffer return nil } // rebuildIndex scans the log file and rebuilds the offset index func (s *HybridStorage) rebuildIndex() error { s.logFile.Seek(0, io.SeekStart) reader := bufio.NewReader(s.logFile) var offset int64 = 0 var entries []LogEntry for { entry, bytesRead, err := s.readEntryAt(reader) if err == io.EOF { break } if err != nil { s.logger.Warn("Error reading log at offset %d: %v", offset, err) break } if entry.Index > s.firstIndex { s.entryOffsets[entry.Index] = offset entries = append(entries, *entry) if s.firstIndex == 0 || entry.Index < s.firstIndex { s.firstIndex = entry.Index } if entry.Index > s.lastIndex { s.lastIndex = entry.Index } } offset += int64(bytesRead) } // Load recent entries into memory if len(entries) > 0 { startIdx := 0 if len(entries) > s.memoryCapacity { startIdx = len(entries) - s.memoryCapacity } s.memoryLog = entries[startIdx:] s.memoryStart = s.memoryLog[0].Index s.logger.Info("Loaded %d entries into memory, starting at index %d", len(s.memoryLog), s.memoryStart) } // Seek to end for appending s.logFile.Seek(0, io.SeekEnd) return nil } // readEntryAt reads a single entry from the reader func (s *HybridStorage) readEntryAt(reader *bufio.Reader) (*LogEntry, int, error) { // Format: [4 bytes length][json data] lenBuf := make([]byte, 4) if _, err := io.ReadFull(reader, lenBuf); err != nil { return nil, 0, err } length := binary.BigEndian.Uint32(lenBuf) if length > 10*1024*1024 { // 10MB limit return nil, 0, ErrCorrupted } data := make([]byte, length) if _, err := io.ReadFull(reader, data); err != nil { return nil, 0, err } var entry LogEntry if err := json.Unmarshal(data, &entry); err != nil { return nil, 0, err } return &entry, 4 + int(length), nil } // GetState returns the current persistent state func (s *HybridStorage) GetState() (*PersistentState, error) { s.mu.RLock() defer s.mu.RUnlock() if s.stateCache != nil { return s.stateCache, nil } return s.loadState() } func (s *HybridStorage) loadState() (*PersistentState, error) { data, err := os.ReadFile(s.stateFile) if err != nil { if os.IsNotExist(err) { return &PersistentState{}, nil } return nil, err } var state PersistentState if err := json.Unmarshal(data, &state); err != nil { return nil, err } s.stateCache = &state return &state, nil } // SaveState persists the state to disk func (s *HybridStorage) SaveState(state *PersistentState) error { s.mu.Lock() defer s.mu.Unlock() // Ensure data directory exists if err := os.MkdirAll(s.dataDir, 0755); err != nil { return fmt.Errorf("failed to create data directory: %w", err) } data, err := json.Marshal(state) if err != nil { return err } // Write to temp file first for atomicity tmpFile := s.stateFile + ".tmp" if err := os.WriteFile(tmpFile, data, 0644); err != nil { return err } if err := os.Rename(tmpFile, s.stateFile); err != nil { return err } s.stateCache = state return nil } // GetFirstIndex returns the first available log index func (s *HybridStorage) GetFirstIndex() uint64 { s.mu.RLock() defer s.mu.RUnlock() return s.firstIndex } // GetLastIndex returns the last log index func (s *HybridStorage) GetLastIndex() uint64 { s.mu.RLock() defer s.mu.RUnlock() return s.lastIndex } // GetEntry retrieves a single log entry by index func (s *HybridStorage) GetEntry(index uint64) (*LogEntry, error) { s.mu.RLock() defer s.mu.RUnlock() if index < s.firstIndex { return nil, ErrCompacted } if index > s.lastIndex { return nil, ErrOutOfRange } // Try memory first (fast path) if index >= s.memoryStart && len(s.memoryLog) > 0 { memIdx := int(index - s.memoryStart) if memIdx >= 0 && memIdx < len(s.memoryLog) { entry := s.memoryLog[memIdx] return &entry, nil } } // Fall back to file return s.readEntryFromFile(index) } // readEntryFromFile reads an entry from the log file func (s *HybridStorage) readEntryFromFile(index uint64) (*LogEntry, error) { offset, ok := s.entryOffsets[index] if !ok { return nil, ErrNotFound } if _, err := s.logFile.Seek(offset, io.SeekStart); err != nil { return nil, err } reader := bufio.NewReader(s.logFile) entry, _, err := s.readEntryAt(reader) return entry, err } // GetEntries retrieves a range of log entries [startIndex, endIndex) func (s *HybridStorage) GetEntries(startIndex, endIndex uint64) ([]LogEntry, error) { s.mu.RLock() defer s.mu.RUnlock() if startIndex < s.firstIndex { return nil, ErrCompacted } if endIndex > s.lastIndex+1 { endIndex = s.lastIndex + 1 } if startIndex >= endIndex { return nil, nil } entries := make([]LogEntry, 0, endIndex-startIndex) // Check if all requested entries are in memory if startIndex >= s.memoryStart && len(s.memoryLog) > 0 { memStartIdx := int(startIndex - s.memoryStart) memEndIdx := int(endIndex - s.memoryStart) if memStartIdx >= 0 && memEndIdx <= len(s.memoryLog) { return append(entries, s.memoryLog[memStartIdx:memEndIdx]...), nil } } // Need to read from file for idx := startIndex; idx < endIndex; idx++ { // Try memory first if idx >= s.memoryStart && len(s.memoryLog) > 0 { memIdx := int(idx - s.memoryStart) if memIdx >= 0 && memIdx < len(s.memoryLog) { entries = append(entries, s.memoryLog[memIdx]) continue } } // Read from file entry, err := s.readEntryFromFile(idx) if err != nil { return nil, err } entries = append(entries, *entry) } return entries, nil } // AppendEntries appends new entries to the log // It will skip entries that already exist and only append sequential new entries func (s *HybridStorage) AppendEntries(entries []LogEntry) error { if len(entries) == 0 { return nil } s.mu.Lock() defer s.mu.Unlock() // Filter entries: only append sequential entries starting from lastIndex + 1 // This handles overlapping entries and gaps gracefully var newEntries []LogEntry nextExpected := s.lastIndex + 1 // If log was compacted and this is a fresh start, adjust nextExpected if s.lastIndex == 0 && s.firstIndex > 0 { nextExpected = s.firstIndex + 1 } for _, entry := range entries { if entry.Index == nextExpected { // This is the next expected entry, add it newEntries = append(newEntries, entry) nextExpected++ } else if entry.Index > nextExpected { // Gap detected - this is normal during follower catch-up // Leader will send snapshot or earlier entries s.logger.Debug("Gap in entries: got %d, expected %d (will wait for leader)", entry.Index, nextExpected) break } // entry.Index < nextExpected: already exists, skip } if len(newEntries) == 0 { return nil // All entries already exist or there's a gap } // Get current file offset for indexing currentOffset, err := s.logFile.Seek(0, io.SeekEnd) if err != nil { return err } for i, entry := range newEntries { // Write to file data, err := json.Marshal(entry) if err != nil { return err } lenBuf := make([]byte, 4) binary.BigEndian.PutUint32(lenBuf, uint32(len(data))) if _, err := s.logWriter.Write(lenBuf); err != nil { return err } if _, err := s.logWriter.Write(data); err != nil { return err } // Update index s.entryOffsets[entry.Index] = currentOffset currentOffset += int64(4 + len(data)) // Update memory cache // Initialize memoryStart when first entry is added if len(s.memoryLog) == 0 { s.memoryStart = entry.Index } s.memoryLog = append(s.memoryLog, entry) // Trim memory if needed if len(s.memoryLog) > s.memoryCapacity { excess := len(s.memoryLog) - s.memoryCapacity s.memoryLog = s.memoryLog[excess:] s.memoryStart = s.memoryLog[0].Index } s.lastIndex = entry.Index if s.firstIndex == 0 || (i == 0 && newEntries[0].Index < s.firstIndex) { s.firstIndex = newEntries[0].Index } } // Flush to disk is now handled by the caller or periodically return s.logWriter.Flush() // return nil } // TruncateAfter removes all entries after the given index func (s *HybridStorage) TruncateAfter(index uint64) error { s.mu.Lock() defer s.mu.Unlock() if index >= s.lastIndex { return nil } // Truncate file if offset, ok := s.entryOffsets[index+1]; ok { if err := s.logFile.Truncate(offset); err != nil { return err } s.logFile.Seek(0, io.SeekEnd) } // Remove from index for idx := index + 1; idx <= s.lastIndex; idx++ { delete(s.entryOffsets, idx) } // Truncate memory if index < s.memoryStart { s.memoryLog = s.memoryLog[:0] s.memoryStart = 0 } else if index >= s.memoryStart && len(s.memoryLog) > 0 { memIdx := int(index - s.memoryStart + 1) if memIdx < len(s.memoryLog) { s.memoryLog = s.memoryLog[:memIdx] } } s.lastIndex = index return nil } // TruncateBefore removes all entries before the given index (for compaction) func (s *HybridStorage) TruncateBefore(index uint64) error { s.mu.Lock() defer s.mu.Unlock() if index <= s.firstIndex { return nil } // Remove from index for idx := s.firstIndex; idx < index; idx++ { delete(s.entryOffsets, idx) } // Truncate memory if index > s.memoryStart && len(s.memoryLog) > 0 { memIdx := int(index - s.memoryStart) if memIdx >= len(s.memoryLog) { s.memoryLog = s.memoryLog[:0] s.memoryStart = 0 } else if memIdx > 0 { s.memoryLog = s.memoryLog[memIdx:] s.memoryStart = s.memoryLog[0].Index } } s.firstIndex = index // Note: We don't actually truncate the file here to avoid expensive rewrites // The compacted entries will be cleaned up during snapshot restoration // return nil // Flush any pending writes before reading for compaction if s.logWriter != nil { s.logWriter.Flush() } return s.compactLogFile(index) } // compactLogFile physically rewrites the log file to remove compacted entries func (s *HybridStorage) compactLogFile(newFirstIndex uint64) error { s.logger.Info("Starting physical log compaction. Keeping entries from %d to %d", newFirstIndex, s.lastIndex) // 1. Create temp file tmpPath := filepath.Join(s.dataDir, "log.bin.tmp") tmpFile, err := os.OpenFile(tmpPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return err } // Clean up temp file on error defer func() { if tmpFile != nil { tmpFile.Close() // Only remove if we didn't successfully rename if _, err := os.Stat(tmpPath); err == nil { os.Remove(tmpPath) } } }() tmpWriter := bufio.NewWriterSize(tmpFile, 1024*1024) newOffsets := make(map[uint64]int64) var currentOffset int64 = 0 // 2. Copy entries for idx := newFirstIndex; idx <= s.lastIndex; idx++ { // Read from old file // Note: We use readEntryFromFile which uses s.logFile and s.entryOffsets entry, err := s.readEntryFromFile(idx) if err != nil { return fmt.Errorf("failed to read entry %d during compaction: %v", idx, err) } // Write to new file data, err := json.Marshal(entry) if err != nil { return err } lenBuf := make([]byte, 4) binary.BigEndian.PutUint32(lenBuf, uint32(len(data))) if _, err := tmpWriter.Write(lenBuf); err != nil { return err } if _, err := tmpWriter.Write(data); err != nil { return err } newOffsets[idx] = currentOffset currentOffset += int64(4 + len(data)) } if err := tmpWriter.Flush(); err != nil { return err } // 3. Swap files // Close new file (defer will handle it, but we need to close before rename) if err := tmpFile.Close(); err != nil { tmpFile = nil // Prevent double close in defer return err } tmpFile = nil // Prevent double close in defer // Close old file if err := s.logFile.Close(); err != nil { return err } s.logFile = nil // Prevent usage // Rename logPath := filepath.Join(s.dataDir, "log.bin") if err := os.Rename(tmpPath, logPath); err != nil { // Try to recover old file? It's still there as log.bin // Re-open old file f, openErr := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE, 0644) if openErr == nil { s.logFile = f s.logWriter = bufio.NewWriterSize(s.logFile, 1024*1024) s.logFile.Seek(0, io.SeekEnd) } return err } // Re-open log file s.logFile, err = os.OpenFile(logPath, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return err } s.logWriter = bufio.NewWriterSize(s.logFile, 1024*1024) // Seek to end if _, err := s.logFile.Seek(0, io.SeekEnd); err != nil { return err } // Update offsets s.entryOffsets = newOffsets s.logger.Info("Compacted log file successfully. New size: %d bytes", currentOffset) return nil } // loadSnapshot reads the snapshot from disk func (s *HybridStorage) loadSnapshot() ([]byte, uint64, uint64, error) { data, err := os.ReadFile(s.snapshotFile) if err != nil { return nil, 0, 0, err } if len(data) < 16 { return nil, 0, 0, ErrCorrupted } lastIndex := binary.BigEndian.Uint64(data[:8]) lastTerm := binary.BigEndian.Uint64(data[8:16]) snapData := data[16:] return snapData, lastIndex, lastTerm, nil } // GetSnapshot returns the current snapshot func (s *HybridStorage) GetSnapshot() ([]byte, uint64, uint64, error) { s.mu.RLock() defer s.mu.RUnlock() return s.loadSnapshot() } // SaveSnapshot saves a new snapshot func (s *HybridStorage) SaveSnapshot(data []byte, lastIndex, lastTerm uint64) error { s.mu.Lock() defer s.mu.Unlock() // Ensure data directory exists if err := os.MkdirAll(s.dataDir, 0755); err != nil { return fmt.Errorf("failed to create data directory: %w", err) } // Format: [8 bytes lastIndex][8 bytes lastTerm][snapshot data] buf := make([]byte, 16+len(data)) binary.BigEndian.PutUint64(buf[:8], lastIndex) binary.BigEndian.PutUint64(buf[8:16], lastTerm) copy(buf[16:], data) // Write to temp file first tmpFile := s.snapshotFile + ".tmp" if err := os.WriteFile(tmpFile, buf, 0644); err != nil { return err } return os.Rename(tmpFile, s.snapshotFile) } // GetClusterConfig returns the current cluster configuration func (s *HybridStorage) GetClusterConfig() (*ClusterConfig, error) { s.mu.RLock() defer s.mu.RUnlock() if s.clusterCache != nil { return s.clusterCache, nil } return s.loadClusterConfig() } // loadClusterConfig reads the cluster configuration from disk func (s *HybridStorage) loadClusterConfig() (*ClusterConfig, error) { data, err := os.ReadFile(s.clusterFile) if err != nil { if os.IsNotExist(err) { return nil, nil // No config saved yet } return nil, err } var config ClusterConfig if err := json.Unmarshal(data, &config); err != nil { return nil, err } s.clusterCache = &config return &config, nil } // SaveClusterConfig persists the cluster configuration to disk func (s *HybridStorage) SaveClusterConfig(config *ClusterConfig) error { s.mu.Lock() defer s.mu.Unlock() // Ensure data directory exists if err := os.MkdirAll(s.dataDir, 0755); err != nil { return fmt.Errorf("failed to create data directory: %w", err) } data, err := json.Marshal(config) if err != nil { return err } // Write to temp file first for atomicity tmpFile := s.clusterFile + ".tmp" if err := os.WriteFile(tmpFile, data, 0644); err != nil { return err } if err := os.Rename(tmpFile, s.clusterFile); err != nil { return err } s.clusterCache = config return nil } // Close closes the storage func (s *HybridStorage) Close() error { s.mu.Lock() defer s.mu.Unlock() if s.logWriter != nil { s.logWriter.Flush() } if s.logFile != nil { return s.logFile.Close() } return nil } // Sync forces a sync to disk func (s *HybridStorage) Sync() error { s.mu.Lock() defer s.mu.Unlock() if s.logWriter != nil { if err := s.logWriter.Flush(); err != nil { return err } } if s.logFile != nil { return s.logFile.Sync() } return nil } // Flush writes buffered data to the operating system func (s *HybridStorage) Flush() error { s.mu.Lock() defer s.mu.Unlock() if s.logWriter != nil { return s.logWriter.Flush() } return nil }