+ // responses == null on success, Exception on failure
+ record WrittenMessages(WriteMessages message, List<Object> responses, long writtenBytes) {
+ WrittenMessages {
+ verify(responses.size() == message.size(), "Mismatched %s and %s", message, responses);
+ verify(writtenBytes >= 0, "Unexpected length %s", writtenBytes);
+ }
+
+ private void complete() {
+ for (int i = 0, size = responses.size(); i < size; ++i) {
+ if (responses.get(i) instanceof Exception ex) {
+ message.setFailure(i, ex);
+ } else {
+ message.setSuccess(i);
+ }
+ }
+ }
+ }
+
+ /**
+ * A {@link SegmentedJournalActor} which delays issuing a flush operation until a watermark is reached or when the
+ * queue is empty.
+ *
+ * <p>
+ * The problem we are addressing is that there is a queue sitting in from of the actor, which we have no direct
+ * access to. Since a flush involves committing data to durable storage, that operation can easily end up dominating
+ * workloads.
+ *
+ * <p>
+ * We solve this by having an additional queue in which we track which messages were written and trigger a flush
+ * only when the number of bytes we have written exceeds specified limit. The other part is that each time this
+ * queue becomes non-empty, we send a dedicated message to self. This acts as a actor queue probe -- when we receive
+ * it, we know we have processed all messages that were in the queue when we first delayed the write.
+ *
+ * <p>
+ * The combination of these mechanisms ensure we use a minimal delay while also ensuring we take advantage of
+ * batching opportunities.
+ */
+ private static final class Delayed extends SegmentedJournalActor {
+ private static final class Flush extends AsyncMessage<Void> {
+ final long batch;
+
+ Flush(final long batch) {
+ this.batch = batch;
+ }
+ }
+
+ private record UnflushedWrite(WrittenMessages message, Stopwatch start, long count) {
+ UnflushedWrite {
+ requireNonNull(message);
+ requireNonNull(start);
+ }
+ }
+
+ private final ArrayDeque<UnflushedWrite> unflushedWrites = new ArrayDeque<>();
+ private final Stopwatch unflushedDuration = Stopwatch.createUnstarted();
+ private final long maxUnflushedBytes;
+
+ private long batch = 0;
+ private long unflushedBytes = 0;
+
+ Delayed(final String persistenceId, final File directory, final StorageLevel storage,
+ final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) {
+ super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize);
+ this.maxUnflushedBytes = maxUnflushedBytes;
+ }
+
+ @Override
+ ReceiveBuilder addMessages(final ReceiveBuilder builder) {
+ return super.addMessages(builder).match(Flush.class, this::handleFlush);
+ }
+
+ private void handleFlush(final Flush message) {
+ if (message.batch == batch) {
+ flushWrites();
+ } else {
+ LOG.debug("{}: batch {} not flushed by {}", persistenceId(), batch, message.batch);
+ }
+ }
+
+ @Override
+ void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+ boolean first = unflushedWrites.isEmpty();
+ if (first) {
+ unflushedDuration.start();
+ }
+ unflushedWrites.addLast(new UnflushedWrite(message, started, count));
+ unflushedBytes = unflushedBytes + message.writtenBytes;
+ if (unflushedBytes >= maxUnflushedBytes) {
+ LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes);
+ flushWrites();
+ } else if (first) {
+ LOG.debug("{}: deferring journal flush", persistenceId());
+ self().tell(new Flush(++batch), ActorRef.noSender());
+ }
+ }
+
+ @Override
+ void flushWrites() {
+ final var unsyncedSize = unflushedWrites.size();
+ if (unsyncedSize == 0) {
+ // Nothing to flush
+ return;
+ }
+
+ LOG.debug("{}: flushing {} journal writes after {}", persistenceId(), unsyncedSize,
+ unflushedDuration.stop());
+ flushJournal(unflushedBytes, unsyncedSize);
+
+ final var sw = Stopwatch.createStarted();
+ unflushedWrites.forEach(write -> completeWriteMessages(write.message, write.start, write.count));
+ unflushedWrites.clear();
+ unflushedBytes = 0;
+ unflushedDuration.reset();
+ LOG.debug("{}: completed {} flushed journal writes in {}", persistenceId(), unsyncedSize, sw);
+ }
+ }
+
+ private static final class Immediate extends SegmentedJournalActor {
+ Immediate(final String persistenceId, final File directory, final StorageLevel storage,
+ final int maxEntrySize, final int maxSegmentSize) {
+ super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize);
+ }
+
+ @Override
+ void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+ flushJournal(message.writtenBytes, 1);
+ completeWriteMessages(message, started, count);
+ }
+
+ @Override
+ void flushWrites() {
+ // No-op
+ }
+ }
+