}
}
- abstract long lastWrittenIndex();
+ /**
+ * Return the last sequence number completely written to the journal.
+ *
+ * @return Last written sequence number, {@code -1} if there are no in the journal.
+ */
+ abstract long lastWrittenSequenceNr();
- abstract void commitTo(long index);
+ /**
+ * Delete all messages up to specified sequence number.
+ *
+ * @param sequenceNr Sequence number to delete to.
+ */
+ abstract void deleteTo(long sequenceNr);
- abstract void compactTo(long index);
+ /**
+ * Delete all messages up to specified sequence number.
+ *
+ * @param sequenceNr Sequence number to compact to.
+ */
+ abstract void compactTo(long sequenceNr);
+ /**
+ * Close this journal, freeing up resources associated with it.
+ */
abstract void close();
- abstract void handleReplayMessages(ReplayMessages message, long from);
+ /**
+ * Handle a request to replay messages.
+ *
+ * @param message Request message
+ * @param fromSequenceNr Sequence number to replay from, adjusted for deletions
+ */
+ abstract void handleReplayMessages(@NonNull ReplayMessages message, long fromSequenceNr);
- abstract void handleWriteMessages(WriteMessages message);
+ /**
+ * Handle a request to store some messages.
+ *
+ * @param message Request message
+ */
+ abstract void handleWriteMessages(@NonNull WriteMessages message);
}
}
@Override
- long lastWrittenIndex() {
+ long lastWrittenSequenceNr() {
return entries.writer().getLastIndex();
}
@Override
- void commitTo(final long index) {
- entries.writer().commit(index);
+ void deleteTo(final long sequenceNr) {
+ entries.writer().commit(sequenceNr);
}
@Override
- void compactTo(final long index) {
- entries.compact(index);
+ void compactTo(final long sequenceNr) {
+ entries.compact(sequenceNr + 1);
}
@Override
@Override
@SuppressWarnings("checkstyle:illegalCatch")
- void handleReplayMessages(final ReplayMessages message, final long from) {
- try (SegmentedJournalReader<DataJournalEntry> reader = entries.openReader(from)) {
+ void handleReplayMessages(final ReplayMessages message, final long fromSequenceNr) {
+ try (SegmentedJournalReader<DataJournalEntry> reader = entries.openReader(fromSequenceNr)) {
int count = 0;
while (reader.hasNext() && count < message.max) {
final Indexed<DataJournalEntry> next = reader.next();
ensureOpen();
LOG.debug("{}: delete messages {}", persistenceId, message);
- final long to = Long.min(dataJournal.lastWrittenIndex(), message.toSequenceNr);
+ final long to = Long.min(dataJournal.lastWrittenSequenceNr(), message.toSequenceNr);
LOG.debug("{}: adjusted delete to {}", persistenceId, to);
if (lastDelete < to) {
final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
final Indexed<Long> entry = deleteWriter.append(lastDelete);
deleteWriter.commit(entry.index());
- dataJournal.commitTo(lastDelete);
+ dataJournal.deleteTo(lastDelete);
LOG.debug("{}: compaction started", persistenceId);
- dataJournal.compactTo(lastDelete + 1);
+ dataJournal.compactTo(lastDelete);
deleteJournal.compact(entry.index());
LOG.debug("{}: compaction finished", persistenceId);
} else {
final Long sequence;
if (directory.isDirectory()) {
ensureOpen();
- sequence = dataJournal.lastWrittenIndex();
+ sequence = dataJournal.lastWrittenSequenceNr();
} else {
sequence = 0L;
}
message.promise.success(sequence);
}
- @SuppressWarnings("checkstyle:illegalCatch")
private void handleReplayMessages(final ReplayMessages message) {
LOG.debug("{}: replaying messages {}", persistenceId, message);
ensureOpen();
ensureOpen();
final long startTicks = System.nanoTime();
- final long start = dataJournal.lastWrittenIndex();
+ final long start = dataJournal.lastWrittenSequenceNr();
dataJournal.handleWriteMessages(message);
batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
- messageWriteCount.mark(dataJournal.lastWrittenIndex() - start);
+ messageWriteCount.mark(dataJournal.lastWrittenSequenceNr() - start);
}
private void handleUnknown(final Object message) {
dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
maxEntrySize, maxSegmentSize);
- dataJournal.commitTo(lastDelete);
- LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, dataJournal.lastWrittenIndex(),
- lastDelete);
+ dataJournal.deleteTo(lastDelete);
+ LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId,
+ dataJournal.lastWrittenSequenceNr(), lastDelete);
}
}