*/
package org.opendaylight.controller.akka.segjournal;
+import static com.google.common.base.Verify.verify;
import static com.google.common.base.Verify.verifyNotNull;
import static java.util.Objects.requireNonNull;
import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AtomicWrite;
import akka.persistence.PersistentRepr;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.MoreObjects;
-import io.atomix.storage.StorageLevel;
+import com.google.common.base.Stopwatch;
import io.atomix.storage.journal.Indexed;
+import io.atomix.storage.journal.JournalSerdes;
+import io.atomix.storage.journal.SegmentedByteBufJournal;
import io.atomix.storage.journal.SegmentedJournal;
-import io.atomix.storage.journal.SegmentedJournalWriter;
-import io.atomix.utils.serializer.Namespace;
+import io.atomix.storage.journal.StorageLevel;
import java.io.File;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
* <p>
* Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit
* mapping information. The only additional information we need to maintain is the last deleted sequence number.
- *
- * @author Robert Varga
*/
-final class SegmentedJournalActor extends AbstractActor {
- abstract static class AsyncMessage<T> {
+abstract sealed class SegmentedJournalActor extends AbstractActor {
+ abstract static sealed class AsyncMessage<T> {
final Promise<T> promise = Promise.apply();
}
private final List<Promise<Optional<Exception>>> results = new ArrayList<>();
Future<Optional<Exception>> add(final AtomicWrite write) {
- final Promise<Optional<Exception>> promise = Promise.apply();
+ final var promise = Promise.<Optional<Exception>>apply();
requests.add(write);
results.add(promise);
return promise.future();
void setFailure(final int index, final Exception cause) {
results.get(index).success(Optional.of(cause));
-
}
void setSuccess(final int index) {
}
}
+ // responses == null on success, Exception on failure
+ record WrittenMessages(WriteMessages message, List<Object> responses, long writtenBytes) {
+ WrittenMessages {
+ verify(responses.size() == message.size(), "Mismatched %s and %s", message, responses);
+ verify(writtenBytes >= 0, "Unexpected length %s", writtenBytes);
+ }
+
+ private void complete() {
+ for (int i = 0, size = responses.size(); i < size; ++i) {
+ if (responses.get(i) instanceof Exception ex) {
+ message.setFailure(i, ex);
+ } else {
+ message.setSuccess(i);
+ }
+ }
+ }
+ }
+
+ /**
+ * A {@link SegmentedJournalActor} which delays issuing a flush operation until a watermark is reached or when the
+ * queue is empty.
+ *
+ * <p>
+ * The problem we are addressing is that there is a queue sitting in from of the actor, which we have no direct
+ * access to. Since a flush involves committing data to durable storage, that operation can easily end up dominating
+ * workloads.
+ *
+ * <p>
+ * We solve this by having an additional queue in which we track which messages were written and trigger a flush
+ * only when the number of bytes we have written exceeds specified limit. The other part is that each time this
+ * queue becomes non-empty, we send a dedicated message to self. This acts as a actor queue probe -- when we receive
+ * it, we know we have processed all messages that were in the queue when we first delayed the write.
+ *
+ * <p>
+ * The combination of these mechanisms ensure we use a minimal delay while also ensuring we take advantage of
+ * batching opportunities.
+ */
+ private static final class Delayed extends SegmentedJournalActor {
+ private static final class Flush extends AsyncMessage<Void> {
+ final long batch;
+
+ Flush(final long batch) {
+ this.batch = batch;
+ }
+ }
+
+ private record UnflushedWrite(WrittenMessages message, Stopwatch start, long count) {
+ UnflushedWrite {
+ requireNonNull(message);
+ requireNonNull(start);
+ }
+ }
+
+ private final ArrayDeque<UnflushedWrite> unflushedWrites = new ArrayDeque<>();
+ private final Stopwatch unflushedDuration = Stopwatch.createUnstarted();
+ private final long maxUnflushedBytes;
+
+ private long batch = 0;
+ private long unflushedBytes = 0;
+
+ Delayed(final String persistenceId, final File directory, final StorageLevel storage,
+ final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) {
+ super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize);
+ this.maxUnflushedBytes = maxUnflushedBytes;
+ }
+
+ @Override
+ ReceiveBuilder addMessages(final ReceiveBuilder builder) {
+ return super.addMessages(builder).match(Flush.class, this::handleFlush);
+ }
+
+ private void handleFlush(final Flush message) {
+ if (message.batch == batch) {
+ flushWrites();
+ } else {
+ LOG.debug("{}: batch {} not flushed by {}", persistenceId(), batch, message.batch);
+ }
+ }
+
+ @Override
+ void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+ boolean first = unflushedWrites.isEmpty();
+ if (first) {
+ unflushedDuration.start();
+ }
+ unflushedWrites.addLast(new UnflushedWrite(message, started, count));
+ unflushedBytes = unflushedBytes + message.writtenBytes;
+ if (unflushedBytes >= maxUnflushedBytes) {
+ LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes);
+ flushWrites();
+ } else if (first) {
+ LOG.debug("{}: deferring journal flush", persistenceId());
+ self().tell(new Flush(++batch), ActorRef.noSender());
+ }
+ }
+
+ @Override
+ void flushWrites() {
+ final var unsyncedSize = unflushedWrites.size();
+ if (unsyncedSize == 0) {
+ // Nothing to flush
+ return;
+ }
+
+ LOG.debug("{}: flushing {} journal writes after {}", persistenceId(), unsyncedSize,
+ unflushedDuration.stop());
+ flushJournal(unflushedBytes, unsyncedSize);
+
+ final var sw = Stopwatch.createStarted();
+ unflushedWrites.forEach(write -> completeWriteMessages(write.message, write.start, write.count));
+ unflushedWrites.clear();
+ unflushedBytes = 0;
+ unflushedDuration.reset();
+ LOG.debug("{}: completed {} flushed journal writes in {}", persistenceId(), unsyncedSize, sw);
+ }
+ }
+
+ private static final class Immediate extends SegmentedJournalActor {
+ Immediate(final String persistenceId, final File directory, final StorageLevel storage,
+ final int maxEntrySize, final int maxSegmentSize) {
+ super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize);
+ }
+
+ @Override
+ void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+ flushJournal(message.writtenBytes, 1);
+ completeWriteMessages(message, started, count);
+ }
+
+ @Override
+ void flushWrites() {
+ // No-op
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
- private static final Namespace DELETE_NAMESPACE = Namespace.builder().register(Long.class).build();
private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
+ private static final FromByteBufMapper<Long> READ_MAPPER;
+ private static final ToByteBufMapper<Long> WRITE_MAPPER;
+
+ static {
+ final var namespace = JournalSerdes.builder()
+ .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
+ .build();
+
+ READ_MAPPER = namespace.toReadMapper();
+ WRITE_MAPPER = namespace.toWriteMapper();
+ }
private final String persistenceId;
private final StorageLevel storage;
private Meter messageWriteCount;
// Tracks the size distribution of messages
private Histogram messageSize;
+ // Tracks the number of messages completed for each flush
+ private Histogram flushMessages;
+ // Tracks the number of bytes completed for each flush
+ private Histogram flushBytes;
+ // Tracks the duration of flush operations
+ private Timer flushTime;
private DataJournal dataJournal;
private SegmentedJournal<Long> deleteJournal;
private long lastDelete;
- SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
+ private SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
final int maxEntrySize, final int maxSegmentSize) {
this.persistenceId = requireNonNull(persistenceId);
this.directory = requireNonNull(directory);
}
static Props props(final String persistenceId, final File directory, final StorageLevel storage,
- final int maxEntrySize, final int maxSegmentSize) {
- return Props.create(SegmentedJournalActor.class, requireNonNull(persistenceId), directory, storage,
- maxEntrySize, maxSegmentSize);
+ final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) {
+ final var pid = requireNonNull(persistenceId);
+ return maxUnflushedBytes > 0
+ ? Props.create(Delayed.class, pid, directory, storage, maxEntrySize, maxSegmentSize, maxUnflushedBytes)
+ : Props.create(Immediate.class, pid, directory, storage, maxEntrySize, maxSegmentSize);
+ }
+
+ final String persistenceId() {
+ return persistenceId;
+ }
+
+ final void flushJournal(final long bytes, final int messages) {
+ final var sw = Stopwatch.createStarted();
+ dataJournal.flush();
+ LOG.debug("{}: journal flush completed in {}", persistenceId, sw.stop());
+ flushBytes.update(bytes);
+ flushMessages.update(messages);
+ flushTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
}
@Override
public Receive createReceive() {
- return receiveBuilder()
- .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo)
- .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr)
- .match(ReplayMessages.class, this::handleReplayMessages)
- .match(WriteMessages.class, this::handleWriteMessages)
- .matchAny(this::handleUnknown)
- .build();
+ return addMessages(receiveBuilder())
+ .matchAny(this::handleUnknown)
+ .build();
+ }
+
+ ReceiveBuilder addMessages(final ReceiveBuilder builder) {
+ return builder
+ .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo)
+ .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr)
+ .match(ReplayMessages.class, this::handleReplayMessages)
+ .match(WriteMessages.class, this::handleWriteMessages);
}
@Override
LOG.debug("{}: actor starting", persistenceId);
super.preStart();
- final MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
- final String actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
+ final var registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
+ final var actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime"));
messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount"));
messageSize = registry.histogram(MetricRegistry.name(actorName, "messageSize"));
+ flushBytes = registry.histogram(MetricRegistry.name(actorName, "flushBytes"));
+ flushMessages = registry.histogram(MetricRegistry.name(actorName, "flushMessages"));
+ flushTime = registry.timer(MetricRegistry.name(actorName, "flushTime"));
}
@Override
ensureOpen();
LOG.debug("{}: delete messages {}", persistenceId, message);
+ flushWrites();
+
final long to = Long.min(dataJournal.lastWrittenSequenceNr(), message.toSequenceNr);
LOG.debug("{}: adjusted delete to {}", persistenceId, to);
LOG.debug("{}: deleting entries up to {}", persistenceId, to);
lastDelete = to;
- final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
- final Indexed<Long> entry = deleteWriter.append(lastDelete);
+ final var deleteWriter = deleteJournal.writer();
+ final var entry = deleteWriter.append(lastDelete);
deleteWriter.commit(entry.index());
dataJournal.deleteTo(lastDelete);
final Long sequence;
if (directory.isDirectory()) {
ensureOpen();
+ flushWrites();
sequence = dataJournal.lastWrittenSequenceNr();
} else {
sequence = 0L;
private void handleReplayMessages(final ReplayMessages message) {
LOG.debug("{}: replaying messages {}", persistenceId, message);
ensureOpen();
+ flushWrites();
final long from = Long.max(lastDelete + 1, message.fromSequenceNr);
LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from);
private void handleWriteMessages(final WriteMessages message) {
ensureOpen();
- final long startTicks = System.nanoTime();
+ final var started = Stopwatch.createStarted();
final long start = dataJournal.lastWrittenSequenceNr();
+ final var writtenMessages = dataJournal.handleWriteMessages(message);
- dataJournal.handleWriteMessages(message);
+ onWrittenMessages(writtenMessages, started, dataJournal.lastWrittenSequenceNr() - start);
+ }
- batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
- messageWriteCount.mark(dataJournal.lastWrittenSequenceNr() - start);
+ final void completeWriteMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+ batchWriteTime.update(started.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+ messageWriteCount.mark(count);
+ // log message after statistics are updated
+ LOG.debug("{}: write of {} bytes completed in {}", persistenceId, message.writtenBytes, started);
+ message.complete();
}
+ /**
+ * Handle a check of written messages.
+ *
+ * @param message Messages which were written
+ * @param started Stopwatch started when the write started
+ * @param count number of writes
+ */
+ abstract void onWrittenMessages(WrittenMessages message, Stopwatch started, long count);
+
private void handleUnknown(final Object message) {
LOG.error("{}: Received unknown message {}", persistenceId, message);
}
return;
}
- deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
- .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
- final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
- lastDelete = lastEntry == null ? 0 : lastEntry.entry();
+ final var sw = Stopwatch.createStarted();
+ deleteJournal = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
+ .withDirectory(directory)
+ .withName("delete")
+ .withMaxSegmentSize(DELETE_SEGMENT_SIZE)
+ .build(), READ_MAPPER, WRITE_MAPPER);
+ final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
+ .tryNext((index, value, length) -> value);
+ lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered;
dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
maxEntrySize, maxSegmentSize);
dataJournal.deleteTo(lastDelete);
- LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId,
+ LOG.debug("{}: journal open in {} with last index {}, deleted to {}", persistenceId, sw,
dataJournal.lastWrittenSequenceNr(), lastDelete);
}
+
+ abstract void flushWrites();
+
}