dataJournal.writer().commit(lastDelete);
LOG.debug("{}: compaction started", persistenceId);
- dataJournal.compact(lastDelete);
+ dataJournal.compact(lastDelete + 1);
deleteJournal.compact(entry.index());
LOG.debug("{}: compaction finished", persistenceId);
} else {
deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
.withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
- lastDelete = lastEntry == null ? 0 : lastEntry.index();
+ lastDelete = lastEntry == null ? 0 : lastEntry.entry();
dataJournal = SegmentedJournal.<DataJournalEntry>builder()
.withStorageLevel(storage).withDirectory(directory).withName("data")
assertFileCount(1, 1);
}
+ @Test
+ public void testComplexDeletesAndPartialReplays() throws Exception {
+ for (int i = 0; i <= 4; i++) {
+ writeBigPaylod();
+ }
+
+ assertFileCount(10, 1);
+
+ // delete including index 3, so get rid of the first segment
+ deleteEntries(3);
+ assertFileCount(9, 1);
+
+ // get rid of segments 2(index 4-6) and 3(index 7-9)
+ deleteEntries(9);
+ assertFileCount(7, 1);
+
+ // get rid of all segments except the last one
+ deleteEntries(27);
+ assertFileCount(1, 1);
+
+ restartActor();
+
+ // Check if state is retained
+ assertHighestSequenceNr(30);
+ // 28,29,30 replayed
+ assertReplayCount(3);
+
+
+ deleteEntries(28);
+ restartActor();
+
+ assertHighestSequenceNr(30);
+ // 29,30 replayed
+ assertReplayCount(2);
+
+ deleteEntries(29);
+ restartActor();
+
+ // 30 replayed
+ assertReplayCount(1);
+
+ deleteEntries(30);
+ restartActor();
+
+ // nothing replayed
+ assertReplayCount(0);
+ }
+
+ private void restartActor() {
+ actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ actor = actor();
+ }
+
+ private void writeBigPaylod() {
+ final LargePayload payload = new LargePayload();
+
+ final WriteMessages write = new WriteMessages();
+ final List<Future<Optional<Exception>>> requests = new ArrayList<>();
+
+ // Each payload is half of segment size, plus some overhead, should result in two segments being present
+ for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) {
+ requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(),
+ "uuid"))));
+ }
+
+ actor.tell(write, ActorRef.noSender());
+ requests.forEach(future -> assertFalse(getFuture(future).isPresent()));
+ }
+
private ActorRef actor() {
return kit.childActorOf(SegmentedJournalActor.props("foo", DIRECTORY, StorageLevel.DISK, MESSAGE_SIZE,
SEGMENT_SIZE).withDispatcher(CallingThreadDispatcher.Id()));