<artifactId>sal-akka-raft</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <!-- Segmented Journal for Akka, including Kryo and asm-5.2 -->
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>sal-akka-segmented-journal</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo</artifactId>
+ <version>4.0.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>minlog</artifactId>
+ <version>1.3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>reflectasm</artifactId>
+ <version>1.11.8</version>
+ </dependency>
+ <dependency>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm</artifactId>
+ <version>5.2</version>
+ </dependency>
</dependencies>
</project>
<artifactId>sal-akka-raft-example</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-akka-segmented-journal</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>cds-access-api</artifactId>
<module>cds-access-api</module>
<module>cds-access-client</module>
<module>cds-dom-api</module>
+ <module>sal-akka-segmented-journal</module>
<!-- sal clustering configuration -->
<module>sal-clustering-config</module>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>mdsal-parent</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <relativePath>../parent</relativePath>
+ </parent>
+
+ <artifactId>sal-akka-segmented-journal</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <packaging>bundle</packaging>
+
+ <dependencies>
+
+ <!-- Akka -->
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_2.12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-persistence_2.12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-slf4j_2.12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-testkit_2.12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-persistence-tck_2.12</artifactId>
+ </dependency>
+
+ <!-- Codahale -->
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ </dependency>
+
+ <!-- Scala -->
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+
+ <!-- Atomix -->
+ <dependency>
+ <groupId>io.atomix</groupId>
+ <artifactId>atomix-storage</artifactId>
+ <version>3.1.5</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.atomix</groupId>
+ <artifactId>atomix-utils</artifactId>
+ <version>3.1.5</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+ <!-- atomix.io is using an older Guava, so let's embed it to prevent duplicates -->
+ <Embed-Dependency>*;inline=true;groupId=io.atomix</Embed-Dependency>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <scm>
+ <connection>scm:git:http://git.opendaylight.org/gerrit/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <tag>HEAD</tag>
+ <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL:Architecture:Clustering</url>
+ </scm>
+
+</project>
--- /dev/null
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.persistence.PersistentRepr;
+import io.atomix.storage.journal.JournalSegment;
+
+/**
+ * A single entry in the data journal. We do not store {@code persistenceId} for each entry, as that is a
+ * journal-invariant, nor do we store {@code sequenceNr}, as that information is maintained by {@link JournalSegment}'s
+ * index.
+ *
+ * @author Robert Varga
+ */
+abstract class DataJournalEntry {
+ static final class ToPersistence extends DataJournalEntry {
+ private final PersistentRepr repr;
+
+ ToPersistence(final PersistentRepr repr) {
+ this.repr = requireNonNull(repr);
+ }
+
+ PersistentRepr repr() {
+ return repr;
+ }
+ }
+
+ static final class FromPersistence extends DataJournalEntry {
+ private final String manifest;
+ private final String writerUuid;
+ private final Object payload;
+
+ FromPersistence(final String manifest, final String writerUuid, final Object payload) {
+ this.manifest = manifest;
+ this.writerUuid = requireNonNull(writerUuid);
+ this.payload = requireNonNull(payload);
+ }
+
+ PersistentRepr toRepr(final String persistenceId, final long sequenceNr) {
+ return PersistentRepr.apply(payload, sequenceNr, persistenceId, manifest, false, null, writerUuid);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.persistence.PersistentRepr;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import java.util.concurrent.Callable;
+import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
+import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
+
+/**
+ * Kryo serializer for {@link DataJournalEntry}. Each {@link SegmentedJournalActor} has its own instance, as well as
+ * a nested JavaSerializer to handle the payload.
+ *
+ * <p>
+ * Since we are persisting only parts of {@link PersistentRepr}, this class asymmetric by design:
+ * {@link #write(Kryo, Output, DataJournalEntry)} only accepts {@link ToPersistence} subclass, which is a wrapper
+ * around a {@link PersistentRepr}, while {@link #read(Kryo, Input, Class)} produces an {@link FromPersistence}, which
+ * needs further processing to reconstruct a {@link PersistentRepr}.
+ *
+ * @author Robert Varga
+ */
+final class DataJournalEntrySerializer extends Serializer<DataJournalEntry> {
+ private final JavaSerializer serializer = new JavaSerializer();
+ private final ExtendedActorSystem actorSystem;
+
+ DataJournalEntrySerializer(final ActorSystem actorSystem) {
+ this.actorSystem = requireNonNull((ExtendedActorSystem) actorSystem);
+ }
+
+ @Override
+ public void write(final Kryo kryo, final Output output, final DataJournalEntry object) {
+ verify(object instanceof ToPersistence);
+ final PersistentRepr repr = ((ToPersistence) object).repr();
+ output.writeString(repr.manifest());
+ output.writeString(repr.writerUuid());
+ serializer.write(kryo, output, repr.payload());
+ }
+
+ @Override
+ public DataJournalEntry read(final Kryo kryo, final Input input, final Class<DataJournalEntry> type) {
+ final String manifest = input.readString();
+ final String uuid = input.readString();
+ final Object payload = akka.serialization.JavaSerializer.currentSystem().withValue(actorSystem,
+ (Callable<Object>)() -> serializer.read(kryo, input, type));
+ return new FromPersistence(manifest, uuid, payload);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static akka.actor.ActorRef.noSender;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import akka.persistence.AtomicWrite;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigMemorySize;
+import io.atomix.storage.StorageLevel;
+import io.atomix.storage.journal.SegmentedJournal;
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.AsyncMessage;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * An Akka persistence journal implementation on top of {@link SegmentedJournal}. This actor represents aggregation
+ * of multiple journals and performs a receptionist job between Akka and invidual per-persistenceId actors. See
+ * {@link SegmentedJournalActor} for details on how the persistence works.
+ *
+ * @author Robert Varga
+ */
+public class SegmentedFileJournal extends AsyncWriteJournal {
+ public static final String STORAGE_ROOT_DIRECTORY = "root-directory";
+ public static final String STORAGE_MAX_ENTRY_SIZE = "max-entry-size";
+ public static final int STORAGE_MAX_ENTRY_SIZE_DEFAULT = 16 * 1024 * 1024;
+ public static final String STORAGE_MAX_SEGMENT_SIZE = "max-segment-size";
+ public static final int STORAGE_MAX_SEGMENT_SIZE_DEFAULT = STORAGE_MAX_ENTRY_SIZE_DEFAULT * 8;
+ public static final String STORAGE_MEMORY_MAPPED = "memory-mapped";
+
+ private static final Logger LOG = LoggerFactory.getLogger(SegmentedFileJournal.class);
+
+ private final Map<String, ActorRef> handlers = new HashMap<>();
+ private final File rootDir;
+ private final StorageLevel storage;
+ private final int maxEntrySize;
+ private final int maxSegmentSize;
+
+ public SegmentedFileJournal(final Config config) {
+ rootDir = new File(config.getString(STORAGE_ROOT_DIRECTORY));
+ if (!rootDir.exists()) {
+ LOG.debug("Creating directory {}", rootDir);
+ checkState(rootDir.mkdirs(), "Failed to create root directory %s", rootDir);
+ }
+ checkArgument(rootDir.isDirectory(), "%s is not a directory", rootDir);
+
+ maxEntrySize = getBytes(config, STORAGE_MAX_ENTRY_SIZE, STORAGE_MAX_ENTRY_SIZE_DEFAULT);
+ maxSegmentSize = getBytes(config, STORAGE_MAX_SEGMENT_SIZE, STORAGE_MAX_SEGMENT_SIZE_DEFAULT);
+
+ if (config.hasPath(STORAGE_MEMORY_MAPPED)) {
+ storage = config.getBoolean(STORAGE_MEMORY_MAPPED) ? StorageLevel.MAPPED : StorageLevel.DISK;
+ } else {
+ storage = StorageLevel.DISK;
+ }
+
+ LOG.info("Initialized with root directory {} with storage {}", rootDir, storage);
+ }
+
+ @Override
+ public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
+ final Map<ActorRef, WriteMessages> map = new HashMap<>();
+ final List<Future<Optional<Exception>>> result = new ArrayList<>();
+
+ for (AtomicWrite message : messages) {
+ final String persistenceId = message.persistenceId();
+ final ActorRef handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
+ result.add(map.computeIfAbsent(handler, key -> new WriteMessages()).add(message));
+ }
+
+ // Send requests to actors and zip the futures back
+ map.forEach((handler, message) -> {
+ LOG.trace("Sending {} to {}", message, handler);
+ handler.tell(message, noSender());
+ });
+ return Futures.sequence(result, context().dispatcher());
+ }
+
+ @Override
+ public Future<Void> doAsyncDeleteMessagesTo(final String persistenceId, final long toSequenceNr) {
+ return delegateMessage(persistenceId, SegmentedJournalActor.deleteMessagesTo(toSequenceNr));
+ }
+
+ @Override
+ public Future<Void> doAsyncReplayMessages(final String persistenceId, final long fromSequenceNr,
+ final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
+ return delegateMessage(persistenceId,
+ SegmentedJournalActor.replayMessages(fromSequenceNr, toSequenceNr, max, replayCallback));
+ }
+
+ @Override
+ public Future<Long> doAsyncReadHighestSequenceNr(final String persistenceId, final long fromSequenceNr) {
+ return delegateMessage(handlers.computeIfAbsent(persistenceId, this::createHandler),
+ SegmentedJournalActor.readHighestSequenceNr(fromSequenceNr));
+ }
+
+ private ActorRef createHandler(final String persistenceId) {
+ final String directoryName = Base64.getUrlEncoder().encodeToString(persistenceId.getBytes(
+ StandardCharsets.UTF_8));
+ final File directory = new File(rootDir, directoryName);
+ LOG.debug("Creating handler for {} in directory {}", persistenceId, directory);
+
+ final ActorRef handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
+ maxEntrySize, maxSegmentSize));
+ LOG.debug("Directory {} handled by {}", directory, handler);
+
+ final ActorRef prev = handlers.putIfAbsent(persistenceId, handler);
+ verify(prev == null, "Duplicate handler for %s, already handled by %s", persistenceId, prev);
+ return handler;
+ }
+
+ private <T> Future<T> delegateMessage(final String persistenceId, final AsyncMessage<T> message) {
+ final ActorRef handler = handlers.get(persistenceId);
+ if (handler == null) {
+ return Futures.failed(new IllegalStateException("Cannot find handler for " + persistenceId));
+ }
+
+ return delegateMessage(handler, message);
+ }
+
+ private static <T> Future<T> delegateMessage(final ActorRef handler, final AsyncMessage<T> message) {
+ LOG.trace("Delegating {} to {}", message, handler);
+ handler.tell(message, noSender());
+ return message.promise.future();
+ }
+
+ private static int getBytes(final Config config, final String path, final int defaultValue) {
+ if (!config.hasPath(path)) {
+ return defaultValue;
+ }
+ final ConfigMemorySize value = config.getMemorySize(path);
+ final long result = value.toBytes();
+ checkArgument(result <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
+ return (int) result;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.AbstractActor;
+import akka.actor.Props;
+import akka.persistence.AtomicWrite;
+import akka.persistence.PersistentRepr;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
+import com.codahale.metrics.Timer;
+import com.google.common.base.MoreObjects;
+import io.atomix.storage.StorageLevel;
+import io.atomix.storage.journal.Indexed;
+import io.atomix.storage.journal.SegmentedJournal;
+import io.atomix.storage.journal.SegmentedJournalReader;
+import io.atomix.storage.journal.SegmentedJournalWriter;
+import io.atomix.utils.serializer.Namespace;
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
+import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.Iterator;
+import scala.collection.SeqLike;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+/**
+ * This actor handles a single PersistentActor's journal. The journal is split into two {@link SegmentedJournal}s:
+ * <ul>
+ * <li>A memory-mapped data journal, containing actual data entries</li>
+ * <li>A simple file journal, containing sequence numbers of last deleted entry</li>
+ * </ul>
+ *
+ * <p>
+ * This is a conscious design decision to minimize the amount of data that is being stored in the data journal while
+ * speeding up normal operations. Since the SegmentedJournal is an append-only linear log and Akka requires the ability
+ * to delete persistence entries, we need ability to mark a subset of a SegmentedJournal as deleted. While we could
+ * treat such delete requests as normal events, this leads to a mismatch between SegmentedJournal indices (as exposed by
+ * {@link Indexed}) and Akka sequence numbers -- requiring us to potentially perform costly deserialization to find the
+ * index corresponding to a particular sequence number, or maintain moderately-complex logic and data structures to
+ * perform that mapping in sub-linear time complexity.
+ *
+ * <p>
+ * Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit
+ * mapping information. The only additional information we need to maintain is the last deleted sequence number.
+ *
+ * @author Robert Varga
+ */
+final class SegmentedJournalActor extends AbstractActor {
+ abstract static class AsyncMessage<T> {
+ final Promise<T> promise = Promise.apply();
+ }
+
+ private static final class ReadHighestSequenceNr extends AsyncMessage<Long> {
+ private final long fromSequenceNr;
+
+ ReadHighestSequenceNr(final long fromSequenceNr) {
+ this.fromSequenceNr = fromSequenceNr;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("fromSequenceNr", fromSequenceNr).toString();
+ }
+ }
+
+ private static final class ReplayMessages extends AsyncMessage<Void> {
+ private final long fromSequenceNr;
+ private final long toSequenceNr;
+ private final long max;
+ private final Consumer<PersistentRepr> replayCallback;
+
+ ReplayMessages(final long fromSequenceNr,
+ final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
+ this.fromSequenceNr = fromSequenceNr;
+ this.toSequenceNr = toSequenceNr;
+ this.max = max;
+ this.replayCallback = requireNonNull(replayCallback);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("fromSequenceNr", fromSequenceNr)
+ .add("toSequenceNr", toSequenceNr).add("max", max).toString();
+ }
+ }
+
+ static final class WriteMessages {
+ private final List<AtomicWrite> requests = new ArrayList<>();
+ private final List<Promise<Optional<Exception>>> results = new ArrayList<>();
+
+ Future<Optional<Exception>> add(final AtomicWrite write) {
+ final Promise<Optional<Exception>> promise = Promise.apply();
+ requests.add(write);
+ results.add(promise);
+ return promise.future();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("requests", requests).toString();
+ }
+ }
+
+ private static final class DeleteMessagesTo extends AsyncMessage<Void> {
+ final long toSequenceNr;
+
+ DeleteMessagesTo(final long toSequenceNr) {
+ this.toSequenceNr = toSequenceNr;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("toSequenceNr", toSequenceNr).toString();
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
+ private static final Namespace DELETE_NAMESPACE = Namespace.builder().register(Long.class).build();
+ private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
+
+ // Tracks the time it took us to write a batch of messages
+ private final Timer batchWriteTime = new Timer();
+ // Tracks the number of individual messages written
+ private final Meter messageWriteCount = new Meter();
+ // Tracks the size distribution of messages for last 5 minutes
+ private final Histogram messageSize = new Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.MINUTES));
+
+ private final String persistenceId;
+ private final StorageLevel storage;
+ private final int maxSegmentSize;
+ private final int maxEntrySize;
+ private final File directory;
+
+ private SegmentedJournal<DataJournalEntry> dataJournal;
+ private SegmentedJournal<Long> deleteJournal;
+ private long lastDelete;
+
+ // Tracks largest message size we have observed either during recovery or during write
+ private int largestObservedSize;
+
+ SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
+ final int maxEntrySize, final int maxSegmentSize) {
+ this.persistenceId = requireNonNull(persistenceId);
+ this.directory = requireNonNull(directory);
+ this.storage = requireNonNull(storage);
+ this.maxEntrySize = maxEntrySize;
+ this.maxSegmentSize = maxSegmentSize;
+ }
+
+ static Props props(final String persistenceId, final File directory, final StorageLevel storage,
+ final int maxEntrySize, final int maxSegmentSize) {
+ return Props.create(SegmentedJournalActor.class, requireNonNull(persistenceId), directory, storage,
+ maxEntrySize, maxSegmentSize);
+ }
+
+ @Override
+ public Receive createReceive() {
+ return receiveBuilder()
+ .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo)
+ .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr)
+ .match(ReplayMessages.class, this::handleReplayMessages)
+ .match(WriteMessages.class, this::handleWriteMessages)
+ .matchAny(this::handleUnknown)
+ .build();
+ }
+
+ @Override
+ public void preStart() throws Exception {
+ LOG.debug("{}: actor starting", persistenceId);
+ super.preStart();
+ }
+
+ @Override
+ public void postStop() throws Exception {
+ LOG.debug("{}: actor stopping", persistenceId);
+ if (dataJournal != null) {
+ dataJournal.close();
+ LOG.debug("{}: data journal closed", persistenceId);
+ dataJournal = null;
+ }
+ if (deleteJournal != null) {
+ deleteJournal.close();
+ LOG.debug("{}: delete journal closed", persistenceId);
+ deleteJournal = null;
+ }
+ LOG.debug("{}: actor stopped", persistenceId);
+ super.postStop();
+ }
+
+ static AsyncMessage<Void> deleteMessagesTo(final long toSequenceNr) {
+ return new DeleteMessagesTo(toSequenceNr);
+ }
+
+ static AsyncMessage<Long> readHighestSequenceNr(final long fromSequenceNr) {
+ return new ReadHighestSequenceNr(fromSequenceNr);
+ }
+
+ static AsyncMessage<Void> replayMessages(final long fromSequenceNr, final long toSequenceNr, final long max,
+ final Consumer<PersistentRepr> replayCallback) {
+ return new ReplayMessages(fromSequenceNr, toSequenceNr, max, replayCallback);
+ }
+
+ private void handleDeleteMessagesTo(final DeleteMessagesTo message) {
+ ensureOpen();
+
+ LOG.debug("{}: delete messages {}", persistenceId, message);
+ final long to = Long.min(dataJournal.writer().getLastIndex(), message.toSequenceNr);
+ LOG.debug("{}: adjusted delete to {}", persistenceId, to);
+
+ if (lastDelete < to) {
+ LOG.debug("{}: deleting entries up to {}", persistenceId, to);
+
+ lastDelete = to;
+ final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
+ final Indexed<Long> entry = deleteWriter.append(lastDelete);
+ deleteWriter.commit(entry.index());
+ dataJournal.writer().commit(lastDelete);
+
+ LOG.debug("{}: compaction started", persistenceId);
+ dataJournal.compact(lastDelete);
+ deleteJournal.compact(entry.index());
+ LOG.debug("{}: compaction finished", persistenceId);
+ } else {
+ LOG.debug("{}: entries up to {} already deleted", persistenceId, lastDelete);
+ }
+
+ message.promise.success(null);
+ }
+
+ @SuppressWarnings("checkstyle:illegalCatch")
+ private void handleReadHighestSequenceNr(final ReadHighestSequenceNr message) {
+ LOG.debug("{}: looking for highest sequence on {}", persistenceId, message);
+ final Long sequence;
+ if (directory.isDirectory()) {
+ ensureOpen();
+ sequence = dataJournal.writer().getLastIndex();
+ } else {
+ sequence = 0L;
+ }
+
+ LOG.debug("{}: highest sequence is {}", message, sequence);
+ message.promise.success(sequence);
+ }
+
+ @SuppressWarnings("checkstyle:illegalCatch")
+ private void handleReplayMessages(final ReplayMessages message) {
+ LOG.debug("{}: replaying messages {}", persistenceId, message);
+ ensureOpen();
+
+ final long from = Long.max(lastDelete + 1, message.fromSequenceNr);
+ LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from);
+
+ try (SegmentedJournalReader<DataJournalEntry> reader = dataJournal.openReader(from)) {
+ int count = 0;
+ while (reader.hasNext() && count < message.max) {
+ final Indexed<DataJournalEntry> next = reader.next();
+ if (next.index() > message.toSequenceNr) {
+ break;
+ }
+
+ LOG.trace("{}: replay {}", persistenceId, next);
+ updateLargestSize(next.size());
+ final DataJournalEntry entry = next.entry();
+ verify(entry instanceof FromPersistence, "Unexpected entry %s", entry);
+
+ final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index());
+ LOG.debug("{}: replaying {}", persistenceId, repr);
+ message.replayCallback.accept(repr);
+ count++;
+ }
+ LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
+ } catch (Exception e) {
+ LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e);
+ message.promise.failure(e);
+ } finally {
+ message.promise.success(null);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:illegalCatch")
+ private void handleWriteMessages(final WriteMessages message) {
+ ensureOpen();
+
+ final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
+ final long startTicks = System.nanoTime();
+ final int count = message.requests.size();
+ final long start = writer.getLastIndex();
+
+ for (int i = 0; i < count; ++i) {
+ final long mark = writer.getLastIndex();
+ try {
+ writeRequest(writer, message.requests.get(i));
+ } catch (Exception e) {
+ LOG.warn("{}: failed to write out request", persistenceId, e);
+ message.results.get(i).success(Optional.of(e));
+ writer.truncate(mark);
+ continue;
+ }
+
+ message.results.get(i).success(Optional.empty());
+ }
+ writer.flush();
+ batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
+ messageWriteCount.mark(writer.getLastIndex() - start);
+ }
+
+ private void writeRequest(final SegmentedJournalWriter<DataJournalEntry> writer, final AtomicWrite request) {
+ // Cast is needed for Eclipse because of https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276
+ final Iterator<PersistentRepr> it = ((SeqLike<PersistentRepr, ?>) request.payload()).iterator();
+ while (it.hasNext()) {
+ final PersistentRepr repr = it.next();
+ final Object payload = repr.payload();
+ if (!(payload instanceof Serializable)) {
+ throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass());
+ }
+
+ final int size = writer.append(new ToPersistence(repr)).size();
+ messageSize.update(size);
+ updateLargestSize(size);
+ }
+ }
+
+ private void handleUnknown(final Object message) {
+ LOG.error("{}: Received unknown message {}", persistenceId, message);
+ }
+
+ private void updateLargestSize(final int size) {
+ if (size > largestObservedSize) {
+ largestObservedSize = size;
+ }
+ }
+
+ private void ensureOpen() {
+ if (dataJournal != null) {
+ verifyNotNull(deleteJournal);
+ return;
+ }
+
+ deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
+ .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
+ final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
+ lastDelete = lastEntry == null ? 0 : lastEntry.index();
+
+ dataJournal = SegmentedJournal.<DataJournalEntry>builder()
+ .withStorageLevel(storage).withDirectory(directory).withName("data")
+ .withNamespace(Namespace.builder()
+ .register(new DataJournalEntrySerializer(context().system()),
+ FromPersistence.class, ToPersistence.class)
+ .build())
+ .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize)
+ .build();
+ final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
+ writer.commit(lastDelete);
+ LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, writer.getLastIndex(),
+ lastDelete);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import akka.persistence.japi.journal.JavaJournalSpec;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import org.apache.commons.io.FileUtils;
+import org.junit.runner.RunWith;
+import org.scalatest.junit.JUnitRunner;
+
+@RunWith(JUnitRunner.class)
+public class SegmentedFileJournalSpecTest extends JavaJournalSpec {
+ private static final long serialVersionUID = 1L;
+
+ private static final File JOURNAL_DIR = new File("target/segmented-journal");
+
+ public SegmentedFileJournalSpecTest() {
+ super(ConfigFactory.load("SegmentedFileJournalTest.conf"));
+ }
+
+ @Override
+ public void beforeAll() {
+ FileUtils.deleteQuietly(JOURNAL_DIR);
+ super.beforeAll();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.persistence.AtomicWrite;
+import akka.persistence.PersistentRepr;
+import akka.testkit.CallingThreadDispatcher;
+import akka.testkit.javadsl.TestKit;
+import io.atomix.storage.StorageLevel;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.AsyncMessage;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
+import scala.concurrent.Future;
+
+public class SegmentedFileJournalTest {
+ private static final File DIRECTORY = new File("target/sfj-test");
+ private static final int SEGMENT_SIZE = 1024 * 1024;
+ private static final int MESSAGE_SIZE = 512 * 1024;
+
+ private static ActorSystem SYSTEM;
+
+ private TestKit kit;
+ private ActorRef actor;
+
+ @BeforeClass
+ public static void beforeClass() {
+ SYSTEM = ActorSystem.create("test");
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ TestKit.shutdownActorSystem(SYSTEM);
+ SYSTEM = null;
+ }
+
+ @Before
+ public void before() {
+ kit = new TestKit(SYSTEM);
+ FileUtils.deleteQuietly(DIRECTORY);
+ actor = actor();
+ }
+
+ @After
+ public void after() {
+ actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+ @Test
+ public void testDeleteAfterStop() {
+ // Preliminary setup
+ final WriteMessages write = new WriteMessages();
+ final Future<Optional<Exception>> first = write.add(AtomicWrite.apply(PersistentRepr.apply("first", 1, "foo",
+ null, false, kit.getRef(), "uuid")));
+ final Future<Optional<Exception>> second = write.add(AtomicWrite.apply(PersistentRepr.apply("second", 2, "foo",
+ null, false, kit.getRef(), "uuid")));
+ actor.tell(write, ActorRef.noSender());
+ assertFalse(getFuture(first).isPresent());
+ assertFalse(getFuture(second).isPresent());
+
+ assertHighestSequenceNr(2);
+ assertReplayCount(2);
+
+ deleteEntries(1);
+
+ assertHighestSequenceNr(2);
+ assertReplayCount(1);
+
+ // Restart actor
+ actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ actor = actor();
+
+ // Check if state is retained
+ assertHighestSequenceNr(2);
+ assertReplayCount(1);
+ }
+
+ @Test
+ public void testSegmentation() throws IOException {
+ // We want to have roughly three segments
+ final LargePayload payload = new LargePayload();
+
+ final WriteMessages write = new WriteMessages();
+ final List<Future<Optional<Exception>>> requests = new ArrayList<>();
+
+ // Each payload is half of segment size, plus some overhead, should result in two segments being present
+ for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) {
+ requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(),
+ "uuid"))));
+ }
+
+ actor.tell(write, ActorRef.noSender());
+ requests.forEach(future -> assertFalse(getFuture(future).isPresent()));
+
+ assertFileCount(2, 1);
+
+ // Delete all but the last entry
+ deleteEntries(requests.size());
+
+ assertFileCount(1, 1);
+ }
+
+ private ActorRef actor() {
+ return kit.childActorOf(SegmentedJournalActor.props("foo", DIRECTORY, StorageLevel.DISK, MESSAGE_SIZE,
+ SEGMENT_SIZE).withDispatcher(CallingThreadDispatcher.Id()));
+ }
+
+ private void deleteEntries(final long deleteTo) {
+ final AsyncMessage<Void> delete = SegmentedJournalActor.deleteMessagesTo(deleteTo);
+ actor.tell(delete, ActorRef.noSender());
+ assertNull(get(delete));
+ }
+
+ private void assertHighestSequenceNr(final long expected) {
+ AsyncMessage<Long> highest = SegmentedJournalActor.readHighestSequenceNr(0);
+ actor.tell(highest, ActorRef.noSender());
+ assertEquals(expected, (long) get(highest));
+ }
+
+ private void assertReplayCount(final int expected) {
+ Consumer<PersistentRepr> firstCallback = mock(Consumer.class);
+ doNothing().when(firstCallback).accept(any(PersistentRepr.class));
+ AsyncMessage<Void> replay = SegmentedJournalActor.replayMessages(0, Long.MAX_VALUE, Long.MAX_VALUE,
+ firstCallback);
+ actor.tell(replay, ActorRef.noSender());
+ assertNull(get(replay));
+ verify(firstCallback, times(expected)).accept(any(PersistentRepr.class));
+ }
+
+ private static void assertFileCount(final long dataFiles, final long deleteFiles) throws IOException {
+ List<File> contents = Files.list(DIRECTORY.toPath()).map(Path::toFile).collect(Collectors.toList());
+ assertEquals(dataFiles, contents.stream().filter(file -> file.getName().startsWith("data-")).count());
+ assertEquals(deleteFiles, contents.stream().filter(file -> file.getName().startsWith("delete-")).count());
+ }
+
+ private static <T> T get(final AsyncMessage<T> message) {
+ return getFuture(message.promise.future());
+ }
+
+ private static <T> T getFuture(final Future<T> future) {
+ assertTrue(future.isCompleted());
+ return future.value().get().get();
+ }
+
+ private static final class LargePayload implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ final byte[] bytes = new byte[MESSAGE_SIZE / 2];
+
+ }
+}
--- /dev/null
+akka {
+ persistence {
+ journal {
+ plugin = "akka.persistence.journal.segmented-file"
+
+ segmented-file {
+ class = "org.opendaylight.controller.akka.segjournal.SegmentedFileJournal"
+ root-directory = "target/segmented-journal"
+ max-entry-size = 8M
+ max-segment-size = 32M
+ memory-mapped = false
+ }
+ }
+ }
+}
}
persistence {
- journal.plugin = akka.persistence.journal.leveldb
+ journal {
+ plugin = akka.persistence.journal.leveldb
+
+ # The following activates the alternative segmented file journal. Each persistent actor
+ # is stored in a separate directory, with multiple segment files. Segments are removed
+ # when they are not longer required.
+ #
+ # plugin = akka.persistence.journal.segmented-file
+ segmented-file {
+ class = "org.opendaylight.controller.akka.segjournal.SegmentedFileJournal"
+ # Root directory for segmented journal storage
+ root-directory = "segmented-journal"
+ # Maximum size of a single entry in the segmented journal
+ max-entry-size = 16M
+ # Maximum size of a segment
+ max-segment-size = 128M
+ # Map each segment into memory. Note that while this can improve performance,
+ # it will also place additional burden on system resources.
+ memory-mapped = false
+ }
+ }
snapshot-store.local.class = "org.opendaylight.controller.cluster.persistence.LocalSnapshotStore"
snapshot-store.plugin = akka.persistence.snapshot-store.local