From 3bd9ac1bec0ffae26e36aa31d82f5160dd44faee Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 23 Sep 2020 12:24:52 +0200 Subject: [PATCH 1/1] Add a segmented DataJournal abstraction We are going to change the way we organize journal entries within a segmented file. Add DataJournal abstraction which exposes a unified interface towards the SegmentedJournalActor. Current implementation is split out from SegmentedJournalActor as DataJournalV0. JIRA: CONTROLLER-1954 Change-Id: I0d1de42b22e75610d0434548483091e3359123e4 Signed-off-by: Robert Varga --- .../akka/segjournal/DataJournal.java | 56 ++++++++ .../akka/segjournal/DataJournalV0.java | 134 ++++++++++++++++++ .../segjournal/SegmentedJournalActor.java | 127 +++++------------ 3 files changed, 225 insertions(+), 92 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournal.java create mode 100644 opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournal.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournal.java new file mode 100644 index 0000000000..3f746900c9 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournal.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.akka.segjournal; + +import static java.util.Objects.requireNonNull; + +import com.codahale.metrics.Histogram; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.ReplayMessages; +import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages; + +/** + * Abstraction of a data journal. This provides a unified interface towards {@link SegmentedJournalActor}, allowing + * specialization for various formats. + */ +abstract class DataJournal { + // Mirrors fields from associated actor + final @NonNull String persistenceId; + private final Histogram messageSize; + + // Tracks largest message size we have observed either during recovery or during write + private int largestObservedSize; + + DataJournal(final String persistenceId, final Histogram messageSize) { + this.persistenceId = requireNonNull(persistenceId); + this.messageSize = requireNonNull(messageSize); + } + + final void recordMessageSize(final int size) { + messageSize.update(size); + updateLargestSize(size); + } + + final void updateLargestSize(final int size) { + if (size > largestObservedSize) { + largestObservedSize = size; + } + } + + abstract long lastWrittenIndex(); + + abstract void commitTo(long index); + + abstract void compactTo(long index); + + abstract void close(); + + abstract void handleReplayMessages(ReplayMessages message, long from); + + abstract void handleWriteMessages(WriteMessages message); +} diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java new file mode 100644 index 0000000000..766f2fac05 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2019, 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.akka.segjournal; + +import static com.google.common.base.Verify.verify; + +import akka.actor.ActorSystem; +import akka.persistence.AtomicWrite; +import akka.persistence.PersistentRepr; +import com.codahale.metrics.Histogram; +import io.atomix.storage.StorageLevel; +import io.atomix.storage.journal.Indexed; +import io.atomix.storage.journal.SegmentedJournal; +import io.atomix.storage.journal.SegmentedJournalReader; +import io.atomix.storage.journal.SegmentedJournalWriter; +import io.atomix.utils.serializer.Namespace; +import java.io.File; +import java.io.Serializable; +import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence; +import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence; +import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.ReplayMessages; +import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.jdk.javaapi.CollectionConverters; + +/** + * Version 0 data journal, where every journal entry maps to exactly one segmented file entry. + * + * @author Robert Varga + */ +final class DataJournalV0 extends DataJournal { + private static final Logger LOG = LoggerFactory.getLogger(DataJournalV0.class); + + private final SegmentedJournal entries; + + DataJournalV0(final String persistenceId, final Histogram messageSize, final ActorSystem system, + final StorageLevel storage, final File directory, final int maxEntrySize, final int maxSegmentSize) { + super(persistenceId, messageSize); + entries = SegmentedJournal.builder() + .withStorageLevel(storage).withDirectory(directory).withName("data") + .withNamespace(Namespace.builder() + .register(new DataJournalEntrySerializer(system), FromPersistence.class, ToPersistence.class) + .build()) + .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize) + .build(); + } + + @Override + long lastWrittenIndex() { + return entries.writer().getLastIndex(); + } + + @Override + void commitTo(final long index) { + entries.writer().commit(index); + } + + @Override + void compactTo(final long index) { + entries.compact(index); + } + + @Override + void close() { + entries.close(); + } + + @Override + @SuppressWarnings("checkstyle:illegalCatch") + void handleReplayMessages(final ReplayMessages message, final long from) { + try (SegmentedJournalReader reader = entries.openReader(from)) { + int count = 0; + while (reader.hasNext() && count < message.max) { + final Indexed next = reader.next(); + if (next.index() > message.toSequenceNr) { + break; + } + + LOG.trace("{}: replay {}", persistenceId, next); + updateLargestSize(next.size()); + final DataJournalEntry entry = next.entry(); + verify(entry instanceof FromPersistence, "Unexpected entry %s", entry); + + final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index()); + LOG.debug("{}: replaying {}", persistenceId, repr); + message.replayCallback.accept(repr); + count++; + } + LOG.debug("{}: successfully replayed {} entries", persistenceId, count); + } catch (Exception e) { + LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e); + message.promise.failure(e); + } finally { + message.promise.success(null); + } + } + + @Override + @SuppressWarnings("checkstyle:illegalCatch") + void handleWriteMessages(final WriteMessages message) { + final int count = message.size(); + final SegmentedJournalWriter writer = entries.writer(); + + for (int i = 0; i < count; ++i) { + final long mark = writer.getLastIndex(); + final AtomicWrite request = message.getRequest(i); + try { + for (PersistentRepr repr : CollectionConverters.asJava(request.payload())) { + final Object payload = repr.payload(); + if (!(payload instanceof Serializable)) { + throw new UnsupportedOperationException("Non-serializable payload encountered " + + payload.getClass()); + } + + recordMessageSize(writer.append(new ToPersistence(repr)).size()); + } + } catch (Exception e) { + LOG.warn("{}: failed to write out request", persistenceId, e); + message.setFailure(i, e); + writer.truncate(mark); + continue; + } + + message.setSuccess(i); + } + writer.flush(); + } +} diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java index 4e0a54c4b2..41117c8e32 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java @@ -7,7 +7,6 @@ */ package org.opendaylight.controller.akka.segjournal; -import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; @@ -23,25 +22,20 @@ import com.google.common.base.MoreObjects; import io.atomix.storage.StorageLevel; import io.atomix.storage.journal.Indexed; import io.atomix.storage.journal.SegmentedJournal; -import io.atomix.storage.journal.SegmentedJournalReader; import io.atomix.storage.journal.SegmentedJournalWriter; import io.atomix.utils.serializer.Namespace; import java.io.File; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence; -import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.reporting.MetricsReporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.Promise; -import scala.jdk.javaapi.CollectionConverters; /** * This actor handles a single PersistentActor's journal. The journal is split into two {@link SegmentedJournal}s: @@ -83,11 +77,11 @@ final class SegmentedJournalActor extends AbstractActor { } } - private static final class ReplayMessages extends AsyncMessage { + static final class ReplayMessages extends AsyncMessage { private final long fromSequenceNr; - private final long toSequenceNr; - private final long max; - private final Consumer replayCallback; + final long toSequenceNr; + final long max; + final Consumer replayCallback; ReplayMessages(final long fromSequenceNr, final long toSequenceNr, final long max, final Consumer replayCallback) { @@ -115,6 +109,23 @@ final class SegmentedJournalActor extends AbstractActor { return promise.future(); } + int size() { + return requests.size(); + } + + AtomicWrite getRequest(final int index) { + return requests.get(index); + } + + void setFailure(final int index, final Exception cause) { + results.get(index).success(Optional.of(cause)); + + } + + void setSuccess(final int index) { + results.get(index).success(Optional.empty()); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("requests", requests).toString(); @@ -151,13 +162,10 @@ final class SegmentedJournalActor extends AbstractActor { // Tracks the size distribution of messages private Histogram messageSize; - private SegmentedJournal dataJournal; + private DataJournal dataJournal; private SegmentedJournal deleteJournal; private long lastDelete; - // Tracks largest message size we have observed either during recovery or during write - private int largestObservedSize; - SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage, final int maxEntrySize, final int maxSegmentSize) { this.persistenceId = requireNonNull(persistenceId); @@ -231,7 +239,7 @@ final class SegmentedJournalActor extends AbstractActor { ensureOpen(); LOG.debug("{}: delete messages {}", persistenceId, message); - final long to = Long.min(dataJournal.writer().getLastIndex(), message.toSequenceNr); + final long to = Long.min(dataJournal.lastWrittenIndex(), message.toSequenceNr); LOG.debug("{}: adjusted delete to {}", persistenceId, to); if (lastDelete < to) { @@ -241,10 +249,10 @@ final class SegmentedJournalActor extends AbstractActor { final SegmentedJournalWriter deleteWriter = deleteJournal.writer(); final Indexed entry = deleteWriter.append(lastDelete); deleteWriter.commit(entry.index()); - dataJournal.writer().commit(lastDelete); + dataJournal.commitTo(lastDelete); LOG.debug("{}: compaction started", persistenceId); - dataJournal.compact(lastDelete + 1); + dataJournal.compactTo(lastDelete + 1); deleteJournal.compact(entry.index()); LOG.debug("{}: compaction finished", persistenceId); } else { @@ -259,7 +267,7 @@ final class SegmentedJournalActor extends AbstractActor { final Long sequence; if (directory.isDirectory()) { ensureOpen(); - sequence = dataJournal.writer().getLastIndex(); + sequence = dataJournal.lastWrittenIndex(); } else { sequence = 0L; } @@ -276,83 +284,25 @@ final class SegmentedJournalActor extends AbstractActor { final long from = Long.max(lastDelete + 1, message.fromSequenceNr); LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from); - try (SegmentedJournalReader reader = dataJournal.openReader(from)) { - int count = 0; - while (reader.hasNext() && count < message.max) { - final Indexed next = reader.next(); - if (next.index() > message.toSequenceNr) { - break; - } - - LOG.trace("{}: replay {}", persistenceId, next); - updateLargestSize(next.size()); - final DataJournalEntry entry = next.entry(); - verify(entry instanceof FromPersistence, "Unexpected entry %s", entry); - - final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index()); - LOG.debug("{}: replaying {}", persistenceId, repr); - message.replayCallback.accept(repr); - count++; - } - LOG.debug("{}: successfully replayed {} entries", persistenceId, count); - } catch (Exception e) { - LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e); - message.promise.failure(e); - } finally { - message.promise.success(null); - } + dataJournal.handleReplayMessages(message, from); } - @SuppressWarnings("checkstyle:illegalCatch") private void handleWriteMessages(final WriteMessages message) { ensureOpen(); - final SegmentedJournalWriter writer = dataJournal.writer(); final long startTicks = System.nanoTime(); - final int count = message.requests.size(); - final long start = writer.getLastIndex(); - - for (int i = 0; i < count; ++i) { - final long mark = writer.getLastIndex(); - try { - writeRequest(writer, message.requests.get(i)); - } catch (Exception e) { - LOG.warn("{}: failed to write out request", persistenceId, e); - message.results.get(i).success(Optional.of(e)); - writer.truncate(mark); - continue; - } - - message.results.get(i).success(Optional.empty()); - } - writer.flush(); - batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS); - messageWriteCount.mark(writer.getLastIndex() - start); - } + final long start = dataJournal.lastWrittenIndex(); - private void writeRequest(final SegmentedJournalWriter writer, final AtomicWrite request) { - for (PersistentRepr repr : CollectionConverters.asJava(request.payload())) { - final Object payload = repr.payload(); - if (!(payload instanceof Serializable)) { - throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass()); - } + dataJournal.handleWriteMessages(message); - final int size = writer.append(new ToPersistence(repr)).size(); - messageSize.update(size); - updateLargestSize(size); - } + batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS); + messageWriteCount.mark(dataJournal.lastWrittenIndex() - start); } private void handleUnknown(final Object message) { LOG.error("{}: Received unknown message {}", persistenceId, message); } - private void updateLargestSize(final int size) { - if (size > largestObservedSize) { - largestObservedSize = size; - } - } - private void ensureOpen() { if (dataJournal != null) { verifyNotNull(deleteJournal); @@ -364,17 +314,10 @@ final class SegmentedJournalActor extends AbstractActor { final Indexed lastEntry = deleteJournal.writer().getLastEntry(); lastDelete = lastEntry == null ? 0 : lastEntry.entry(); - dataJournal = SegmentedJournal.builder() - .withStorageLevel(storage).withDirectory(directory).withName("data") - .withNamespace(Namespace.builder() - .register(new DataJournalEntrySerializer(context().system()), - FromPersistence.class, ToPersistence.class) - .build()) - .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize) - .build(); - final SegmentedJournalWriter writer = dataJournal.writer(); - writer.commit(lastDelete); - LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, writer.getLastIndex(), + dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory, + maxEntrySize, maxSegmentSize); + dataJournal.commitTo(lastDelete); + LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, dataJournal.lastWrittenIndex(), lastDelete); } } -- 2.36.6