2 * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.akka.segjournal;
10 import static com.google.common.base.Verify.verify;
11 import static com.google.common.base.Verify.verifyNotNull;
12 import static java.util.Objects.requireNonNull;
14 import akka.actor.AbstractActor;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.japi.pf.ReceiveBuilder;
18 import akka.persistence.AtomicWrite;
19 import akka.persistence.PersistentRepr;
20 import com.codahale.metrics.Histogram;
21 import com.codahale.metrics.Meter;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.MoreObjects;
25 import com.google.common.base.Stopwatch;
26 import io.atomix.storage.journal.Indexed;
27 import io.atomix.storage.journal.JournalSerdes;
28 import io.atomix.storage.journal.SegmentedByteBufJournal;
29 import io.atomix.storage.journal.SegmentedJournal;
30 import io.atomix.storage.journal.StorageLevel;
32 import java.util.ArrayDeque;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Optional;
36 import java.util.concurrent.TimeUnit;
37 import java.util.function.Consumer;
38 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
39 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.concurrent.Future;
43 import scala.concurrent.Promise;
46 * This actor handles a single PersistentActor's journal. The journal is split into two {@link SegmentedJournal}s:
48 * <li>A memory-mapped data journal, containing actual data entries</li>
49 * <li>A simple file journal, containing sequence numbers of last deleted entry</li>
53 * This is a conscious design decision to minimize the amount of data that is being stored in the data journal while
54 * speeding up normal operations. Since the SegmentedJournal is an append-only linear log and Akka requires the ability
55 * to delete persistence entries, we need ability to mark a subset of a SegmentedJournal as deleted. While we could
56 * treat such delete requests as normal events, this leads to a mismatch between SegmentedJournal indices (as exposed by
57 * {@link Indexed}) and Akka sequence numbers -- requiring us to potentially perform costly deserialization to find the
58 * index corresponding to a particular sequence number, or maintain moderately-complex logic and data structures to
59 * perform that mapping in sub-linear time complexity.
62 * Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit
63 * mapping information. The only additional information we need to maintain is the last deleted sequence number.
65 abstract sealed class SegmentedJournalActor extends AbstractActor {
66 abstract static sealed class AsyncMessage<T> {
67 final Promise<T> promise = Promise.apply();
70 private static final class ReadHighestSequenceNr extends AsyncMessage<Long> {
71 private final long fromSequenceNr;
73 ReadHighestSequenceNr(final long fromSequenceNr) {
74 this.fromSequenceNr = fromSequenceNr;
78 public String toString() {
79 return MoreObjects.toStringHelper(this).add("fromSequenceNr", fromSequenceNr).toString();
83 static final class ReplayMessages extends AsyncMessage<Void> {
84 private final long fromSequenceNr;
85 final long toSequenceNr;
87 final Consumer<PersistentRepr> replayCallback;
89 ReplayMessages(final long fromSequenceNr,
90 final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
91 this.fromSequenceNr = fromSequenceNr;
92 this.toSequenceNr = toSequenceNr;
94 this.replayCallback = requireNonNull(replayCallback);
98 public String toString() {
99 return MoreObjects.toStringHelper(this).add("fromSequenceNr", fromSequenceNr)
100 .add("toSequenceNr", toSequenceNr).add("max", max).toString();
104 static final class WriteMessages {
105 private final List<AtomicWrite> requests = new ArrayList<>();
106 private final List<Promise<Optional<Exception>>> results = new ArrayList<>();
108 Future<Optional<Exception>> add(final AtomicWrite write) {
109 final var promise = Promise.<Optional<Exception>>apply();
111 results.add(promise);
112 return promise.future();
116 return requests.size();
119 AtomicWrite getRequest(final int index) {
120 return requests.get(index);
123 void setFailure(final int index, final Exception cause) {
124 results.get(index).success(Optional.of(cause));
127 void setSuccess(final int index) {
128 results.get(index).success(Optional.empty());
132 public String toString() {
133 return MoreObjects.toStringHelper(this).add("requests", requests).toString();
137 private static final class DeleteMessagesTo extends AsyncMessage<Void> {
138 final long toSequenceNr;
140 DeleteMessagesTo(final long toSequenceNr) {
141 this.toSequenceNr = toSequenceNr;
145 public String toString() {
146 return MoreObjects.toStringHelper(this).add("toSequenceNr", toSequenceNr).toString();
150 // responses == null on success, Exception on failure
151 record WrittenMessages(WriteMessages message, List<Object> responses, long writtenBytes) {
153 verify(responses.size() == message.size(), "Mismatched %s and %s", message, responses);
154 verify(writtenBytes >= 0, "Unexpected length %s", writtenBytes);
157 private void complete() {
158 for (int i = 0, size = responses.size(); i < size; ++i) {
159 if (responses.get(i) instanceof Exception ex) {
160 message.setFailure(i, ex);
162 message.setSuccess(i);
169 * A {@link SegmentedJournalActor} which delays issuing a flush operation until a watermark is reached or when the
173 * The problem we are addressing is that there is a queue sitting in from of the actor, which we have no direct
174 * access to. Since a flush involves committing data to durable storage, that operation can easily end up dominating
178 * We solve this by having an additional queue in which we track which messages were written and trigger a flush
179 * only when the number of bytes we have written exceeds specified limit. The other part is that each time this
180 * queue becomes non-empty, we send a dedicated message to self. This acts as a actor queue probe -- when we receive
181 * it, we know we have processed all messages that were in the queue when we first delayed the write.
184 * The combination of these mechanisms ensure we use a minimal delay while also ensuring we take advantage of
185 * batching opportunities.
187 private static final class Delayed extends SegmentedJournalActor {
188 private static final class Flush extends AsyncMessage<Void> {
191 Flush(final long batch) {
196 private record UnflushedWrite(WrittenMessages message, Stopwatch start, long count) {
198 requireNonNull(message);
199 requireNonNull(start);
203 private final ArrayDeque<UnflushedWrite> unflushedWrites = new ArrayDeque<>();
204 private final Stopwatch unflushedDuration = Stopwatch.createUnstarted();
205 private final long maxUnflushedBytes;
207 private long batch = 0;
208 private long unflushedBytes = 0;
210 Delayed(final String persistenceId, final File directory, final StorageLevel storage,
211 final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) {
212 super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize);
213 this.maxUnflushedBytes = maxUnflushedBytes;
217 ReceiveBuilder addMessages(final ReceiveBuilder builder) {
218 return super.addMessages(builder).match(Flush.class, this::handleFlush);
221 private void handleFlush(final Flush message) {
222 if (message.batch == batch) {
225 LOG.debug("{}: batch {} not flushed by {}", persistenceId(), batch, message.batch);
230 void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
231 boolean first = unflushedWrites.isEmpty();
233 unflushedDuration.start();
235 unflushedWrites.addLast(new UnflushedWrite(message, started, count));
236 unflushedBytes = unflushedBytes + message.writtenBytes;
237 if (unflushedBytes >= maxUnflushedBytes) {
238 LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), unflushedBytes);
241 LOG.debug("{}: deferring journal flush", persistenceId());
242 self().tell(new Flush(++batch), ActorRef.noSender());
248 final var unsyncedSize = unflushedWrites.size();
249 if (unsyncedSize == 0) {
254 LOG.debug("{}: flushing {} journal writes after {}", persistenceId(), unsyncedSize,
255 unflushedDuration.stop());
256 flushJournal(unflushedBytes, unsyncedSize);
258 final var sw = Stopwatch.createStarted();
259 unflushedWrites.forEach(write -> completeWriteMessages(write.message, write.start, write.count));
260 unflushedWrites.clear();
262 unflushedDuration.reset();
263 LOG.debug("{}: completed {} flushed journal writes in {}", persistenceId(), unsyncedSize, sw);
267 private static final class Immediate extends SegmentedJournalActor {
268 Immediate(final String persistenceId, final File directory, final StorageLevel storage,
269 final int maxEntrySize, final int maxSegmentSize) {
270 super(persistenceId, directory, storage, maxEntrySize, maxSegmentSize);
274 void onWrittenMessages(final WrittenMessages message, final Stopwatch started, final long count) {
275 flushJournal(message.writtenBytes, 1);
276 completeWriteMessages(message, started, count);
285 private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
286 private static final JournalSerdes DELETE_NAMESPACE = JournalSerdes.builder()
287 .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
289 private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
291 private final String persistenceId;
292 private final StorageLevel storage;
293 private final int maxSegmentSize;
294 private final int maxEntrySize;
295 private final File directory;
297 // Tracks the time it took us to write a batch of messages
298 private Timer batchWriteTime;
299 // Tracks the number of individual messages written
300 private Meter messageWriteCount;
301 // Tracks the size distribution of messages
302 private Histogram messageSize;
303 // Tracks the number of messages completed for each flush
304 private Histogram flushMessages;
305 // Tracks the number of bytes completed for each flush
306 private Histogram flushBytes;
307 // Tracks the duration of flush operations
308 private Timer flushTime;
310 private DataJournal dataJournal;
311 private SegmentedJournal<Long> deleteJournal;
312 private long lastDelete;
314 private SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
315 final int maxEntrySize, final int maxSegmentSize) {
316 this.persistenceId = requireNonNull(persistenceId);
317 this.directory = requireNonNull(directory);
318 this.storage = requireNonNull(storage);
319 this.maxEntrySize = maxEntrySize;
320 this.maxSegmentSize = maxSegmentSize;
323 static Props props(final String persistenceId, final File directory, final StorageLevel storage,
324 final int maxEntrySize, final int maxSegmentSize, final int maxUnflushedBytes) {
325 final var pid = requireNonNull(persistenceId);
326 return maxUnflushedBytes > 0
327 ? Props.create(Delayed.class, pid, directory, storage, maxEntrySize, maxSegmentSize, maxUnflushedBytes)
328 : Props.create(Immediate.class, pid, directory, storage, maxEntrySize, maxSegmentSize);
331 final String persistenceId() {
332 return persistenceId;
335 final void flushJournal(final long bytes, final int messages) {
336 final var sw = Stopwatch.createStarted();
338 LOG.debug("{}: journal flush completed in {}", persistenceId, sw.stop());
339 flushBytes.update(bytes);
340 flushMessages.update(messages);
341 flushTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
345 public Receive createReceive() {
346 return addMessages(receiveBuilder())
347 .matchAny(this::handleUnknown)
351 ReceiveBuilder addMessages(final ReceiveBuilder builder) {
353 .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo)
354 .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr)
355 .match(ReplayMessages.class, this::handleReplayMessages)
356 .match(WriteMessages.class, this::handleWriteMessages);
360 public void preStart() throws Exception {
361 LOG.debug("{}: actor starting", persistenceId);
364 final var registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
365 final var actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
367 batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime"));
368 messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount"));
369 messageSize = registry.histogram(MetricRegistry.name(actorName, "messageSize"));
370 flushBytes = registry.histogram(MetricRegistry.name(actorName, "flushBytes"));
371 flushMessages = registry.histogram(MetricRegistry.name(actorName, "flushMessages"));
372 flushTime = registry.timer(MetricRegistry.name(actorName, "flushTime"));
376 public void postStop() throws Exception {
377 LOG.debug("{}: actor stopping", persistenceId);
378 if (dataJournal != null) {
380 LOG.debug("{}: data journal closed", persistenceId);
383 if (deleteJournal != null) {
384 deleteJournal.close();
385 LOG.debug("{}: delete journal closed", persistenceId);
386 deleteJournal = null;
388 LOG.debug("{}: actor stopped", persistenceId);
392 static AsyncMessage<Void> deleteMessagesTo(final long toSequenceNr) {
393 return new DeleteMessagesTo(toSequenceNr);
396 static AsyncMessage<Long> readHighestSequenceNr(final long fromSequenceNr) {
397 return new ReadHighestSequenceNr(fromSequenceNr);
400 static AsyncMessage<Void> replayMessages(final long fromSequenceNr, final long toSequenceNr, final long max,
401 final Consumer<PersistentRepr> replayCallback) {
402 return new ReplayMessages(fromSequenceNr, toSequenceNr, max, replayCallback);
405 private void handleDeleteMessagesTo(final DeleteMessagesTo message) {
408 LOG.debug("{}: delete messages {}", persistenceId, message);
411 final long to = Long.min(dataJournal.lastWrittenSequenceNr(), message.toSequenceNr);
412 LOG.debug("{}: adjusted delete to {}", persistenceId, to);
414 if (lastDelete < to) {
415 LOG.debug("{}: deleting entries up to {}", persistenceId, to);
418 final var deleteWriter = deleteJournal.writer();
419 final var entry = deleteWriter.append(lastDelete);
420 deleteWriter.commit(entry.index());
421 dataJournal.deleteTo(lastDelete);
423 LOG.debug("{}: compaction started", persistenceId);
424 dataJournal.compactTo(lastDelete);
425 deleteJournal.compact(entry.index());
426 LOG.debug("{}: compaction finished", persistenceId);
428 LOG.debug("{}: entries up to {} already deleted", persistenceId, lastDelete);
431 message.promise.success(null);
434 private void handleReadHighestSequenceNr(final ReadHighestSequenceNr message) {
435 LOG.debug("{}: looking for highest sequence on {}", persistenceId, message);
437 if (directory.isDirectory()) {
440 sequence = dataJournal.lastWrittenSequenceNr();
445 LOG.debug("{}: highest sequence is {}", message, sequence);
446 message.promise.success(sequence);
449 private void handleReplayMessages(final ReplayMessages message) {
450 LOG.debug("{}: replaying messages {}", persistenceId, message);
454 final long from = Long.max(lastDelete + 1, message.fromSequenceNr);
455 LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from);
457 dataJournal.handleReplayMessages(message, from);
460 private void handleWriteMessages(final WriteMessages message) {
463 final var started = Stopwatch.createStarted();
464 final long start = dataJournal.lastWrittenSequenceNr();
465 final var writtenMessages = dataJournal.handleWriteMessages(message);
467 onWrittenMessages(writtenMessages, started, dataJournal.lastWrittenSequenceNr() - start);
470 final void completeWriteMessages(final WrittenMessages message, final Stopwatch started, final long count) {
471 batchWriteTime.update(started.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
472 messageWriteCount.mark(count);
473 // log message after statistics are updated
474 LOG.debug("{}: write of {} bytes completed in {}", persistenceId, message.writtenBytes, started);
479 * Handle a check of written messages.
481 * @param message Messages which were written
482 * @param started Stopwatch started when the write started
483 * @param count number of writes
485 abstract void onWrittenMessages(WrittenMessages message, Stopwatch started, long count);
487 private void handleUnknown(final Object message) {
488 LOG.error("{}: Received unknown message {}", persistenceId, message);
491 private void ensureOpen() {
492 if (dataJournal != null) {
493 verifyNotNull(deleteJournal);
497 final var sw = Stopwatch.createStarted();
498 deleteJournal = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
499 .withDirectory(directory)
501 .withMaxSegmentSize(DELETE_SEGMENT_SIZE)
502 .build(), DELETE_NAMESPACE.toMapper());
503 final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
504 .tryNext((index, value, length) -> value);
505 lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered;
507 dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
508 maxEntrySize, maxSegmentSize);
509 dataJournal.deleteTo(lastDelete);
510 LOG.debug("{}: journal open in {} with last index {}, deleted to {}", persistenceId, sw,
511 dataJournal.lastWrittenSequenceNr(), lastDelete);
514 abstract void flushWrites();