Fix segmented journal replay 22/80822/1
authorTomas Cere <tomas.cere@pantheon.tech>
Tue, 12 Mar 2019 09:54:47 +0000 (10:54 +0100)
committerTomas Cere <tomas.cere@pantheon.tech>
Tue, 12 Mar 2019 09:54:47 +0000 (10:54 +0100)
lastDelete was getting set with the incorrect value from the delete journal.
The actual written value from the delete journal needs to be used
instead of the index.
dataJournal compact also needs to be compacted upto lastDelete + 1 since
deleteUpTo is inclusive and compact keeps this value around.

Change-Id: I4a678be67fd1ad09c57273ef0fc3b7398a7d714f
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java
opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java

index 6cc5d9e672b37c7372928b855ee91a19bf5c6da0..c28a58c8bc254642aeb5286eb09322f9a46d607a 100644 (file)
@@ -236,7 +236,7 @@ final class SegmentedJournalActor extends AbstractActor {
             dataJournal.writer().commit(lastDelete);
 
             LOG.debug("{}: compaction started", persistenceId);
-            dataJournal.compact(lastDelete);
+            dataJournal.compact(lastDelete + 1);
             deleteJournal.compact(entry.index());
             LOG.debug("{}: compaction finished", persistenceId);
         } else {
@@ -358,7 +358,7 @@ final class SegmentedJournalActor extends AbstractActor {
         deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
                 .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
         final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
-        lastDelete = lastEntry == null ? 0 : lastEntry.index();
+        lastDelete = lastEntry == null ? 0 : lastEntry.entry();
 
         dataJournal = SegmentedJournal.<DataJournalEntry>builder()
                 .withStorageLevel(storage).withDirectory(directory).withName("data")
index 6939dbcba78e2cfba07b7db2017cf3be37180d01..7db0d4b87e0da205f4a227385d01ec46e4cae6a6 100644 (file)
@@ -132,6 +132,75 @@ public class SegmentedFileJournalTest {
         assertFileCount(1, 1);
     }
 
+    @Test
+    public void testComplexDeletesAndPartialReplays() throws Exception {
+        for (int i = 0; i <= 4; i++) {
+            writeBigPaylod();
+        }
+
+        assertFileCount(10, 1);
+
+        // delete including index 3, so get rid of the first segment
+        deleteEntries(3);
+        assertFileCount(9, 1);
+
+        // get rid of segments 2(index 4-6) and 3(index 7-9)
+        deleteEntries(9);
+        assertFileCount(7, 1);
+
+        // get rid of all segments except the last one
+        deleteEntries(27);
+        assertFileCount(1, 1);
+
+        restartActor();
+
+        // Check if state is retained
+        assertHighestSequenceNr(30);
+        // 28,29,30 replayed
+        assertReplayCount(3);
+
+
+        deleteEntries(28);
+        restartActor();
+
+        assertHighestSequenceNr(30);
+        // 29,30 replayed
+        assertReplayCount(2);
+
+        deleteEntries(29);
+        restartActor();
+
+        // 30 replayed
+        assertReplayCount(1);
+
+        deleteEntries(30);
+        restartActor();
+
+        // nothing replayed
+        assertReplayCount(0);
+    }
+
+    private void restartActor() {
+        actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        actor = actor();
+    }
+
+    private void writeBigPaylod() {
+        final LargePayload payload = new LargePayload();
+
+        final WriteMessages write = new WriteMessages();
+        final List<Future<Optional<Exception>>> requests = new ArrayList<>();
+
+        // Each payload is half of segment size, plus some overhead, should result in two segments being present
+        for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) {
+            requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(),
+                    "uuid"))));
+        }
+
+        actor.tell(write, ActorRef.noSender());
+        requests.forEach(future -> assertFalse(getFuture(future).isPresent()));
+    }
+
     private ActorRef actor() {
         return kit.childActorOf(SegmentedJournalActor.props("foo", DIRECTORY, StorageLevel.DISK, MESSAGE_SIZE,
             SEGMENT_SIZE).withDispatcher(CallingThreadDispatcher.Id()));