Derive MockRaftActorContext from RaftActorContextImpl
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / ReplicatedLogImpl.java
index fdb630538130aa2ac1bfcdac43b6f5ad3808a1ba..ab1e23bb94d8a7a9790d3319a8ba977bef1baa36 100644 (file)
@@ -10,8 +10,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.japi.Procedure;
 import java.util.Collections;
 import java.util.List;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
 /**
@@ -22,51 +21,39 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
     private long dataSizeSinceLastSnapshot = 0L;
     private final RaftActorContext context;
-    private final DataPersistenceProvider persistence;
     private final RaftActorBehavior currentBehavior;
 
     private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
         @Override
-        public void apply(DeleteEntries param) {
-            dataSize = 0;
-            for (ReplicatedLogEntry entry : journal) {
-                dataSize += entry.size();
-            }
+        public void apply(DeleteEntries notUsed) {
         }
     };
 
     static ReplicatedLog newInstance(Snapshot snapshot, RaftActorContext context,
-            DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+            RaftActorBehavior currentBehavior) {
         return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
-                snapshot.getUnAppliedEntries(), context, persistence, currentBehavior);
+                snapshot.getUnAppliedEntries(), context, currentBehavior);
     }
 
-    static ReplicatedLog newInstance(RaftActorContext context,
-            DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+    static ReplicatedLog newInstance(RaftActorContext context, RaftActorBehavior currentBehavior) {
         return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context,
-                persistence, currentBehavior);
+                currentBehavior);
     }
 
     private ReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries,
-            RaftActorContext context, DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+            RaftActorContext context, RaftActorBehavior currentBehavior) {
         super(snapshotIndex, snapshotTerm, unAppliedEntries);
         this.context = context;
-        this.persistence = persistence;
         this.currentBehavior = currentBehavior;
     }
 
     @Override
     public void removeFromAndPersist(long logEntryIndex) {
-        int adjustedIndex = adjustedIndex(logEntryIndex);
-
-        if (adjustedIndex < 0) {
-            return;
-        }
-
         // FIXME: Maybe this should be done after the command is saved
-        journal.subList(adjustedIndex , journal.size()).clear();
-
-        persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+        long adjustedIndex = removeFrom(logEntryIndex);
+        if(adjustedIndex >= 0) {
+            context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+        }
     }
 
     @Override
@@ -74,6 +61,44 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         appendAndPersist(replicatedLogEntry, null);
     }
 
+    @Override
+    public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
+        long journalSize = replicatedLogEntry.getIndex() + 1;
+        long dataThreshold = context.getTotalMemory() *
+                context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
+        if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+                || getDataSizeForSnapshotCheck() > dataThreshold)) {
+
+            boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+                    currentBehavior.getReplicatedToAllIndex());
+            if (started) {
+                if (!context.hasFollowers()) {
+                    dataSizeSinceLastSnapshot = 0;
+                }
+            }
+        }
+    }
+
+    private long getDataSizeForSnapshotCheck() {
+        long dataSizeForCheck = dataSize();
+        if (!context.hasFollowers()) {
+            // When we do not have followers we do not maintain an in-memory log
+            // due to this the journalSize will never become anything close to the
+            // snapshot batch count. In fact will mostly be 1.
+            // Similarly since the journal's dataSize depends on the entries in the
+            // journal the journal's dataSize will never reach a value close to the
+            // memory threshold.
+            // By maintaining the dataSize outside the journal we are tracking essentially
+            // what we have written to the disk however since we no longer are in
+            // need of doing a snapshot just for the sake of freeing up memory we adjust
+            // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
+            // as if we were maintaining a real snapshot
+            dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
+        }
+        return dataSizeForCheck;
+    }
+
     @Override
     public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
             final Procedure<ReplicatedLogEntry> callback)  {
@@ -83,53 +108,22 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         }
 
         // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
-        journal.add(replicatedLogEntry);
+        append(replicatedLogEntry);
 
         // When persisting events with persist it is guaranteed that the
         // persistent actor will not receive further commands between the
         // persist call and the execution(s) of the associated event
         // handler. This also holds for multiple persist calls in context
         // of a single command.
-        persistence.persist(replicatedLogEntry,
+        context.getPersistenceProvider().persist(replicatedLogEntry,
             new Procedure<ReplicatedLogEntry>() {
                 @Override
                 public void apply(ReplicatedLogEntry evt) throws Exception {
-                    int logEntrySize = replicatedLogEntry.size();
-
-                    dataSize += logEntrySize;
-                    long dataSizeForCheck = dataSize;
+                    context.getLogger().debug("{}: persist complete {}", context.getId(), replicatedLogEntry);
 
+                    int logEntrySize = replicatedLogEntry.size();
                     dataSizeSinceLastSnapshot += logEntrySize;
 
-                    if (!context.hasFollowers()) {
-                        // When we do not have followers we do not maintain an in-memory log
-                        // due to this the journalSize will never become anything close to the
-                        // snapshot batch count. In fact will mostly be 1.
-                        // Similarly since the journal's dataSize depends on the entries in the
-                        // journal the journal's dataSize will never reach a value close to the
-                        // memory threshold.
-                        // By maintaining the dataSize outside the journal we are tracking essentially
-                        // what we have written to the disk however since we no longer are in
-                        // need of doing a snapshot just for the sake of freeing up memory we adjust
-                        // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
-                        // as if we were maintaining a real snapshot
-                        dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
-                    }
-                    long journalSize = replicatedLogEntry.getIndex() + 1;
-                    long dataThreshold = context.getTotalMemory() *
-                            context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
-                    if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
-                            || dataSizeForCheck > dataThreshold)) {
-
-                        boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
-                                currentBehavior.getReplicatedToAllIndex());
-
-                        if(started){
-                            dataSizeSinceLastSnapshot = 0;
-                        }
-                    }
-
                     if (callback != null){
                         callback.apply(replicatedLogEntry);
                     }