Centralize JournalSegmentFile creation
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / main / java / org / opendaylight / controller / akka / segjournal / SegmentedFileJournal.java
index a0bffa2c1247979ffb47f7966a3ace77a1157b31..b9320998c95f28b7fcbd8eb170bf1842d3b83a47 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.akka.segjournal;
 import static akka.actor.ActorRef.noSender;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Verify.verify;
 
 import akka.actor.ActorRef;
 import akka.dispatch.Futures;
@@ -18,15 +17,13 @@ import akka.persistence.AtomicWrite;
 import akka.persistence.PersistentRepr;
 import akka.persistence.journal.japi.AsyncWriteJournal;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigMemorySize;
-import io.atomix.storage.StorageLevel;
 import io.atomix.storage.journal.SegmentedJournal;
+import io.atomix.storage.journal.StorageLevel;
 import java.io.File;
+import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Base64;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
@@ -40,8 +37,6 @@ import scala.concurrent.Future;
  * An Akka persistence journal implementation on top of {@link SegmentedJournal}. This actor represents aggregation
  * of multiple journals and performs a receptionist job between Akka and invidual per-persistenceId actors. See
  * {@link SegmentedJournalActor} for details on how the persistence works.
- *
- * @author Robert Varga
  */
 public class SegmentedFileJournal extends AsyncWriteJournal {
     public static final String STORAGE_ROOT_DIRECTORY = "root-directory";
@@ -49,6 +44,7 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
     public static final int STORAGE_MAX_ENTRY_SIZE_DEFAULT = 16 * 1024 * 1024;
     public static final String STORAGE_MAX_SEGMENT_SIZE = "max-segment-size";
     public static final int STORAGE_MAX_SEGMENT_SIZE_DEFAULT = STORAGE_MAX_ENTRY_SIZE_DEFAULT * 8;
+    public static final String STORAGE_MAX_UNFLUSHED_BYTES = "max-unflushed-bytes";
     public static final String STORAGE_MEMORY_MAPPED = "memory-mapped";
 
     private static final Logger LOG = LoggerFactory.getLogger(SegmentedFileJournal.class);
@@ -58,6 +54,7 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
     private final StorageLevel storage;
     private final int maxEntrySize;
     private final int maxSegmentSize;
+    private final int maxUnflushedBytes;
 
     public SegmentedFileJournal(final Config config) {
         rootDir = new File(config.getString(STORAGE_ROOT_DIRECTORY));
@@ -69,6 +66,7 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
 
         maxEntrySize = getBytes(config, STORAGE_MAX_ENTRY_SIZE, STORAGE_MAX_ENTRY_SIZE_DEFAULT);
         maxSegmentSize = getBytes(config, STORAGE_MAX_SEGMENT_SIZE, STORAGE_MAX_SEGMENT_SIZE_DEFAULT);
+        maxUnflushedBytes = getBytes(config, STORAGE_MAX_UNFLUSHED_BYTES, maxEntrySize);
 
         if (config.hasPath(STORAGE_MEMORY_MAPPED)) {
             storage = config.getBoolean(STORAGE_MEMORY_MAPPED) ? StorageLevel.MAPPED : StorageLevel.DISK;
@@ -81,12 +79,12 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
-        final Map<ActorRef, WriteMessages> map = new HashMap<>();
-        final List<Future<Optional<Exception>>> result = new ArrayList<>();
+        final var map = new HashMap<ActorRef, WriteMessages>();
+        final var result = new ArrayList<Future<Optional<Exception>>>();
 
-        for (AtomicWrite message : messages) {
-            final String persistenceId = message.persistenceId();
-            final ActorRef handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
+        for (var message : messages) {
+            final var persistenceId = message.persistenceId();
+            final var handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
             result.add(map.computeIfAbsent(handler, key -> new WriteMessages()).add(message));
         }
 
@@ -117,22 +115,18 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
     }
 
     private ActorRef createHandler(final String persistenceId) {
-        final String directoryName = Base64.getUrlEncoder().encodeToString(persistenceId.getBytes(
-            StandardCharsets.UTF_8));
-        final File directory = new File(rootDir, directoryName);
+        final var directoryName = URLEncoder.encode(persistenceId, StandardCharsets.UTF_8);
+        final var directory = new File(rootDir, directoryName);
         LOG.debug("Creating handler for {} in directory {}", persistenceId, directory);
 
-        final ActorRef handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
-            maxEntrySize, maxSegmentSize));
+        final var handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
+            maxEntrySize, maxSegmentSize, maxUnflushedBytes));
         LOG.debug("Directory {} handled by {}", directory, handler);
-
-        final ActorRef prev = handlers.putIfAbsent(persistenceId, handler);
-        verify(prev == null, "Duplicate handler for %s, already handled by %s", persistenceId, prev);
         return handler;
     }
 
     private <T> Future<T> delegateMessage(final String persistenceId, final AsyncMessage<T> message) {
-        final ActorRef handler = handlers.get(persistenceId);
+        final var handler = handlers.get(persistenceId);
         if (handler == null) {
             return Futures.failed(new IllegalStateException("Cannot find handler for " + persistenceId));
         }
@@ -150,9 +144,8 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
         if (!config.hasPath(path)) {
             return defaultValue;
         }
-        final ConfigMemorySize value = config.getMemorySize(path);
-        final long result = value.toBytes();
-        checkArgument(result <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
-        return (int) result;
+        final long value = config.getBytes(path);
+        checkArgument(value <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
+        return (int) value;
     }
 }