Split out JournalSegmentDescriptor.readFrom() 22/111622/2
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 5 May 2024 12:45:54 +0000 (14:45 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 5 May 2024 13:52:19 +0000 (15:52 +0200)
We have a user which reads a one-time representation from a file. Split
out that use case into a dedicated factory method.

JIRA: CONTROLLER-2107
Change-Id: I11c335ce03aa59c7796d9c2c8e78801c5b2754c0
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentDescriptor.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java

index 757ca3a078d2235866f4a921dd0812af1858dff5..587271a0889c9ff110f52f75d36c45ec8b0d15d3 100644 (file)
  */
 package io.atomix.storage.journal;
 
-import com.google.common.annotations.VisibleForTesting;
+import static java.util.Objects.requireNonNull;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import java.io.IOException;
 import java.nio.ByteBuffer;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static java.util.Objects.requireNonNull;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import org.eclipse.jdt.annotation.NonNull;
 
 /**
  * Stores information about a {@link JournalSegment} of the log.
@@ -119,6 +123,24 @@ public final class JournalSegmentDescriptor {
     this.locked = buffer.get() == 1;
   }
 
+  /**
+   * Read a JournalSegmentDescriptor from a {@link Path}.
+   *
+   * @param path path to read from
+   * @return A {@link JournalSegmentDescriptor}
+   * @throws IOException if an I/O error occurs or there is not enough data
+   */
+  public static @NonNull JournalSegmentDescriptor readFrom(final Path path) throws IOException {
+      try (var channel = FileChannel.open(path, StandardOpenOption.READ)) {
+          final var buffer = ByteBuffer.allocate(BYTES);
+          final var readBytes = channel.read(buffer);
+          if (readBytes != BYTES) {
+              throw new IOException("Need " + BYTES + " bytes, only " + readBytes + " available");
+          }
+          return new JournalSegmentDescriptor(buffer.flip());
+      }
+  }
+
   /**
    * Returns the segment version.
    * <p>
@@ -211,7 +233,7 @@ public final class JournalSegmentDescriptor {
 
   @Override
   public String toString() {
-    return toStringHelper(this)
+    return MoreObjects.toStringHelper(this)
         .add("version", version)
         .add("id", id)
         .add("index", index)
index 4b2f0334aa82eefd1457b331ebbb5e74ff673be5..5f8519865b816541a5e70cd1d3bda1c0bc9c960d 100644 (file)
@@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -468,32 +467,6 @@ public final class SegmentedJournal<E> implements Journal<E> {
     return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity);
   }
 
-  /**
-   * Loads a segment.
-   */
-  private JournalSegment loadSegment(long segmentId) {
-    File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
-    ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
-    try (FileChannel channel = openChannel(segmentFile)) {
-      channel.read(buffer);
-      buffer.flip();
-      JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
-      JournalSegment segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
-      LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
-      return segment;
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
-  }
-
-  private FileChannel openChannel(File file) {
-    try {
-      return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
-  }
-
   /**
    * Loads all segments from disk.
    *
@@ -503,29 +476,27 @@ public final class SegmentedJournal<E> implements Journal<E> {
     // Ensure log directories are created.
     directory.mkdirs();
 
-    TreeMap<Long, JournalSegment> segments = new TreeMap<>();
+    final var segments = new TreeMap<Long, JournalSegment>();
 
     // Iterate through all files in the log directory.
-    for (File file : directory.listFiles(File::isFile)) {
+    for (var file : directory.listFiles(File::isFile)) {
 
       // If the file looks like a segment file, attempt to load the segment.
       if (JournalSegmentFile.isSegmentFile(name, file)) {
-        JournalSegmentFile segmentFile = new JournalSegmentFile(file);
-        ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
-        try (FileChannel channel = openChannel(file)) {
-          channel.read(buffer);
-          buffer.flip();
+        // read the descriptor
+        final JournalSegmentDescriptor descriptor;
+        try {
+          descriptor = JournalSegmentDescriptor.readFrom(file.toPath());
         } catch (IOException e) {
           throw new StorageException(e);
         }
 
-        JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
-
         // Load the segment.
-        JournalSegment segment = loadSegment(descriptor.id());
+        final var segmentFile = new JournalSegmentFile(file);
+        final var segment = newSegment(segmentFile, descriptor);
+        LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), file.getName());
 
         // Add the segment to the segments list.
-        LOG.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
         segments.put(segment.firstIndex(), segment);
       }
     }
@@ -533,11 +504,12 @@ public final class SegmentedJournal<E> implements Journal<E> {
     // Verify that all the segments in the log align with one another.
     JournalSegment previousSegment = null;
     boolean corrupted = false;
-    Iterator<Map.Entry<Long, JournalSegment>> iterator = segments.entrySet().iterator();
+    final var iterator = segments.entrySet().iterator();
     while (iterator.hasNext()) {
-      JournalSegment segment = iterator.next().getValue();
+      final var segment = iterator.next().getValue();
       if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
-        LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
+        LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(),
+            previousSegment.file().file());
         corrupted = true;
       }
       if (corrupted) {