Change segmented journal naming
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / main / java / org / opendaylight / controller / akka / segjournal / SegmentedFileJournal.java
1 /*
2  * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.akka.segjournal;
9
10 import static akka.actor.ActorRef.noSender;
11 import static com.google.common.base.Preconditions.checkArgument;
12 import static com.google.common.base.Preconditions.checkState;
13
14 import akka.actor.ActorRef;
15 import akka.dispatch.Futures;
16 import akka.persistence.AtomicWrite;
17 import akka.persistence.PersistentRepr;
18 import akka.persistence.journal.japi.AsyncWriteJournal;
19 import com.typesafe.config.Config;
20 import com.typesafe.config.ConfigMemorySize;
21 import io.atomix.storage.StorageLevel;
22 import io.atomix.storage.journal.SegmentedJournal;
23 import java.io.File;
24 import java.io.UnsupportedEncodingException;
25 import java.net.URLEncoder;
26 import java.nio.charset.StandardCharsets;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Optional;
32 import java.util.function.Consumer;
33 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.AsyncMessage;
34 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import scala.concurrent.Future;
38
39 /**
40  * An Akka persistence journal implementation on top of {@link SegmentedJournal}. This actor represents aggregation
41  * of multiple journals and performs a receptionist job between Akka and invidual per-persistenceId actors. See
42  * {@link SegmentedJournalActor} for details on how the persistence works.
43  *
44  * @author Robert Varga
45  */
46 public class SegmentedFileJournal extends AsyncWriteJournal {
47     public static final String STORAGE_ROOT_DIRECTORY = "root-directory";
48     public static final String STORAGE_MAX_ENTRY_SIZE = "max-entry-size";
49     public static final int STORAGE_MAX_ENTRY_SIZE_DEFAULT = 16 * 1024 * 1024;
50     public static final String STORAGE_MAX_SEGMENT_SIZE = "max-segment-size";
51     public static final int STORAGE_MAX_SEGMENT_SIZE_DEFAULT = STORAGE_MAX_ENTRY_SIZE_DEFAULT * 8;
52     public static final String STORAGE_MEMORY_MAPPED = "memory-mapped";
53
54     private static final Logger LOG = LoggerFactory.getLogger(SegmentedFileJournal.class);
55
56     private final Map<String, ActorRef> handlers = new HashMap<>();
57     private final File rootDir;
58     private final StorageLevel storage;
59     private final int maxEntrySize;
60     private final int maxSegmentSize;
61
62     public SegmentedFileJournal(final Config config) {
63         rootDir = new File(config.getString(STORAGE_ROOT_DIRECTORY));
64         if (!rootDir.exists()) {
65             LOG.debug("Creating directory {}", rootDir);
66             checkState(rootDir.mkdirs(), "Failed to create root directory %s", rootDir);
67         }
68         checkArgument(rootDir.isDirectory(), "%s is not a directory", rootDir);
69
70         maxEntrySize = getBytes(config, STORAGE_MAX_ENTRY_SIZE, STORAGE_MAX_ENTRY_SIZE_DEFAULT);
71         maxSegmentSize = getBytes(config, STORAGE_MAX_SEGMENT_SIZE, STORAGE_MAX_SEGMENT_SIZE_DEFAULT);
72
73         if (config.hasPath(STORAGE_MEMORY_MAPPED)) {
74             storage = config.getBoolean(STORAGE_MEMORY_MAPPED) ? StorageLevel.MAPPED : StorageLevel.DISK;
75         } else {
76             storage = StorageLevel.DISK;
77         }
78
79         LOG.info("Initialized with root directory {} with storage {}", rootDir, storage);
80     }
81
82     @Override
83     public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
84         final Map<ActorRef, WriteMessages> map = new HashMap<>();
85         final List<Future<Optional<Exception>>> result = new ArrayList<>();
86
87         for (AtomicWrite message : messages) {
88             final String persistenceId = message.persistenceId();
89             final ActorRef handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
90             result.add(map.computeIfAbsent(handler, key -> new WriteMessages()).add(message));
91         }
92
93         // Send requests to actors and zip the futures back
94         map.forEach((handler, message) -> {
95             LOG.trace("Sending {} to {}", message, handler);
96             handler.tell(message, noSender());
97         });
98         return Futures.sequence(result, context().dispatcher());
99     }
100
101     @Override
102     public Future<Void> doAsyncDeleteMessagesTo(final String persistenceId, final long toSequenceNr) {
103         return delegateMessage(persistenceId, SegmentedJournalActor.deleteMessagesTo(toSequenceNr));
104     }
105
106     @Override
107     public Future<Void> doAsyncReplayMessages(final String persistenceId, final long fromSequenceNr,
108             final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
109         return delegateMessage(persistenceId,
110             SegmentedJournalActor.replayMessages(fromSequenceNr, toSequenceNr, max, replayCallback));
111     }
112
113     @Override
114     public Future<Long> doAsyncReadHighestSequenceNr(final String persistenceId, final long fromSequenceNr) {
115         return delegateMessage(handlers.computeIfAbsent(persistenceId, this::createHandler),
116             SegmentedJournalActor.readHighestSequenceNr(fromSequenceNr));
117     }
118
119     private ActorRef createHandler(final String persistenceId) {
120         final String directoryName = encode(persistenceId);
121         final File directory = new File(rootDir, directoryName);
122         LOG.debug("Creating handler for {} in directory {}", persistenceId, directory);
123
124         final ActorRef handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
125             maxEntrySize, maxSegmentSize));
126         LOG.debug("Directory {} handled by {}", directory, handler);
127         return handler;
128     }
129
130     private <T> Future<T> delegateMessage(final String persistenceId, final AsyncMessage<T> message) {
131         final ActorRef handler = handlers.get(persistenceId);
132         if (handler == null) {
133             return Futures.failed(new IllegalStateException("Cannot find handler for " + persistenceId));
134         }
135
136         return delegateMessage(handler, message);
137     }
138
139     private static <T> Future<T> delegateMessage(final ActorRef handler, final AsyncMessage<T> message) {
140         LOG.trace("Delegating {} to {}", message, handler);
141         handler.tell(message, noSender());
142         return message.promise.future();
143     }
144
145     private static String encode(final String str) {
146         try {
147             return URLEncoder.encode(str, StandardCharsets.UTF_8.name());
148         } catch (UnsupportedEncodingException e) {
149             // Shouldn't happen
150             LOG.warn("Error encoding {}", str, e);
151             return str;
152         }
153     }
154
155     private static int getBytes(final Config config, final String path, final int defaultValue) {
156         if (!config.hasPath(path)) {
157             return defaultValue;
158         }
159         final ConfigMemorySize value = config.getMemorySize(path);
160         final long result = value.toBytes();
161         checkArgument(result <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
162         return (int) result;
163     }
164 }