import static java.util.Objects.requireNonNull;
import akka.actor.AbstractActor;
+import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AtomicWrite;
import akka.persistence.PersistentRepr;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
-import com.codahale.metrics.SlidingTimeWindowReservoir;
+import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.MoreObjects;
-import io.atomix.storage.StorageLevel;
+import com.google.common.base.Stopwatch;
import io.atomix.storage.journal.Indexed;
+import io.atomix.storage.journal.JournalSerdes;
+import io.atomix.storage.journal.SegmentedByteBufJournal;
import io.atomix.storage.journal.SegmentedJournal;
-import io.atomix.storage.journal.SegmentedJournalReader;
-import io.atomix.storage.journal.SegmentedJournalWriter;
-import io.atomix.utils.serializer.Namespace;
+import io.atomix.storage.journal.StorageLevel;
import java.io.File;
-import java.io.Serializable;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
-import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
+import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
+import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.collection.Iterator;
-import scala.collection.SeqLike;
import scala.concurrent.Future;
import scala.concurrent.Promise;
* <p>
* Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit
* mapping information. The only additional information we need to maintain is the last deleted sequence number.
- *
- * @author Robert Varga
*/
-final class SegmentedJournalActor extends AbstractActor {
- abstract static class AsyncMessage<T> {
+abstract sealed class SegmentedJournalActor extends AbstractActor {
+ abstract static sealed class AsyncMessage<T> {
final Promise<T> promise = Promise.apply();
}
}
}
- private static final class ReplayMessages extends AsyncMessage<Void> {
+ static final class ReplayMessages extends AsyncMessage<Void> {
private final long fromSequenceNr;
- private final long toSequenceNr;
- private final long max;
- private final Consumer<PersistentRepr> replayCallback;
+ final long toSequenceNr;
+ final long max;
+ final Consumer<PersistentRepr> replayCallback;
ReplayMessages(final long fromSequenceNr,
final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
private final List<Promise<Optional<Exception>>> results = new ArrayList<>();
Future<Optional<Exception>> add(final AtomicWrite write) {
- final Promise<Optional<Exception>> promise = Promise.apply();
+ final var promise = Promise.<Optional<Exception>>apply();
requests.add(write);
results.add(promise);
return promise.future();
}
+ int size() {
+ return requests.size();
+ }
+
+ AtomicWrite getRequest(final int index) {
+ return requests.get(index);
+ }
+
+ void setFailure(final int index, final Exception cause) {
+ results.get(index).success(Optional.of(cause));
+ }
+
+ void setSuccess(final int index) {
+ results.get(index).success(Optional.empty());
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("requests", requests).toString();
}
}
+ // responses == null on success, Exception on failure
+ record WrittenMessages(WriteMessages message, List<Object> responses, long writtenBytes) {
+ WrittenMessages {
+ verify(responses.size() == message.size(), "Mismatched %s and %s", message, responses);
+ verify(writtenBytes >= 0, "Unexpected length %s", writtenBytes);
+ }
+
+ private void complete() {
+ for (int i = 0, size = responses.size(); i < size; ++i) {
+ if (responses.get(i) instanceof Exception ex) {
+ message.setFailure(i, ex);
+ } else {
+ message.setSuccess(i);
+ }
+ }
+ }
+ }
+
+ /**
+ * A {@link SegmentedJournalActor} which delays issuing a flush operation until a watermark is reached or when the
+ * queue is empty.
+ *
+ * <p>
+ * The problem we are addressing is that there is a queue sitting in from of the actor, which we have no direct
+ * access to. Since a flush involves committing data to durable storage, that operation can easily end up dominating
+ * workloads.
+ *
+ * <p>
+ * We solve this by having an additional queue in which we track which messages were written and trigger a flush
+ * only when the number of bytes we have written exceeds specified limit. The other part is that each time this
+ * queue becomes non-empty, we send a dedicated message to self. This acts as a actor queue probe -- when we receive
+ * it, we know we have processed all messages that were in the queue when we first delayed the write.
+ *
+ * <p>
+ * The combination of these mechanisms ensure we use a minimal delay while also ensuring we take advantage of
+ * batching opportunities.
+ */
+ private static final class Delayed extends SegmentedJournalActor {
+ private static final class Flush extends AsyncMessage<Void> {
+ final long batch;
+
+ Flush(final long batch) {
+ this.batch = batch;
+ }
+ }
+
+ private record UnflushedWrite(WrittenMessages message, Stopwatch start, long count) {
+ UnflushedWrite {
+ requireNonNull(message);
+ requireNonNull(start);
+ }
+ }
+
+ private final ArrayDeque<UnflushedWrite> unflushedWrites = new ArrayDeque<>();
+ private final Stopwatch unflushedDuration = Stopwatch.createUnstarted();
+ private final long maxUnflushedBytes;
+
+ private long batch = 0;
+ private long unflushedBytes = 0;
+
+ Delayed(final String persistenceId, final File directory, final StorageLevel storage,
+ final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) {
+ super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize);
+ this.maxUnflushedBytes = maxUnflushedBytes;
+ }
+
+ @Override
+ ReceiveBuilder addMessages(final ReceiveBuilder builder) {
+ return super.addMessages(builder).match(Flush.class, this::handleFlush);
+ }
+
+ private void handleFlush(final Flush message) {
+ if (message.batch == batch) {
+ flushWrites();
+ } else {
+ LOG.debug("{}: batch {} not flushed by {}", persistenceId(), batch, message.batch);
+ }
+ }
+
+ @Override
+ void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+ boolean first = unflushedWrites.isEmpty();
+ if (first) {
+ unflushedDuration.start();
+ }
+ unflushedWrites.addLast(new UnflushedWrite(message, started, count));
+ unflushedBytes = unflushedBytes + message.writtenBytes;
+ if (unflushedBytes >= maxUnflushedBytes) {
+ LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes);
+ flushWrites();
+ } else if (first) {
+ LOG.debug("{}: deferring journal flush", persistenceId());
+ self().tell(new Flush(++batch), ActorRef.noSender());
+ }
+ }
+
+ @Override
+ void flushWrites() {
+ final var unsyncedSize = unflushedWrites.size();
+ if (unsyncedSize == 0) {
+ // Nothing to flush
+ return;
+ }
+
+ LOG.debug("{}: flushing {} journal writes after {}", persistenceId(), unsyncedSize,
+ unflushedDuration.stop());
+ flushJournal(unflushedBytes, unsyncedSize);
+
+ final var sw = Stopwatch.createStarted();
+ unflushedWrites.forEach(write -> completeWriteMessages(write.message, write.start, write.count));
+ unflushedWrites.clear();
+ unflushedBytes = 0;
+ unflushedDuration.reset();
+ LOG.debug("{}: completed {} flushed journal writes in {}", persistenceId(), unsyncedSize, sw);
+ }
+ }
+
+ private static final class Immediate extends SegmentedJournalActor {
+ Immediate(final String persistenceId, final File directory, final StorageLevel storage,
+ final int maxEntrySize, final int maxSegmentSize) {
+ super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize);
+ }
+
+ @Override
+ void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+ flushJournal(message.writtenBytes, 1);
+ completeWriteMessages(message, started, count);
+ }
+
+ @Override
+ void flushWrites() {
+ // No-op
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
- private static final Namespace DELETE_NAMESPACE = Namespace.builder().register(Long.class).build();
private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
+ private static final FromByteBufMapper<Long> READ_MAPPER;
+ private static final ToByteBufMapper<Long> WRITE_MAPPER;
- // Tracks the time it took us to write a batch of messages
- private final Timer batchWriteTime = new Timer();
- // Tracks the number of individual messages written
- private final Meter messageWriteCount = new Meter();
- // Tracks the size distribution of messages for last 5 minutes
- private final Histogram messageSize = new Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.MINUTES));
+ static {
+ final var namespace = JournalSerdes.builder()
+ .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
+ .build();
+
+ READ_MAPPER = namespace.toReadMapper();
+ WRITE_MAPPER = namespace.toWriteMapper();
+ }
private final String persistenceId;
private final StorageLevel storage;
private final int maxEntrySize;
private final File directory;
- private SegmentedJournal<DataJournalEntry> dataJournal;
+ // Tracks the time it took us to write a batch of messages
+ private Timer batchWriteTime;
+ // Tracks the number of individual messages written
+ private Meter messageWriteCount;
+ // Tracks the size distribution of messages
+ private Histogram messageSize;
+ // Tracks the number of messages completed for each flush
+ private Histogram flushMessages;
+ // Tracks the number of bytes completed for each flush
+ private Histogram flushBytes;
+ // Tracks the duration of flush operations
+ private Timer flushTime;
+
+ private DataJournal dataJournal;
private SegmentedJournal<Long> deleteJournal;
private long lastDelete;
- // Tracks largest message size we have observed either during recovery or during write
- private int largestObservedSize;
-
- SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
+ private SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
final int maxEntrySize, final int maxSegmentSize) {
this.persistenceId = requireNonNull(persistenceId);
this.directory = requireNonNull(directory);
}
static Props props(final String persistenceId, final File directory, final StorageLevel storage,
- final int maxEntrySize, final int maxSegmentSize) {
- return Props.create(SegmentedJournalActor.class, requireNonNull(persistenceId), directory, storage,
- maxEntrySize, maxSegmentSize);
+ final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) {
+ final var pid = requireNonNull(persistenceId);
+ return maxUnflushedBytes > 0
+ ? Props.create(Delayed.class, pid, directory, storage, maxEntrySize, maxSegmentSize, maxUnflushedBytes)
+ : Props.create(Immediate.class, pid, directory, storage, maxEntrySize, maxSegmentSize);
+ }
+
+ final String persistenceId() {
+ return persistenceId;
+ }
+
+ final void flushJournal(final long bytes, final int messages) {
+ final var sw = Stopwatch.createStarted();
+ dataJournal.flush();
+ LOG.debug("{}: journal flush completed in {}", persistenceId, sw.stop());
+ flushBytes.update(bytes);
+ flushMessages.update(messages);
+ flushTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
}
@Override
public Receive createReceive() {
- return receiveBuilder()
- .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo)
- .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr)
- .match(ReplayMessages.class, this::handleReplayMessages)
- .match(WriteMessages.class, this::handleWriteMessages)
- .matchAny(this::handleUnknown)
- .build();
+ return addMessages(receiveBuilder())
+ .matchAny(this::handleUnknown)
+ .build();
+ }
+
+ ReceiveBuilder addMessages(final ReceiveBuilder builder) {
+ return builder
+ .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo)
+ .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr)
+ .match(ReplayMessages.class, this::handleReplayMessages)
+ .match(WriteMessages.class, this::handleWriteMessages);
}
@Override
public void preStart() throws Exception {
LOG.debug("{}: actor starting", persistenceId);
super.preStart();
+
+ final var registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
+ final var actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
+
+ batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime"));
+ messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount"));
+ messageSize = registry.histogram(MetricRegistry.name(actorName, "messageSize"));
+ flushBytes = registry.histogram(MetricRegistry.name(actorName, "flushBytes"));
+ flushMessages = registry.histogram(MetricRegistry.name(actorName, "flushMessages"));
+ flushTime = registry.timer(MetricRegistry.name(actorName, "flushTime"));
}
@Override
ensureOpen();
LOG.debug("{}: delete messages {}", persistenceId, message);
- final long to = Long.min(dataJournal.writer().getLastIndex(), message.toSequenceNr);
+ flushWrites();
+
+ final long to = Long.min(dataJournal.lastWrittenSequenceNr(), message.toSequenceNr);
LOG.debug("{}: adjusted delete to {}", persistenceId, to);
if (lastDelete < to) {
LOG.debug("{}: deleting entries up to {}", persistenceId, to);
lastDelete = to;
- final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
- final Indexed<Long> entry = deleteWriter.append(lastDelete);
+ final var deleteWriter = deleteJournal.writer();
+ final var entry = deleteWriter.append(lastDelete);
deleteWriter.commit(entry.index());
- dataJournal.writer().commit(lastDelete);
+ dataJournal.deleteTo(lastDelete);
LOG.debug("{}: compaction started", persistenceId);
- dataJournal.compact(lastDelete);
+ dataJournal.compactTo(lastDelete);
deleteJournal.compact(entry.index());
LOG.debug("{}: compaction finished", persistenceId);
} else {
message.promise.success(null);
}
- @SuppressWarnings("checkstyle:illegalCatch")
private void handleReadHighestSequenceNr(final ReadHighestSequenceNr message) {
LOG.debug("{}: looking for highest sequence on {}", persistenceId, message);
final Long sequence;
if (directory.isDirectory()) {
ensureOpen();
- sequence = dataJournal.writer().getLastIndex();
+ flushWrites();
+ sequence = dataJournal.lastWrittenSequenceNr();
} else {
sequence = 0L;
}
message.promise.success(sequence);
}
- @SuppressWarnings("checkstyle:illegalCatch")
private void handleReplayMessages(final ReplayMessages message) {
LOG.debug("{}: replaying messages {}", persistenceId, message);
ensureOpen();
+ flushWrites();
final long from = Long.max(lastDelete + 1, message.fromSequenceNr);
LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from);
- try (SegmentedJournalReader<DataJournalEntry> reader = dataJournal.openReader(from)) {
- int count = 0;
- while (reader.hasNext() && count < message.max) {
- final Indexed<DataJournalEntry> next = reader.next();
- if (next.index() > message.toSequenceNr) {
- break;
- }
-
- LOG.trace("{}: replay {}", persistenceId, next);
- updateLargestSize(next.size());
- final DataJournalEntry entry = next.entry();
- verify(entry instanceof FromPersistence, "Unexpected entry %s", entry);
-
- final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index());
- LOG.debug("{}: replaying {}", persistenceId, repr);
- message.replayCallback.accept(repr);
- count++;
- }
- LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
- } catch (Exception e) {
- LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e);
- message.promise.failure(e);
- } finally {
- message.promise.success(null);
- }
+ dataJournal.handleReplayMessages(message, from);
}
- @SuppressWarnings("checkstyle:illegalCatch")
private void handleWriteMessages(final WriteMessages message) {
ensureOpen();
- final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
- final long startTicks = System.nanoTime();
- final int count = message.requests.size();
- final long start = writer.getLastIndex();
-
- for (int i = 0; i < count; ++i) {
- final long mark = writer.getLastIndex();
- try {
- writeRequest(writer, message.requests.get(i));
- } catch (Exception e) {
- LOG.warn("{}: failed to write out request", persistenceId, e);
- message.results.get(i).success(Optional.of(e));
- writer.truncate(mark);
- continue;
- }
+ final var started = Stopwatch.createStarted();
+ final long start = dataJournal.lastWrittenSequenceNr();
+ final var writtenMessages = dataJournal.handleWriteMessages(message);
- message.results.get(i).success(Optional.empty());
- }
- writer.flush();
- batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
- messageWriteCount.mark(writer.getLastIndex() - start);
+ onWrittenMessages(writtenMessages, started, dataJournal.lastWrittenSequenceNr() - start);
}
- private void writeRequest(final SegmentedJournalWriter<DataJournalEntry> writer, final AtomicWrite request) {
- // Cast is needed for Eclipse because of https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276
- final Iterator<PersistentRepr> it = ((SeqLike<PersistentRepr, ?>) request.payload()).iterator();
- while (it.hasNext()) {
- final PersistentRepr repr = it.next();
- final Object payload = repr.payload();
- if (!(payload instanceof Serializable)) {
- throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass());
- }
-
- final int size = writer.append(new ToPersistence(repr)).size();
- messageSize.update(size);
- updateLargestSize(size);
- }
+ final void completeWriteMessages(final WrittenMessages message, final Stopwatch started, final long count) {
+ batchWriteTime.update(started.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+ messageWriteCount.mark(count);
+ // log message after statistics are updated
+ LOG.debug("{}: write of {} bytes completed in {}", persistenceId, message.writtenBytes, started);
+ message.complete();
}
+ /**
+ * Handle a check of written messages.
+ *
+ * @param message Messages which were written
+ * @param started Stopwatch started when the write started
+ * @param count number of writes
+ */
+ abstract void onWrittenMessages(WrittenMessages message, Stopwatch started, long count);
+
private void handleUnknown(final Object message) {
LOG.error("{}: Received unknown message {}", persistenceId, message);
}
- private void updateLargestSize(final int size) {
- if (size > largestObservedSize) {
- largestObservedSize = size;
- }
- }
-
private void ensureOpen() {
if (dataJournal != null) {
verifyNotNull(deleteJournal);
return;
}
- deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
- .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
- final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
- lastDelete = lastEntry == null ? 0 : lastEntry.index();
-
- dataJournal = SegmentedJournal.<DataJournalEntry>builder()
- .withStorageLevel(storage).withDirectory(directory).withName("data")
- .withNamespace(Namespace.builder()
- .register(new DataJournalEntrySerializer(context().system()),
- FromPersistence.class, ToPersistence.class)
- .build())
- .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize)
- .build();
- final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
- writer.commit(lastDelete);
- LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, writer.getLastIndex(),
- lastDelete);
+ final var sw = Stopwatch.createStarted();
+ deleteJournal = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
+ .withDirectory(directory)
+ .withName("delete")
+ .withMaxSegmentSize(DELETE_SEGMENT_SIZE)
+ .build(), READ_MAPPER, WRITE_MAPPER);
+ final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
+ .tryNext((index, value, length) -> value);
+ lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered;
+
+ dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
+ maxEntrySize, maxSegmentSize);
+ dataJournal.deleteTo(lastDelete);
+ LOG.debug("{}: journal open in {} with last index {}, deleted to {}", persistenceId, sw,
+ dataJournal.lastWrittenSequenceNr(), lastDelete);
}
+
+ abstract void flushWrites();
+
}