From: Tomas Cere Date: Tue, 12 Mar 2019 09:54:47 +0000 (+0100) Subject: Fix segmented journal replay X-Git-Tag: release/sodium~134 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=a47589fdd131b1f2363b331a3dd9266a08c40a57 Fix segmented journal replay lastDelete was getting set with the incorrect value from the delete journal. The actual written value from the delete journal needs to be used instead of the index. dataJournal compact also needs to be compacted upto lastDelete + 1 since deleteUpTo is inclusive and compact keeps this value around. Change-Id: I4a678be67fd1ad09c57273ef0fc3b7398a7d714f Signed-off-by: Tomas Cere --- diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java index 6cc5d9e672..c28a58c8bc 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java @@ -236,7 +236,7 @@ final class SegmentedJournalActor extends AbstractActor { dataJournal.writer().commit(lastDelete); LOG.debug("{}: compaction started", persistenceId); - dataJournal.compact(lastDelete); + dataJournal.compact(lastDelete + 1); deleteJournal.compact(entry.index()); LOG.debug("{}: compaction finished", persistenceId); } else { @@ -358,7 +358,7 @@ final class SegmentedJournalActor extends AbstractActor { deleteJournal = SegmentedJournal.builder().withDirectory(directory).withName("delete") .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build(); final Indexed lastEntry = deleteJournal.writer().getLastEntry(); - lastDelete = lastEntry == null ? 0 : lastEntry.index(); + lastDelete = lastEntry == null ? 0 : lastEntry.entry(); dataJournal = SegmentedJournal.builder() .withStorageLevel(storage).withDirectory(directory).withName("data") diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java index 6939dbcba7..7db0d4b87e 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java @@ -132,6 +132,75 @@ public class SegmentedFileJournalTest { assertFileCount(1, 1); } + @Test + public void testComplexDeletesAndPartialReplays() throws Exception { + for (int i = 0; i <= 4; i++) { + writeBigPaylod(); + } + + assertFileCount(10, 1); + + // delete including index 3, so get rid of the first segment + deleteEntries(3); + assertFileCount(9, 1); + + // get rid of segments 2(index 4-6) and 3(index 7-9) + deleteEntries(9); + assertFileCount(7, 1); + + // get rid of all segments except the last one + deleteEntries(27); + assertFileCount(1, 1); + + restartActor(); + + // Check if state is retained + assertHighestSequenceNr(30); + // 28,29,30 replayed + assertReplayCount(3); + + + deleteEntries(28); + restartActor(); + + assertHighestSequenceNr(30); + // 29,30 replayed + assertReplayCount(2); + + deleteEntries(29); + restartActor(); + + // 30 replayed + assertReplayCount(1); + + deleteEntries(30); + restartActor(); + + // nothing replayed + assertReplayCount(0); + } + + private void restartActor() { + actor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + actor = actor(); + } + + private void writeBigPaylod() { + final LargePayload payload = new LargePayload(); + + final WriteMessages write = new WriteMessages(); + final List>> requests = new ArrayList<>(); + + // Each payload is half of segment size, plus some overhead, should result in two segments being present + for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) { + requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(), + "uuid")))); + } + + actor.tell(write, ActorRef.noSender()); + requests.forEach(future -> assertFalse(getFuture(future).isPresent())); + } + private ActorRef actor() { return kit.childActorOf(SegmentedJournalActor.props("foo", DIRECTORY, StorageLevel.DISK, MESSAGE_SIZE, SEGMENT_SIZE).withDispatcher(CallingThreadDispatcher.Id()));