Allow segmented journal to flush periodically
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / main / java / org / opendaylight / controller / akka / segjournal / SegmentedFileJournal.java
index 617353ec9fc4757c6afaf38c87729e4be867bcbb..41a6add0ed7d6c29fbe3c2658836c7dc2c3aad01 100644 (file)
@@ -17,7 +17,6 @@ import akka.persistence.AtomicWrite;
 import akka.persistence.PersistentRepr;
 import akka.persistence.journal.japi.AsyncWriteJournal;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigMemorySize;
 import io.atomix.storage.journal.SegmentedJournal;
 import io.atomix.storage.journal.StorageLevel;
 import java.io.File;
@@ -25,7 +24,6 @@ import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
@@ -39,8 +37,6 @@ import scala.concurrent.Future;
  * An Akka persistence journal implementation on top of {@link SegmentedJournal}. This actor represents aggregation
  * of multiple journals and performs a receptionist job between Akka and invidual per-persistenceId actors. See
  * {@link SegmentedJournalActor} for details on how the persistence works.
- *
- * @author Robert Varga
  */
 public class SegmentedFileJournal extends AsyncWriteJournal {
     public static final String STORAGE_ROOT_DIRECTORY = "root-directory";
@@ -48,6 +44,8 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
     public static final int STORAGE_MAX_ENTRY_SIZE_DEFAULT = 16 * 1024 * 1024;
     public static final String STORAGE_MAX_SEGMENT_SIZE = "max-segment-size";
     public static final int STORAGE_MAX_SEGMENT_SIZE_DEFAULT = STORAGE_MAX_ENTRY_SIZE_DEFAULT * 8;
+    public static final String STORAGE_MAX_UNFLUSHED_BYTES = "max-unflushed-bytes";
+    public static final int STORAGE_MAX_UNFLUSHED_BYTES_DEFAULT = 0;
     public static final String STORAGE_MEMORY_MAPPED = "memory-mapped";
 
     private static final Logger LOG = LoggerFactory.getLogger(SegmentedFileJournal.class);
@@ -57,6 +55,7 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
     private final StorageLevel storage;
     private final int maxEntrySize;
     private final int maxSegmentSize;
+    private final int maxUnflushedBytes;
 
     public SegmentedFileJournal(final Config config) {
         rootDir = new File(config.getString(STORAGE_ROOT_DIRECTORY));
@@ -68,6 +67,7 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
 
         maxEntrySize = getBytes(config, STORAGE_MAX_ENTRY_SIZE, STORAGE_MAX_ENTRY_SIZE_DEFAULT);
         maxSegmentSize = getBytes(config, STORAGE_MAX_SEGMENT_SIZE, STORAGE_MAX_SEGMENT_SIZE_DEFAULT);
+        maxUnflushedBytes = getBytes(config, STORAGE_MAX_UNFLUSHED_BYTES, STORAGE_MAX_UNFLUSHED_BYTES_DEFAULT);
 
         if (config.hasPath(STORAGE_MEMORY_MAPPED)) {
             storage = config.getBoolean(STORAGE_MEMORY_MAPPED) ? StorageLevel.MAPPED : StorageLevel.DISK;
@@ -80,12 +80,12 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
-        final Map<ActorRef, WriteMessages> map = new HashMap<>();
-        final List<Future<Optional<Exception>>> result = new ArrayList<>();
+        final var map = new HashMap<ActorRef, WriteMessages>();
+        final var result = new ArrayList<Future<Optional<Exception>>>();
 
-        for (AtomicWrite message : messages) {
-            final String persistenceId = message.persistenceId();
-            final ActorRef handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
+        for (var message : messages) {
+            final var persistenceId = message.persistenceId();
+            final var handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
             result.add(map.computeIfAbsent(handler, key -> new WriteMessages()).add(message));
         }
 
@@ -116,18 +116,18 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
     }
 
     private ActorRef createHandler(final String persistenceId) {
-        final String directoryName = URLEncoder.encode(persistenceId, StandardCharsets.UTF_8);
-        final File directory = new File(rootDir, directoryName);
+        final var directoryName = URLEncoder.encode(persistenceId, StandardCharsets.UTF_8);
+        final var directory = new File(rootDir, directoryName);
         LOG.debug("Creating handler for {} in directory {}", persistenceId, directory);
 
-        final ActorRef handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
-            maxEntrySize, maxSegmentSize));
+        final var handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
+            maxEntrySize, maxSegmentSize, maxUnflushedBytes));
         LOG.debug("Directory {} handled by {}", directory, handler);
         return handler;
     }
 
     private <T> Future<T> delegateMessage(final String persistenceId, final AsyncMessage<T> message) {
-        final ActorRef handler = handlers.get(persistenceId);
+        final var handler = handlers.get(persistenceId);
         if (handler == null) {
             return Futures.failed(new IllegalStateException("Cannot find handler for " + persistenceId));
         }
@@ -145,9 +145,8 @@ public class SegmentedFileJournal extends AsyncWriteJournal {
         if (!config.hasPath(path)) {
             return defaultValue;
         }
-        final ConfigMemorySize value = config.getMemorySize(path);
-        final long result = value.toBytes();
-        checkArgument(result <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
-        return (int) result;
+        final long value = config.getBytes(path);
+        checkArgument(value <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
+        return (int) value;
     }
 }