From: Robert Varga Date: Mon, 4 Feb 2019 09:09:57 +0000 (+0100) Subject: Add SegmentedFileJournal X-Git-Tag: release/sodium~137 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=52fb7d830e43a9849c728c7b0d36470afe686fb5 Add SegmentedFileJournal This adds an alternative Akka persistence journal implementation based on Atomix's SegmentedJournal. SegmentedJournal is a linear append-only log of entries, each of which has a incrementing 63bit index. This logical structure is stored in rolling segments -- i.e. entries are appended to current segment until it is full, at which point a new segment is allocated and writeout continues there. Old segments can be freed as long as they are not needed. This layout makes it good match to how Akka's persistence works, with the one exception that the SegmentedJournal cannot explicitly delete entries from the head of the journal. SegmentedFileJournal allocates one SegmentedJournal for each persistenceId it encounters and uses a dedicated actor for it. This provides a major simplification in the implementation as well as allows for concurrent persistence of multiple PersistentActors. JIRA: CONTROLLER-1884 Change-Id: I78140b154bab44a3e17d5ffb76b040c62add3204 Signed-off-by: Robert Varga --- diff --git a/features/mdsal/odl-mdsal-clustering-commons/pom.xml b/features/mdsal/odl-mdsal-clustering-commons/pom.xml index 89d35c0635..b5cf579d2c 100644 --- a/features/mdsal/odl-mdsal-clustering-commons/pom.xml +++ b/features/mdsal/odl-mdsal-clustering-commons/pom.xml @@ -75,6 +75,33 @@ sal-akka-raft ${project.version} + + + + ${project.groupId} + sal-akka-segmented-journal + ${project.version} + + + com.esotericsoftware + kryo + 4.0.2 + + + com.esotericsoftware + minlog + 1.3.1 + + + com.esotericsoftware + reflectasm + 1.11.8 + + + org.ow2.asm + asm + 5.2 + diff --git a/opendaylight/md-sal/mdsal-artifacts/pom.xml b/opendaylight/md-sal/mdsal-artifacts/pom.xml index fd9b642b02..51aaca2e3a 100644 --- a/opendaylight/md-sal/mdsal-artifacts/pom.xml +++ b/opendaylight/md-sal/mdsal-artifacts/pom.xml @@ -206,6 +206,11 @@ sal-akka-raft-example ${project.version} + + org.opendaylight.controller + sal-akka-segmented-journal + ${project.version} + org.opendaylight.controller cds-access-api diff --git a/opendaylight/md-sal/pom.xml b/opendaylight/md-sal/pom.xml index 11607a30fc..7e07837c28 100644 --- a/opendaylight/md-sal/pom.xml +++ b/opendaylight/md-sal/pom.xml @@ -51,6 +51,7 @@ cds-access-api cds-access-client cds-dom-api + sal-akka-segmented-journal sal-clustering-config diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/pom.xml b/opendaylight/md-sal/sal-akka-segmented-journal/pom.xml new file mode 100644 index 0000000000..ba6b7edb6a --- /dev/null +++ b/opendaylight/md-sal/sal-akka-segmented-journal/pom.xml @@ -0,0 +1,105 @@ + + + + + 4.0.0 + + org.opendaylight.controller + mdsal-parent + 1.10.0-SNAPSHOT + ../parent + + + sal-akka-segmented-journal + 1.10.0-SNAPSHOT + bundle + + + + + + com.typesafe.akka + akka-actor_2.12 + + + com.typesafe.akka + akka-persistence_2.12 + + + com.typesafe.akka + akka-slf4j_2.12 + test + + + com.typesafe.akka + akka-testkit_2.12 + + + com.typesafe.akka + akka-persistence-tck_2.12 + + + + + io.dropwizard.metrics + metrics-core + + + + + org.scala-lang + scala-library + + + + + io.atomix + atomix-storage + 3.1.5 + provided + + + io.atomix + atomix-utils + 3.1.5 + provided + + + + commons-io + commons-io + test + + + + + + + org.apache.felix + maven-bundle-plugin + true + + + ${project.groupId}.${project.artifactId} + + *;inline=true;groupId=io.atomix + + + + + + + + scm:git:http://git.opendaylight.org/gerrit/controller.git + scm:git:ssh://git.opendaylight.org:29418/controller.git + HEAD + https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL:Architecture:Clustering + + + diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java new file mode 100644 index 0000000000..0713c0212a --- /dev/null +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.akka.segjournal; + +import static java.util.Objects.requireNonNull; + +import akka.persistence.PersistentRepr; +import io.atomix.storage.journal.JournalSegment; + +/** + * A single entry in the data journal. We do not store {@code persistenceId} for each entry, as that is a + * journal-invariant, nor do we store {@code sequenceNr}, as that information is maintained by {@link JournalSegment}'s + * index. + * + * @author Robert Varga + */ +abstract class DataJournalEntry { + static final class ToPersistence extends DataJournalEntry { + private final PersistentRepr repr; + + ToPersistence(final PersistentRepr repr) { + this.repr = requireNonNull(repr); + } + + PersistentRepr repr() { + return repr; + } + } + + static final class FromPersistence extends DataJournalEntry { + private final String manifest; + private final String writerUuid; + private final Object payload; + + FromPersistence(final String manifest, final String writerUuid, final Object payload) { + this.manifest = manifest; + this.writerUuid = requireNonNull(writerUuid); + this.payload = requireNonNull(payload); + } + + PersistentRepr toRepr(final String persistenceId, final long sequenceNr) { + return PersistentRepr.apply(payload, sequenceNr, persistenceId, manifest, false, null, writerUuid); + } + } +} diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntrySerializer.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntrySerializer.java new file mode 100644 index 0000000000..e248262b15 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntrySerializer.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.akka.segjournal; + +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +import akka.actor.ActorSystem; +import akka.actor.ExtendedActorSystem; +import akka.persistence.PersistentRepr; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import java.util.concurrent.Callable; +import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence; +import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence; + +/** + * Kryo serializer for {@link DataJournalEntry}. Each {@link SegmentedJournalActor} has its own instance, as well as + * a nested JavaSerializer to handle the payload. + * + *

+ * Since we are persisting only parts of {@link PersistentRepr}, this class asymmetric by design: + * {@link #write(Kryo, Output, DataJournalEntry)} only accepts {@link ToPersistence} subclass, which is a wrapper + * around a {@link PersistentRepr}, while {@link #read(Kryo, Input, Class)} produces an {@link FromPersistence}, which + * needs further processing to reconstruct a {@link PersistentRepr}. + * + * @author Robert Varga + */ +final class DataJournalEntrySerializer extends Serializer { + private final JavaSerializer serializer = new JavaSerializer(); + private final ExtendedActorSystem actorSystem; + + DataJournalEntrySerializer(final ActorSystem actorSystem) { + this.actorSystem = requireNonNull((ExtendedActorSystem) actorSystem); + } + + @Override + public void write(final Kryo kryo, final Output output, final DataJournalEntry object) { + verify(object instanceof ToPersistence); + final PersistentRepr repr = ((ToPersistence) object).repr(); + output.writeString(repr.manifest()); + output.writeString(repr.writerUuid()); + serializer.write(kryo, output, repr.payload()); + } + + @Override + public DataJournalEntry read(final Kryo kryo, final Input input, final Class type) { + final String manifest = input.readString(); + final String uuid = input.readString(); + final Object payload = akka.serialization.JavaSerializer.currentSystem().withValue(actorSystem, + (Callable)() -> serializer.read(kryo, input, type)); + return new FromPersistence(manifest, uuid, payload); + } +} diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java new file mode 100644 index 0000000000..a0bffa2c12 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.akka.segjournal; + +import static akka.actor.ActorRef.noSender; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; + +import akka.actor.ActorRef; +import akka.dispatch.Futures; +import akka.persistence.AtomicWrite; +import akka.persistence.PersistentRepr; +import akka.persistence.journal.japi.AsyncWriteJournal; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigMemorySize; +import io.atomix.storage.StorageLevel; +import io.atomix.storage.journal.SegmentedJournal; +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.AsyncMessage; +import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +/** + * An Akka persistence journal implementation on top of {@link SegmentedJournal}. This actor represents aggregation + * of multiple journals and performs a receptionist job between Akka and invidual per-persistenceId actors. See + * {@link SegmentedJournalActor} for details on how the persistence works. + * + * @author Robert Varga + */ +public class SegmentedFileJournal extends AsyncWriteJournal { + public static final String STORAGE_ROOT_DIRECTORY = "root-directory"; + public static final String STORAGE_MAX_ENTRY_SIZE = "max-entry-size"; + public static final int STORAGE_MAX_ENTRY_SIZE_DEFAULT = 16 * 1024 * 1024; + public static final String STORAGE_MAX_SEGMENT_SIZE = "max-segment-size"; + public static final int STORAGE_MAX_SEGMENT_SIZE_DEFAULT = STORAGE_MAX_ENTRY_SIZE_DEFAULT * 8; + public static final String STORAGE_MEMORY_MAPPED = "memory-mapped"; + + private static final Logger LOG = LoggerFactory.getLogger(SegmentedFileJournal.class); + + private final Map handlers = new HashMap<>(); + private final File rootDir; + private final StorageLevel storage; + private final int maxEntrySize; + private final int maxSegmentSize; + + public SegmentedFileJournal(final Config config) { + rootDir = new File(config.getString(STORAGE_ROOT_DIRECTORY)); + if (!rootDir.exists()) { + LOG.debug("Creating directory {}", rootDir); + checkState(rootDir.mkdirs(), "Failed to create root directory %s", rootDir); + } + checkArgument(rootDir.isDirectory(), "%s is not a directory", rootDir); + + maxEntrySize = getBytes(config, STORAGE_MAX_ENTRY_SIZE, STORAGE_MAX_ENTRY_SIZE_DEFAULT); + maxSegmentSize = getBytes(config, STORAGE_MAX_SEGMENT_SIZE, STORAGE_MAX_SEGMENT_SIZE_DEFAULT); + + if (config.hasPath(STORAGE_MEMORY_MAPPED)) { + storage = config.getBoolean(STORAGE_MEMORY_MAPPED) ? StorageLevel.MAPPED : StorageLevel.DISK; + } else { + storage = StorageLevel.DISK; + } + + LOG.info("Initialized with root directory {} with storage {}", rootDir, storage); + } + + @Override + public Future>> doAsyncWriteMessages(final Iterable messages) { + final Map map = new HashMap<>(); + final List>> result = new ArrayList<>(); + + for (AtomicWrite message : messages) { + final String persistenceId = message.persistenceId(); + final ActorRef handler = handlers.computeIfAbsent(persistenceId, this::createHandler); + result.add(map.computeIfAbsent(handler, key -> new WriteMessages()).add(message)); + } + + // Send requests to actors and zip the futures back + map.forEach((handler, message) -> { + LOG.trace("Sending {} to {}", message, handler); + handler.tell(message, noSender()); + }); + return Futures.sequence(result, context().dispatcher()); + } + + @Override + public Future doAsyncDeleteMessagesTo(final String persistenceId, final long toSequenceNr) { + return delegateMessage(persistenceId, SegmentedJournalActor.deleteMessagesTo(toSequenceNr)); + } + + @Override + public Future doAsyncReplayMessages(final String persistenceId, final long fromSequenceNr, + final long toSequenceNr, final long max, final Consumer replayCallback) { + return delegateMessage(persistenceId, + SegmentedJournalActor.replayMessages(fromSequenceNr, toSequenceNr, max, replayCallback)); + } + + @Override + public Future doAsyncReadHighestSequenceNr(final String persistenceId, final long fromSequenceNr) { + return delegateMessage(handlers.computeIfAbsent(persistenceId, this::createHandler), + SegmentedJournalActor.readHighestSequenceNr(fromSequenceNr)); + } + + private ActorRef createHandler(final String persistenceId) { + final String directoryName = Base64.getUrlEncoder().encodeToString(persistenceId.getBytes( + StandardCharsets.UTF_8)); + final File directory = new File(rootDir, directoryName); + LOG.debug("Creating handler for {} in directory {}", persistenceId, directory); + + final ActorRef handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage, + maxEntrySize, maxSegmentSize)); + LOG.debug("Directory {} handled by {}", directory, handler); + + final ActorRef prev = handlers.putIfAbsent(persistenceId, handler); + verify(prev == null, "Duplicate handler for %s, already handled by %s", persistenceId, prev); + return handler; + } + + private Future delegateMessage(final String persistenceId, final AsyncMessage message) { + final ActorRef handler = handlers.get(persistenceId); + if (handler == null) { + return Futures.failed(new IllegalStateException("Cannot find handler for " + persistenceId)); + } + + return delegateMessage(handler, message); + } + + private static Future delegateMessage(final ActorRef handler, final AsyncMessage message) { + LOG.trace("Delegating {} to {}", message, handler); + handler.tell(message, noSender()); + return message.promise.future(); + } + + private static int getBytes(final Config config, final String path, final int defaultValue) { + if (!config.hasPath(path)) { + return defaultValue; + } + final ConfigMemorySize value = config.getMemorySize(path); + final long result = value.toBytes(); + checkArgument(result <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE); + return (int) result; + } +} diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java new file mode 100644 index 0000000000..6cc5d9e672 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java @@ -0,0 +1,376 @@ +/* + * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.akka.segjournal; + +import static com.google.common.base.Verify.verify; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + +import akka.actor.AbstractActor; +import akka.actor.Props; +import akka.persistence.AtomicWrite; +import akka.persistence.PersistentRepr; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.SlidingTimeWindowReservoir; +import com.codahale.metrics.Timer; +import com.google.common.base.MoreObjects; +import io.atomix.storage.StorageLevel; +import io.atomix.storage.journal.Indexed; +import io.atomix.storage.journal.SegmentedJournal; +import io.atomix.storage.journal.SegmentedJournalReader; +import io.atomix.storage.journal.SegmentedJournalWriter; +import io.atomix.utils.serializer.Namespace; +import java.io.File; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence; +import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.Iterator; +import scala.collection.SeqLike; +import scala.concurrent.Future; +import scala.concurrent.Promise; + +/** + * This actor handles a single PersistentActor's journal. The journal is split into two {@link SegmentedJournal}s: + *
    + *
  • A memory-mapped data journal, containing actual data entries
  • + *
  • A simple file journal, containing sequence numbers of last deleted entry
  • + *
+ * + *

+ * This is a conscious design decision to minimize the amount of data that is being stored in the data journal while + * speeding up normal operations. Since the SegmentedJournal is an append-only linear log and Akka requires the ability + * to delete persistence entries, we need ability to mark a subset of a SegmentedJournal as deleted. While we could + * treat such delete requests as normal events, this leads to a mismatch between SegmentedJournal indices (as exposed by + * {@link Indexed}) and Akka sequence numbers -- requiring us to potentially perform costly deserialization to find the + * index corresponding to a particular sequence number, or maintain moderately-complex logic and data structures to + * perform that mapping in sub-linear time complexity. + * + *

+ * Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit + * mapping information. The only additional information we need to maintain is the last deleted sequence number. + * + * @author Robert Varga + */ +final class SegmentedJournalActor extends AbstractActor { + abstract static class AsyncMessage { + final Promise promise = Promise.apply(); + } + + private static final class ReadHighestSequenceNr extends AsyncMessage { + private final long fromSequenceNr; + + ReadHighestSequenceNr(final long fromSequenceNr) { + this.fromSequenceNr = fromSequenceNr; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("fromSequenceNr", fromSequenceNr).toString(); + } + } + + private static final class ReplayMessages extends AsyncMessage { + private final long fromSequenceNr; + private final long toSequenceNr; + private final long max; + private final Consumer replayCallback; + + ReplayMessages(final long fromSequenceNr, + final long toSequenceNr, final long max, final Consumer replayCallback) { + this.fromSequenceNr = fromSequenceNr; + this.toSequenceNr = toSequenceNr; + this.max = max; + this.replayCallback = requireNonNull(replayCallback); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("fromSequenceNr", fromSequenceNr) + .add("toSequenceNr", toSequenceNr).add("max", max).toString(); + } + } + + static final class WriteMessages { + private final List requests = new ArrayList<>(); + private final List>> results = new ArrayList<>(); + + Future> add(final AtomicWrite write) { + final Promise> promise = Promise.apply(); + requests.add(write); + results.add(promise); + return promise.future(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("requests", requests).toString(); + } + } + + private static final class DeleteMessagesTo extends AsyncMessage { + final long toSequenceNr; + + DeleteMessagesTo(final long toSequenceNr) { + this.toSequenceNr = toSequenceNr; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("toSequenceNr", toSequenceNr).toString(); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class); + private static final Namespace DELETE_NAMESPACE = Namespace.builder().register(Long.class).build(); + private static final int DELETE_SEGMENT_SIZE = 64 * 1024; + + // Tracks the time it took us to write a batch of messages + private final Timer batchWriteTime = new Timer(); + // Tracks the number of individual messages written + private final Meter messageWriteCount = new Meter(); + // Tracks the size distribution of messages for last 5 minutes + private final Histogram messageSize = new Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.MINUTES)); + + private final String persistenceId; + private final StorageLevel storage; + private final int maxSegmentSize; + private final int maxEntrySize; + private final File directory; + + private SegmentedJournal dataJournal; + private SegmentedJournal deleteJournal; + private long lastDelete; + + // Tracks largest message size we have observed either during recovery or during write + private int largestObservedSize; + + SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage, + final int maxEntrySize, final int maxSegmentSize) { + this.persistenceId = requireNonNull(persistenceId); + this.directory = requireNonNull(directory); + this.storage = requireNonNull(storage); + this.maxEntrySize = maxEntrySize; + this.maxSegmentSize = maxSegmentSize; + } + + static Props props(final String persistenceId, final File directory, final StorageLevel storage, + final int maxEntrySize, final int maxSegmentSize) { + return Props.create(SegmentedJournalActor.class, requireNonNull(persistenceId), directory, storage, + maxEntrySize, maxSegmentSize); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo) + .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr) + .match(ReplayMessages.class, this::handleReplayMessages) + .match(WriteMessages.class, this::handleWriteMessages) + .matchAny(this::handleUnknown) + .build(); + } + + @Override + public void preStart() throws Exception { + LOG.debug("{}: actor starting", persistenceId); + super.preStart(); + } + + @Override + public void postStop() throws Exception { + LOG.debug("{}: actor stopping", persistenceId); + if (dataJournal != null) { + dataJournal.close(); + LOG.debug("{}: data journal closed", persistenceId); + dataJournal = null; + } + if (deleteJournal != null) { + deleteJournal.close(); + LOG.debug("{}: delete journal closed", persistenceId); + deleteJournal = null; + } + LOG.debug("{}: actor stopped", persistenceId); + super.postStop(); + } + + static AsyncMessage deleteMessagesTo(final long toSequenceNr) { + return new DeleteMessagesTo(toSequenceNr); + } + + static AsyncMessage readHighestSequenceNr(final long fromSequenceNr) { + return new ReadHighestSequenceNr(fromSequenceNr); + } + + static AsyncMessage replayMessages(final long fromSequenceNr, final long toSequenceNr, final long max, + final Consumer replayCallback) { + return new ReplayMessages(fromSequenceNr, toSequenceNr, max, replayCallback); + } + + private void handleDeleteMessagesTo(final DeleteMessagesTo message) { + ensureOpen(); + + LOG.debug("{}: delete messages {}", persistenceId, message); + final long to = Long.min(dataJournal.writer().getLastIndex(), message.toSequenceNr); + LOG.debug("{}: adjusted delete to {}", persistenceId, to); + + if (lastDelete < to) { + LOG.debug("{}: deleting entries up to {}", persistenceId, to); + + lastDelete = to; + final SegmentedJournalWriter deleteWriter = deleteJournal.writer(); + final Indexed entry = deleteWriter.append(lastDelete); + deleteWriter.commit(entry.index()); + dataJournal.writer().commit(lastDelete); + + LOG.debug("{}: compaction started", persistenceId); + dataJournal.compact(lastDelete); + deleteJournal.compact(entry.index()); + LOG.debug("{}: compaction finished", persistenceId); + } else { + LOG.debug("{}: entries up to {} already deleted", persistenceId, lastDelete); + } + + message.promise.success(null); + } + + @SuppressWarnings("checkstyle:illegalCatch") + private void handleReadHighestSequenceNr(final ReadHighestSequenceNr message) { + LOG.debug("{}: looking for highest sequence on {}", persistenceId, message); + final Long sequence; + if (directory.isDirectory()) { + ensureOpen(); + sequence = dataJournal.writer().getLastIndex(); + } else { + sequence = 0L; + } + + LOG.debug("{}: highest sequence is {}", message, sequence); + message.promise.success(sequence); + } + + @SuppressWarnings("checkstyle:illegalCatch") + private void handleReplayMessages(final ReplayMessages message) { + LOG.debug("{}: replaying messages {}", persistenceId, message); + ensureOpen(); + + final long from = Long.max(lastDelete + 1, message.fromSequenceNr); + LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from); + + try (SegmentedJournalReader reader = dataJournal.openReader(from)) { + int count = 0; + while (reader.hasNext() && count < message.max) { + final Indexed next = reader.next(); + if (next.index() > message.toSequenceNr) { + break; + } + + LOG.trace("{}: replay {}", persistenceId, next); + updateLargestSize(next.size()); + final DataJournalEntry entry = next.entry(); + verify(entry instanceof FromPersistence, "Unexpected entry %s", entry); + + final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index()); + LOG.debug("{}: replaying {}", persistenceId, repr); + message.replayCallback.accept(repr); + count++; + } + LOG.debug("{}: successfully replayed {} entries", persistenceId, count); + } catch (Exception e) { + LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e); + message.promise.failure(e); + } finally { + message.promise.success(null); + } + } + + @SuppressWarnings("checkstyle:illegalCatch") + private void handleWriteMessages(final WriteMessages message) { + ensureOpen(); + + final SegmentedJournalWriter writer = dataJournal.writer(); + final long startTicks = System.nanoTime(); + final int count = message.requests.size(); + final long start = writer.getLastIndex(); + + for (int i = 0; i < count; ++i) { + final long mark = writer.getLastIndex(); + try { + writeRequest(writer, message.requests.get(i)); + } catch (Exception e) { + LOG.warn("{}: failed to write out request", persistenceId, e); + message.results.get(i).success(Optional.of(e)); + writer.truncate(mark); + continue; + } + + message.results.get(i).success(Optional.empty()); + } + writer.flush(); + batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS); + messageWriteCount.mark(writer.getLastIndex() - start); + } + + private void writeRequest(final SegmentedJournalWriter writer, final AtomicWrite request) { + // Cast is needed for Eclipse because of https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276 + final Iterator it = ((SeqLike) request.payload()).iterator(); + while (it.hasNext()) { + final PersistentRepr repr = it.next(); + final Object payload = repr.payload(); + if (!(payload instanceof Serializable)) { + throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass()); + } + + final int size = writer.append(new ToPersistence(repr)).size(); + messageSize.update(size); + updateLargestSize(size); + } + } + + private void handleUnknown(final Object message) { + LOG.error("{}: Received unknown message {}", persistenceId, message); + } + + private void updateLargestSize(final int size) { + if (size > largestObservedSize) { + largestObservedSize = size; + } + } + + private void ensureOpen() { + if (dataJournal != null) { + verifyNotNull(deleteJournal); + return; + } + + deleteJournal = SegmentedJournal.builder().withDirectory(directory).withName("delete") + .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build(); + final Indexed lastEntry = deleteJournal.writer().getLastEntry(); + lastDelete = lastEntry == null ? 0 : lastEntry.index(); + + dataJournal = SegmentedJournal.builder() + .withStorageLevel(storage).withDirectory(directory).withName("data") + .withNamespace(Namespace.builder() + .register(new DataJournalEntrySerializer(context().system()), + FromPersistence.class, ToPersistence.class) + .build()) + .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize) + .build(); + final SegmentedJournalWriter writer = dataJournal.writer(); + writer.commit(lastDelete); + LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, writer.getLastIndex(), + lastDelete); + } +} diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalSpecTest.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalSpecTest.java new file mode 100644 index 0000000000..da1ba454e1 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalSpecTest.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.akka.segjournal; + +import akka.persistence.japi.journal.JavaJournalSpec; +import com.typesafe.config.ConfigFactory; +import java.io.File; +import org.apache.commons.io.FileUtils; +import org.junit.runner.RunWith; +import org.scalatest.junit.JUnitRunner; + +@RunWith(JUnitRunner.class) +public class SegmentedFileJournalSpecTest extends JavaJournalSpec { + private static final long serialVersionUID = 1L; + + private static final File JOURNAL_DIR = new File("target/segmented-journal"); + + public SegmentedFileJournalSpecTest() { + super(ConfigFactory.load("SegmentedFileJournalTest.conf")); + } + + @Override + public void beforeAll() { + FileUtils.deleteQuietly(JOURNAL_DIR); + super.beforeAll(); + } +} diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java new file mode 100644 index 0000000000..6939dbcba7 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.akka.segjournal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.persistence.AtomicWrite; +import akka.persistence.PersistentRepr; +import akka.testkit.CallingThreadDispatcher; +import akka.testkit.javadsl.TestKit; +import io.atomix.storage.StorageLevel; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.AsyncMessage; +import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages; +import scala.concurrent.Future; + +public class SegmentedFileJournalTest { + private static final File DIRECTORY = new File("target/sfj-test"); + private static final int SEGMENT_SIZE = 1024 * 1024; + private static final int MESSAGE_SIZE = 512 * 1024; + + private static ActorSystem SYSTEM; + + private TestKit kit; + private ActorRef actor; + + @BeforeClass + public static void beforeClass() { + SYSTEM = ActorSystem.create("test"); + } + + @AfterClass + public static void afterClass() { + TestKit.shutdownActorSystem(SYSTEM); + SYSTEM = null; + } + + @Before + public void before() { + kit = new TestKit(SYSTEM); + FileUtils.deleteQuietly(DIRECTORY); + actor = actor(); + } + + @After + public void after() { + actor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + @Test + public void testDeleteAfterStop() { + // Preliminary setup + final WriteMessages write = new WriteMessages(); + final Future> first = write.add(AtomicWrite.apply(PersistentRepr.apply("first", 1, "foo", + null, false, kit.getRef(), "uuid"))); + final Future> second = write.add(AtomicWrite.apply(PersistentRepr.apply("second", 2, "foo", + null, false, kit.getRef(), "uuid"))); + actor.tell(write, ActorRef.noSender()); + assertFalse(getFuture(first).isPresent()); + assertFalse(getFuture(second).isPresent()); + + assertHighestSequenceNr(2); + assertReplayCount(2); + + deleteEntries(1); + + assertHighestSequenceNr(2); + assertReplayCount(1); + + // Restart actor + actor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + actor = actor(); + + // Check if state is retained + assertHighestSequenceNr(2); + assertReplayCount(1); + } + + @Test + public void testSegmentation() throws IOException { + // We want to have roughly three segments + final LargePayload payload = new LargePayload(); + + final WriteMessages write = new WriteMessages(); + final List>> requests = new ArrayList<>(); + + // Each payload is half of segment size, plus some overhead, should result in two segments being present + for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) { + requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(), + "uuid")))); + } + + actor.tell(write, ActorRef.noSender()); + requests.forEach(future -> assertFalse(getFuture(future).isPresent())); + + assertFileCount(2, 1); + + // Delete all but the last entry + deleteEntries(requests.size()); + + assertFileCount(1, 1); + } + + private ActorRef actor() { + return kit.childActorOf(SegmentedJournalActor.props("foo", DIRECTORY, StorageLevel.DISK, MESSAGE_SIZE, + SEGMENT_SIZE).withDispatcher(CallingThreadDispatcher.Id())); + } + + private void deleteEntries(final long deleteTo) { + final AsyncMessage delete = SegmentedJournalActor.deleteMessagesTo(deleteTo); + actor.tell(delete, ActorRef.noSender()); + assertNull(get(delete)); + } + + private void assertHighestSequenceNr(final long expected) { + AsyncMessage highest = SegmentedJournalActor.readHighestSequenceNr(0); + actor.tell(highest, ActorRef.noSender()); + assertEquals(expected, (long) get(highest)); + } + + private void assertReplayCount(final int expected) { + Consumer firstCallback = mock(Consumer.class); + doNothing().when(firstCallback).accept(any(PersistentRepr.class)); + AsyncMessage replay = SegmentedJournalActor.replayMessages(0, Long.MAX_VALUE, Long.MAX_VALUE, + firstCallback); + actor.tell(replay, ActorRef.noSender()); + assertNull(get(replay)); + verify(firstCallback, times(expected)).accept(any(PersistentRepr.class)); + } + + private static void assertFileCount(final long dataFiles, final long deleteFiles) throws IOException { + List contents = Files.list(DIRECTORY.toPath()).map(Path::toFile).collect(Collectors.toList()); + assertEquals(dataFiles, contents.stream().filter(file -> file.getName().startsWith("data-")).count()); + assertEquals(deleteFiles, contents.stream().filter(file -> file.getName().startsWith("delete-")).count()); + } + + private static T get(final AsyncMessage message) { + return getFuture(message.promise.future()); + } + + private static T getFuture(final Future future) { + assertTrue(future.isCompleted()); + return future.value().get().get(); + } + + private static final class LargePayload implements Serializable { + private static final long serialVersionUID = 1L; + + final byte[] bytes = new byte[MESSAGE_SIZE / 2]; + + } +} diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/test/resources/SegmentedFileJournalTest.conf b/opendaylight/md-sal/sal-akka-segmented-journal/src/test/resources/SegmentedFileJournalTest.conf new file mode 100644 index 0000000000..8fce93b097 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/test/resources/SegmentedFileJournalTest.conf @@ -0,0 +1,15 @@ +akka { + persistence { + journal { + plugin = "akka.persistence.journal.segmented-file" + + segmented-file { + class = "org.opendaylight.controller.akka.segjournal.SegmentedFileJournal" + root-directory = "target/segmented-journal" + max-entry-size = 8M + max-segment-size = 32M + memory-mapped = false + } + } + } +} diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf index 7b86f1c572..34b309b0b1 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf @@ -120,7 +120,27 @@ odl-cluster-data { } persistence { - journal.plugin = akka.persistence.journal.leveldb + journal { + plugin = akka.persistence.journal.leveldb + + # The following activates the alternative segmented file journal. Each persistent actor + # is stored in a separate directory, with multiple segment files. Segments are removed + # when they are not longer required. + # + # plugin = akka.persistence.journal.segmented-file + segmented-file { + class = "org.opendaylight.controller.akka.segjournal.SegmentedFileJournal" + # Root directory for segmented journal storage + root-directory = "segmented-journal" + # Maximum size of a single entry in the segmented journal + max-entry-size = 16M + # Maximum size of a segment + max-segment-size = 128M + # Map each segment into memory. Note that while this can improve performance, + # it will also place additional burden on system resources. + memory-mapped = false + } + } snapshot-store.local.class = "org.opendaylight.controller.cluster.persistence.LocalSnapshotStore" snapshot-store.plugin = akka.persistence.snapshot-store.local