X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-segmented-journal%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fakka%2Fsegjournal%2FSegmentedJournalActor.java;h=9f63892d26fdfd949b46d0bb9213bd84df2deb80;hb=HEAD;hp=e5c5b7807b0b9ea016cdc4aa0835f49daf5ff8dd;hpb=255e74efd633f2fbca7ce4f1372004d93cc81a10;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java index e5c5b7807b..0b78083911 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java @@ -7,11 +7,14 @@ */ package org.opendaylight.controller.akka.segjournal; +import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; import akka.actor.AbstractActor; +import akka.actor.ActorRef; import akka.actor.Props; +import akka.japi.pf.ReceiveBuilder; import akka.persistence.AtomicWrite; import akka.persistence.PersistentRepr; import com.codahale.metrics.Histogram; @@ -19,12 +22,14 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.google.common.base.MoreObjects; -import io.atomix.storage.StorageLevel; +import com.google.common.base.Stopwatch; import io.atomix.storage.journal.Indexed; +import io.atomix.storage.journal.JournalSerdes; +import io.atomix.storage.journal.SegmentedByteBufJournal; import io.atomix.storage.journal.SegmentedJournal; -import io.atomix.storage.journal.SegmentedJournalWriter; -import io.atomix.utils.serializer.Namespace; +import io.atomix.storage.journal.StorageLevel; import java.io.File; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -32,6 +37,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.reporting.MetricsReporter; +import org.opendaylight.controller.raft.journal.FromByteBufMapper; +import org.opendaylight.controller.raft.journal.ToByteBufMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -56,11 +63,9 @@ import scala.concurrent.Promise; *

* Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit * mapping information. The only additional information we need to maintain is the last deleted sequence number. - * - * @author Robert Varga */ -final class SegmentedJournalActor extends AbstractActor { - abstract static class AsyncMessage { +abstract sealed class SegmentedJournalActor extends AbstractActor { + abstract static sealed class AsyncMessage { final Promise promise = Promise.apply(); } @@ -103,7 +108,7 @@ final class SegmentedJournalActor extends AbstractActor { private final List>> results = new ArrayList<>(); Future> add(final AtomicWrite write) { - final Promise> promise = Promise.apply(); + final var promise = Promise.>apply(); requests.add(write); results.add(promise); return promise.future(); @@ -119,7 +124,6 @@ final class SegmentedJournalActor extends AbstractActor { void setFailure(final int index, final Exception cause) { results.get(index).success(Optional.of(cause)); - } void setSuccess(final int index) { @@ -145,9 +149,154 @@ final class SegmentedJournalActor extends AbstractActor { } } + // responses == null on success, Exception on failure + record WrittenMessages(WriteMessages message, List responses, long writtenBytes) { + WrittenMessages { + verify(responses.size() == message.size(), "Mismatched %s and %s", message, responses); + verify(writtenBytes >= 0, "Unexpected length %s", writtenBytes); + } + + private void complete() { + for (int i = 0, size = responses.size(); i < size; ++i) { + if (responses.get(i) instanceof Exception ex) { + message.setFailure(i, ex); + } else { + message.setSuccess(i); + } + } + } + } + + /** + * A {@link SegmentedJournalActor} which delays issuing a flush operation until a watermark is reached or when the + * queue is empty. + * + *

+ * The problem we are addressing is that there is a queue sitting in from of the actor, which we have no direct + * access to. Since a flush involves committing data to durable storage, that operation can easily end up dominating + * workloads. + * + *

+ * We solve this by having an additional queue in which we track which messages were written and trigger a flush + * only when the number of bytes we have written exceeds specified limit. The other part is that each time this + * queue becomes non-empty, we send a dedicated message to self. This acts as a actor queue probe -- when we receive + * it, we know we have processed all messages that were in the queue when we first delayed the write. + * + *

+ * The combination of these mechanisms ensure we use a minimal delay while also ensuring we take advantage of + * batching opportunities. + */ + private static final class Delayed extends SegmentedJournalActor { + private static final class Flush extends AsyncMessage { + final long batch; + + Flush(final long batch) { + this.batch = batch; + } + } + + private record UnflushedWrite(WrittenMessages message, Stopwatch start, long count) { + UnflushedWrite { + requireNonNull(message); + requireNonNull(start); + } + } + + private final ArrayDeque unflushedWrites = new ArrayDeque<>(); + private final Stopwatch unflushedDuration = Stopwatch.createUnstarted(); + private final long maxUnflushedBytes; + + private long batch = 0; + private long unflushedBytes = 0; + + Delayed(final String persistenceId, final File directory, final StorageLevel storage, + final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) { + super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize); + this.maxUnflushedBytes = maxUnflushedBytes; + } + + @Override + ReceiveBuilder addMessages(final ReceiveBuilder builder) { + return super.addMessages(builder).match(Flush.class, this::handleFlush); + } + + private void handleFlush(final Flush message) { + if (message.batch == batch) { + flushWrites(); + } else { + LOG.debug("{}: batch {} not flushed by {}", persistenceId(), batch, message.batch); + } + } + + @Override + void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) { + boolean first = unflushedWrites.isEmpty(); + if (first) { + unflushedDuration.start(); + } + unflushedWrites.addLast(new UnflushedWrite(message, started, count)); + unflushedBytes = unflushedBytes + message.writtenBytes; + if (unflushedBytes >= maxUnflushedBytes) { + LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes); + flushWrites(); + } else if (first) { + LOG.debug("{}: deferring journal flush", persistenceId()); + self().tell(new Flush(++batch), ActorRef.noSender()); + } + } + + @Override + void flushWrites() { + final var unsyncedSize = unflushedWrites.size(); + if (unsyncedSize == 0) { + // Nothing to flush + return; + } + + LOG.debug("{}: flushing {} journal writes after {}", persistenceId(), unsyncedSize, + unflushedDuration.stop()); + flushJournal(unflushedBytes, unsyncedSize); + + final var sw = Stopwatch.createStarted(); + unflushedWrites.forEach(write -> completeWriteMessages(write.message, write.start, write.count)); + unflushedWrites.clear(); + unflushedBytes = 0; + unflushedDuration.reset(); + LOG.debug("{}: completed {} flushed journal writes in {}", persistenceId(), unsyncedSize, sw); + } + } + + private static final class Immediate extends SegmentedJournalActor { + Immediate(final String persistenceId, final File directory, final StorageLevel storage, + final int maxEntrySize, final int maxSegmentSize) { + super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize); + } + + @Override + void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) { + flushJournal(message.writtenBytes, 1); + completeWriteMessages(message, started, count); + } + + @Override + void flushWrites() { + // No-op + } + } + private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class); - private static final Namespace DELETE_NAMESPACE = Namespace.builder().register(Long.class).build(); private static final int DELETE_SEGMENT_SIZE = 64 * 1024; + private static final FromByteBufMapper READ_MAPPER; + private static final ToByteBufMapper WRITE_MAPPER; + + static { + final var namespace = JournalSerdes.builder() + .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class) + .build(); + + READ_MAPPER = namespace.toReadMapper(); + WRITE_MAPPER = namespace.toWriteMapper(); + } private final String persistenceId; private final StorageLevel storage; @@ -161,12 +310,18 @@ final class SegmentedJournalActor extends AbstractActor { private Meter messageWriteCount; // Tracks the size distribution of messages private Histogram messageSize; + // Tracks the number of messages completed for each flush + private Histogram flushMessages; + // Tracks the number of bytes completed for each flush + private Histogram flushBytes; + // Tracks the duration of flush operations + private Timer flushTime; private DataJournal dataJournal; private SegmentedJournal deleteJournal; private long lastDelete; - SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage, + private SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage, final int maxEntrySize, final int maxSegmentSize) { this.persistenceId = requireNonNull(persistenceId); this.directory = requireNonNull(directory); @@ -176,20 +331,39 @@ final class SegmentedJournalActor extends AbstractActor { } static Props props(final String persistenceId, final File directory, final StorageLevel storage, - final int maxEntrySize, final int maxSegmentSize) { - return Props.create(SegmentedJournalActor.class, requireNonNull(persistenceId), directory, storage, - maxEntrySize, maxSegmentSize); + final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) { + final var pid = requireNonNull(persistenceId); + return maxUnflushedBytes > 0 + ? Props.create(Delayed.class, pid, directory, storage, maxEntrySize, maxSegmentSize, maxUnflushedBytes) + : Props.create(Immediate.class, pid, directory, storage, maxEntrySize, maxSegmentSize); + } + + final String persistenceId() { + return persistenceId; + } + + final void flushJournal(final long bytes, final int messages) { + final var sw = Stopwatch.createStarted(); + dataJournal.flush(); + LOG.debug("{}: journal flush completed in {}", persistenceId, sw.stop()); + flushBytes.update(bytes); + flushMessages.update(messages); + flushTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); } @Override public Receive createReceive() { - return receiveBuilder() - .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo) - .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr) - .match(ReplayMessages.class, this::handleReplayMessages) - .match(WriteMessages.class, this::handleWriteMessages) - .matchAny(this::handleUnknown) - .build(); + return addMessages(receiveBuilder()) + .matchAny(this::handleUnknown) + .build(); + } + + ReceiveBuilder addMessages(final ReceiveBuilder builder) { + return builder + .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo) + .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr) + .match(ReplayMessages.class, this::handleReplayMessages) + .match(WriteMessages.class, this::handleWriteMessages); } @Override @@ -197,12 +371,15 @@ final class SegmentedJournalActor extends AbstractActor { LOG.debug("{}: actor starting", persistenceId); super.preStart(); - final MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry(); - final String actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName(); + final var registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry(); + final var actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName(); batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime")); messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount")); messageSize = registry.histogram(MetricRegistry.name(actorName, "messageSize")); + flushBytes = registry.histogram(MetricRegistry.name(actorName, "flushBytes")); + flushMessages = registry.histogram(MetricRegistry.name(actorName, "flushMessages")); + flushTime = registry.timer(MetricRegistry.name(actorName, "flushTime")); } @Override @@ -239,6 +416,8 @@ final class SegmentedJournalActor extends AbstractActor { ensureOpen(); LOG.debug("{}: delete messages {}", persistenceId, message); + flushWrites(); + final long to = Long.min(dataJournal.lastWrittenSequenceNr(), message.toSequenceNr); LOG.debug("{}: adjusted delete to {}", persistenceId, to); @@ -246,8 +425,8 @@ final class SegmentedJournalActor extends AbstractActor { LOG.debug("{}: deleting entries up to {}", persistenceId, to); lastDelete = to; - final SegmentedJournalWriter deleteWriter = deleteJournal.writer(); - final Indexed entry = deleteWriter.append(lastDelete); + final var deleteWriter = deleteJournal.writer(); + final var entry = deleteWriter.append(lastDelete); deleteWriter.commit(entry.index()); dataJournal.deleteTo(lastDelete); @@ -267,6 +446,7 @@ final class SegmentedJournalActor extends AbstractActor { final Long sequence; if (directory.isDirectory()) { ensureOpen(); + flushWrites(); sequence = dataJournal.lastWrittenSequenceNr(); } else { sequence = 0L; @@ -279,6 +459,7 @@ final class SegmentedJournalActor extends AbstractActor { private void handleReplayMessages(final ReplayMessages message) { LOG.debug("{}: replaying messages {}", persistenceId, message); ensureOpen(); + flushWrites(); final long from = Long.max(lastDelete + 1, message.fromSequenceNr); LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from); @@ -289,15 +470,30 @@ final class SegmentedJournalActor extends AbstractActor { private void handleWriteMessages(final WriteMessages message) { ensureOpen(); - final long startTicks = System.nanoTime(); + final var started = Stopwatch.createStarted(); final long start = dataJournal.lastWrittenSequenceNr(); + final var writtenMessages = dataJournal.handleWriteMessages(message); - dataJournal.handleWriteMessages(message); + onWrittenMessages(writtenMessages, started, dataJournal.lastWrittenSequenceNr() - start); + } - batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS); - messageWriteCount.mark(dataJournal.lastWrittenSequenceNr() - start); + final void completeWriteMessages(final WrittenMessages message, final Stopwatch started, final long count) { + batchWriteTime.update(started.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + messageWriteCount.mark(count); + // log message after statistics are updated + LOG.debug("{}: write of {} bytes completed in {}", persistenceId, message.writtenBytes, started); + message.complete(); } + /** + * Handle a check of written messages. + * + * @param message Messages which were written + * @param started Stopwatch started when the write started + * @param count number of writes + */ + abstract void onWrittenMessages(WrittenMessages message, Stopwatch started, long count); + private void handleUnknown(final Object message) { LOG.error("{}: Received unknown message {}", persistenceId, message); } @@ -308,15 +504,23 @@ final class SegmentedJournalActor extends AbstractActor { return; } - deleteJournal = SegmentedJournal.builder().withDirectory(directory).withName("delete") - .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build(); - final Indexed lastEntry = deleteJournal.writer().getLastEntry(); - lastDelete = lastEntry == null ? 0 : lastEntry.entry(); + final var sw = Stopwatch.createStarted(); + deleteJournal = new SegmentedJournal<>(SegmentedByteBufJournal.builder() + .withDirectory(directory) + .withName("delete") + .withMaxSegmentSize(DELETE_SEGMENT_SIZE) + .build(), READ_MAPPER, WRITE_MAPPER); + final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex()) + .tryNext((index, value, length) -> value); + lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered; dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory, maxEntrySize, maxSegmentSize); dataJournal.deleteTo(lastDelete); - LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, + LOG.debug("{}: journal open in {} with last index {}, deleted to {}", persistenceId, sw, dataJournal.lastWrittenSequenceNr(), lastDelete); } + + abstract void flushWrites(); + }