void setFailure(final int index, final Exception cause) {
results.get(index).success(Optional.of(cause));
-
}
void setSuccess(final int index) {
}
}
- private final ArrayDeque<WrittenMessages> unflushedWrites = new ArrayDeque<>();
+ private record UnflushedWrite(WrittenMessages message, Stopwatch start, long count) {
+ UnflushedWrite {
+ requireNonNull(message);
+ requireNonNull(start);
+ }
+ }
+
+ private final ArrayDeque<UnflushedWrite> unflushedWrites = new ArrayDeque<>();
private final Stopwatch unflushedDuration = Stopwatch.createUnstarted();
private final long maxUnflushedBytes;
}
@Override
- void onWrittenMessages(final WrittenMessages message) {
+ void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
boolean first = unflushedWrites.isEmpty();
if (first) {
unflushedDuration.start();
}
- unflushedWrites.addLast(message);
+ unflushedWrites.addLast(new UnflushedWrite(message, started, count));
unflushedBytes = unflushedBytes + message.writtenBytes;
if (unflushedBytes >= maxUnflushedBytes) {
LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes);
flushJournal(unflushedBytes, unsyncedSize);
final var sw = Stopwatch.createStarted();
- unflushedWrites.forEach(WrittenMessages::complete);
+ unflushedWrites.forEach(write -> completeWriteMessages(write.message, write.start, write.count));
unflushedWrites.clear();
unflushedBytes = 0;
unflushedDuration.reset();
}
@Override
- void onWrittenMessages(final WrittenMessages message) {
+ void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
flushJournal(message.writtenBytes, 1);
- message.complete();
+ completeWriteMessages(message, started, count);
}
@Override
private void handleWriteMessages(final WriteMessages message) {
ensureOpen();
- final var sw = Stopwatch.createStarted();
+ final var started = Stopwatch.createStarted();
final long start = dataJournal.lastWrittenSequenceNr();
final var writtenMessages = dataJournal.handleWriteMessages(message);
- sw.stop();
- batchWriteTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
- messageWriteCount.mark(dataJournal.lastWrittenSequenceNr() - start);
+ onWrittenMessages(writtenMessages, started, dataJournal.lastWrittenSequenceNr() - start);
+ }
+ final void completeWriteMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+ batchWriteTime.update(started.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+ messageWriteCount.mark(count);
// log message after statistics are updated
- LOG.debug("{}: write of {} bytes completed in {}", persistenceId, writtenMessages.writtenBytes, sw);
- onWrittenMessages(writtenMessages);
+ LOG.debug("{}: write of {} bytes completed in {}", persistenceId, message.writtenBytes, started);
+ message.complete();
}
/**
* Handle a check of written messages.
*
* @param message Messages which were written
+ * @param started Stopwatch started when the write started
+ * @param count number of writes
*/
- abstract void onWrittenMessages(WrittenMessages message);
+ abstract void onWrittenMessages(WrittenMessages message, Stopwatch started, long count);
private void handleUnknown(final Object message) {
LOG.error("{}: Received unknown message {}", persistenceId, message);