+ private void handleReplayMessages(final SegmentedJournalReader<DataJournalEntry> reader,
+ final ReplayMessages message) {
+ int count = 0;
+ while (reader.hasNext() && count < message.max) {
+ final var next = reader.next();
+ if (next.index() > message.toSequenceNr) {
+ break;
+ }
+
+ LOG.trace("{}: replay {}", persistenceId, next);
+ updateLargestSize(next.size());
+ final var entry = next.entry();
+ if (entry instanceof FromPersistence fromPersistence) {
+ final var repr = fromPersistence.toRepr(persistenceId, next.index());
+ LOG.debug("{}: replaying {}", persistenceId, repr);
+ message.replayCallback.accept(repr);
+ count++;
+ } else {
+ throw new VerifyException("Unexpected entry " + entry);
+ }
+ }
+ LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
+ }
+