2 * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.akka.segjournal;
10 import static akka.actor.ActorRef.noSender;
11 import static com.google.common.base.Preconditions.checkArgument;
12 import static com.google.common.base.Preconditions.checkState;
14 import akka.actor.ActorRef;
15 import akka.dispatch.Futures;
16 import akka.persistence.AtomicWrite;
17 import akka.persistence.PersistentRepr;
18 import akka.persistence.journal.japi.AsyncWriteJournal;
19 import com.typesafe.config.Config;
20 import io.atomix.storage.journal.SegmentedJournal;
21 import io.atomix.storage.journal.StorageLevel;
23 import java.net.URLEncoder;
24 import java.nio.charset.StandardCharsets;
25 import java.util.ArrayList;
26 import java.util.HashMap;
28 import java.util.Optional;
29 import java.util.function.Consumer;
30 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.AsyncMessage;
31 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import scala.concurrent.Future;
37 * An Akka persistence journal implementation on top of {@link SegmentedJournal}. This actor represents aggregation
38 * of multiple journals and performs a receptionist job between Akka and invidual per-persistenceId actors. See
39 * {@link SegmentedJournalActor} for details on how the persistence works.
41 public class SegmentedFileJournal extends AsyncWriteJournal {
42 public static final String STORAGE_ROOT_DIRECTORY = "root-directory";
43 public static final String STORAGE_MAX_ENTRY_SIZE = "max-entry-size";
44 public static final int STORAGE_MAX_ENTRY_SIZE_DEFAULT = 16 * 1024 * 1024;
45 public static final String STORAGE_MAX_SEGMENT_SIZE = "max-segment-size";
46 public static final int STORAGE_MAX_SEGMENT_SIZE_DEFAULT = STORAGE_MAX_ENTRY_SIZE_DEFAULT * 8;
47 public static final String STORAGE_MAX_UNFLUSHED_BYTES = "max-unflushed-bytes";
48 public static final String STORAGE_MEMORY_MAPPED = "memory-mapped";
50 private static final Logger LOG = LoggerFactory.getLogger(SegmentedFileJournal.class);
52 private final Map<String, ActorRef> handlers = new HashMap<>();
53 private final File rootDir;
54 private final StorageLevel storage;
55 private final int maxEntrySize;
56 private final int maxSegmentSize;
57 private final int maxUnflushedBytes;
59 public SegmentedFileJournal(final Config config) {
60 rootDir = new File(config.getString(STORAGE_ROOT_DIRECTORY));
61 if (!rootDir.exists()) {
62 LOG.debug("Creating directory {}", rootDir);
63 checkState(rootDir.mkdirs(), "Failed to create root directory %s", rootDir);
65 checkArgument(rootDir.isDirectory(), "%s is not a directory", rootDir);
67 maxEntrySize = getBytes(config, STORAGE_MAX_ENTRY_SIZE, STORAGE_MAX_ENTRY_SIZE_DEFAULT);
68 maxSegmentSize = getBytes(config, STORAGE_MAX_SEGMENT_SIZE, STORAGE_MAX_SEGMENT_SIZE_DEFAULT);
69 maxUnflushedBytes = getBytes(config, STORAGE_MAX_UNFLUSHED_BYTES, maxEntrySize);
71 if (config.hasPath(STORAGE_MEMORY_MAPPED)) {
72 storage = config.getBoolean(STORAGE_MEMORY_MAPPED) ? StorageLevel.MAPPED : StorageLevel.DISK;
74 storage = StorageLevel.DISK;
77 LOG.info("Initialized with root directory {} with storage {}", rootDir, storage);
81 public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
82 final var map = new HashMap<ActorRef, WriteMessages>();
83 final var result = new ArrayList<Future<Optional<Exception>>>();
85 for (var message : messages) {
86 final var persistenceId = message.persistenceId();
87 final var handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
88 result.add(map.computeIfAbsent(handler, key -> new WriteMessages()).add(message));
91 // Send requests to actors and zip the futures back
92 map.forEach((handler, message) -> {
93 LOG.trace("Sending {} to {}", message, handler);
94 handler.tell(message, noSender());
96 return Futures.sequence(result, context().dispatcher());
100 public Future<Void> doAsyncDeleteMessagesTo(final String persistenceId, final long toSequenceNr) {
101 return delegateMessage(persistenceId, SegmentedJournalActor.deleteMessagesTo(toSequenceNr));
105 public Future<Void> doAsyncReplayMessages(final String persistenceId, final long fromSequenceNr,
106 final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
107 return delegateMessage(persistenceId,
108 SegmentedJournalActor.replayMessages(fromSequenceNr, toSequenceNr, max, replayCallback));
112 public Future<Long> doAsyncReadHighestSequenceNr(final String persistenceId, final long fromSequenceNr) {
113 return delegateMessage(handlers.computeIfAbsent(persistenceId, this::createHandler),
114 SegmentedJournalActor.readHighestSequenceNr(fromSequenceNr));
117 private ActorRef createHandler(final String persistenceId) {
118 final var directoryName = URLEncoder.encode(persistenceId, StandardCharsets.UTF_8);
119 final var directory = new File(rootDir, directoryName);
120 LOG.debug("Creating handler for {} in directory {}", persistenceId, directory);
122 final var handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
123 maxEntrySize, maxSegmentSize, maxUnflushedBytes));
124 LOG.debug("Directory {} handled by {}", directory, handler);
128 private <T> Future<T> delegateMessage(final String persistenceId, final AsyncMessage<T> message) {
129 final var handler = handlers.get(persistenceId);
130 if (handler == null) {
131 return Futures.failed(new IllegalStateException("Cannot find handler for " + persistenceId));
134 return delegateMessage(handler, message);
137 private static <T> Future<T> delegateMessage(final ActorRef handler, final AsyncMessage<T> message) {
138 LOG.trace("Delegating {} to {}", message, handler);
139 handler.tell(message, noSender());
140 return message.promise.future();
143 private static int getBytes(final Config config, final String path, final int defaultValue) {
144 if (!config.hasPath(path)) {
147 final long value = config.getBytes(path);
148 checkArgument(value <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);