Fix segmented journal replay
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / test / java / org / opendaylight / controller / akka / segjournal / SegmentedFileJournalTest.java
index 6939dbcba78e2cfba07b7db2017cf3be37180d01..7db0d4b87e0da205f4a227385d01ec46e4cae6a6 100644 (file)
@@ -132,6 +132,75 @@ public class SegmentedFileJournalTest {
         assertFileCount(1, 1);
     }
 
+    @Test
+    public void testComplexDeletesAndPartialReplays() throws Exception {
+        for (int i = 0; i <= 4; i++) {
+            writeBigPaylod();
+        }
+
+        assertFileCount(10, 1);
+
+        // delete including index 3, so get rid of the first segment
+        deleteEntries(3);
+        assertFileCount(9, 1);
+
+        // get rid of segments 2(index 4-6) and 3(index 7-9)
+        deleteEntries(9);
+        assertFileCount(7, 1);
+
+        // get rid of all segments except the last one
+        deleteEntries(27);
+        assertFileCount(1, 1);
+
+        restartActor();
+
+        // Check if state is retained
+        assertHighestSequenceNr(30);
+        // 28,29,30 replayed
+        assertReplayCount(3);
+
+
+        deleteEntries(28);
+        restartActor();
+
+        assertHighestSequenceNr(30);
+        // 29,30 replayed
+        assertReplayCount(2);
+
+        deleteEntries(29);
+        restartActor();
+
+        // 30 replayed
+        assertReplayCount(1);
+
+        deleteEntries(30);
+        restartActor();
+
+        // nothing replayed
+        assertReplayCount(0);
+    }
+
+    private void restartActor() {
+        actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        actor = actor();
+    }
+
+    private void writeBigPaylod() {
+        final LargePayload payload = new LargePayload();
+
+        final WriteMessages write = new WriteMessages();
+        final List<Future<Optional<Exception>>> requests = new ArrayList<>();
+
+        // Each payload is half of segment size, plus some overhead, should result in two segments being present
+        for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) {
+            requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(),
+                    "uuid"))));
+        }
+
+        actor.tell(write, ActorRef.noSender());
+        requests.forEach(future -> assertFalse(getFuture(future).isPresent()));
+    }
+
     private ActorRef actor() {
         return kit.childActorOf(SegmentedJournalActor.props("foo", DIRECTORY, StorageLevel.DISK, MESSAGE_SIZE,
             SEGMENT_SIZE).withDispatcher(CallingThreadDispatcher.Id()));