Update DataJournal interface 97/93097/4
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 16 Oct 2020 13:17:50 +0000 (15:17 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 16 Oct 2020 14:38:22 +0000 (16:38 +0200)
The interface is not documented and we need to differentiate between
indices as viewed from segmented journal and the persistence journal
views.

JIRA: CONTROLLER-1954
Change-Id: Ibbc384b88f8d5567e9af18d83b9a84f7c9b9634c
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournal.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java

index 3f746900c917ca0a2a7199bf7cf40ced1dcfaa75..678749b1c16cf9a4ddd9d0a052d763b70541ab19 100644 (file)
@@ -42,15 +42,44 @@ abstract class DataJournal {
         }
     }
 
         }
     }
 
-    abstract long lastWrittenIndex();
+    /**
+     * Return the last sequence number completely written to the journal.
+     *
+     * @return Last written sequence number, {@code -1} if there are no in the journal.
+     */
+    abstract long lastWrittenSequenceNr();
 
 
-    abstract void commitTo(long index);
+    /**
+     * Delete all messages up to specified sequence number.
+     *
+     * @param sequenceNr Sequence number to delete to.
+     */
+    abstract void deleteTo(long sequenceNr);
 
 
-    abstract void compactTo(long index);
+    /**
+     * Delete all messages up to specified sequence number.
+     *
+     * @param sequenceNr Sequence number to compact to.
+     */
+    abstract void compactTo(long sequenceNr);
 
 
+    /**
+     * Close this journal, freeing up resources associated with it.
+     */
     abstract void close();
 
     abstract void close();
 
-    abstract void handleReplayMessages(ReplayMessages message, long from);
+    /**
+     * Handle a request to replay messages.
+     *
+     * @param message Request message
+     * @param fromSequenceNr Sequence number to replay from, adjusted for deletions
+     */
+    abstract void handleReplayMessages(@NonNull ReplayMessages message, long fromSequenceNr);
 
 
-    abstract void handleWriteMessages(WriteMessages message);
+    /**
+     * Handle a request to store some messages.
+     *
+     * @param message Request message
+     */
+    abstract void handleWriteMessages(@NonNull WriteMessages message);
 }
 }
index 0713c0212a627eae426b9e551e08b5f8b0696e1c..6899c6e1d652d518dd49ec15e1692ac409b51661 100644 (file)
@@ -20,6 +20,9 @@ import io.atomix.storage.journal.JournalSegment;
  * @author Robert Varga
  */
 abstract class DataJournalEntry {
  * @author Robert Varga
  */
 abstract class DataJournalEntry {
+    /**
+     * A single data journal entry on its way to the backing file.
+     */
     static final class ToPersistence extends DataJournalEntry {
         private final PersistentRepr repr;
 
     static final class ToPersistence extends DataJournalEntry {
         private final PersistentRepr repr;
 
@@ -32,6 +35,9 @@ abstract class DataJournalEntry {
         }
     }
 
         }
     }
 
+    /**
+     * A single data journal entry on its way from the backing file.
+     */
     static final class FromPersistence extends DataJournalEntry {
         private final String manifest;
         private final String writerUuid;
     static final class FromPersistence extends DataJournalEntry {
         private final String manifest;
         private final String writerUuid;
index 766f2fac05e8e46dd08f76ecf80043aa51b5f6e0..bc5eead800c938ab5bde4eb55d5dc732da234776 100644 (file)
@@ -52,18 +52,18 @@ final class DataJournalV0 extends DataJournal {
     }
 
     @Override
     }
 
     @Override
-    long lastWrittenIndex() {
+    long lastWrittenSequenceNr() {
         return entries.writer().getLastIndex();
     }
 
     @Override
         return entries.writer().getLastIndex();
     }
 
     @Override
-    void commitTo(final long index) {
-        entries.writer().commit(index);
+    void deleteTo(final long sequenceNr) {
+        entries.writer().commit(sequenceNr);
     }
 
     @Override
     }
 
     @Override
-    void compactTo(final long index) {
-        entries.compact(index);
+    void compactTo(final long sequenceNr) {
+        entries.compact(sequenceNr + 1);
     }
 
     @Override
     }
 
     @Override
@@ -73,8 +73,8 @@ final class DataJournalV0 extends DataJournal {
 
     @Override
     @SuppressWarnings("checkstyle:illegalCatch")
 
     @Override
     @SuppressWarnings("checkstyle:illegalCatch")
-    void handleReplayMessages(final ReplayMessages message, final long from) {
-        try (SegmentedJournalReader<DataJournalEntry> reader = entries.openReader(from)) {
+    void handleReplayMessages(final ReplayMessages message, final long fromSequenceNr) {
+        try (SegmentedJournalReader<DataJournalEntry> reader = entries.openReader(fromSequenceNr)) {
             int count = 0;
             while (reader.hasNext() && count < message.max) {
                 final Indexed<DataJournalEntry> next = reader.next();
             int count = 0;
             while (reader.hasNext() && count < message.max) {
                 final Indexed<DataJournalEntry> next = reader.next();
index 41117c8e32754f47a7b7a258cd803d31d83eedab..e5c5b7807b0b9ea016cdc4aa0835f49daf5ff8dd 100644 (file)
@@ -239,7 +239,7 @@ final class SegmentedJournalActor extends AbstractActor {
         ensureOpen();
 
         LOG.debug("{}: delete messages {}", persistenceId, message);
         ensureOpen();
 
         LOG.debug("{}: delete messages {}", persistenceId, message);
-        final long to = Long.min(dataJournal.lastWrittenIndex(), message.toSequenceNr);
+        final long to = Long.min(dataJournal.lastWrittenSequenceNr(), message.toSequenceNr);
         LOG.debug("{}: adjusted delete to {}", persistenceId, to);
 
         if (lastDelete < to) {
         LOG.debug("{}: adjusted delete to {}", persistenceId, to);
 
         if (lastDelete < to) {
@@ -249,10 +249,10 @@ final class SegmentedJournalActor extends AbstractActor {
             final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
             final Indexed<Long> entry = deleteWriter.append(lastDelete);
             deleteWriter.commit(entry.index());
             final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
             final Indexed<Long> entry = deleteWriter.append(lastDelete);
             deleteWriter.commit(entry.index());
-            dataJournal.commitTo(lastDelete);
+            dataJournal.deleteTo(lastDelete);
 
             LOG.debug("{}: compaction started", persistenceId);
 
             LOG.debug("{}: compaction started", persistenceId);
-            dataJournal.compactTo(lastDelete + 1);
+            dataJournal.compactTo(lastDelete);
             deleteJournal.compact(entry.index());
             LOG.debug("{}: compaction finished", persistenceId);
         } else {
             deleteJournal.compact(entry.index());
             LOG.debug("{}: compaction finished", persistenceId);
         } else {
@@ -267,7 +267,7 @@ final class SegmentedJournalActor extends AbstractActor {
         final Long sequence;
         if (directory.isDirectory()) {
             ensureOpen();
         final Long sequence;
         if (directory.isDirectory()) {
             ensureOpen();
-            sequence = dataJournal.lastWrittenIndex();
+            sequence = dataJournal.lastWrittenSequenceNr();
         } else {
             sequence = 0L;
         }
         } else {
             sequence = 0L;
         }
@@ -276,7 +276,6 @@ final class SegmentedJournalActor extends AbstractActor {
         message.promise.success(sequence);
     }
 
         message.promise.success(sequence);
     }
 
-    @SuppressWarnings("checkstyle:illegalCatch")
     private void handleReplayMessages(final ReplayMessages message) {
         LOG.debug("{}: replaying messages {}", persistenceId, message);
         ensureOpen();
     private void handleReplayMessages(final ReplayMessages message) {
         LOG.debug("{}: replaying messages {}", persistenceId, message);
         ensureOpen();
@@ -291,12 +290,12 @@ final class SegmentedJournalActor extends AbstractActor {
         ensureOpen();
 
         final long startTicks = System.nanoTime();
         ensureOpen();
 
         final long startTicks = System.nanoTime();
-        final long start = dataJournal.lastWrittenIndex();
+        final long start = dataJournal.lastWrittenSequenceNr();
 
         dataJournal.handleWriteMessages(message);
 
         batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
 
         dataJournal.handleWriteMessages(message);
 
         batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
-        messageWriteCount.mark(dataJournal.lastWrittenIndex() - start);
+        messageWriteCount.mark(dataJournal.lastWrittenSequenceNr() - start);
     }
 
     private void handleUnknown(final Object message) {
     }
 
     private void handleUnknown(final Object message) {
@@ -316,8 +315,8 @@ final class SegmentedJournalActor extends AbstractActor {
 
         dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
             maxEntrySize, maxSegmentSize);
 
         dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
             maxEntrySize, maxSegmentSize);
-        dataJournal.commitTo(lastDelete);
-        LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, dataJournal.lastWrittenIndex(),
-            lastDelete);
+        dataJournal.deleteTo(lastDelete);
+        LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId,
+            dataJournal.lastWrittenSequenceNr(), lastDelete);
     }
 }
     }
 }