Fix segmented journal replay 22/80822/1
authorTomas Cere <tomas.cere@pantheon.tech>
Tue, 12 Mar 2019 09:54:47 +0000 (10:54 +0100)
committerTomas Cere <tomas.cere@pantheon.tech>
Tue, 12 Mar 2019 09:54:47 +0000 (10:54 +0100)
lastDelete was getting set with the incorrect value from the delete journal.
The actual written value from the delete journal needs to be used
instead of the index.
dataJournal compact also needs to be compacted upto lastDelete + 1 since
deleteUpTo is inclusive and compact keeps this value around.

Change-Id: I4a678be67fd1ad09c57273ef0fc3b7398a7d714f
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java
opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java

index 6cc5d9e..c28a58c 100644 (file)
@@ -236,7 +236,7 @@ final class SegmentedJournalActor extends AbstractActor {
             dataJournal.writer().commit(lastDelete);
 
             LOG.debug("{}: compaction started", persistenceId);
-            dataJournal.compact(lastDelete);
+            dataJournal.compact(lastDelete + 1);
             deleteJournal.compact(entry.index());
             LOG.debug("{}: compaction finished", persistenceId);
         } else {
@@ -358,7 +358,7 @@ final class SegmentedJournalActor extends AbstractActor {
         deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
                 .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
         final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
-        lastDelete = lastEntry == null ? 0 : lastEntry.index();
+        lastDelete = lastEntry == null ? 0 : lastEntry.entry();
 
         dataJournal = SegmentedJournal.<DataJournalEntry>builder()
                 .withStorageLevel(storage).withDirectory(directory).withName("data")
index 6939dbc..7db0d4b 100644 (file)
@@ -132,6 +132,75 @@ public class SegmentedFileJournalTest {
         assertFileCount(1, 1);
     }
 
+    @Test
+    public void testComplexDeletesAndPartialReplays() throws Exception {
+        for (int i = 0; i <= 4; i++) {
+            writeBigPaylod();
+        }
+
+        assertFileCount(10, 1);
+
+        // delete including index 3, so get rid of the first segment
+        deleteEntries(3);
+        assertFileCount(9, 1);
+
+        // get rid of segments 2(index 4-6) and 3(index 7-9)
+        deleteEntries(9);
+        assertFileCount(7, 1);
+
+        // get rid of all segments except the last one
+        deleteEntries(27);
+        assertFileCount(1, 1);
+
+        restartActor();
+
+        // Check if state is retained
+        assertHighestSequenceNr(30);
+        // 28,29,30 replayed
+        assertReplayCount(3);
+
+
+        deleteEntries(28);
+        restartActor();
+
+        assertHighestSequenceNr(30);
+        // 29,30 replayed
+        assertReplayCount(2);
+
+        deleteEntries(29);
+        restartActor();
+
+        // 30 replayed
+        assertReplayCount(1);
+
+        deleteEntries(30);
+        restartActor();
+
+        // nothing replayed
+        assertReplayCount(0);
+    }
+
+    private void restartActor() {
+        actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        actor = actor();
+    }
+
+    private void writeBigPaylod() {
+        final LargePayload payload = new LargePayload();
+
+        final WriteMessages write = new WriteMessages();
+        final List<Future<Optional<Exception>>> requests = new ArrayList<>();
+
+        // Each payload is half of segment size, plus some overhead, should result in two segments being present
+        for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) {
+            requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(),
+                    "uuid"))));
+        }
+
+        actor.tell(write, ActorRef.noSender());
+        requests.forEach(future -> assertFalse(getFuture(future).isPresent()));
+    }
+
     private ActorRef actor() {
         return kit.childActorOf(SegmentedJournalActor.props("foo", DIRECTORY, StorageLevel.DISK, MESSAGE_SIZE,
             SEGMENT_SIZE).withDispatcher(CallingThreadDispatcher.Id()));

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.