Add direct in-memory journal threshold
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / ReplicatedLogImpl.java
index fe873340aa3b414d4f89f60354dabcede970b52c..6167aac6d2ad71da7225c388ce4bb68bbb139070 100644 (file)
@@ -7,18 +7,18 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import akka.japi.Procedure;
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
 import java.util.Collections;
 import java.util.List;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Implementation of ReplicatedLog used by the RaftActor.
  */
-class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
+final class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     private static final int DATA_SIZE_DIVIDER = 5;
 
     private final RaftActorContext context;
@@ -28,7 +28,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
             final List<ReplicatedLogEntry> unAppliedEntries,
             final RaftActorContext context) {
         super(snapshotIndex, snapshotTerm, unAppliedEntries, context.getId());
-        this.context = Preconditions.checkNotNull(context);
+        this.context = requireNonNull(context);
     }
 
     static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context) {
@@ -42,10 +42,9 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
     @Override
     public boolean removeFromAndPersist(final long logEntryIndex) {
-        // FIXME: Maybe this should be done after the command is saved
         long adjustedIndex = removeFrom(logEntryIndex);
         if (adjustedIndex >= 0) {
-            context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), NoopProcedure.instance());
+            context.getPersistenceProvider().persist(new DeleteEntries(logEntryIndex), NoopProcedure.instance());
             return true;
         }
 
@@ -53,12 +52,16 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     }
 
     @Override
-    public boolean shouldCaptureSnapshot(long logIndex) {
+    public boolean shouldCaptureSnapshot(final long logIndex) {
         final ConfigParams config = context.getConfigParams();
-        final long journalSize = logIndex + 1;
-        final long dataThreshold = context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
+        if ((logIndex + 1) % config.getSnapshotBatchCount() == 0) {
+            return true;
+        }
 
-        return journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold;
+        final long absoluteThreshold = config.getSnapshotDataThreshold();
+        final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE
+                : context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
+        return getDataSizeForSnapshotCheck() > dataThreshold;
     }
 
     @Override
@@ -92,8 +95,8 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     }
 
     @Override
-    public boolean appendAndPersist(@Nonnull final ReplicatedLogEntry replicatedLogEntry,
-            @Nullable final Procedure<ReplicatedLogEntry> callback, boolean doAsync)  {
+    public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
+            final Consumer<ReplicatedLogEntry> callback, final boolean doAsync)  {
 
         context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
 
@@ -101,22 +104,29 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
             return false;
         }
 
-        Procedure<ReplicatedLogEntry> persistCallback = persistedLogEntry -> {
-            context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
-
-            dataSizeSinceLastSnapshot += persistedLogEntry.size();
-
-            if (callback != null) {
-                callback.apply(persistedLogEntry);
-            }
-        };
-
         if (doAsync) {
-            context.getPersistenceProvider().persistAsync(replicatedLogEntry, persistCallback);
+            context.getPersistenceProvider().persistAsync(replicatedLogEntry,
+                entry -> persistCallback(entry, callback));
         } else {
-            context.getPersistenceProvider().persist(replicatedLogEntry, persistCallback);
+            context.getPersistenceProvider().persist(replicatedLogEntry, entry -> syncPersistCallback(entry, callback));
         }
 
         return true;
     }
+
+    private void persistCallback(final ReplicatedLogEntry persistedLogEntry,
+            final Consumer<ReplicatedLogEntry> callback) {
+        context.getExecutor().execute(() -> syncPersistCallback(persistedLogEntry, callback));
+    }
+
+    private void syncPersistCallback(final ReplicatedLogEntry persistedLogEntry,
+            final Consumer<ReplicatedLogEntry> callback) {
+        context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
+
+        dataSizeSinceLastSnapshot += persistedLogEntry.size();
+
+        if (callback != null) {
+            callback.accept(persistedLogEntry);
+        }
+    }
 }