Add SegmentedFileJournal
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / main / java / org / opendaylight / controller / akka / segjournal / SegmentedFileJournal.java
diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournal.java
new file mode 100644 (file)
index 0000000..a0bffa2
--- /dev/null
@@ -0,0 +1,158 @@
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static akka.actor.ActorRef.noSender;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import akka.persistence.AtomicWrite;
+import akka.persistence.PersistentRepr;
+import akka.persistence.journal.japi.AsyncWriteJournal;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigMemorySize;
+import io.atomix.storage.StorageLevel;
+import io.atomix.storage.journal.SegmentedJournal;
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.AsyncMessage;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * An Akka persistence journal implementation on top of {@link SegmentedJournal}. This actor represents aggregation
+ * of multiple journals and performs a receptionist job between Akka and invidual per-persistenceId actors. See
+ * {@link SegmentedJournalActor} for details on how the persistence works.
+ *
+ * @author Robert Varga
+ */
+public class SegmentedFileJournal extends AsyncWriteJournal {
+    public static final String STORAGE_ROOT_DIRECTORY = "root-directory";
+    public static final String STORAGE_MAX_ENTRY_SIZE = "max-entry-size";
+    public static final int STORAGE_MAX_ENTRY_SIZE_DEFAULT = 16 * 1024 * 1024;
+    public static final String STORAGE_MAX_SEGMENT_SIZE = "max-segment-size";
+    public static final int STORAGE_MAX_SEGMENT_SIZE_DEFAULT = STORAGE_MAX_ENTRY_SIZE_DEFAULT * 8;
+    public static final String STORAGE_MEMORY_MAPPED = "memory-mapped";
+
+    private static final Logger LOG = LoggerFactory.getLogger(SegmentedFileJournal.class);
+
+    private final Map<String, ActorRef> handlers = new HashMap<>();
+    private final File rootDir;
+    private final StorageLevel storage;
+    private final int maxEntrySize;
+    private final int maxSegmentSize;
+
+    public SegmentedFileJournal(final Config config) {
+        rootDir = new File(config.getString(STORAGE_ROOT_DIRECTORY));
+        if (!rootDir.exists()) {
+            LOG.debug("Creating directory {}", rootDir);
+            checkState(rootDir.mkdirs(), "Failed to create root directory %s", rootDir);
+        }
+        checkArgument(rootDir.isDirectory(), "%s is not a directory", rootDir);
+
+        maxEntrySize = getBytes(config, STORAGE_MAX_ENTRY_SIZE, STORAGE_MAX_ENTRY_SIZE_DEFAULT);
+        maxSegmentSize = getBytes(config, STORAGE_MAX_SEGMENT_SIZE, STORAGE_MAX_SEGMENT_SIZE_DEFAULT);
+
+        if (config.hasPath(STORAGE_MEMORY_MAPPED)) {
+            storage = config.getBoolean(STORAGE_MEMORY_MAPPED) ? StorageLevel.MAPPED : StorageLevel.DISK;
+        } else {
+            storage = StorageLevel.DISK;
+        }
+
+        LOG.info("Initialized with root directory {} with storage {}", rootDir, storage);
+    }
+
+    @Override
+    public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
+        final Map<ActorRef, WriteMessages> map = new HashMap<>();
+        final List<Future<Optional<Exception>>> result = new ArrayList<>();
+
+        for (AtomicWrite message : messages) {
+            final String persistenceId = message.persistenceId();
+            final ActorRef handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
+            result.add(map.computeIfAbsent(handler, key -> new WriteMessages()).add(message));
+        }
+
+        // Send requests to actors and zip the futures back
+        map.forEach((handler, message) -> {
+            LOG.trace("Sending {} to {}", message, handler);
+            handler.tell(message, noSender());
+        });
+        return Futures.sequence(result, context().dispatcher());
+    }
+
+    @Override
+    public Future<Void> doAsyncDeleteMessagesTo(final String persistenceId, final long toSequenceNr) {
+        return delegateMessage(persistenceId, SegmentedJournalActor.deleteMessagesTo(toSequenceNr));
+    }
+
+    @Override
+    public Future<Void> doAsyncReplayMessages(final String persistenceId, final long fromSequenceNr,
+            final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
+        return delegateMessage(persistenceId,
+            SegmentedJournalActor.replayMessages(fromSequenceNr, toSequenceNr, max, replayCallback));
+    }
+
+    @Override
+    public Future<Long> doAsyncReadHighestSequenceNr(final String persistenceId, final long fromSequenceNr) {
+        return delegateMessage(handlers.computeIfAbsent(persistenceId, this::createHandler),
+            SegmentedJournalActor.readHighestSequenceNr(fromSequenceNr));
+    }
+
+    private ActorRef createHandler(final String persistenceId) {
+        final String directoryName = Base64.getUrlEncoder().encodeToString(persistenceId.getBytes(
+            StandardCharsets.UTF_8));
+        final File directory = new File(rootDir, directoryName);
+        LOG.debug("Creating handler for {} in directory {}", persistenceId, directory);
+
+        final ActorRef handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
+            maxEntrySize, maxSegmentSize));
+        LOG.debug("Directory {} handled by {}", directory, handler);
+
+        final ActorRef prev = handlers.putIfAbsent(persistenceId, handler);
+        verify(prev == null, "Duplicate handler for %s, already handled by %s", persistenceId, prev);
+        return handler;
+    }
+
+    private <T> Future<T> delegateMessage(final String persistenceId, final AsyncMessage<T> message) {
+        final ActorRef handler = handlers.get(persistenceId);
+        if (handler == null) {
+            return Futures.failed(new IllegalStateException("Cannot find handler for " + persistenceId));
+        }
+
+        return delegateMessage(handler, message);
+    }
+
+    private static <T> Future<T> delegateMessage(final ActorRef handler, final AsyncMessage<T> message) {
+        LOG.trace("Delegating {} to {}", message, handler);
+        handler.tell(message, noSender());
+        return message.promise.future();
+    }
+
+    private static int getBytes(final Config config, final String path, final int defaultValue) {
+        if (!config.hasPath(path)) {
+            return defaultValue;
+        }
+        final ConfigMemorySize value = config.getMemorySize(path);
+        final long result = value.toBytes();
+        checkArgument(result <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
+        return (int) result;
+    }
+}