Move JournalWriter.getLastIndex() 47/111647/4
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 6 May 2024 19:51:31 +0000 (21:51 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 8 May 2024 01:15:28 +0000 (03:15 +0200)
Last written index is a property of a particular Journal, not of a
writer -- and now that we maintain this in the index, we can make
shortcuts.

This also removes a source of confusion, as we have two methods taking
a 'long index' and performing some writer adjustments:
- reset(long) is equivalent of setNextIndex()
- truncate(long) is equivalent of setLastIndex()

Change-Id: I1bc4b5d1b3052c2b35808b8ec4ea2d88dcfca593
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
13 files changed:
atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java
atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java
atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java

index baaa6b0ba9632013300c12e2d40af81db61f9371..dc3e75bcde0888fce6e8675696550c0c3ccd7a91 100644 (file)
@@ -23,6 +23,13 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
  */
 @NonNullByDefault
 public interface ByteBufJournal extends AutoCloseable {
+    /**
+     * Return the index of the last entry in the journal.
+     *
+     * @return the last index, or zero if there are no entries.
+     */
+    long lastIndex();
+
     /**
      * Returns the journal writer.
      *
index 2f85cc6163522b18ac9e8465ec1a99b45a629c67..58f9d35291da5d5e9b44869897545e627c9ea896 100644 (file)
@@ -23,13 +23,6 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
  */
 @NonNullByDefault
 public interface ByteBufWriter {
-    /**
-     * Returns the last written index.
-     *
-     * @return The last written index
-     */
-    long lastIndex();
-
     /**
      * Returns the next index to be written.
      *
index 93ae0a565bcaa7e1d941c703490f069273329daa..39be7d4d5f7338019f4d41b84ec8e9dd962fae0d 100644 (file)
@@ -23,6 +23,13 @@ import io.atomix.storage.journal.JournalReader.Mode;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 public interface Journal<E> extends AutoCloseable {
+    /**
+     * Return the index of the last entry in the journal.
+     *
+     * @return the last index, or zero if there are no entries.
+     */
+    long lastIndex();
+
     /**
      * Returns the journal writer.
      *
index 2128b87e20241021235337050be23f3276abb1a0..844a1c9214d98ca641ea98f3d760305f9973d421 100644 (file)
@@ -39,12 +39,12 @@ import org.slf4j.LoggerFactory;
 final class JournalSegment {
   private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
 
+  private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
+  private final AtomicInteger references = new AtomicInteger();
   private final JournalSegmentFile file;
   private final StorageLevel storageLevel;
   private final int maxEntrySize;
   private final JournalIndex journalIndex;
-  private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
-  private final AtomicInteger references = new AtomicInteger();
 
   private JournalSegmentWriter writer;
   private boolean open = true;
@@ -83,7 +83,8 @@ final class JournalSegment {
    * @return The last index in the segment.
    */
   long lastIndex() {
-    return writer.getLastIndex();
+    final var lastPosition = journalIndex.last();
+    return lastPosition != null ? lastPosition.index() : firstIndex() - 1;
   }
 
   /**
index b18371f04424c015ce2a031dafccf7838bbd36c7..70cc790389484850e9e2e3bf0ec7379e583f5dc0 100644 (file)
@@ -18,6 +18,7 @@ package io.atomix.storage.journal;
 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
 import static java.util.Objects.requireNonNull;
 
+import io.atomix.storage.journal.StorageException.TooLarge;
 import io.atomix.storage.journal.index.JournalIndex;
 import io.atomix.storage.journal.index.Position;
 import io.netty.buffer.ByteBuf;
@@ -32,17 +33,17 @@ final class JournalSegmentWriter {
 
     private final FileWriter fileWriter;
     final @NonNull JournalSegment segment;
-    private final @NonNull JournalIndex index;
+    private final @NonNull JournalIndex journalIndex;
     final int maxSegmentSize;
     final int maxEntrySize;
 
     private int currentPosition;
 
     JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
-            final JournalIndex index) {
+            final JournalIndex journalIndex) {
         this.fileWriter = requireNonNull(fileWriter);
         this.segment = requireNonNull(segment);
-        this.index = requireNonNull(index);
+        this.journalIndex = requireNonNull(journalIndex);
         maxSegmentSize = segment.file().maxSize();
         this.maxEntrySize = maxEntrySize;
         // adjust lastEntry value
@@ -51,31 +52,21 @@ final class JournalSegmentWriter {
 
     JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
         segment = previous.segment;
-        index = previous.index;
+        journalIndex = previous.journalIndex;
         maxSegmentSize = previous.maxSegmentSize;
         maxEntrySize = previous.maxEntrySize;
         currentPosition = previous.currentPosition;
         this.fileWriter = requireNonNull(fileWriter);
     }
 
-    /**
-     * Returns the last written index.
-     *
-     * @return The last written index.
-     */
-    long getLastIndex() {
-        final var last = index.last();
-        return last != null ? last.index() : segment.firstIndex() - 1;
-    }
-
     /**
      * Returns the next index to be written.
      *
      * @return The next index to be written.
      */
-    long getNextIndex() {
-        final var last = index.last();
-        return last != null ? last.index() + 1 : segment.firstIndex();
+    long nextIndex() {
+        final var lastPosition = journalIndex.last();
+        return lastPosition != null ? lastPosition.index() + 1 : segment.firstIndex();
     }
 
     /**
@@ -87,12 +78,11 @@ final class JournalSegmentWriter {
     Position append(final ByteBuf buf) {
         final var length = buf.readableBytes();
         if (length > maxEntrySize) {
-            throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes ("
-                + maxEntrySize + ")");
+            throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
         }
 
         // Store the entry index.
-        final long index = getNextIndex();
+        final long index = nextIndex();
         final int position = currentPosition;
 
         // check space available
@@ -114,7 +104,7 @@ final class JournalSegmentWriter {
 
         // Update the last entry with the correct index/term/length.
         currentPosition = nextPosition;
-        return this.index.index(index, position);
+        return journalIndex.index(index, position);
     }
 
     /**
@@ -147,7 +137,7 @@ final class JournalSegmentWriter {
                 break;
             }
 
-            this.index.index(nextIndex++, currentPosition);
+            journalIndex.index(nextIndex++, currentPosition);
 
             // Update the current position for indexing.
             currentPosition += HEADER_BYTES + buf.readableBytes();
@@ -161,12 +151,12 @@ final class JournalSegmentWriter {
      */
     void truncate(final long index) {
         // If the index is greater than or equal to the last index, skip the truncate.
-        if (index >= getLastIndex()) {
+        if (index >= segment.lastIndex()) {
             return;
         }
 
         // Truncate the index.
-        this.index.truncate(index);
+        journalIndex.truncate(index);
 
         if (index < segment.firstIndex()) {
             // Reset the writer to the first entry.
index ae6577818594229f9c394fbf06018269fea318be..0561c99fe04579a713b12af38cdec7b2b94971e1 100644 (file)
@@ -23,13 +23,6 @@ import org.eclipse.jdt.annotation.NonNull;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 public interface JournalWriter<E> {
-    /**
-     * Returns the last written index.
-     *
-     * @return The last written index.
-     */
-    long getLastIndex();
-
     /**
      * Returns the next index to be written.
      *
index 074a5dd18297aab2d97ba22d22b470b913678493..4698a5fd60bfb548f5f25f9335ca8eab87141e70 100644 (file)
@@ -92,6 +92,11 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
             .sum();
     }
 
+    @Override
+    public long lastIndex() {
+        return lastSegment().lastIndex();
+    }
+
     @Override
     public ByteBufWriter writer() {
         return writer;
index e16e6446ebb86783eb552ec6484d461cca157e2e..c51e8a2ffae870306ce0dcb952fb019349c7f7a0 100644 (file)
@@ -35,14 +35,9 @@ final class SegmentedByteBufWriter implements ByteBufWriter {
         currentWriter = currentSegment.acquireWriter();
     }
 
-    @Override
-    public long lastIndex() {
-        return currentWriter.getLastIndex();
-    }
-
     @Override
     public long nextIndex() {
-        return currentWriter.getNextIndex();
+        return currentWriter.nextIndex();
     }
 
     @Override
index 8f0464ebe3fabc0fd8ef69cca57c1fd346bd135a..f6c976742c52573b58e07d0a26e1167ebd50eeac 100644 (file)
@@ -34,6 +34,11 @@ public final class SegmentedJournal<E> implements Journal<E> {
         writer = new SegmentedJournalWriter<>(journal.writer(), mapper);
     }
 
+    @Override
+    public long lastIndex() {
+        return journal.lastIndex();
+    }
+
     @Override
     public JournalWriter<E> writer() {
         return writer;
index 80c352ead0b33286aeca467e44496344f7ec1638..11aa6c2431f8a5b926bf8dda9815e1a551eee5fa 100644 (file)
@@ -30,11 +30,6 @@ final class SegmentedJournalWriter<E> implements JournalWriter<E> {
         this.mapper = requireNonNull(mapper);
     }
 
-    @Override
-    public long getLastIndex() {
-        return writer.lastIndex();
-    }
-
     @Override
     public long getNextIndex() {
         return writer.nextIndex();
index d4bc43d9b28298dcd620333261bfde885d372eae..026e58df7b13b407775b277a89eec4b3d7dedfca 100644 (file)
@@ -176,28 +176,34 @@ public abstract class AbstractJournalTest {
             JournalWriter<TestEntry> writer = journal.writer();
             JournalReader<TestEntry> reader = journal.openReader(1);
 
-            assertEquals(0, writer.getLastIndex());
+            assertEquals(0, journal.lastIndex());
+            assertEquals(1, writer.getNextIndex());
             writer.append(ENTRY);
             writer.append(ENTRY);
             writer.reset(1);
-            assertEquals(0, writer.getLastIndex());
+            assertEquals(0, journal.lastIndex());
+            assertEquals(1, writer.getNextIndex());
             writer.append(ENTRY);
 
             var indexed = assertNext(reader);
             assertEquals(1, indexed.index());
             writer.reset(1);
-            assertEquals(0, writer.getLastIndex());
+            assertEquals(0, journal.lastIndex());
+            assertEquals(1, writer.getNextIndex());
             indexed = writer.append(ENTRY);
-            assertEquals(1, writer.getLastIndex());
+            assertEquals(1, journal.lastIndex());
+            assertEquals(2, writer.getNextIndex());
             assertEquals(1, indexed.index());
 
             indexed = assertNext(reader);
             assertEquals(1, indexed.index());
 
             writer.truncate(0);
-            assertEquals(0, writer.getLastIndex());
+            assertEquals(0, journal.lastIndex());
+            assertEquals(1, writer.getNextIndex());
             indexed = writer.append(ENTRY);
-            assertEquals(1, writer.getLastIndex());
+            assertEquals(1, journal.lastIndex());
+            assertEquals(2, writer.getNextIndex());
             assertEquals(1, indexed.index());
 
             indexed = assertNext(reader);
index ad4c110bc893468e56159f52028a2bfa0cdd8df2..bf1700f7f04eb64dda6751e436068a662da4e097 100644 (file)
@@ -51,7 +51,7 @@ final class DataJournalV0 extends DataJournal {
 
     @Override
     long lastWrittenSequenceNr() {
-        return entries.writer().getLastIndex();
+        return entries.lastIndex();
     }
 
     @Override
@@ -120,17 +120,18 @@ final class DataJournalV0 extends DataJournal {
         long writtenBytes = 0;
 
         for (int i = 0; i < count; ++i) {
-            final long mark = writer.getLastIndex();
+            final long prevNextIndex = writer.getNextIndex();
             final var request = message.getRequest(i);
 
             final var reprs = CollectionConverters.asJava(request.payload());
-            LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), mark);
+            LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), prevNextIndex);
             try {
                 writtenBytes += writePayload(writer, reprs);
             } catch (Exception e) {
-                LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count, mark, e);
+                LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count,
+                    prevNextIndex, e);
                 responses.add(e);
-                writer.truncate(mark);
+                writer.reset(prevNextIndex);
                 continue;
             }
             responses.add(null);
index 73ffab6a053639f89b45470efab8685408efb9a3..7e285f7d0d41d3c31b23c64b84304606aee2d226 100644 (file)
@@ -494,9 +494,13 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
         }
 
         final var sw = Stopwatch.createStarted();
-        deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
-                .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
-        final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.writer().getLastIndex())
+        deleteJournal = SegmentedJournal.<Long>builder()
+            .withDirectory(directory)
+            .withName("delete")
+            .withNamespace(DELETE_NAMESPACE)
+            .withMaxSegmentSize(DELETE_SEGMENT_SIZE)
+            .build();
+        final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
             .tryNext((index, value, length) -> value);
         lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered.longValue();