Use eliminate magic JournalSegmentWriter constructor 63/111663/2
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 8 May 2024 16:11:14 +0000 (18:11 +0200)
committerRobert Varga <nite@hq.sk>
Wed, 8 May 2024 17:09:14 +0000 (17:09 +0000)
Rather than performing internal initialization, move index entries to
acquire current position. This way the writer is always properly
initialized.

JIRA: CONTROLLER-2100
Change-Id: I4c6459d31438379d58dc1c8e769dca368565eaf5
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java

index 844a1c9214d98ca641ea98f3d760305f9973d421..3f7783ba44cda217ee718450b2c3036ddefbb30d 100644 (file)
@@ -16,6 +16,7 @@
  */
 package io.atomix.storage.journal;
 
+import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.MoreObjects;
@@ -37,207 +38,254 @@ import org.slf4j.LoggerFactory;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 final class JournalSegment {
-  private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
-
-  private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
-  private final AtomicInteger references = new AtomicInteger();
-  private final JournalSegmentFile file;
-  private final StorageLevel storageLevel;
-  private final int maxEntrySize;
-  private final JournalIndex journalIndex;
-
-  private JournalSegmentWriter writer;
-  private boolean open = true;
-
-  JournalSegment(
-      final JournalSegmentFile file,
-      final StorageLevel storageLevel,
-      final int maxEntrySize,
-      final double indexDensity) {
-    this.file = requireNonNull(file);
-    this.storageLevel = requireNonNull(storageLevel);
-    this.maxEntrySize = maxEntrySize;
-    journalIndex = new SparseJournalIndex(indexDensity);
-
-    final var fileWriter = switch (storageLevel) {
-        case DISK -> new DiskFileWriter(file, maxEntrySize);
-        case MAPPED -> new MappedFileWriter(file, maxEntrySize);
-    };
-    writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex)
-        // relinquish mapped memory
-        .toFileChannel();
-  }
-
-  /**
-   * Returns the segment's starting index.
-   *
-   * @return The segment's starting index.
-   */
-  long firstIndex() {
-    return file.firstIndex();
-  }
-
-  /**
-   * Returns the last index in the segment.
-   *
-   * @return The last index in the segment.
-   */
-  long lastIndex() {
-    final var lastPosition = journalIndex.last();
-    return lastPosition != null ? lastPosition.index() : firstIndex() - 1;
-  }
-
-  /**
-   * Returns the segment file.
-   *
-   * @return The segment file.
-   */
-  JournalSegmentFile file() {
-    return file;
-  }
-
-  /**
-   * Looks up the position of the given index.
-   *
-   * @param index the index to lookup
-   * @return the position of the given index or a lesser index, or {@code null}
-   */
-  @Nullable Position lookup(final long index) {
-    return journalIndex.lookup(index);
-  }
-
-  /**
-   * Acquires a reference to the log segment.
-   */
-  private void acquire() {
-    if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
-      writer = writer.toMapped();
-    }
-  }
-
-  /**
-   * Releases a reference to the log segment.
-   */
-  private void release() {
-    if (references.decrementAndGet() == 0) {
-      if (storageLevel == StorageLevel.MAPPED) {
-        writer = writer.toFileChannel();
-      }
-      if (!open) {
-        finishClose();
-      }
-    }
-  }
-
-  /**
-   * Acquires a reference to the segment writer.
-   *
-   * @return The segment writer.
-   */
-  JournalSegmentWriter acquireWriter() {
-    checkOpen();
-    acquire();
-
-    return writer;
-  }
-
-  /**
-   * Releases the reference to the segment writer.
-   */
-  void releaseWriter() {
-      release();
-  }
-
-  /**
-   * Creates a new segment reader.
-   *
-   * @return A new segment reader.
-   */
-  JournalSegmentReader createReader() {
-    checkOpen();
-    acquire();
-
-    final var buffer = writer.buffer();
-    final var fileReader = buffer != null ? new MappedFileReader(file, buffer) : new DiskFileReader(file, maxEntrySize);
-    final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
-    reader.setPosition(JournalSegmentDescriptor.BYTES);
-    readers.add(reader);
-    return reader;
-  }
-
-  /**
-   * Closes a segment reader.
-   *
-   * @param reader the closed segment reader
-   */
-  void closeReader(final JournalSegmentReader reader) {
-    if (readers.remove(reader)) {
-      release();
-    }
-  }
-
-  /**
-   * Checks whether the segment is open.
-   */
-  private void checkOpen() {
-    if (!open) {
-      throw new IllegalStateException("Segment not open");
-    }
-  }
-
-  /**
-   * Returns a boolean indicating whether the segment is open.
-   *
-   * @return indicates whether the segment is open
-   */
-  boolean isOpen() {
-    return open;
-  }
-
-  /**
-   * Closes the segment.
-   */
-  void close() {
-    if (!open) {
-      return;
-    }
-
-    LOG.debug("Closing segment: {}", this);
-    open = false;
-    readers.forEach(JournalSegmentReader::close);
-    if (references.get() == 0) {
-      finishClose();
-    }
-  }
-
-  private void finishClose() {
-    writer.close();
-    try {
-      file.close();
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
-  }
-
-  /**
-   * Deletes the segment.
-   */
-  void delete() {
-    close();
-    LOG.debug("Deleting segment: {}", this);
-    try {
-      Files.deleteIfExists(file.path());
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-      .add("id", file.segmentId())
-      .add("version", file.version())
-      .add("index", file.firstIndex())
-      .toString();
-  }
+    private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
+
+    private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
+    private final AtomicInteger references = new AtomicInteger();
+    private final JournalSegmentFile file;
+    private final StorageLevel storageLevel;
+    private final int maxEntrySize;
+    private final JournalIndex journalIndex;
+
+    private JournalSegmentWriter writer;
+    private boolean open = true;
+
+    JournalSegment(
+        final JournalSegmentFile file,
+        final StorageLevel storageLevel,
+        final int maxEntrySize,
+        final double indexDensity) {
+        this.file = requireNonNull(file);
+        this.storageLevel = requireNonNull(storageLevel);
+        this.maxEntrySize = maxEntrySize;
+        journalIndex = new SparseJournalIndex(indexDensity);
+
+        final var fileWriter = switch (storageLevel) {
+            case DISK -> new DiskFileWriter(file, maxEntrySize);
+            case MAPPED -> new MappedFileWriter(file, maxEntrySize);
+        };
+
+        // Traverse all entries and push them to index -- thus reconstructing both last index and current position
+        writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex,
+            indexEntries(fileWriter, this, maxEntrySize, journalIndex, Long.MAX_VALUE, null))
+            // relinquish mapped memory
+            .toFileChannel();
+    }
+
+    /**
+     * Returns the segment's starting index.
+     *
+     * @return The segment's starting index.
+     */
+    long firstIndex() {
+        return file.firstIndex();
+    }
+
+    /**
+     * Returns the last index in the segment.
+     *
+     * @return The last index in the segment.
+     */
+    long lastIndex() {
+        final var lastPosition = journalIndex.last();
+        return lastPosition != null ? lastPosition.index() : firstIndex() - 1;
+    }
+
+    /**
+     * Returns the segment file.
+     *
+     * @return The segment file.
+     */
+    JournalSegmentFile file() {
+        return file;
+    }
+
+    /**
+     * Looks up the position of the given index.
+     *
+     * @param index the index to lookup
+     * @return the position of the given index or a lesser index, or {@code null}
+     */
+    @Nullable Position lookup(final long index) {
+        return journalIndex.lookup(index);
+    }
+
+    /**
+     * Acquires a reference to the log segment.
+     */
+    private void acquire() {
+        if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
+            writer = writer.toMapped();
+        }
+    }
+
+    /**
+     * Releases a reference to the log segment.
+     */
+    private void release() {
+        if (references.decrementAndGet() == 0) {
+            if (storageLevel == StorageLevel.MAPPED) {
+                writer = writer.toFileChannel();
+            }
+            if (!open) {
+                finishClose();
+            }
+        }
+    }
+
+    /**
+     * Acquires a reference to the segment writer.
+     *
+     * @return The segment writer.
+     */
+    JournalSegmentWriter acquireWriter() {
+        checkOpen();
+        acquire();
+
+        return writer;
+    }
+
+    /**
+     * Releases the reference to the segment writer.
+     */
+    void releaseWriter() {
+        release();
+    }
+
+    /**
+     * Creates a new segment reader.
+     *
+     * @return A new segment reader.
+     */
+    JournalSegmentReader createReader() {
+        checkOpen();
+        acquire();
+
+        final var buffer = writer.buffer();
+        final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
+            : new DiskFileReader(file, maxEntrySize);
+        final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
+        reader.setPosition(JournalSegmentDescriptor.BYTES);
+        readers.add(reader);
+        return reader;
+    }
+
+    /**
+     * Closes a segment reader.
+     *
+     * @param reader the closed segment reader
+     */
+    void closeReader(final JournalSegmentReader reader) {
+        if (readers.remove(reader)) {
+            release();
+        }
+    }
+
+    /**
+     * Checks whether the segment is open.
+     */
+    private void checkOpen() {
+        if (!open) {
+            throw new IllegalStateException("Segment not open");
+        }
+    }
+
+    /**
+     * Returns a boolean indicating whether the segment is open.
+     *
+     * @return indicates whether the segment is open
+     */
+    boolean isOpen() {
+        return open;
+    }
+
+    /**
+     * Closes the segment.
+     */
+    void close() {
+        if (!open) {
+            return;
+        }
+
+        LOG.debug("Closing segment: {}", this);
+        open = false;
+        readers.forEach(JournalSegmentReader::close);
+        if (references.get() == 0) {
+            finishClose();
+        }
+    }
+
+    private void finishClose() {
+        writer.close();
+        try {
+            file.close();
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
+    }
+
+    /**
+     * Deletes the segment.
+     */
+    void delete() {
+        close();
+        LOG.debug("Deleting segment: {}", this);
+        try {
+            Files.deleteIfExists(file.path());
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("id", file.segmentId())
+            .add("version", file.version())
+            .add("index", file.firstIndex())
+            .toString();
+    }
+
+    static int indexEntries(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
+            final JournalIndex journalIndex, final long maxNextIndex, final @Nullable Position start) {
+        // acquire ownership of cache and make sure reader does not see anything we've done once we're done
+        final var fileReader = fileWriter.reader();
+        try {
+            return indexEntries(fileReader, segment, maxEntrySize, journalIndex, maxNextIndex, start);
+        } finally {
+            // Make sure reader does not see anything we've done
+            fileReader.invalidateCache();
+        }
+    }
+
+    private static int indexEntries(final FileReader fileReader, final JournalSegment segment, final int maxEntrySize,
+            final JournalIndex journalIndex, final long maxNextIndex, final @Nullable Position start) {
+        int position;
+        long nextIndex;
+        if (start != null) {
+            // look from nearest recovered index
+            nextIndex = start.index();
+            position = start.position();
+        } else {
+            // look from very beginning of the segment
+            nextIndex = segment.firstIndex();
+            position = JournalSegmentDescriptor.BYTES;
+        }
+
+        final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
+        reader.setPosition(position);
+
+        while (nextIndex <= maxNextIndex) {
+            final var buf = reader.readBytes();
+            if (buf == null) {
+                break;
+            }
+
+            journalIndex.index(nextIndex++, position);
+            // Update the current position for indexing.
+            position += HEADER_BYTES + buf.readableBytes();
+        }
+
+        return position;
+    }
 }
index f9e13274059933aae38f2dc192e38fda93911ac1..0ffe8bea6c2909cf887d4a2e7f84ddef1e81bfd7 100644 (file)
@@ -40,15 +40,13 @@ final class JournalSegmentWriter {
     private int currentPosition;
 
     JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
-            final JournalIndex journalIndex) {
+            final JournalIndex journalIndex, final int currentPosition) {
         this.fileWriter = requireNonNull(fileWriter);
         this.segment = requireNonNull(segment);
         this.journalIndex = requireNonNull(journalIndex);
-        maxSegmentSize = segment.file().maxSize();
         this.maxEntrySize = maxEntrySize;
-
-        // recover currentPosition and lastIndex
-        reset(Long.MAX_VALUE, null);
+        this.currentPosition = currentPosition;
+        maxSegmentSize = segment.file().maxSize();
     }
 
     JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
@@ -122,55 +120,14 @@ final class JournalSegmentWriter {
         // Truncate the index, find nearest indexed entry
         final var nearest = journalIndex.truncate(index);
 
-        // recover position and last written
-        if (index >= segment.firstIndex()) {
-            reset(index, nearest);
-        } else {
-            currentPosition = JournalSegmentDescriptor.BYTES;
-        }
+        currentPosition = index < segment.firstIndex() ? JournalSegmentDescriptor.BYTES
+            // recover position and last written
+            : JournalSegment.indexEntries(fileWriter, segment, maxEntrySize, journalIndex, index, nearest);
 
         // Zero the entry header at current channel position.
         fileWriter.writeEmptyHeader(currentPosition);
     }
 
-    private void reset(final long maxNextIndex, final @Nullable Position position) {
-        // acquire ownership of cache and make sure reader does not see anything we've done once we're done
-        final var fileReader = fileWriter.reader();
-        try {
-            reset(fileReader, maxNextIndex, position);
-        } finally {
-            // Make sure reader does not see anything we've done
-            fileReader.invalidateCache();
-        }
-    }
-
-    private void reset(final FileReader fileReader, final long maxNextIndex, final @Nullable Position position) {
-        long nextIndex;
-        if (position != null) {
-            // look from nearest recovered index
-            nextIndex = position.index();
-            currentPosition = position.position();
-        } else {
-            // look from very beginning of the segment
-            nextIndex = segment.firstIndex();
-            currentPosition = JournalSegmentDescriptor.BYTES;
-        }
-
-        final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
-        reader.setPosition(currentPosition);
-
-        while (nextIndex <= maxNextIndex) {
-            final var buf = reader.readBytes();
-            if (buf == null) {
-                break;
-            }
-
-            journalIndex.index(nextIndex++, currentPosition);
-            // Update the current position for indexing.
-            currentPosition += HEADER_BYTES + buf.readableBytes();
-        }
-    }
-
     /**
      * Flushes written entries to disk.
      */