X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-segmented-journal%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fakka%2Fsegjournal%2FSegmentedJournalActor.java;h=9f63892d26fdfd949b46d0bb9213bd84df2deb80;hb=HEAD;hp=3b85a230a1b9b26877bead05c8da2ccd22d0d76d;hpb=47d43dcb0148e356c0c5a4eb36bcfc2049f36da0;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java index 3b85a230a1..0b78083911 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java @@ -12,7 +12,9 @@ import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; import akka.actor.AbstractActor; +import akka.actor.ActorRef; import akka.actor.Props; +import akka.japi.pf.ReceiveBuilder; import akka.persistence.AtomicWrite; import akka.persistence.PersistentRepr; import com.codahale.metrics.Histogram; @@ -20,27 +22,25 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.google.common.base.MoreObjects; -import io.atomix.storage.StorageLevel; +import com.google.common.base.Stopwatch; import io.atomix.storage.journal.Indexed; +import io.atomix.storage.journal.JournalSerdes; +import io.atomix.storage.journal.SegmentedByteBufJournal; import io.atomix.storage.journal.SegmentedJournal; -import io.atomix.storage.journal.SegmentedJournalReader; -import io.atomix.storage.journal.SegmentedJournalWriter; -import io.atomix.utils.serializer.Namespace; +import io.atomix.storage.journal.StorageLevel; import java.io.File; -import java.io.Serializable; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence; -import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.reporting.MetricsReporter; +import org.opendaylight.controller.raft.journal.FromByteBufMapper; +import org.opendaylight.controller.raft.journal.ToByteBufMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.collection.Iterator; -import scala.collection.SeqLike; import scala.concurrent.Future; import scala.concurrent.Promise; @@ -63,11 +63,9 @@ import scala.concurrent.Promise; *

* Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit * mapping information. The only additional information we need to maintain is the last deleted sequence number. - * - * @author Robert Varga */ -final class SegmentedJournalActor extends AbstractActor { - abstract static class AsyncMessage { +abstract sealed class SegmentedJournalActor extends AbstractActor { + abstract static sealed class AsyncMessage { final Promise promise = Promise.apply(); } @@ -84,11 +82,11 @@ final class SegmentedJournalActor extends AbstractActor { } } - private static final class ReplayMessages extends AsyncMessage { + static final class ReplayMessages extends AsyncMessage { private final long fromSequenceNr; - private final long toSequenceNr; - private final long max; - private final Consumer replayCallback; + final long toSequenceNr; + final long max; + final Consumer replayCallback; ReplayMessages(final long fromSequenceNr, final long toSequenceNr, final long max, final Consumer replayCallback) { @@ -110,12 +108,28 @@ final class SegmentedJournalActor extends AbstractActor { private final List>> results = new ArrayList<>(); Future> add(final AtomicWrite write) { - final Promise> promise = Promise.apply(); + final var promise = Promise.>apply(); requests.add(write); results.add(promise); return promise.future(); } + int size() { + return requests.size(); + } + + AtomicWrite getRequest(final int index) { + return requests.get(index); + } + + void setFailure(final int index, final Exception cause) { + results.get(index).success(Optional.of(cause)); + } + + void setSuccess(final int index) { + results.get(index).success(Optional.empty()); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("requests", requests).toString(); @@ -135,9 +149,154 @@ final class SegmentedJournalActor extends AbstractActor { } } + // responses == null on success, Exception on failure + record WrittenMessages(WriteMessages message, List responses, long writtenBytes) { + WrittenMessages { + verify(responses.size() == message.size(), "Mismatched %s and %s", message, responses); + verify(writtenBytes >= 0, "Unexpected length %s", writtenBytes); + } + + private void complete() { + for (int i = 0, size = responses.size(); i < size; ++i) { + if (responses.get(i) instanceof Exception ex) { + message.setFailure(i, ex); + } else { + message.setSuccess(i); + } + } + } + } + + /** + * A {@link SegmentedJournalActor} which delays issuing a flush operation until a watermark is reached or when the + * queue is empty. + * + *

+ * The problem we are addressing is that there is a queue sitting in from of the actor, which we have no direct + * access to. Since a flush involves committing data to durable storage, that operation can easily end up dominating + * workloads. + * + *

+ * We solve this by having an additional queue in which we track which messages were written and trigger a flush + * only when the number of bytes we have written exceeds specified limit. The other part is that each time this + * queue becomes non-empty, we send a dedicated message to self. This acts as a actor queue probe -- when we receive + * it, we know we have processed all messages that were in the queue when we first delayed the write. + * + *

+ * The combination of these mechanisms ensure we use a minimal delay while also ensuring we take advantage of + * batching opportunities. + */ + private static final class Delayed extends SegmentedJournalActor { + private static final class Flush extends AsyncMessage { + final long batch; + + Flush(final long batch) { + this.batch = batch; + } + } + + private record UnflushedWrite(WrittenMessages message, Stopwatch start, long count) { + UnflushedWrite { + requireNonNull(message); + requireNonNull(start); + } + } + + private final ArrayDeque unflushedWrites = new ArrayDeque<>(); + private final Stopwatch unflushedDuration = Stopwatch.createUnstarted(); + private final long maxUnflushedBytes; + + private long batch = 0; + private long unflushedBytes = 0; + + Delayed(final String persistenceId, final File directory, final StorageLevel storage, + final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) { + super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize); + this.maxUnflushedBytes = maxUnflushedBytes; + } + + @Override + ReceiveBuilder addMessages(final ReceiveBuilder builder) { + return super.addMessages(builder).match(Flush.class, this::handleFlush); + } + + private void handleFlush(final Flush message) { + if (message.batch == batch) { + flushWrites(); + } else { + LOG.debug("{}: batch {} not flushed by {}", persistenceId(), batch, message.batch); + } + } + + @Override + void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) { + boolean first = unflushedWrites.isEmpty(); + if (first) { + unflushedDuration.start(); + } + unflushedWrites.addLast(new UnflushedWrite(message, started, count)); + unflushedBytes = unflushedBytes + message.writtenBytes; + if (unflushedBytes >= maxUnflushedBytes) { + LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes); + flushWrites(); + } else if (first) { + LOG.debug("{}: deferring journal flush", persistenceId()); + self().tell(new Flush(++batch), ActorRef.noSender()); + } + } + + @Override + void flushWrites() { + final var unsyncedSize = unflushedWrites.size(); + if (unsyncedSize == 0) { + // Nothing to flush + return; + } + + LOG.debug("{}: flushing {} journal writes after {}", persistenceId(), unsyncedSize, + unflushedDuration.stop()); + flushJournal(unflushedBytes, unsyncedSize); + + final var sw = Stopwatch.createStarted(); + unflushedWrites.forEach(write -> completeWriteMessages(write.message, write.start, write.count)); + unflushedWrites.clear(); + unflushedBytes = 0; + unflushedDuration.reset(); + LOG.debug("{}: completed {} flushed journal writes in {}", persistenceId(), unsyncedSize, sw); + } + } + + private static final class Immediate extends SegmentedJournalActor { + Immediate(final String persistenceId, final File directory, final StorageLevel storage, + final int maxEntrySize, final int maxSegmentSize) { + super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize); + } + + @Override + void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) { + flushJournal(message.writtenBytes, 1); + completeWriteMessages(message, started, count); + } + + @Override + void flushWrites() { + // No-op + } + } + private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class); - private static final Namespace DELETE_NAMESPACE = Namespace.builder().register(Long.class).build(); private static final int DELETE_SEGMENT_SIZE = 64 * 1024; + private static final FromByteBufMapper READ_MAPPER; + private static final ToByteBufMapper WRITE_MAPPER; + + static { + final var namespace = JournalSerdes.builder() + .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class) + .build(); + + READ_MAPPER = namespace.toReadMapper(); + WRITE_MAPPER = namespace.toWriteMapper(); + } private final String persistenceId; private final StorageLevel storage; @@ -151,15 +310,18 @@ final class SegmentedJournalActor extends AbstractActor { private Meter messageWriteCount; // Tracks the size distribution of messages private Histogram messageSize; - - private SegmentedJournal dataJournal; + // Tracks the number of messages completed for each flush + private Histogram flushMessages; + // Tracks the number of bytes completed for each flush + private Histogram flushBytes; + // Tracks the duration of flush operations + private Timer flushTime; + + private DataJournal dataJournal; private SegmentedJournal deleteJournal; private long lastDelete; - // Tracks largest message size we have observed either during recovery or during write - private int largestObservedSize; - - SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage, + private SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage, final int maxEntrySize, final int maxSegmentSize) { this.persistenceId = requireNonNull(persistenceId); this.directory = requireNonNull(directory); @@ -169,20 +331,39 @@ final class SegmentedJournalActor extends AbstractActor { } static Props props(final String persistenceId, final File directory, final StorageLevel storage, - final int maxEntrySize, final int maxSegmentSize) { - return Props.create(SegmentedJournalActor.class, requireNonNull(persistenceId), directory, storage, - maxEntrySize, maxSegmentSize); + final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) { + final var pid = requireNonNull(persistenceId); + return maxUnflushedBytes > 0 + ? Props.create(Delayed.class, pid, directory, storage, maxEntrySize, maxSegmentSize, maxUnflushedBytes) + : Props.create(Immediate.class, pid, directory, storage, maxEntrySize, maxSegmentSize); + } + + final String persistenceId() { + return persistenceId; + } + + final void flushJournal(final long bytes, final int messages) { + final var sw = Stopwatch.createStarted(); + dataJournal.flush(); + LOG.debug("{}: journal flush completed in {}", persistenceId, sw.stop()); + flushBytes.update(bytes); + flushMessages.update(messages); + flushTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); } @Override public Receive createReceive() { - return receiveBuilder() - .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo) - .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr) - .match(ReplayMessages.class, this::handleReplayMessages) - .match(WriteMessages.class, this::handleWriteMessages) - .matchAny(this::handleUnknown) - .build(); + return addMessages(receiveBuilder()) + .matchAny(this::handleUnknown) + .build(); + } + + ReceiveBuilder addMessages(final ReceiveBuilder builder) { + return builder + .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo) + .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr) + .match(ReplayMessages.class, this::handleReplayMessages) + .match(WriteMessages.class, this::handleWriteMessages); } @Override @@ -190,12 +371,15 @@ final class SegmentedJournalActor extends AbstractActor { LOG.debug("{}: actor starting", persistenceId); super.preStart(); - final MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry(); - final String actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName(); + final var registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry(); + final var actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName(); batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime")); messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount")); messageSize = registry.histogram(MetricRegistry.name(actorName, "messageSize")); + flushBytes = registry.histogram(MetricRegistry.name(actorName, "flushBytes")); + flushMessages = registry.histogram(MetricRegistry.name(actorName, "flushMessages")); + flushTime = registry.timer(MetricRegistry.name(actorName, "flushTime")); } @Override @@ -232,20 +416,22 @@ final class SegmentedJournalActor extends AbstractActor { ensureOpen(); LOG.debug("{}: delete messages {}", persistenceId, message); - final long to = Long.min(dataJournal.writer().getLastIndex(), message.toSequenceNr); + flushWrites(); + + final long to = Long.min(dataJournal.lastWrittenSequenceNr(), message.toSequenceNr); LOG.debug("{}: adjusted delete to {}", persistenceId, to); if (lastDelete < to) { LOG.debug("{}: deleting entries up to {}", persistenceId, to); lastDelete = to; - final SegmentedJournalWriter deleteWriter = deleteJournal.writer(); - final Indexed entry = deleteWriter.append(lastDelete); + final var deleteWriter = deleteJournal.writer(); + final var entry = deleteWriter.append(lastDelete); deleteWriter.commit(entry.index()); - dataJournal.writer().commit(lastDelete); + dataJournal.deleteTo(lastDelete); LOG.debug("{}: compaction started", persistenceId); - dataJournal.compact(lastDelete + 1); + dataJournal.compactTo(lastDelete); deleteJournal.compact(entry.index()); LOG.debug("{}: compaction finished", persistenceId); } else { @@ -260,7 +446,8 @@ final class SegmentedJournalActor extends AbstractActor { final Long sequence; if (directory.isDirectory()) { ensureOpen(); - sequence = dataJournal.writer().getLastIndex(); + flushWrites(); + sequence = dataJournal.lastWrittenSequenceNr(); } else { sequence = 0L; } @@ -269,116 +456,71 @@ final class SegmentedJournalActor extends AbstractActor { message.promise.success(sequence); } - @SuppressWarnings("checkstyle:illegalCatch") private void handleReplayMessages(final ReplayMessages message) { LOG.debug("{}: replaying messages {}", persistenceId, message); ensureOpen(); + flushWrites(); final long from = Long.max(lastDelete + 1, message.fromSequenceNr); LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from); - try (SegmentedJournalReader reader = dataJournal.openReader(from)) { - int count = 0; - while (reader.hasNext() && count < message.max) { - final Indexed next = reader.next(); - if (next.index() > message.toSequenceNr) { - break; - } - - LOG.trace("{}: replay {}", persistenceId, next); - updateLargestSize(next.size()); - final DataJournalEntry entry = next.entry(); - verify(entry instanceof FromPersistence, "Unexpected entry %s", entry); - - final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index()); - LOG.debug("{}: replaying {}", persistenceId, repr); - message.replayCallback.accept(repr); - count++; - } - LOG.debug("{}: successfully replayed {} entries", persistenceId, count); - } catch (Exception e) { - LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e); - message.promise.failure(e); - } finally { - message.promise.success(null); - } + dataJournal.handleReplayMessages(message, from); } - @SuppressWarnings("checkstyle:illegalCatch") private void handleWriteMessages(final WriteMessages message) { ensureOpen(); - final SegmentedJournalWriter writer = dataJournal.writer(); - final long startTicks = System.nanoTime(); - final int count = message.requests.size(); - final long start = writer.getLastIndex(); - - for (int i = 0; i < count; ++i) { - final long mark = writer.getLastIndex(); - try { - writeRequest(writer, message.requests.get(i)); - } catch (Exception e) { - LOG.warn("{}: failed to write out request", persistenceId, e); - message.results.get(i).success(Optional.of(e)); - writer.truncate(mark); - continue; - } + final var started = Stopwatch.createStarted(); + final long start = dataJournal.lastWrittenSequenceNr(); + final var writtenMessages = dataJournal.handleWriteMessages(message); - message.results.get(i).success(Optional.empty()); - } - writer.flush(); - batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS); - messageWriteCount.mark(writer.getLastIndex() - start); + onWrittenMessages(writtenMessages, started, dataJournal.lastWrittenSequenceNr() - start); } - private void writeRequest(final SegmentedJournalWriter writer, final AtomicWrite request) { - // Cast is needed for Eclipse because of https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276 - final Iterator it = ((SeqLike) request.payload()).iterator(); - while (it.hasNext()) { - final PersistentRepr repr = it.next(); - final Object payload = repr.payload(); - if (!(payload instanceof Serializable)) { - throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass()); - } - - final int size = writer.append(new ToPersistence(repr)).size(); - messageSize.update(size); - updateLargestSize(size); - } + final void completeWriteMessages(final WrittenMessages message, final Stopwatch started, final long count) { + batchWriteTime.update(started.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + messageWriteCount.mark(count); + // log message after statistics are updated + LOG.debug("{}: write of {} bytes completed in {}", persistenceId, message.writtenBytes, started); + message.complete(); } + /** + * Handle a check of written messages. + * + * @param message Messages which were written + * @param started Stopwatch started when the write started + * @param count number of writes + */ + abstract void onWrittenMessages(WrittenMessages message, Stopwatch started, long count); + private void handleUnknown(final Object message) { LOG.error("{}: Received unknown message {}", persistenceId, message); } - private void updateLargestSize(final int size) { - if (size > largestObservedSize) { - largestObservedSize = size; - } - } - private void ensureOpen() { if (dataJournal != null) { verifyNotNull(deleteJournal); return; } - deleteJournal = SegmentedJournal.builder().withDirectory(directory).withName("delete") - .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build(); - final Indexed lastEntry = deleteJournal.writer().getLastEntry(); - lastDelete = lastEntry == null ? 0 : lastEntry.entry(); - - dataJournal = SegmentedJournal.builder() - .withStorageLevel(storage).withDirectory(directory).withName("data") - .withNamespace(Namespace.builder() - .register(new DataJournalEntrySerializer(context().system()), - FromPersistence.class, ToPersistence.class) - .build()) - .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize) - .build(); - final SegmentedJournalWriter writer = dataJournal.writer(); - writer.commit(lastDelete); - LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, writer.getLastIndex(), - lastDelete); + final var sw = Stopwatch.createStarted(); + deleteJournal = new SegmentedJournal<>(SegmentedByteBufJournal.builder() + .withDirectory(directory) + .withName("delete") + .withMaxSegmentSize(DELETE_SEGMENT_SIZE) + .build(), READ_MAPPER, WRITE_MAPPER); + final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex()) + .tryNext((index, value, length) -> value); + lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered; + + dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory, + maxEntrySize, maxSegmentSize); + dataJournal.deleteTo(lastDelete); + LOG.debug("{}: journal open in {} with last index {}, deleted to {}", persistenceId, sw, + dataJournal.lastWrittenSequenceNr(), lastDelete); } + + abstract void flushWrites(); + }