Bug 5740: Remove Serializable where not necessary
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / ReplicatedLogImpl.java
index 9123c14d5d615553a2abf58d19e9a1d89ad63f39..7196fc5f12d82013c58f80248d82342d0f88aecb 100644 (file)
@@ -11,7 +11,10 @@ import akka.japi.Procedure;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
 /**
  * Implementation of ReplicatedLog used by the RaftActor.
@@ -19,8 +22,6 @@ import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
 class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     private static final int DATA_SIZE_DIVIDER = 5;
 
-    private final Procedure<DeleteEntries> deleteProcedure = NoopProcedure.instance();
-
     private final RaftActorContext context;
     private long dataSizeSinceLastSnapshot = 0L;
 
@@ -45,7 +46,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         // FIXME: Maybe this should be done after the command is saved
         long adjustedIndex = removeFrom(logEntryIndex);
         if (adjustedIndex >= 0) {
-            context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+            context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), NoopProcedure.instance());
             return true;
         }
 
@@ -53,12 +54,17 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     }
 
     @Override
-    public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
+    public boolean shouldCaptureSnapshot(long logIndex) {
         final ConfigParams config = context.getConfigParams();
-        final long journalSize = replicatedLogEntry.getIndex() + 1;
+        final long journalSize = logIndex + 1;
         final long dataThreshold = context.getTotalMemory() * config.getSnapshotDataThresholdPercentage() / 100;
 
-        if (journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold) {
+        return journalSize % config.getSnapshotBatchCount() == 0 || getDataSizeForSnapshotCheck() > dataThreshold;
+    }
+
+    @Override
+    public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
+        if (shouldCaptureSnapshot(replicatedLogEntry.getIndex())) {
             boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
                     context.getCurrentBehavior().getReplicatedToAllIndex());
             if (started && !context.hasFollowers()) {
@@ -87,38 +93,31 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     }
 
     @Override
-    public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
-        appendAndPersist(replicatedLogEntry, null);
-    }
-
-    @Override
-    public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
-            final Procedure<ReplicatedLogEntry> callback)  {
+    public boolean appendAndPersist(@Nonnull final ReplicatedLogEntry replicatedLogEntry,
+            @Nullable final Procedure<ReplicatedLogEntry> callback, boolean doAsync)  {
 
         context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
 
-        // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability
-        // of the logs
         if (!append(replicatedLogEntry)) {
-            return;
+            return false;
         }
 
-        // When persisting events with persist it is guaranteed that the
-        // persistent actor will not receive further commands between the
-        // persist call and the execution(s) of the associated event
-        // handler. This also holds for multiple persist calls in context
-        // of a single command.
-        context.getPersistenceProvider().persist(replicatedLogEntry,
-            param -> {
-                context.getLogger().debug("{}: persist complete {}", context.getId(), param);
-
-                int logEntrySize = param.size();
-                dataSizeSinceLastSnapshot += logEntrySize;
-
-                if (callback != null) {
-                    callback.apply(param);
-                }
+        Procedure<ReplicatedLogEntry> persistCallback = persistedLogEntry -> {
+            context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
+
+            dataSizeSinceLastSnapshot += persistedLogEntry.size();
+
+            if (callback != null) {
+                callback.apply(persistedLogEntry);
             }
-        );
+        };
+
+        if (doAsync) {
+            context.getPersistenceProvider().persistAsync(replicatedLogEntry, persistCallback);
+        } else {
+            context.getPersistenceProvider().persist(replicatedLogEntry, persistCallback);
+        }
+
+        return true;
     }
 }