Separate out FileAccess
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegment.java
index 3f7783ba44cda217ee718450b2c3036ddefbb30d..bab668ffd6982a6a3f2ec07bfef3cc5c9b904f35 100644 (file)
@@ -28,6 +28,8 @@ import java.nio.file.Files;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,16 +40,52 @@ import org.slf4j.LoggerFactory;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 final class JournalSegment {
+    /**
+     * Encapsulation of a {@link JournalSegment}'s state;
+     */
+    sealed interface State {
+        // Marker interface
+    }
+
+    /**
+     * Journal segment is active, i.e. there is a associated with it.
+     */
+    @NonNullByDefault
+    record Active(FileAccess access, JournalSegmentWriter writer) implements State {
+        Active {
+            requireNonNull(access);
+            requireNonNull(writer);
+        }
+
+        Inactive deactivate() {
+            final var inactive = new Inactive(writer.currentPosition());
+            access.close();
+            return inactive;
+        }
+    }
+
+    /**
+     * Journal segment is inactive, i.e. there is no writer associated with it.
+     */
+    @NonNullByDefault
+    record Inactive(int position) implements State {
+        Active activate(final JournalSegment segment) throws IOException {
+            final var access = segment.file.newAccess(segment.storageLevel, segment.maxEntrySize);
+            return new Active(access, new JournalSegmentWriter(access.newFileWriter(), segment, segment.journalIndex,
+                this));
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
 
     private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
     private final AtomicInteger references = new AtomicInteger();
-    private final JournalSegmentFile file;
-    private final StorageLevel storageLevel;
+    private final @NonNull JournalSegmentFile file;
+    private final @NonNull StorageLevel storageLevel;
+    private final @NonNull JournalIndex journalIndex;
     private final int maxEntrySize;
-    private final JournalIndex journalIndex;
 
-    private JournalSegmentWriter writer;
+    private State state;
     private boolean open = true;
 
     JournalSegment(
@@ -58,18 +96,15 @@ final class JournalSegment {
         this.file = requireNonNull(file);
         this.storageLevel = requireNonNull(storageLevel);
         this.maxEntrySize = maxEntrySize;
-        journalIndex = new SparseJournalIndex(indexDensity);
 
-        final var fileWriter = switch (storageLevel) {
-            case DISK -> new DiskFileWriter(file, maxEntrySize);
-            case MAPPED -> new MappedFileWriter(file, maxEntrySize);
-        };
+        journalIndex = new SparseJournalIndex(indexDensity);
 
-        // Traverse all entries and push them to index -- thus reconstructing both last index and current position
-        writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex,
-            indexEntries(fileWriter, this, maxEntrySize, journalIndex, Long.MAX_VALUE, null))
-            // relinquish mapped memory
-            .toFileChannel();
+        try (var tmpAccess = file.newAccess(storageLevel, maxEntrySize)) {
+            final var fileReader = tmpAccess.newFileReader();
+            state = new Inactive(indexEntries(fileReader, this, maxEntrySize, journalIndex, Long.MAX_VALUE, null));
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
     }
 
     /**
@@ -113,10 +148,19 @@ final class JournalSegment {
     /**
      * Acquires a reference to the log segment.
      */
-    private void acquire() {
-        if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
-            writer = writer.toMapped();
+    private Active acquire() {
+        return references.getAndIncrement() == 0 ? activate() : (Active) state;
+    }
+
+    private Active activate() {
+        final Active ret;
+        try {
+            ret = ((Inactive) state).activate(this);
+        } catch (IOException e) {
+            throw new StorageException(e);
         }
+        state = ret;
+        return ret;
     }
 
     /**
@@ -124,9 +168,7 @@ final class JournalSegment {
      */
     private void release() {
         if (references.decrementAndGet() == 0) {
-            if (storageLevel == StorageLevel.MAPPED) {
-                writer = writer.toFileChannel();
-            }
+            state = ((Active) state).deactivate();
             if (!open) {
                 finishClose();
             }
@@ -140,9 +182,7 @@ final class JournalSegment {
      */
     JournalSegmentWriter acquireWriter() {
         checkOpen();
-        acquire();
-
-        return writer;
+        return acquire().writer();
     }
 
     /**
@@ -159,12 +199,8 @@ final class JournalSegment {
      */
     JournalSegmentReader createReader() {
         checkOpen();
-        acquire();
 
-        final var buffer = writer.buffer();
-        final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
-            : new DiskFileReader(file, maxEntrySize);
-        final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
+        final var reader = new JournalSegmentReader(this, acquire().access().newFileReader(), maxEntrySize);
         reader.setPosition(JournalSegmentDescriptor.BYTES);
         readers.add(reader);
         return reader;
@@ -216,7 +252,6 @@ final class JournalSegment {
     }
 
     private void finishClose() {
-        writer.close();
         try {
             file.close();
         } catch (IOException e) {
@@ -246,12 +281,12 @@ final class JournalSegment {
             .toString();
     }
 
-    static int indexEntries(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
-            final JournalIndex journalIndex, final long maxNextIndex, final @Nullable Position start) {
+    static int indexEntries(final FileWriter fileWriter, final JournalSegment segment, final JournalIndex journalIndex,
+            final long maxNextIndex, final @Nullable Position start) {
         // acquire ownership of cache and make sure reader does not see anything we've done once we're done
         final var fileReader = fileWriter.reader();
         try {
-            return indexEntries(fileReader, segment, maxEntrySize, journalIndex, maxNextIndex, start);
+            return indexEntries(fileReader, segment, fileWriter.maxEntrySize(), journalIndex, maxNextIndex, start);
         } finally {
             // Make sure reader does not see anything we've done
             fileReader.invalidateCache();