| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811 |
- package raft
- import (
- "bufio"
- "encoding/binary"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "os"
- "path/filepath"
- "sync"
- )
- var (
- ErrNotFound = errors.New("not found")
- ErrCorrupted = errors.New("corrupted data")
- ErrOutOfRange = errors.New("index out of range")
- ErrCompacted = errors.New("log has been compacted")
- )
- // Storage interface defines the persistent storage operations
- type Storage interface {
- // State operations
- GetState() (*PersistentState, error)
- SaveState(state *PersistentState) error
- // Log operations
- GetFirstIndex() uint64
- GetLastIndex() uint64
- GetEntry(index uint64) (*LogEntry, error)
- GetEntries(startIndex, endIndex uint64) ([]LogEntry, error)
- AppendEntries(entries []LogEntry) error
- TruncateAfter(index uint64) error
- TruncateBefore(index uint64) error
- // Snapshot operations
- GetSnapshot() ([]byte, uint64, uint64, error) // data, lastIndex, lastTerm, error
- SaveSnapshot(data []byte, lastIndex, lastTerm uint64) error
- // Cluster configuration operations
- GetClusterConfig() (*ClusterConfig, error)
- SaveClusterConfig(config *ClusterConfig) error
- // Lifecycle
- Close() error
- Sync() error // fsync (slow, safe)
- Flush() error // write to OS cache (fast)
- }
- // HybridStorage implements a high-performance hybrid memory + file storage
- type HybridStorage struct {
- mu sync.RWMutex
- dataDir string
- logger Logger
- // In-memory cache for fast reads
- memoryLog []LogEntry // Recent entries in memory
- memoryStart uint64 // Start index of entries in memory
- memoryCapacity int
- // File-based persistent storage
- logFile *os.File
- logWriter *bufio.Writer
- stateFile string
- snapshotFile string
- clusterFile string // Cluster configuration file
- // Index tracking
- firstIndex uint64 // First index in storage (after compaction)
- lastIndex uint64 // Last index in storage
- // Entry offset index for fast file seeks
- entryOffsets map[uint64]int64 // index -> file offset
- // State cache
- stateCache *PersistentState
- // Cluster config cache
- clusterCache *ClusterConfig
- }
- // NewHybridStorage creates a new hybrid storage instance
- func NewHybridStorage(dataDir string, memoryCapacity int, logger Logger) (*HybridStorage, error) {
- if logger == nil {
- logger = &NoopLogger{}
- }
- if memoryCapacity <= 0 {
- memoryCapacity = 1000 // Safe default
- }
- if err := os.MkdirAll(dataDir, 0755); err != nil {
- return nil, fmt.Errorf("failed to create data directory: %w", err)
- }
- s := &HybridStorage{
- dataDir: dataDir,
- logger: logger,
- memoryLog: make([]LogEntry, 0, memoryCapacity),
- memoryCapacity: memoryCapacity,
- stateFile: filepath.Join(dataDir, "state.json"),
- snapshotFile: filepath.Join(dataDir, "snapshot.bin"),
- clusterFile: filepath.Join(dataDir, "cluster.json"),
- firstIndex: 0,
- lastIndex: 0,
- entryOffsets: make(map[uint64]int64),
- }
- if err := s.recover(); err != nil {
- return nil, fmt.Errorf("failed to recover storage: %w", err)
- }
- return s, nil
- }
- // recover loads existing data from disk
- func (s *HybridStorage) recover() error {
- // Load state
- if _, err := s.loadState(); err != nil && !os.IsNotExist(err) {
- return fmt.Errorf("failed to load state: %w", err)
- }
- // Open or create log file
- logPath := filepath.Join(s.dataDir, "log.bin")
- var err error
- s.logFile, err = os.OpenFile(logPath, os.O_RDWR|os.O_CREATE, 0644)
- if err != nil {
- return fmt.Errorf("failed to open log file: %w", err)
- }
- // Load snapshot to get compaction point
- if snapData, lastIndex, lastTerm, err := s.loadSnapshot(); err == nil && len(snapData) > 0 {
- s.firstIndex = lastIndex
- s.lastIndex = lastIndex
- s.logger.Info("Loaded snapshot at index %d, term %d", lastIndex, lastTerm)
- }
- // Build index and load recent entries
- if err := s.rebuildIndex(); err != nil {
- return fmt.Errorf("failed to rebuild index: %w", err)
- }
- s.logWriter = bufio.NewWriterSize(s.logFile, 1024*1024) // 1MB buffer
- // s.logWriter = bufio.NewWriterSize(s.logFile, 64*1024) // 64KB buffer
- return nil
- }
- // rebuildIndex scans the log file and rebuilds the offset index
- func (s *HybridStorage) rebuildIndex() error {
- s.logFile.Seek(0, io.SeekStart)
- reader := bufio.NewReader(s.logFile)
- var offset int64 = 0
- var entries []LogEntry
- for {
- entry, bytesRead, err := s.readEntryAt(reader)
- if err == io.EOF {
- break
- }
- if err != nil {
- s.logger.Warn("Error reading log at offset %d: %v", offset, err)
- break
- }
- if entry.Index > s.firstIndex {
- s.entryOffsets[entry.Index] = offset
- entries = append(entries, *entry)
- if s.firstIndex == 0 || entry.Index < s.firstIndex {
- s.firstIndex = entry.Index
- }
- if entry.Index > s.lastIndex {
- s.lastIndex = entry.Index
- }
- }
- offset += int64(bytesRead)
- }
- // Load recent entries into memory
- if len(entries) > 0 {
- startIdx := 0
- if len(entries) > s.memoryCapacity {
- startIdx = len(entries) - s.memoryCapacity
- }
- s.memoryLog = entries[startIdx:]
- s.memoryStart = s.memoryLog[0].Index
- s.logger.Info("Loaded %d entries into memory, starting at index %d", len(s.memoryLog), s.memoryStart)
- }
- // Seek to end for appending
- s.logFile.Seek(0, io.SeekEnd)
- return nil
- }
- // readEntryAt reads a single entry from the reader
- func (s *HybridStorage) readEntryAt(reader *bufio.Reader) (*LogEntry, int, error) {
- // Format: [4 bytes length][json data]
- lenBuf := make([]byte, 4)
- if _, err := io.ReadFull(reader, lenBuf); err != nil {
- return nil, 0, err
- }
- length := binary.BigEndian.Uint32(lenBuf)
- if length > 10*1024*1024 { // 10MB limit
- return nil, 0, ErrCorrupted
- }
- data := make([]byte, length)
- if _, err := io.ReadFull(reader, data); err != nil {
- return nil, 0, err
- }
- var entry LogEntry
- if err := json.Unmarshal(data, &entry); err != nil {
- return nil, 0, err
- }
- return &entry, 4 + int(length), nil
- }
- // GetState returns the current persistent state
- func (s *HybridStorage) GetState() (*PersistentState, error) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- if s.stateCache != nil {
- return s.stateCache, nil
- }
- return s.loadState()
- }
- func (s *HybridStorage) loadState() (*PersistentState, error) {
- data, err := os.ReadFile(s.stateFile)
- if err != nil {
- if os.IsNotExist(err) {
- return &PersistentState{}, nil
- }
- return nil, err
- }
- var state PersistentState
- if err := json.Unmarshal(data, &state); err != nil {
- return nil, err
- }
- s.stateCache = &state
- return &state, nil
- }
- // SaveState persists the state to disk
- func (s *HybridStorage) SaveState(state *PersistentState) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- // Ensure data directory exists
- if err := os.MkdirAll(s.dataDir, 0755); err != nil {
- return fmt.Errorf("failed to create data directory: %w", err)
- }
- data, err := json.Marshal(state)
- if err != nil {
- return err
- }
- // Write to temp file first for atomicity
- tmpFile := s.stateFile + ".tmp"
- if err := os.WriteFile(tmpFile, data, 0644); err != nil {
- return err
- }
- if err := os.Rename(tmpFile, s.stateFile); err != nil {
- return err
- }
- s.stateCache = state
- return nil
- }
- // GetFirstIndex returns the first available log index
- func (s *HybridStorage) GetFirstIndex() uint64 {
- s.mu.RLock()
- defer s.mu.RUnlock()
- return s.firstIndex
- }
- // GetLastIndex returns the last log index
- func (s *HybridStorage) GetLastIndex() uint64 {
- s.mu.RLock()
- defer s.mu.RUnlock()
- return s.lastIndex
- }
- // GetEntry retrieves a single log entry by index
- func (s *HybridStorage) GetEntry(index uint64) (*LogEntry, error) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- if index < s.firstIndex {
- return nil, ErrCompacted
- }
- if index > s.lastIndex {
- return nil, ErrOutOfRange
- }
- // Try memory first (fast path)
- if index >= s.memoryStart && len(s.memoryLog) > 0 {
- memIdx := int(index - s.memoryStart)
- if memIdx >= 0 && memIdx < len(s.memoryLog) {
- entry := s.memoryLog[memIdx]
- return &entry, nil
- }
- }
- // Fall back to file
- return s.readEntryFromFile(index)
- }
- // readEntryFromFile reads an entry from the log file
- func (s *HybridStorage) readEntryFromFile(index uint64) (*LogEntry, error) {
- offset, ok := s.entryOffsets[index]
- if !ok {
- return nil, ErrNotFound
- }
- if _, err := s.logFile.Seek(offset, io.SeekStart); err != nil {
- return nil, err
- }
- reader := bufio.NewReader(s.logFile)
- entry, _, err := s.readEntryAt(reader)
- return entry, err
- }
- // GetEntries retrieves a range of log entries [startIndex, endIndex)
- func (s *HybridStorage) GetEntries(startIndex, endIndex uint64) ([]LogEntry, error) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- if startIndex < s.firstIndex {
- return nil, ErrCompacted
- }
- if endIndex > s.lastIndex+1 {
- endIndex = s.lastIndex + 1
- }
- if startIndex >= endIndex {
- return nil, nil
- }
- entries := make([]LogEntry, 0, endIndex-startIndex)
- // Check if all requested entries are in memory
- if startIndex >= s.memoryStart && len(s.memoryLog) > 0 {
- memStartIdx := int(startIndex - s.memoryStart)
- memEndIdx := int(endIndex - s.memoryStart)
- if memStartIdx >= 0 && memEndIdx <= len(s.memoryLog) {
- return append(entries, s.memoryLog[memStartIdx:memEndIdx]...), nil
- }
- }
- // Need to read from file
- for idx := startIndex; idx < endIndex; idx++ {
- // Try memory first
- if idx >= s.memoryStart && len(s.memoryLog) > 0 {
- memIdx := int(idx - s.memoryStart)
- if memIdx >= 0 && memIdx < len(s.memoryLog) {
- entries = append(entries, s.memoryLog[memIdx])
- continue
- }
- }
- // Read from file
- entry, err := s.readEntryFromFile(idx)
- if err != nil {
- return nil, err
- }
- entries = append(entries, *entry)
- }
- return entries, nil
- }
- // AppendEntries appends new entries to the log
- // It will skip entries that already exist and only append sequential new entries
- func (s *HybridStorage) AppendEntries(entries []LogEntry) error {
- if len(entries) == 0 {
- return nil
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- // Filter entries: only append sequential entries starting from lastIndex + 1
- // This handles overlapping entries and gaps gracefully
- var newEntries []LogEntry
- nextExpected := s.lastIndex + 1
- // If log was compacted and this is a fresh start, adjust nextExpected
- if s.lastIndex == 0 && s.firstIndex > 0 {
- nextExpected = s.firstIndex + 1
- }
- for _, entry := range entries {
- if entry.Index == nextExpected {
- // This is the next expected entry, add it
- newEntries = append(newEntries, entry)
- nextExpected++
- } else if entry.Index > nextExpected {
- // Gap detected - this is normal during follower catch-up
- // Leader will send snapshot or earlier entries
- s.logger.Debug("Gap in entries: got %d, expected %d (will wait for leader)", entry.Index, nextExpected)
- break
- }
- // entry.Index < nextExpected: already exists, skip
- }
- if len(newEntries) == 0 {
- return nil // All entries already exist or there's a gap
- }
- // Get current file offset for indexing
- currentOffset, err := s.logFile.Seek(0, io.SeekEnd)
- if err != nil {
- return err
- }
- for i, entry := range newEntries {
- // Write to file
- data, err := json.Marshal(entry)
- if err != nil {
- return err
- }
- lenBuf := make([]byte, 4)
- binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
- if _, err := s.logWriter.Write(lenBuf); err != nil {
- return err
- }
- if _, err := s.logWriter.Write(data); err != nil {
- return err
- }
- // Update index
- s.entryOffsets[entry.Index] = currentOffset
- currentOffset += int64(4 + len(data))
- // Update memory cache
- // Initialize memoryStart when first entry is added
- if len(s.memoryLog) == 0 {
- s.memoryStart = entry.Index
- }
- s.memoryLog = append(s.memoryLog, entry)
- // Trim memory if needed
- if len(s.memoryLog) > s.memoryCapacity {
- excess := len(s.memoryLog) - s.memoryCapacity
- s.memoryLog = s.memoryLog[excess:]
- s.memoryStart = s.memoryLog[0].Index
- }
- s.lastIndex = entry.Index
- if s.firstIndex == 0 || (i == 0 && newEntries[0].Index < s.firstIndex) {
- s.firstIndex = newEntries[0].Index
- }
- }
- // Flush to disk is now handled by the caller or periodically
- return s.logWriter.Flush()
- // return nil
- }
- // TruncateAfter removes all entries after the given index
- func (s *HybridStorage) TruncateAfter(index uint64) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- if index >= s.lastIndex {
- return nil
- }
- // Truncate file
- if offset, ok := s.entryOffsets[index+1]; ok {
- if err := s.logFile.Truncate(offset); err != nil {
- return err
- }
- s.logFile.Seek(0, io.SeekEnd)
- }
- // Remove from index
- for idx := index + 1; idx <= s.lastIndex; idx++ {
- delete(s.entryOffsets, idx)
- }
- // Truncate memory
- if index < s.memoryStart {
- s.memoryLog = s.memoryLog[:0]
- s.memoryStart = 0
- } else if index >= s.memoryStart && len(s.memoryLog) > 0 {
- memIdx := int(index - s.memoryStart + 1)
- if memIdx < len(s.memoryLog) {
- s.memoryLog = s.memoryLog[:memIdx]
- }
- }
- s.lastIndex = index
- return nil
- }
- // TruncateBefore removes all entries before the given index (for compaction)
- func (s *HybridStorage) TruncateBefore(index uint64) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- if index <= s.firstIndex {
- return nil
- }
- // Remove from index
- for idx := s.firstIndex; idx < index; idx++ {
- delete(s.entryOffsets, idx)
- }
- // Truncate memory
- if index > s.memoryStart && len(s.memoryLog) > 0 {
- memIdx := int(index - s.memoryStart)
- if memIdx >= len(s.memoryLog) {
- s.memoryLog = s.memoryLog[:0]
- s.memoryStart = 0
- } else if memIdx > 0 {
- s.memoryLog = s.memoryLog[memIdx:]
- s.memoryStart = s.memoryLog[0].Index
- }
- }
- s.firstIndex = index
- // Note: We don't actually truncate the file here to avoid expensive rewrites
- // The compacted entries will be cleaned up during snapshot restoration
- // return nil
- // Flush any pending writes before reading for compaction
- if s.logWriter != nil {
- s.logWriter.Flush()
- }
- return s.compactLogFile(index)
- }
- // compactLogFile physically rewrites the log file to remove compacted entries
- func (s *HybridStorage) compactLogFile(newFirstIndex uint64) error {
- s.logger.Info("Starting physical log compaction. Keeping entries from %d to %d", newFirstIndex, s.lastIndex)
- // 1. Create temp file
- tmpPath := filepath.Join(s.dataDir, "log.bin.tmp")
- tmpFile, err := os.OpenFile(tmpPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
- if err != nil {
- return err
- }
- // Clean up temp file on error
- defer func() {
- if tmpFile != nil {
- tmpFile.Close()
- // Only remove if we didn't successfully rename
- if _, err := os.Stat(tmpPath); err == nil {
- os.Remove(tmpPath)
- }
- }
- }()
- tmpWriter := bufio.NewWriterSize(tmpFile, 1024*1024)
- newOffsets := make(map[uint64]int64)
- var currentOffset int64 = 0
- // 2. Copy entries
- for idx := newFirstIndex; idx <= s.lastIndex; idx++ {
- // Read from old file
- // Note: We use readEntryFromFile which uses s.logFile and s.entryOffsets
- entry, err := s.readEntryFromFile(idx)
- if err != nil {
- return fmt.Errorf("failed to read entry %d during compaction: %v", idx, err)
- }
- // Write to new file
- data, err := json.Marshal(entry)
- if err != nil {
- return err
- }
- lenBuf := make([]byte, 4)
- binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
- if _, err := tmpWriter.Write(lenBuf); err != nil {
- return err
- }
- if _, err := tmpWriter.Write(data); err != nil {
- return err
- }
- newOffsets[idx] = currentOffset
- currentOffset += int64(4 + len(data))
- }
- if err := tmpWriter.Flush(); err != nil {
- return err
- }
- // 3. Swap files
- // Close new file (defer will handle it, but we need to close before rename)
- if err := tmpFile.Close(); err != nil {
- tmpFile = nil // Prevent double close in defer
- return err
- }
- tmpFile = nil // Prevent double close in defer
- // Close old file
- if err := s.logFile.Close(); err != nil {
- return err
- }
- s.logFile = nil // Prevent usage
- // Rename
- logPath := filepath.Join(s.dataDir, "log.bin")
- if err := os.Rename(tmpPath, logPath); err != nil {
- // Try to recover old file? It's still there as log.bin
- // Re-open old file
- f, openErr := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE, 0644)
- if openErr == nil {
- s.logFile = f
- s.logWriter = bufio.NewWriterSize(s.logFile, 1024*1024)
- s.logFile.Seek(0, io.SeekEnd)
- }
- return err
- }
- // Re-open log file
- s.logFile, err = os.OpenFile(logPath, os.O_RDWR|os.O_CREATE, 0644)
- if err != nil {
- return err
- }
- s.logWriter = bufio.NewWriterSize(s.logFile, 1024*1024)
- // Seek to end
- if _, err := s.logFile.Seek(0, io.SeekEnd); err != nil {
- return err
- }
- // Update offsets
- s.entryOffsets = newOffsets
- s.logger.Info("Compacted log file successfully. New size: %d bytes", currentOffset)
- return nil
- }
- // loadSnapshot reads the snapshot from disk
- func (s *HybridStorage) loadSnapshot() ([]byte, uint64, uint64, error) {
- data, err := os.ReadFile(s.snapshotFile)
- if err != nil {
- return nil, 0, 0, err
- }
- if len(data) < 16 {
- return nil, 0, 0, ErrCorrupted
- }
- lastIndex := binary.BigEndian.Uint64(data[:8])
- lastTerm := binary.BigEndian.Uint64(data[8:16])
- snapData := data[16:]
- return snapData, lastIndex, lastTerm, nil
- }
- // GetSnapshot returns the current snapshot
- func (s *HybridStorage) GetSnapshot() ([]byte, uint64, uint64, error) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- return s.loadSnapshot()
- }
- // SaveSnapshot saves a new snapshot
- func (s *HybridStorage) SaveSnapshot(data []byte, lastIndex, lastTerm uint64) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- // Ensure data directory exists
- if err := os.MkdirAll(s.dataDir, 0755); err != nil {
- return fmt.Errorf("failed to create data directory: %w", err)
- }
- // Format: [8 bytes lastIndex][8 bytes lastTerm][snapshot data]
- buf := make([]byte, 16+len(data))
- binary.BigEndian.PutUint64(buf[:8], lastIndex)
- binary.BigEndian.PutUint64(buf[8:16], lastTerm)
- copy(buf[16:], data)
- // Write to temp file first
- tmpFile := s.snapshotFile + ".tmp"
- if err := os.WriteFile(tmpFile, buf, 0644); err != nil {
- return err
- }
- return os.Rename(tmpFile, s.snapshotFile)
- }
- // GetClusterConfig returns the current cluster configuration
- func (s *HybridStorage) GetClusterConfig() (*ClusterConfig, error) {
- s.mu.RLock()
- defer s.mu.RUnlock()
- if s.clusterCache != nil {
- return s.clusterCache, nil
- }
- return s.loadClusterConfig()
- }
- // loadClusterConfig reads the cluster configuration from disk
- func (s *HybridStorage) loadClusterConfig() (*ClusterConfig, error) {
- data, err := os.ReadFile(s.clusterFile)
- if err != nil {
- if os.IsNotExist(err) {
- return nil, nil // No config saved yet
- }
- return nil, err
- }
- var config ClusterConfig
- if err := json.Unmarshal(data, &config); err != nil {
- return nil, err
- }
- s.clusterCache = &config
- return &config, nil
- }
- // SaveClusterConfig persists the cluster configuration to disk
- func (s *HybridStorage) SaveClusterConfig(config *ClusterConfig) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- // Ensure data directory exists
- if err := os.MkdirAll(s.dataDir, 0755); err != nil {
- return fmt.Errorf("failed to create data directory: %w", err)
- }
- data, err := json.Marshal(config)
- if err != nil {
- return err
- }
- // Write to temp file first for atomicity
- tmpFile := s.clusterFile + ".tmp"
- if err := os.WriteFile(tmpFile, data, 0644); err != nil {
- return err
- }
- if err := os.Rename(tmpFile, s.clusterFile); err != nil {
- return err
- }
- s.clusterCache = config
- return nil
- }
- // Close closes the storage
- func (s *HybridStorage) Close() error {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.logWriter != nil {
- s.logWriter.Flush()
- }
- if s.logFile != nil {
- return s.logFile.Close()
- }
- return nil
- }
- // Sync forces a sync to disk
- func (s *HybridStorage) Sync() error {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.logWriter != nil {
- if err := s.logWriter.Flush(); err != nil {
- return err
- }
- }
- if s.logFile != nil {
- return s.logFile.Sync()
- }
- return nil
- }
- // Flush writes buffered data to the operating system
- func (s *HybridStorage) Flush() error {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.logWriter != nil {
- return s.logWriter.Flush()
- }
- return nil
- }
|