Bump versions 9.0.4-SNAPSHOT
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / main / java / org / opendaylight / controller / akka / segjournal / SegmentedJournalActor.java
index 74fdf2387ef37aef8a91f697584573825e36062f..0b7808391162327188cc871c69caf75422b88c4c 100644 (file)
@@ -7,11 +7,14 @@
  */
 package org.opendaylight.controller.akka.segjournal;
 
+import static com.google.common.base.Verify.verify;
 import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.japi.pf.ReceiveBuilder;
 import akka.persistence.AtomicWrite;
 import akka.persistence.PersistentRepr;
 import com.codahale.metrics.Histogram;
@@ -21,11 +24,12 @@ import com.codahale.metrics.Timer;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Stopwatch;
 import io.atomix.storage.journal.Indexed;
+import io.atomix.storage.journal.JournalSerdes;
+import io.atomix.storage.journal.SegmentedByteBufJournal;
 import io.atomix.storage.journal.SegmentedJournal;
-import io.atomix.storage.journal.SegmentedJournalWriter;
 import io.atomix.storage.journal.StorageLevel;
-import io.atomix.utils.serializer.Namespace;
 import java.io.File;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -33,6 +37,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -57,11 +63,9 @@ import scala.concurrent.Promise;
  * <p>
  * Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit
  * mapping information. The only additional information we need to maintain is the last deleted sequence number.
- *
- * @author Robert Varga
  */
-final class SegmentedJournalActor extends AbstractActor {
-    abstract static class AsyncMessage<T> {
+abstract sealed class SegmentedJournalActor extends AbstractActor {
+    abstract static sealed class AsyncMessage<T> {
         final Promise<T> promise = Promise.apply();
     }
 
@@ -104,7 +108,7 @@ final class SegmentedJournalActor extends AbstractActor {
         private final List<Promise<Optional<Exception>>> results = new ArrayList<>();
 
         Future<Optional<Exception>> add(final AtomicWrite write) {
-            final Promise<Optional<Exception>> promise = Promise.apply();
+            final var promise = Promise.<Optional<Exception>>apply();
             requests.add(write);
             results.add(promise);
             return promise.future();
@@ -120,7 +124,6 @@ final class SegmentedJournalActor extends AbstractActor {
 
         void setFailure(final int index, final Exception cause) {
             results.get(index).success(Optional.of(cause));
-
         }
 
         void setSuccess(final int index) {
@@ -146,9 +149,154 @@ final class SegmentedJournalActor extends AbstractActor {
         }
     }
 
+    // responses == null on success, Exception on failure
+    record WrittenMessages(WriteMessages message, List<Object> responses, long writtenBytes) {
+        WrittenMessages {
+            verify(responses.size() == message.size(), "Mismatched %s and %s", message, responses);
+            verify(writtenBytes >= 0, "Unexpected length %s", writtenBytes);
+        }
+
+        private void complete() {
+            for (int i = 0, size = responses.size(); i < size; ++i) {
+                if (responses.get(i) instanceof Exception ex) {
+                    message.setFailure(i, ex);
+                } else {
+                    message.setSuccess(i);
+                }
+            }
+        }
+    }
+
+    /**
+     * A {@link SegmentedJournalActor} which delays issuing a flush operation until a watermark is reached or when the
+     * queue is empty.
+     *
+     * <p>
+     * The problem we are addressing is that there is a queue sitting in from of the actor, which we have no direct
+     * access to. Since a flush involves committing data to durable storage, that operation can easily end up dominating
+     * workloads.
+     *
+     * <p>
+     * We solve this by having an additional queue in which we track which messages were written and trigger a flush
+     * only when the number of bytes we have written exceeds specified limit. The other part is that each time this
+     * queue becomes non-empty, we send a dedicated message to self. This acts as a actor queue probe -- when we receive
+     * it, we know we have processed all messages that were in the queue when we first delayed the write.
+     *
+     * <p>
+     * The combination of these mechanisms ensure we use a minimal delay while also ensuring we take advantage of
+     * batching opportunities.
+     */
+    private static final class Delayed extends SegmentedJournalActor {
+        private static final class Flush extends AsyncMessage<Void> {
+            final long batch;
+
+            Flush(final long batch) {
+                this.batch = batch;
+            }
+        }
+
+        private record UnflushedWrite(WrittenMessages message, Stopwatch start, long count) {
+            UnflushedWrite {
+                requireNonNull(message);
+                requireNonNull(start);
+            }
+        }
+
+        private final ArrayDeque<UnflushedWrite> unflushedWrites = new ArrayDeque<>();
+        private final Stopwatch unflushedDuration = Stopwatch.createUnstarted();
+        private final long maxUnflushedBytes;
+
+        private long batch = 0;
+        private long unflushedBytes = 0;
+
+        Delayed(final String persistenceId, final File directory, final StorageLevel storage,
+                final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) {
+            super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize);
+            this.maxUnflushedBytes = maxUnflushedBytes;
+        }
+
+        @Override
+        ReceiveBuilder addMessages(final ReceiveBuilder builder) {
+            return super.addMessages(builder).match(Flush.class, this::handleFlush);
+        }
+
+        private void handleFlush(final Flush message) {
+            if (message.batch == batch) {
+                flushWrites();
+            } else {
+                LOG.debug("{}: batch {} not flushed by {}", persistenceId(), batch, message.batch);
+            }
+        }
+
+        @Override
+        void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+            boolean first = unflushedWrites.isEmpty();
+            if (first) {
+                unflushedDuration.start();
+            }
+            unflushedWrites.addLast(new UnflushedWrite(message, started, count));
+            unflushedBytes = unflushedBytes + message.writtenBytes;
+            if (unflushedBytes >= maxUnflushedBytes) {
+                LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes);
+                flushWrites();
+            } else if (first) {
+                LOG.debug("{}: deferring journal flush", persistenceId());
+                self().tell(new Flush(++batch), ActorRef.noSender());
+            }
+        }
+
+        @Override
+        void flushWrites() {
+            final var unsyncedSize = unflushedWrites.size();
+            if (unsyncedSize == 0) {
+                // Nothing to flush
+                return;
+            }
+
+            LOG.debug("{}: flushing {} journal writes after {}", persistenceId(), unsyncedSize,
+                unflushedDuration.stop());
+            flushJournal(unflushedBytes, unsyncedSize);
+
+            final var sw = Stopwatch.createStarted();
+            unflushedWrites.forEach(write -> completeWriteMessages(write.message, write.start, write.count));
+            unflushedWrites.clear();
+            unflushedBytes = 0;
+            unflushedDuration.reset();
+            LOG.debug("{}: completed {} flushed journal writes in {}", persistenceId(), unsyncedSize, sw);
+        }
+    }
+
+    private static final class Immediate extends SegmentedJournalActor {
+        Immediate(final String persistenceId, final File directory, final StorageLevel storage,
+                final int maxEntrySize, final int maxSegmentSize) {
+            super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize);
+        }
+
+        @Override
+        void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+            flushJournal(message.writtenBytes, 1);
+            completeWriteMessages(message, started, count);
+        }
+
+        @Override
+        void flushWrites() {
+            // No-op
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
-    private static final Namespace DELETE_NAMESPACE = Namespace.builder().register(Long.class).build();
     private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
+    private static final FromByteBufMapper<Long> READ_MAPPER;
+    private static final ToByteBufMapper<Long> WRITE_MAPPER;
+
+    static {
+        final var namespace = JournalSerdes.builder()
+            .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
+            .build();
+
+        READ_MAPPER = namespace.toReadMapper();
+        WRITE_MAPPER = namespace.toWriteMapper();
+    }
 
     private final String persistenceId;
     private final StorageLevel storage;
@@ -162,12 +310,18 @@ final class SegmentedJournalActor extends AbstractActor {
     private Meter messageWriteCount;
     // Tracks the size distribution of messages
     private Histogram messageSize;
+    // Tracks the number of messages completed for each flush
+    private Histogram flushMessages;
+    // Tracks the number of bytes completed for each flush
+    private Histogram flushBytes;
+    // Tracks the duration of flush operations
+    private Timer flushTime;
 
     private DataJournal dataJournal;
     private SegmentedJournal<Long> deleteJournal;
     private long lastDelete;
 
-    SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
+    private SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
             final int maxEntrySize, final int maxSegmentSize) {
         this.persistenceId = requireNonNull(persistenceId);
         this.directory = requireNonNull(directory);
@@ -177,20 +331,39 @@ final class SegmentedJournalActor extends AbstractActor {
     }
 
     static Props props(final String persistenceId, final File directory, final StorageLevel storage,
-            final int maxEntrySize, final int maxSegmentSize) {
-        return Props.create(SegmentedJournalActor.class, requireNonNull(persistenceId), directory, storage,
-            maxEntrySize, maxSegmentSize);
+            final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) {
+        final var pid = requireNonNull(persistenceId);
+        return maxUnflushedBytes > 0
+            ? Props.create(Delayed.class, pid, directory, storage, maxEntrySize, maxSegmentSize, maxUnflushedBytes)
+            : Props.create(Immediate.class, pid, directory, storage, maxEntrySize, maxSegmentSize);
+    }
+
+    final String persistenceId() {
+        return persistenceId;
+    }
+
+    final void flushJournal(final long bytes, final int messages) {
+        final var sw = Stopwatch.createStarted();
+        dataJournal.flush();
+        LOG.debug("{}: journal flush completed in {}", persistenceId, sw.stop());
+        flushBytes.update(bytes);
+        flushMessages.update(messages);
+        flushTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
     }
 
     @Override
     public Receive createReceive() {
-        return receiveBuilder()
-                .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo)
-                .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr)
-                .match(ReplayMessages.class, this::handleReplayMessages)
-                .match(WriteMessages.class, this::handleWriteMessages)
-                .matchAny(this::handleUnknown)
-                .build();
+        return addMessages(receiveBuilder())
+            .matchAny(this::handleUnknown)
+            .build();
+    }
+
+    ReceiveBuilder addMessages(final ReceiveBuilder builder) {
+        return builder
+            .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo)
+            .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr)
+            .match(ReplayMessages.class, this::handleReplayMessages)
+            .match(WriteMessages.class, this::handleWriteMessages);
     }
 
     @Override
@@ -198,12 +371,15 @@ final class SegmentedJournalActor extends AbstractActor {
         LOG.debug("{}: actor starting", persistenceId);
         super.preStart();
 
-        final MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
-        final String actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
+        final var registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
+        final var actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
 
         batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime"));
         messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount"));
         messageSize = registry.histogram(MetricRegistry.name(actorName, "messageSize"));
+        flushBytes = registry.histogram(MetricRegistry.name(actorName, "flushBytes"));
+        flushMessages = registry.histogram(MetricRegistry.name(actorName, "flushMessages"));
+        flushTime = registry.timer(MetricRegistry.name(actorName, "flushTime"));
     }
 
     @Override
@@ -240,6 +416,8 @@ final class SegmentedJournalActor extends AbstractActor {
         ensureOpen();
 
         LOG.debug("{}: delete messages {}", persistenceId, message);
+        flushWrites();
+
         final long to = Long.min(dataJournal.lastWrittenSequenceNr(), message.toSequenceNr);
         LOG.debug("{}: adjusted delete to {}", persistenceId, to);
 
@@ -247,8 +425,8 @@ final class SegmentedJournalActor extends AbstractActor {
             LOG.debug("{}: deleting entries up to {}", persistenceId, to);
 
             lastDelete = to;
-            final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
-            final Indexed<Long> entry = deleteWriter.append(lastDelete);
+            final var deleteWriter = deleteJournal.writer();
+            final var entry = deleteWriter.append(lastDelete);
             deleteWriter.commit(entry.index());
             dataJournal.deleteTo(lastDelete);
 
@@ -268,6 +446,7 @@ final class SegmentedJournalActor extends AbstractActor {
         final Long sequence;
         if (directory.isDirectory()) {
             ensureOpen();
+            flushWrites();
             sequence = dataJournal.lastWrittenSequenceNr();
         } else {
             sequence = 0L;
@@ -280,6 +459,7 @@ final class SegmentedJournalActor extends AbstractActor {
     private void handleReplayMessages(final ReplayMessages message) {
         LOG.debug("{}: replaying messages {}", persistenceId, message);
         ensureOpen();
+        flushWrites();
 
         final long from = Long.max(lastDelete + 1, message.fromSequenceNr);
         LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from);
@@ -290,18 +470,30 @@ final class SegmentedJournalActor extends AbstractActor {
     private void handleWriteMessages(final WriteMessages message) {
         ensureOpen();
 
-        final Stopwatch sw = Stopwatch.createStarted();
+        final var started = Stopwatch.createStarted();
         final long start = dataJournal.lastWrittenSequenceNr();
-        final long bytes = dataJournal.handleWriteMessages(message);
-        sw.stop();
+        final var writtenMessages = dataJournal.handleWriteMessages(message);
 
-        batchWriteTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
-        messageWriteCount.mark(dataJournal.lastWrittenSequenceNr() - start);
+        onWrittenMessages(writtenMessages, started, dataJournal.lastWrittenSequenceNr() - start);
+    }
 
+    final void completeWriteMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+        batchWriteTime.update(started.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+        messageWriteCount.mark(count);
         // log message after statistics are updated
-        LOG.debug("{}: write of {} bytes completed in {}", persistenceId, bytes, sw);
+        LOG.debug("{}: write of {} bytes completed in {}", persistenceId, message.writtenBytes, started);
+        message.complete();
     }
 
+    /**
+     * Handle a check of written messages.
+     *
+     * @param message Messages which were written
+     * @param started Stopwatch started when the write started
+     * @param count number of writes
+     */
+    abstract void onWrittenMessages(WrittenMessages message, Stopwatch started, long count);
+
     private void handleUnknown(final Object message) {
         LOG.error("{}: Received unknown message {}", persistenceId, message);
     }
@@ -312,11 +504,15 @@ final class SegmentedJournalActor extends AbstractActor {
             return;
         }
 
-        final Stopwatch sw = Stopwatch.createStarted();
-        deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
-                .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
-        final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
-        lastDelete = lastEntry == null ? 0 : lastEntry.entry();
+        final var sw = Stopwatch.createStarted();
+        deleteJournal = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
+            .withDirectory(directory)
+            .withName("delete")
+            .withMaxSegmentSize(DELETE_SEGMENT_SIZE)
+            .build(), READ_MAPPER, WRITE_MAPPER);
+        final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
+            .tryNext((index, value, length) -> value);
+        lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered;
 
         dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
             maxEntrySize, maxSegmentSize);
@@ -324,4 +520,7 @@ final class SegmentedJournalActor extends AbstractActor {
         LOG.debug("{}: journal open in {} with last index {}, deleted to {}", persistenceId, sw,
             dataJournal.lastWrittenSequenceNr(), lastDelete);
     }
+
+    abstract void flushWrites();
+
 }