Add SegmentedFileJournal 68/80368/10
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 4 Feb 2019 09:09:57 +0000 (10:09 +0100)
committerRobert Varga <nite@hq.sk>
Wed, 27 Feb 2019 21:37:48 +0000 (21:37 +0000)
This adds an alternative Akka persistence journal implementation
based on Atomix's SegmentedJournal.

SegmentedJournal is a linear append-only log of entries, each of
which has a incrementing 63bit index. This logical structure is
stored in rolling segments -- i.e. entries are appended to current
segment until it is full, at which point a new segment is allocated
and writeout continues there. Old segments can be freed as long as
they are not needed.

This layout makes it good match to how Akka's persistence works,
with the one exception that the SegmentedJournal cannot explicitly
delete entries from the head of the journal.

SegmentedFileJournal allocates one SegmentedJournal for each
persistenceId it encounters and uses a dedicated actor for it. This
provides a major simplification in the implementation as well as
allows for concurrent persistence of multiple PersistentActors.

JIRA: CONTROLLER-1884
Change-Id: I78140b154bab44a3e17d5ffb76b040c62add3204
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
12 files changed:
features/mdsal/odl-mdsal-clustering-commons/pom.xml
opendaylight/md-sal/mdsal-artifacts/pom.xml
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-akka-segmented-journal/pom.xml [new file with mode: 0644]
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntrySerializer.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalSpecTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-segmented-journal/src/test/resources/SegmentedFileJournalTest.conf [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf

index 89d35c0..b5cf579 100644 (file)
             <artifactId>sal-akka-raft</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <!-- Segmented Journal for Akka, including Kryo and asm-5.2 -->
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>sal-akka-segmented-journal</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo</artifactId>
+            <version>4.0.2</version>
+        </dependency>
+        <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>minlog</artifactId>
+            <version>1.3.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>reflectasm</artifactId>
+            <version>1.11.8</version>
+        </dependency>
+        <dependency>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm</artifactId>
+            <version>5.2</version>
+        </dependency>
     </dependencies>
 
 </project>
index fd9b642..51aaca2 100644 (file)
                 <artifactId>sal-akka-raft-example</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.opendaylight.controller</groupId>
+                <artifactId>sal-akka-segmented-journal</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.opendaylight.controller</groupId>
                 <artifactId>cds-access-api</artifactId>
index 11607a3..7e07837 100644 (file)
@@ -51,6 +51,7 @@
     <module>cds-access-api</module>
     <module>cds-access-client</module>
     <module>cds-dom-api</module>
+    <module>sal-akka-segmented-journal</module>
 
     <!-- sal clustering configuration -->
     <module>sal-clustering-config</module>
diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/pom.xml b/opendaylight/md-sal/sal-akka-segmented-journal/pom.xml
new file mode 100644 (file)
index 0000000..ba6b7ed
--- /dev/null
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- vi: set et smarttab sw=4 tabstop=4: -->
+<!--
+Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>mdsal-parent</artifactId>
+        <version>1.10.0-SNAPSHOT</version>
+        <relativePath>../parent</relativePath>
+    </parent>
+
+    <artifactId>sal-akka-segmented-journal</artifactId>
+    <version>1.10.0-SNAPSHOT</version>
+    <packaging>bundle</packaging>
+
+    <dependencies>
+
+        <!-- Akka -->
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-actor_2.12</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-persistence_2.12</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-slf4j_2.12</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-testkit_2.12</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-persistence-tck_2.12</artifactId>
+        </dependency>
+
+        <!-- Codahale -->
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+
+        <!-- Scala -->
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+        </dependency>
+
+        <!-- Atomix -->
+        <dependency>
+            <groupId>io.atomix</groupId>
+            <artifactId>atomix-storage</artifactId>
+            <version>3.1.5</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.atomix</groupId>
+            <artifactId>atomix-utils</artifactId>
+            <version>3.1.5</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+                        <!-- atomix.io is using an older Guava, so let's embed it to prevent duplicates -->
+                        <Embed-Dependency>*;inline=true;groupId=io.atomix</Embed-Dependency>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <scm>
+        <connection>scm:git:http://git.opendaylight.org/gerrit/controller.git</connection>
+        <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+        <tag>HEAD</tag>
+        <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL:Architecture:Clustering</url>
+    </scm>
+
+</project>
diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntry.java
new file mode 100644 (file)
index 0000000..0713c02
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.persistence.PersistentRepr;
+import io.atomix.storage.journal.JournalSegment;
+
+/**
+ * A single entry in the data journal. We do not store {@code persistenceId} for each entry, as that is a
+ * journal-invariant, nor do we store {@code sequenceNr}, as that information is maintained by {@link JournalSegment}'s
+ * index.
+ *
+ * @author Robert Varga
+ */
+abstract class DataJournalEntry {
+    static final class ToPersistence extends DataJournalEntry {
+        private final PersistentRepr repr;
+
+        ToPersistence(final PersistentRepr repr) {
+            this.repr = requireNonNull(repr);
+        }
+
+        PersistentRepr repr() {
+            return repr;
+        }
+    }
+
+    static final class FromPersistence extends DataJournalEntry {
+        private final String manifest;
+        private final String writerUuid;
+        private final Object payload;
+
+        FromPersistence(final String manifest, final String writerUuid, final Object payload) {
+            this.manifest = manifest;
+            this.writerUuid = requireNonNull(writerUuid);
+            this.payload = requireNonNull(payload);
+        }
+
+        PersistentRepr toRepr(final String persistenceId, final long sequenceNr) {
+            return PersistentRepr.apply(payload, sequenceNr, persistenceId, manifest, false, null, writerUuid);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntrySerializer.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalEntrySerializer.java
new file mode 100644 (file)
index 0000000..e248262
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.persistence.PersistentRepr;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import java.util.concurrent.Callable;
+import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
+import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
+
+/**
+ * Kryo serializer for {@link DataJournalEntry}. Each {@link SegmentedJournalActor} has its own instance, as well as
+ * a nested JavaSerializer to handle the payload.
+ *
+ * <p>
+ * Since we are persisting only parts of {@link PersistentRepr}, this class asymmetric by design:
+ * {@link #write(Kryo, Output, DataJournalEntry)} only accepts {@link ToPersistence} subclass, which is a wrapper
+ * around a {@link PersistentRepr}, while {@link #read(Kryo, Input, Class)} produces an {@link FromPersistence}, which
+ * needs further processing to reconstruct a {@link PersistentRepr}.
+ *
+ * @author Robert Varga
+ */
+final class DataJournalEntrySerializer extends Serializer<DataJournalEntry> {
+    private final JavaSerializer serializer = new JavaSerializer();
+    private final ExtendedActorSystem actorSystem;
+
+    DataJournalEntrySerializer(final ActorSystem actorSystem) {
+        this.actorSystem = requireNonNull((ExtendedActorSystem) actorSystem);
+    }
+
+    @Override
+    public void write(final Kryo kryo, final Output output, final DataJournalEntry object) {
+        verify(object instanceof ToPersistence);
+        final PersistentRepr repr = ((ToPersistence) object).repr();
+        output.writeString(repr.manifest());
+        output.writeString(repr.writerUuid());
+        serializer.write(kryo, output, repr.payload());
+    }
+
+    @Override
+    public DataJournalEntry read(final Kryo kryo, final Input input, final Class<DataJournalEntry> type) {
+        final String manifest = input.readString();
+        final String uuid = input.readString();
+        final Object payload = akka.serialization.JavaSerializer.currentSystem().withValue(actorSystem,
+            (Callable<Object>)() -> serializer.read(kryo, input, type));
+        return new FromPersistence(manifest, uuid, payload);
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java
new file mode 100644 (file)
index 0000000..a0bffa2
--- /dev/null
@@ -0,0 +1,158 @@
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static akka.actor.ActorRef.noSender;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import akka.persistence.AtomicWrite;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigMemorySize;
+import io.atomix.storage.StorageLevel;
+import io.atomix.storage.journal.SegmentedJournal;
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.AsyncMessage;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * An Akka persistence journal implementation on top of {@link SegmentedJournal}. This actor represents aggregation
+ * of multiple journals and performs a receptionist job between Akka and invidual per-persistenceId actors. See
+ * {@link SegmentedJournalActor} for details on how the persistence works.
+ *
+ * @author Robert Varga
+ */
+public class SegmentedFileJournal extends AsyncWriteJournal {
+    public static final String STORAGE_ROOT_DIRECTORY = "root-directory";
+    public static final String STORAGE_MAX_ENTRY_SIZE = "max-entry-size";
+    public static final int STORAGE_MAX_ENTRY_SIZE_DEFAULT = 16 * 1024 * 1024;
+    public static final String STORAGE_MAX_SEGMENT_SIZE = "max-segment-size";
+    public static final int STORAGE_MAX_SEGMENT_SIZE_DEFAULT = STORAGE_MAX_ENTRY_SIZE_DEFAULT * 8;
+    public static final String STORAGE_MEMORY_MAPPED = "memory-mapped";
+
+    private static final Logger LOG = LoggerFactory.getLogger(SegmentedFileJournal.class);
+
+    private final Map<String, ActorRef> handlers = new HashMap<>();
+    private final File rootDir;
+    private final StorageLevel storage;
+    private final int maxEntrySize;
+    private final int maxSegmentSize;
+
+    public SegmentedFileJournal(final Config config) {
+        rootDir = new File(config.getString(STORAGE_ROOT_DIRECTORY));
+        if (!rootDir.exists()) {
+            LOG.debug("Creating directory {}", rootDir);
+            checkState(rootDir.mkdirs(), "Failed to create root directory %s", rootDir);
+        }
+        checkArgument(rootDir.isDirectory(), "%s is not a directory", rootDir);
+
+        maxEntrySize = getBytes(config, STORAGE_MAX_ENTRY_SIZE, STORAGE_MAX_ENTRY_SIZE_DEFAULT);
+        maxSegmentSize = getBytes(config, STORAGE_MAX_SEGMENT_SIZE, STORAGE_MAX_SEGMENT_SIZE_DEFAULT);
+
+        if (config.hasPath(STORAGE_MEMORY_MAPPED)) {
+            storage = config.getBoolean(STORAGE_MEMORY_MAPPED) ? StorageLevel.MAPPED : StorageLevel.DISK;
+        } else {
+            storage = StorageLevel.DISK;
+        }
+
+        LOG.info("Initialized with root directory {} with storage {}", rootDir, storage);
+    }
+
+    @Override
+    public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
+        final Map<ActorRef, WriteMessages> map = new HashMap<>();
+        final List<Future<Optional<Exception>>> result = new ArrayList<>();
+
+        for (AtomicWrite message : messages) {
+            final String persistenceId = message.persistenceId();
+            final ActorRef handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
+            result.add(map.computeIfAbsent(handler, key -> new WriteMessages()).add(message));
+        }
+
+        // Send requests to actors and zip the futures back
+        map.forEach((handler, message) -> {
+            LOG.trace("Sending {} to {}", message, handler);
+            handler.tell(message, noSender());
+        });
+        return Futures.sequence(result, context().dispatcher());
+    }
+
+    @Override
+    public Future<Void> doAsyncDeleteMessagesTo(final String persistenceId, final long toSequenceNr) {
+        return delegateMessage(persistenceId, SegmentedJournalActor.deleteMessagesTo(toSequenceNr));
+    }
+
+    @Override
+    public Future<Void> doAsyncReplayMessages(final String persistenceId, final long fromSequenceNr,
+            final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
+        return delegateMessage(persistenceId,
+            SegmentedJournalActor.replayMessages(fromSequenceNr, toSequenceNr, max, replayCallback));
+    }
+
+    @Override
+    public Future<Long> doAsyncReadHighestSequenceNr(final String persistenceId, final long fromSequenceNr) {
+        return delegateMessage(handlers.computeIfAbsent(persistenceId, this::createHandler),
+            SegmentedJournalActor.readHighestSequenceNr(fromSequenceNr));
+    }
+
+    private ActorRef createHandler(final String persistenceId) {
+        final String directoryName = Base64.getUrlEncoder().encodeToString(persistenceId.getBytes(
+            StandardCharsets.UTF_8));
+        final File directory = new File(rootDir, directoryName);
+        LOG.debug("Creating handler for {} in directory {}", persistenceId, directory);
+
+        final ActorRef handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
+            maxEntrySize, maxSegmentSize));
+        LOG.debug("Directory {} handled by {}", directory, handler);
+
+        final ActorRef prev = handlers.putIfAbsent(persistenceId, handler);
+        verify(prev == null, "Duplicate handler for %s, already handled by %s", persistenceId, prev);
+        return handler;
+    }
+
+    private <T> Future<T> delegateMessage(final String persistenceId, final AsyncMessage<T> message) {
+        final ActorRef handler = handlers.get(persistenceId);
+        if (handler == null) {
+            return Futures.failed(new IllegalStateException("Cannot find handler for " + persistenceId));
+        }
+
+        return delegateMessage(handler, message);
+    }
+
+    private static <T> Future<T> delegateMessage(final ActorRef handler, final AsyncMessage<T> message) {
+        LOG.trace("Delegating {} to {}", message, handler);
+        handler.tell(message, noSender());
+        return message.promise.future();
+    }
+
+    private static int getBytes(final Config config, final String path, final int defaultValue) {
+        if (!config.hasPath(path)) {
+            return defaultValue;
+        }
+        final ConfigMemorySize value = config.getMemorySize(path);
+        final long result = value.toBytes();
+        checkArgument(result <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
+        return (int) result;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java
new file mode 100644 (file)
index 0000000..6cc5d9e
--- /dev/null
@@ -0,0 +1,376 @@
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.AbstractActor;
+import akka.actor.Props;
+import akka.persistence.AtomicWrite;
+import akka.persistence.PersistentRepr;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
+import com.codahale.metrics.Timer;
+import com.google.common.base.MoreObjects;
+import io.atomix.storage.StorageLevel;
+import io.atomix.storage.journal.Indexed;
+import io.atomix.storage.journal.SegmentedJournal;
+import io.atomix.storage.journal.SegmentedJournalReader;
+import io.atomix.storage.journal.SegmentedJournalWriter;
+import io.atomix.utils.serializer.Namespace;
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
+import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.Iterator;
+import scala.collection.SeqLike;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+/**
+ * This actor handles a single PersistentActor's journal. The journal is split into two {@link SegmentedJournal}s:
+ * <ul>
+ *     <li>A memory-mapped data journal, containing actual data entries</li>
+ *     <li>A simple file journal, containing sequence numbers of last deleted entry</li>
+ * </ul>
+ *
+ * <p>
+ * This is a conscious design decision to minimize the amount of data that is being stored in the data journal while
+ * speeding up normal operations. Since the SegmentedJournal is an append-only linear log and Akka requires the ability
+ * to delete persistence entries, we need ability to mark a subset of a SegmentedJournal as deleted. While we could
+ * treat such delete requests as normal events, this leads to a mismatch between SegmentedJournal indices (as exposed by
+ * {@link Indexed}) and Akka sequence numbers -- requiring us to potentially perform costly deserialization to find the
+ * index corresponding to a particular sequence number, or maintain moderately-complex logic and data structures to
+ * perform that mapping in sub-linear time complexity.
+ *
+ * <p>
+ * Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit
+ * mapping information. The only additional information we need to maintain is the last deleted sequence number.
+ *
+ * @author Robert Varga
+ */
+final class SegmentedJournalActor extends AbstractActor {
+    abstract static class AsyncMessage<T> {
+        final Promise<T> promise = Promise.apply();
+    }
+
+    private static final class ReadHighestSequenceNr extends AsyncMessage<Long> {
+        private final long fromSequenceNr;
+
+        ReadHighestSequenceNr(final long fromSequenceNr) {
+            this.fromSequenceNr = fromSequenceNr;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this).add("fromSequenceNr", fromSequenceNr).toString();
+        }
+    }
+
+    private static final class ReplayMessages extends AsyncMessage<Void> {
+        private final long fromSequenceNr;
+        private final long toSequenceNr;
+        private final long max;
+        private final Consumer<PersistentRepr> replayCallback;
+
+        ReplayMessages(final long fromSequenceNr,
+                final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
+            this.fromSequenceNr = fromSequenceNr;
+            this.toSequenceNr = toSequenceNr;
+            this.max = max;
+            this.replayCallback = requireNonNull(replayCallback);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this).add("fromSequenceNr", fromSequenceNr)
+                    .add("toSequenceNr", toSequenceNr).add("max", max).toString();
+        }
+    }
+
+    static final class WriteMessages {
+        private final List<AtomicWrite> requests = new ArrayList<>();
+        private final List<Promise<Optional<Exception>>> results = new ArrayList<>();
+
+        Future<Optional<Exception>> add(final AtomicWrite write) {
+            final Promise<Optional<Exception>> promise = Promise.apply();
+            requests.add(write);
+            results.add(promise);
+            return promise.future();
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this).add("requests", requests).toString();
+        }
+    }
+
+    private static final class DeleteMessagesTo extends AsyncMessage<Void> {
+        final long toSequenceNr;
+
+        DeleteMessagesTo(final long toSequenceNr) {
+            this.toSequenceNr = toSequenceNr;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this).add("toSequenceNr", toSequenceNr).toString();
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
+    private static final Namespace DELETE_NAMESPACE = Namespace.builder().register(Long.class).build();
+    private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
+
+    // Tracks the time it took us to write a batch of messages
+    private final Timer batchWriteTime = new Timer();
+    // Tracks the number of individual messages written
+    private final Meter messageWriteCount = new Meter();
+    // Tracks the size distribution of messages for last 5 minutes
+    private final Histogram messageSize = new Histogram(new SlidingTimeWindowReservoir(5, TimeUnit.MINUTES));
+
+    private final String persistenceId;
+    private final StorageLevel storage;
+    private final int maxSegmentSize;
+    private final int maxEntrySize;
+    private final File directory;
+
+    private SegmentedJournal<DataJournalEntry> dataJournal;
+    private SegmentedJournal<Long> deleteJournal;
+    private long lastDelete;
+
+    // Tracks largest message size we have observed either during recovery or during write
+    private int largestObservedSize;
+
+    SegmentedJournalActor(final String persistenceId, final File directory, final StorageLevel storage,
+            final int maxEntrySize, final int maxSegmentSize) {
+        this.persistenceId = requireNonNull(persistenceId);
+        this.directory = requireNonNull(directory);
+        this.storage = requireNonNull(storage);
+        this.maxEntrySize = maxEntrySize;
+        this.maxSegmentSize = maxSegmentSize;
+    }
+
+    static Props props(final String persistenceId, final File directory, final StorageLevel storage,
+            final int maxEntrySize, final int maxSegmentSize) {
+        return Props.create(SegmentedJournalActor.class, requireNonNull(persistenceId), directory, storage,
+            maxEntrySize, maxSegmentSize);
+    }
+
+    @Override
+    public Receive createReceive() {
+        return receiveBuilder()
+                .match(DeleteMessagesTo.class, this::handleDeleteMessagesTo)
+                .match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr)
+                .match(ReplayMessages.class, this::handleReplayMessages)
+                .match(WriteMessages.class, this::handleWriteMessages)
+                .matchAny(this::handleUnknown)
+                .build();
+    }
+
+    @Override
+    public void preStart() throws Exception {
+        LOG.debug("{}: actor starting", persistenceId);
+        super.preStart();
+    }
+
+    @Override
+    public void postStop() throws Exception {
+        LOG.debug("{}: actor stopping", persistenceId);
+        if (dataJournal != null) {
+            dataJournal.close();
+            LOG.debug("{}: data journal closed", persistenceId);
+            dataJournal = null;
+        }
+        if (deleteJournal != null) {
+            deleteJournal.close();
+            LOG.debug("{}: delete journal closed", persistenceId);
+            deleteJournal = null;
+        }
+        LOG.debug("{}: actor stopped", persistenceId);
+        super.postStop();
+    }
+
+    static AsyncMessage<Void> deleteMessagesTo(final long toSequenceNr) {
+        return new DeleteMessagesTo(toSequenceNr);
+    }
+
+    static AsyncMessage<Long> readHighestSequenceNr(final long fromSequenceNr) {
+        return new ReadHighestSequenceNr(fromSequenceNr);
+    }
+
+    static AsyncMessage<Void> replayMessages(final long fromSequenceNr, final long toSequenceNr, final long max,
+            final Consumer<PersistentRepr> replayCallback) {
+        return new ReplayMessages(fromSequenceNr, toSequenceNr, max, replayCallback);
+    }
+
+    private void handleDeleteMessagesTo(final DeleteMessagesTo message) {
+        ensureOpen();
+
+        LOG.debug("{}: delete messages {}", persistenceId, message);
+        final long to = Long.min(dataJournal.writer().getLastIndex(), message.toSequenceNr);
+        LOG.debug("{}: adjusted delete to {}", persistenceId, to);
+
+        if (lastDelete < to) {
+            LOG.debug("{}: deleting entries up to {}", persistenceId, to);
+
+            lastDelete = to;
+            final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
+            final Indexed<Long> entry = deleteWriter.append(lastDelete);
+            deleteWriter.commit(entry.index());
+            dataJournal.writer().commit(lastDelete);
+
+            LOG.debug("{}: compaction started", persistenceId);
+            dataJournal.compact(lastDelete);
+            deleteJournal.compact(entry.index());
+            LOG.debug("{}: compaction finished", persistenceId);
+        } else {
+            LOG.debug("{}: entries up to {} already deleted", persistenceId, lastDelete);
+        }
+
+        message.promise.success(null);
+    }
+
+    @SuppressWarnings("checkstyle:illegalCatch")
+    private void handleReadHighestSequenceNr(final ReadHighestSequenceNr message) {
+        LOG.debug("{}: looking for highest sequence on {}", persistenceId, message);
+        final Long sequence;
+        if (directory.isDirectory()) {
+            ensureOpen();
+            sequence = dataJournal.writer().getLastIndex();
+        } else {
+            sequence = 0L;
+        }
+
+        LOG.debug("{}: highest sequence is {}", message, sequence);
+        message.promise.success(sequence);
+    }
+
+    @SuppressWarnings("checkstyle:illegalCatch")
+    private void handleReplayMessages(final ReplayMessages message) {
+        LOG.debug("{}: replaying messages {}", persistenceId, message);
+        ensureOpen();
+
+        final long from = Long.max(lastDelete + 1, message.fromSequenceNr);
+        LOG.debug("{}: adjusted fromSequenceNr to {}", persistenceId, from);
+
+        try (SegmentedJournalReader<DataJournalEntry> reader = dataJournal.openReader(from)) {
+            int count = 0;
+            while (reader.hasNext() && count < message.max) {
+                final Indexed<DataJournalEntry> next = reader.next();
+                if (next.index() > message.toSequenceNr) {
+                    break;
+                }
+
+                LOG.trace("{}: replay {}", persistenceId, next);
+                updateLargestSize(next.size());
+                final DataJournalEntry entry = next.entry();
+                verify(entry instanceof FromPersistence, "Unexpected entry %s", entry);
+
+                final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index());
+                LOG.debug("{}: replaying {}", persistenceId, repr);
+                message.replayCallback.accept(repr);
+                count++;
+            }
+            LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
+        } catch (Exception e) {
+            LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e);
+            message.promise.failure(e);
+        } finally {
+            message.promise.success(null);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:illegalCatch")
+    private void handleWriteMessages(final WriteMessages message) {
+        ensureOpen();
+
+        final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
+        final long startTicks = System.nanoTime();
+        final int count = message.requests.size();
+        final long start = writer.getLastIndex();
+
+        for (int i = 0; i < count; ++i) {
+            final long mark = writer.getLastIndex();
+            try {
+                writeRequest(writer, message.requests.get(i));
+            } catch (Exception e) {
+                LOG.warn("{}: failed to write out request", persistenceId, e);
+                message.results.get(i).success(Optional.of(e));
+                writer.truncate(mark);
+                continue;
+            }
+
+            message.results.get(i).success(Optional.empty());
+        }
+        writer.flush();
+        batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
+        messageWriteCount.mark(writer.getLastIndex() - start);
+    }
+
+    private void writeRequest(final SegmentedJournalWriter<DataJournalEntry> writer, final AtomicWrite request) {
+        // Cast is needed for Eclipse because of https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276
+        final Iterator<PersistentRepr> it = ((SeqLike<PersistentRepr, ?>) request.payload()).iterator();
+        while (it.hasNext()) {
+            final PersistentRepr repr = it.next();
+            final Object payload = repr.payload();
+            if (!(payload instanceof Serializable)) {
+                throw new UnsupportedOperationException("Non-serializable payload encountered " + payload.getClass());
+            }
+
+            final int size = writer.append(new ToPersistence(repr)).size();
+            messageSize.update(size);
+            updateLargestSize(size);
+        }
+    }
+
+    private void handleUnknown(final Object message) {
+        LOG.error("{}: Received unknown message {}", persistenceId, message);
+    }
+
+    private void updateLargestSize(final int size) {
+        if (size > largestObservedSize) {
+            largestObservedSize = size;
+        }
+    }
+
+    private void ensureOpen() {
+        if (dataJournal != null) {
+            verifyNotNull(deleteJournal);
+            return;
+        }
+
+        deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
+                .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
+        final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
+        lastDelete = lastEntry == null ? 0 : lastEntry.index();
+
+        dataJournal = SegmentedJournal.<DataJournalEntry>builder()
+                .withStorageLevel(storage).withDirectory(directory).withName("data")
+                .withNamespace(Namespace.builder()
+                    .register(new DataJournalEntrySerializer(context().system()),
+                        FromPersistence.class, ToPersistence.class)
+                    .build())
+                .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize)
+                .build();
+        final SegmentedJournalWriter<DataJournalEntry> writer = dataJournal.writer();
+        writer.commit(lastDelete);
+        LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId, writer.getLastIndex(),
+            lastDelete);
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalSpecTest.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalSpecTest.java
new file mode 100644 (file)
index 0000000..da1ba45
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import akka.persistence.japi.journal.JavaJournalSpec;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import org.apache.commons.io.FileUtils;
+import org.junit.runner.RunWith;
+import org.scalatest.junit.JUnitRunner;
+
+@RunWith(JUnitRunner.class)
+public class SegmentedFileJournalSpecTest extends JavaJournalSpec {
+    private static final long serialVersionUID = 1L;
+
+    private static final File JOURNAL_DIR = new File("target/segmented-journal");
+
+    public SegmentedFileJournalSpecTest() {
+        super(ConfigFactory.load("SegmentedFileJournalTest.conf"));
+    }
+
+    @Override
+    public void beforeAll() {
+        FileUtils.deleteQuietly(JOURNAL_DIR);
+        super.beforeAll();
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java
new file mode 100644 (file)
index 0000000..6939dbc
--- /dev/null
@@ -0,0 +1,183 @@
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.persistence.AtomicWrite;
+import akka.persistence.PersistentRepr;
+import akka.testkit.CallingThreadDispatcher;
+import akka.testkit.javadsl.TestKit;
+import io.atomix.storage.StorageLevel;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.AsyncMessage;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
+import scala.concurrent.Future;
+
+public class SegmentedFileJournalTest {
+    private static final File DIRECTORY = new File("target/sfj-test");
+    private static final int SEGMENT_SIZE = 1024 * 1024;
+    private static final int MESSAGE_SIZE = 512 * 1024;
+
+    private static ActorSystem SYSTEM;
+
+    private TestKit kit;
+    private ActorRef actor;
+
+    @BeforeClass
+    public static void beforeClass() {
+        SYSTEM = ActorSystem.create("test");
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        TestKit.shutdownActorSystem(SYSTEM);
+        SYSTEM = null;
+    }
+
+    @Before
+    public void before() {
+        kit = new TestKit(SYSTEM);
+        FileUtils.deleteQuietly(DIRECTORY);
+        actor = actor();
+    }
+
+    @After
+    public void after() {
+        actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    @Test
+    public void testDeleteAfterStop() {
+        // Preliminary setup
+        final WriteMessages write = new WriteMessages();
+        final Future<Optional<Exception>> first = write.add(AtomicWrite.apply(PersistentRepr.apply("first", 1, "foo",
+            null, false, kit.getRef(), "uuid")));
+        final Future<Optional<Exception>> second = write.add(AtomicWrite.apply(PersistentRepr.apply("second", 2, "foo",
+            null, false, kit.getRef(), "uuid")));
+        actor.tell(write, ActorRef.noSender());
+        assertFalse(getFuture(first).isPresent());
+        assertFalse(getFuture(second).isPresent());
+
+        assertHighestSequenceNr(2);
+        assertReplayCount(2);
+
+        deleteEntries(1);
+
+        assertHighestSequenceNr(2);
+        assertReplayCount(1);
+
+        // Restart actor
+        actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        actor = actor();
+
+        // Check if state is retained
+        assertHighestSequenceNr(2);
+        assertReplayCount(1);
+    }
+
+    @Test
+    public void testSegmentation() throws IOException {
+        // We want to have roughly three segments
+        final LargePayload payload = new LargePayload();
+
+        final WriteMessages write = new WriteMessages();
+        final List<Future<Optional<Exception>>> requests = new ArrayList<>();
+
+        // Each payload is half of segment size, plus some overhead, should result in two segments being present
+        for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) {
+            requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(),
+                "uuid"))));
+        }
+
+        actor.tell(write, ActorRef.noSender());
+        requests.forEach(future -> assertFalse(getFuture(future).isPresent()));
+
+        assertFileCount(2, 1);
+
+        // Delete all but the last entry
+        deleteEntries(requests.size());
+
+        assertFileCount(1, 1);
+    }
+
+    private ActorRef actor() {
+        return kit.childActorOf(SegmentedJournalActor.props("foo", DIRECTORY, StorageLevel.DISK, MESSAGE_SIZE,
+            SEGMENT_SIZE).withDispatcher(CallingThreadDispatcher.Id()));
+    }
+
+    private void deleteEntries(final long deleteTo) {
+        final AsyncMessage<Void> delete = SegmentedJournalActor.deleteMessagesTo(deleteTo);
+        actor.tell(delete, ActorRef.noSender());
+        assertNull(get(delete));
+    }
+
+    private void assertHighestSequenceNr(final long expected) {
+        AsyncMessage<Long> highest = SegmentedJournalActor.readHighestSequenceNr(0);
+        actor.tell(highest, ActorRef.noSender());
+        assertEquals(expected, (long) get(highest));
+    }
+
+    private void assertReplayCount(final int expected) {
+        Consumer<PersistentRepr> firstCallback = mock(Consumer.class);
+        doNothing().when(firstCallback).accept(any(PersistentRepr.class));
+        AsyncMessage<Void> replay = SegmentedJournalActor.replayMessages(0, Long.MAX_VALUE, Long.MAX_VALUE,
+            firstCallback);
+        actor.tell(replay, ActorRef.noSender());
+        assertNull(get(replay));
+        verify(firstCallback, times(expected)).accept(any(PersistentRepr.class));
+    }
+
+    private static void assertFileCount(final long dataFiles, final long deleteFiles) throws IOException {
+        List<File> contents = Files.list(DIRECTORY.toPath()).map(Path::toFile).collect(Collectors.toList());
+        assertEquals(dataFiles, contents.stream().filter(file -> file.getName().startsWith("data-")).count());
+        assertEquals(deleteFiles, contents.stream().filter(file -> file.getName().startsWith("delete-")).count());
+    }
+
+    private static <T> T get(final AsyncMessage<T> message) {
+        return getFuture(message.promise.future());
+    }
+
+    private static <T> T getFuture(final Future<T> future) {
+        assertTrue(future.isCompleted());
+        return future.value().get().get();
+    }
+
+    private static final class LargePayload implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        final byte[] bytes = new byte[MESSAGE_SIZE / 2];
+
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/test/resources/SegmentedFileJournalTest.conf b/opendaylight/md-sal/sal-akka-segmented-journal/src/test/resources/SegmentedFileJournalTest.conf
new file mode 100644 (file)
index 0000000..8fce93b
--- /dev/null
@@ -0,0 +1,15 @@
+akka {
+    persistence {
+        journal {
+            plugin = "akka.persistence.journal.segmented-file"
+
+            segmented-file {
+                class = "org.opendaylight.controller.akka.segjournal.SegmentedFileJournal"
+                root-directory = "target/segmented-journal"
+                max-entry-size = 8M
+                max-segment-size = 32M
+                memory-mapped = false
+            }
+        }
+    }
+}
index 7b86f1c..34b309b 100644 (file)
@@ -120,7 +120,27 @@ odl-cluster-data {
     }
 
     persistence {
-      journal.plugin = akka.persistence.journal.leveldb
+      journal {
+        plugin = akka.persistence.journal.leveldb
+
+        # The following activates the alternative segmented file journal. Each persistent actor
+        # is stored in a separate directory, with multiple segment files. Segments are removed
+        # when they are not longer required.
+        #
+        # plugin = akka.persistence.journal.segmented-file
+        segmented-file {
+          class = "org.opendaylight.controller.akka.segjournal.SegmentedFileJournal"
+          # Root directory for segmented journal storage
+          root-directory = "segmented-journal"
+          # Maximum size of a single entry in the segmented journal
+          max-entry-size = 16M
+          # Maximum size of a segment
+          max-segment-size = 128M
+          # Map each segment into memory. Note that while this can improve performance,
+          # it will also place additional burden on system resources.
+          memory-mapped = false
+        }
+      }
 
       snapshot-store.local.class = "org.opendaylight.controller.cluster.persistence.LocalSnapshotStore"
       snapshot-store.plugin = akka.persistence.snapshot-store.local

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.