--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static java.util.Objects.requireNonNull;
+
+import com.codahale.metrics.Histogram;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.ReplayMessages;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
+
+/**
+ * Abstraction of a data journal. This provides a unified interface towards {@link SegmentedJournalActor}, allowing
+ * specialization for various formats.
+ */
+abstract class DataJournal {
+ // Mirrors fields from associated actor
+ final @NonNull String persistenceId;
+ private final Histogram messageSize;
+
+ // Tracks largest message size we have observed either during recovery or during write
+ private int largestObservedSize;
+
+ DataJournal(final String persistenceId, final Histogram messageSize) {
+ this.persistenceId = requireNonNull(persistenceId);
+ this.messageSize = requireNonNull(messageSize);
+ }
+
+ final void recordMessageSize(final int size) {
+ messageSize.update(size);
+ updateLargestSize(size);
+ }
+
+ final void updateLargestSize(final int size) {
+ if (size > largestObservedSize) {
+ largestObservedSize = size;
+ }
+ }
+
+ abstract long lastWrittenIndex();
+
+ abstract void commitTo(long index);
+
+ abstract void compactTo(long index);
+
+ abstract void close();
+
+ abstract void handleReplayMessages(ReplayMessages message, long from);
+
+ abstract void handleWriteMessages(WriteMessages message);
+}
--- /dev/null
+/*
+ * Copyright (c) 2019, 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static com.google.common.base.Verify.verify;
+
+import akka.actor.ActorSystem;
+import akka.persistence.AtomicWrite;
+import akka.persistence.PersistentRepr;
+import com.codahale.metrics.Histogram;
+import io.atomix.storage.StorageLevel;
+import io.atomix.storage.journal.Indexed;
+import io.atomix.storage.journal.SegmentedJournal;
+import io.atomix.storage.journal.SegmentedJournalReader;
+import io.atomix.storage.journal.SegmentedJournalWriter;
+import io.atomix.utils.serializer.Namespace;
+import java.io.File;
+import java.io.Serializable;
+import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
+import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.ReplayMessages;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.jdk.javaapi.CollectionConverters;
+
+/**
+ * Version 0 data journal, where every journal entry maps to exactly one segmented file entry.
+ *
+ * @author Robert Varga
+ */
+final class DataJournalV0 extends DataJournal {
+ private static final Logger LOG = LoggerFactory.getLogger(DataJournalV0.class);
+
+ private final SegmentedJournal<DataJournalEntry> entries;
+
+ DataJournalV0(final String persistenceId, final Histogram messageSize, final ActorSystem system,
+ final StorageLevel storage, final File directory, final int maxEntrySize, final int maxSegmentSize) {
+ super(persistenceId, messageSize);
+ entries = SegmentedJournal.<DataJournalEntry>builder()
+ .withStorageLevel(storage).withDirectory(directory).withName("data")
+ .withNamespace(Namespace.builder()
+ .register(new DataJournalEntrySerializer(system), FromPersistence.class, ToPersistence.class)
+ .build())
+ .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize)
+ .build();
+ }
+
+ @Override
+ long lastWrittenIndex() {
+ return entries.writer().getLastIndex();
+ }
+
+ @Override
+ void commitTo(final long index) {
+ entries.writer().commit(index);
+ }
+
+ @Override
+ void compactTo(final long index) {
+ entries.compact(index);
+ }
+
+ @Override
+ void close() {
+ entries.close();
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:illegalCatch")
+ void handleReplayMessages(final ReplayMessages message, final long from) {
+ try (SegmentedJournalReader<DataJournalEntry> reader = entries.openReader(from)) {
+ int count = 0;
+ while (reader.hasNext() && count < message.max) {
+ final Indexed<DataJournalEntry> next = reader.next();
+ if (next.index() > message.toSequenceNr) {
+ break;
+ }
+
+ LOG.trace("{}: replay {}", persistenceId, next);
+ updateLargestSize(next.size());
+ final DataJournalEntry entry = next.entry();
+ verify(entry instanceof FromPersistence, "Unexpected entry %s", entry);
+
+ final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index());
+ LOG.debug("{}: replaying {}", persistenceId, repr);
+ message.replayCallback.accept(repr);
+ count++;
+ }
+ LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
+ } catch (Exception e) {
+ LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e);
+ message.promise.failure(e);
+ } finally {
+ message.promise.success(null);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:illegalCatch")
+ void handleWriteMessages(final WriteMessages message) {
+ final int count = message.size();
+ final SegmentedJournalWriter<DataJournalEntry> writer = entries.writer();
+
+ for (int i = 0; i < count; ++i) {
+ final long mark = writer.getLastIndex();
+ final AtomicWrite request = message.getRequest(i);
+ try {
+ for (PersistentRepr repr : CollectionConverters.asJava(request.payload())) {
+ final Object payload = repr.payload();
+ if (!(payload instanceof Serializable)) {
+ throw new UnsupportedOperationException("Non-serializable payload encountered "
+ + payload.getClass());
+ }
+
+ recordMessageSize(writer.append(new ToPersistence(repr)).size());
+ }
+ } catch (Exception e) {
+ LOG.warn("{}: failed to write out request", persistenceId, e);
+ message.setFailure(i, e);
+ writer.truncate(mark);
+ continue;
+ }
+
+ message.setSuccess(i);
+ }
+ writer.flush();
+ }
+}
*/
package org.opendaylight.controller.akka.segjournal;
-import static com.google.common.base.Verify.verify;
import static com.google.common.base.Verify.verifyNotNull;
import static java.util.Objects.requireNonNull;
import io.atomix.storage.StorageLevel;
import io.atomix.storage.journal.Indexed;
import io.atomix.storage.journal.SegmentedJournal;
-import io.atomix.storage.journal.SegmentedJournalReader;
import io.atomix.storage.journal.SegmentedJournalWriter;
import io.atomix.utils.serializer.Namespace;
import java.io.File;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
-import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.Promise;
-import scala.jdk.javaapi.CollectionConverters;
/**
* This actor handles a single PersistentActor's journal. The journal is split into two {@link SegmentedJournal}s:
}
}
- private static final class ReplayMessages extends AsyncMessage<Void> {
+ static final class ReplayMessages extends AsyncMessage<Void> {
private final long fromSequenceNr;
- private final long toSequenceNr;
- private final long max;
- private final Consumer<PersistentRepr> replayCallback;
+ final long toSequenceNr;
+ final long max;
+ final Consumer<PersistentRepr> replayCallback;
ReplayMessages(final long fromSequenceNr,
final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
return promise.future();
}
+ int size() {
+ return requests.size();
+ }
+
+ AtomicWrite getRequest(final int index) {
+ return requests.get(index);
+ }
+
+ void setFailure(final int index, final Exception cause) {
+ results.get(index).success(Optional.of(cause));
+
+ }
+
+ void setSuccess(final int index) {
+ results.get(index).success(Optional.empty());
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("requests", requests).toString();
// Tracks the size distribution of messages
private Histogram messageSize;
- private SegmentedJournal<DataJournalEntry> dataJournal;
+ private DataJournal dataJournal;
private SegmentedJournal<Long> deleteJournal;
private long lastDelete;
- // Tracks largest message size we have observed either during recovery or during write
- private int largestObservedSize;
-
SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
final int maxEntrySize, final int maxSegmentSize) {
this.persistenceId = requireNonNull(persistenceId);
ensureOpen();
LOG.debug("{}: delete messages {}", persistenceId, message);
- final long to = Long.min(dataJournal.writer().getLastIndex(), message.toSequenceNr);
+ final long to = Long.min(dataJournal.lastWrittenIndex(), message.toSequenceNr);
LOG.debug("{}: adjusted delete to {}", persistenceId, to);
if (lastDelete < to) {
final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
final Indexed<Long> entry = deleteWriter.append(lastDelete);
deleteWriter.commit(entry.index());
- dataJournal.writer().commit(lastDelete);
+ dataJournal.commitTo(lastDelete);
LOG.debug("{}: compaction started", persistenceId);
- dataJournal.compact(lastDelete + 1);
+ dataJournal.compactTo(lastDelete + 1);
deleteJournal.compact(entry.index());
LOG.debug("{}: compaction finished", persistenceId);
} else {
final Long sequence;
if (directory.isDirectory()) {
ensureOpen();
- sequence = dataJournal.writer().getLastIndex();
+ sequence = dataJournal.lastWrittenIndex();
} else {
sequence = 0L;
}
final long from = Long.max(lastDelete + 1, message.fromSequenceNr);
LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from);
- try (SegmentedJournalReader<DataJournalEntry> reader = dataJournal.openReader(from)) {
- int count = 0;
- while (reader.hasNext() && count < message.max) {
- final Indexed<DataJournalEntry> next = reader.next();
- if (next.index() > message.toSequenceNr) {
- break;
- }
-
- LOG.trace("{}: replay {}", persistenceId, next);
- updateLargestSize(next.size());
- final DataJournalEntry entry = next.entry();
- verify(entry instanceof FromPersistence, "Unexpected entry %s", entry);
-
- final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index());
- LOG.debug("{}: replaying {}", persistenceId, repr);
- message.replayCallback.accept(repr);
- count++;
- }
- LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
- } catch (Exception e) {
- LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e);
- message.promise.failure(e);
- } finally {
- message.promise.success(null);
- }
+ dataJournal.handleReplayMessages(message, from);
}
- @SuppressWarnings("checkstyle:illegalCatch")
private void handleWriteMessages(final WriteMessages message) {
ensureOpen();
- final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
final long startTicks = System.nanoTime();
- final int count = message.requests.size();
- final long start = writer.getLastIndex();
-
- for (int i = 0; i < count; ++i) {
- final long mark = writer.getLastIndex();
- try {
- writeRequest(writer, message.requests.get(i));
- } catch (Exception e) {
- LOG.warn("{}: failed to write out request", persistenceId, e);
- message.results.get(i).success(Optional.of(e));
- writer.truncate(mark);
- continue;
- }
-
- message.results.get(i).success(Optional.empty());
- }
- writer.flush();
- batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
- messageWriteCount.mark(writer.getLastIndex() - start);
- }
+ final long start = dataJournal.lastWrittenIndex();
- private void writeRequest(final SegmentedJournalWriter<DataJournalEntry> writer, final AtomicWrite request) {
- for (PersistentRepr repr : CollectionConverters.asJava(request.payload())) {
- final Object payload = repr.payload();
- if (!(payload instanceof Serializable)) {
- throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass());
- }
+ dataJournal.handleWriteMessages(message);
- final int size = writer.append(new ToPersistence(repr)).size();
- messageSize.update(size);
- updateLargestSize(size);
- }
+ batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
+ messageWriteCount.mark(dataJournal.lastWrittenIndex() - start);
}
private void handleUnknown(final Object message) {
LOG.error("{}: Received unknown message {}", persistenceId, message);
}
- private void updateLargestSize(final int size) {
- if (size > largestObservedSize) {
- largestObservedSize = size;
- }
- }
-
private void ensureOpen() {
if (dataJournal != null) {
verifyNotNull(deleteJournal);
final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
lastDelete = lastEntry == null ? 0 : lastEntry.entry();
- dataJournal = SegmentedJournal.<DataJournalEntry>builder()
- .withStorageLevel(storage).withDirectory(directory).withName("data")
- .withNamespace(Namespace.builder()
- .register(new DataJournalEntrySerializer(context().system()),
- FromPersistence.class, ToPersistence.class)
- .build())
- .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize)
- .build();
- final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
- writer.commit(lastDelete);
- LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, writer.getLastIndex(),
+ dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
+ maxEntrySize, maxSegmentSize);
+ dataJournal.commitTo(lastDelete);
+ LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, dataJournal.lastWrittenIndex(),
lastDelete);
}
}