Add direct in-memory journal threshold
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / ReplicatedLogImpl.java
index 6b4427d5ce39078c772e0ea5f33953cc421712c8..6167aac6d2ad71da7225c388ce4bb68bbb139070 100644 (file)
@@ -7,82 +7,75 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import akka.japi.Procedure;
-import com.google.common.base.Preconditions;
+import static java.util.Objects.requireNonNull;
+
 import java.util.Collections;
 import java.util.List;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Implementation of ReplicatedLog used by the RaftActor.
  */
-class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
+final class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     private static final int DATA_SIZE_DIVIDER = 5;
 
-    private long dataSizeSinceLastSnapshot = 0L;
     private final RaftActorContext context;
-    private final RaftActorBehavior currentBehavior;
-
-    private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
-        @Override
-        public void apply(final DeleteEntries notUsed) {
-        }
-    };
+    private long dataSizeSinceLastSnapshot = 0L;
 
-    static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context,
-            final RaftActorBehavior currentBehavior) {
-        return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
-                snapshot.getUnAppliedEntries(), context, currentBehavior);
+    private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm,
+            final List<ReplicatedLogEntry> unAppliedEntries,
+            final RaftActorContext context) {
+        super(snapshotIndex, snapshotTerm, unAppliedEntries, context.getId());
+        this.context = requireNonNull(context);
     }
 
-    static ReplicatedLog newInstance(final RaftActorContext context, final RaftActorBehavior currentBehavior) {
-        return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context,
-                currentBehavior);
+    static ReplicatedLog newInstance(final Snapshot snapshot, final RaftActorContext context) {
+        return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+                snapshot.getUnAppliedEntries(), context);
     }
 
-    private ReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, final List<ReplicatedLogEntry> unAppliedEntries,
-            final RaftActorContext context, final RaftActorBehavior currentBehavior) {
-        super(snapshotIndex, snapshotTerm, unAppliedEntries);
-        this.context = Preconditions.checkNotNull(context);
-        this.currentBehavior = Preconditions.checkNotNull(currentBehavior);
+    static ReplicatedLog newInstance(final RaftActorContext context) {
+        return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context);
     }
 
     @Override
-    public void removeFromAndPersist(final long logEntryIndex) {
-        // FIXME: Maybe this should be done after the command is saved
+    public boolean removeFromAndPersist(final long logEntryIndex) {
         long adjustedIndex = removeFrom(logEntryIndex);
-        if(adjustedIndex >= 0) {
-            context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+        if (adjustedIndex >= 0) {
+            context.getPersistenceProvider().persist(new DeleteEntries(logEntryIndex), NoopProcedure.instance());
+            return true;
         }
+
+        return false;
     }
 
     @Override
-    public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
-        appendAndPersist(replicatedLogEntry, null);
+    public boolean shouldCaptureSnapshot(final long logIndex) {
+        final ConfigParams config = context.getConfigParams();
+        if ((logIndex + 1) % config.getSnapshotBatchCount() == 0) {
+            return true;
+        }
+
+        final long absoluteThreshold = config.getSnapshotDataThreshold();
+        final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE
+                : context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
+        return getDataSizeForSnapshotCheck() > dataThreshold;
     }
 
     @Override
     public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
-        long journalSize = replicatedLogEntry.getIndex() + 1;
-        long dataThreshold = context.getTotalMemory() *
-                context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
-        if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
-                || getDataSizeForSnapshotCheck() > dataThreshold)) {
-
+        if (shouldCaptureSnapshot(replicatedLogEntry.getIndex())) {
             boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
-                    currentBehavior.getReplicatedToAllIndex());
-            if (started) {
-                if (!context.hasFollowers()) {
-                    dataSizeSinceLastSnapshot = 0;
-                }
+                    context.getCurrentBehavior().getReplicatedToAllIndex());
+            if (started && !context.hasFollowers()) {
+                dataSizeSinceLastSnapshot = 0;
             }
         }
     }
 
     private long getDataSizeForSnapshotCheck() {
-        long dataSizeForCheck = dataSize();
         if (!context.hasFollowers()) {
             // When we do not have followers we do not maintain an in-memory log
             // due to this the journalSize will never become anything close to the
@@ -95,41 +88,45 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
             // need of doing a snapshot just for the sake of freeing up memory we adjust
             // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
             // as if we were maintaining a real snapshot
-            dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
+            return dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
+        } else {
+            return dataSize();
         }
-        return dataSizeForCheck;
     }
 
     @Override
-    public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
-            final Procedure<ReplicatedLogEntry> callback)  {
+    public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
+            final Consumer<ReplicatedLogEntry> callback, final boolean doAsync)  {
 
-        if (context.getLogger().isDebugEnabled()) {
-            context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
+        context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
+
+        if (!append(replicatedLogEntry)) {
+            return false;
         }
 
-        // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
-        append(replicatedLogEntry);
-
-        // When persisting events with persist it is guaranteed that the
-        // persistent actor will not receive further commands between the
-        // persist call and the execution(s) of the associated event
-        // handler. This also holds for multiple persist calls in context
-        // of a single command.
-        context.getPersistenceProvider().persist(replicatedLogEntry,
-            new Procedure<ReplicatedLogEntry>() {
-                @Override
-                public void apply(final ReplicatedLogEntry param) throws Exception {
-                    context.getLogger().debug("{}: persist complete {}", context.getId(), param);
-
-                    int logEntrySize = param.size();
-                    dataSizeSinceLastSnapshot += logEntrySize;
-
-                    if (callback != null) {
-                        callback.apply(param);
-                    }
-                }
-            }
-        );
+        if (doAsync) {
+            context.getPersistenceProvider().persistAsync(replicatedLogEntry,
+                entry -> persistCallback(entry, callback));
+        } else {
+            context.getPersistenceProvider().persist(replicatedLogEntry, entry -> syncPersistCallback(entry, callback));
+        }
+
+        return true;
+    }
+
+    private void persistCallback(final ReplicatedLogEntry persistedLogEntry,
+            final Consumer<ReplicatedLogEntry> callback) {
+        context.getExecutor().execute(() -> syncPersistCallback(persistedLogEntry, callback));
+    }
+
+    private void syncPersistCallback(final ReplicatedLogEntry persistedLogEntry,
+            final Consumer<ReplicatedLogEntry> callback) {
+        context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
+
+        dataSizeSinceLastSnapshot += persistedLogEntry.size();
+
+        if (callback != null) {
+            callback.accept(persistedLogEntry);
+        }
     }
-}
\ No newline at end of file
+}