- try (SegmentedJournalReader<DataJournalEntry> reader = entries.openReader(fromSequenceNr)) {
- int count = 0;
- while (reader.hasNext() && count < message.max) {
- final Indexed<DataJournalEntry> next = reader.next();
- if (next.index() > message.toSequenceNr) {
- break;
- }
-
- LOG.trace("{}: replay {}", persistenceId, next);
- updateLargestSize(next.size());
- final DataJournalEntry entry = next.entry();
- verify(entry instanceof FromPersistence, "Unexpected entry %s", entry);
-
- final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index());
- LOG.debug("{}: replaying {}", persistenceId, repr);
- message.replayCallback.accept(repr);
- count++;
- }
- LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
+ try (var reader = entries.openReader(fromSequenceNr)) {
+ handleReplayMessages(reader, message);