BUG-5626: use lambdas, Runnable and Consumer instead of Procedure
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
index 197fa867156265eb0b7ddda8bbab7586a40dea3b..b195686d2092ff4f54353a6d18295c62ebfe9bec 100644 (file)
@@ -8,10 +8,10 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -37,10 +37,10 @@ public class SnapshotManager implements SnapshotState {
     private CaptureSnapshot captureSnapshot;
     private long lastSequenceNumber = -1;
 
-    private Procedure<Void> createSnapshotProcedure;
+    private Runnable createSnapshotProcedure;
 
     private ApplySnapshot applySnapshot;
-    private Procedure<byte[]> applySnapshotProcedure;
+    private Consumer<byte[]> applySnapshotProcedure;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
@@ -72,13 +72,13 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
-        currentState.persist(snapshotBytes, currentBehavior, totalMemory);
+    public void persist(final byte[] snapshotBytes, final long totalMemory) {
+        currentState.persist(snapshotBytes, totalMemory);
     }
 
     @Override
-    public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
-        currentState.commit(sequenceNumber, currentBehavior);
+    public void commit(final long sequenceNumber) {
+        currentState.commit(sequenceNumber);
     }
 
     @Override
@@ -87,15 +87,15 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
-        return currentState.trimLog(desiredTrimIndex, currentBehavior);
+    public long trimLog(final long desiredTrimIndex) {
+        return currentState.trimLog(desiredTrimIndex);
     }
 
-    public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
+    public void setCreateSnapshotRunnable(Runnable createSnapshotProcedure) {
         this.createSnapshotProcedure = createSnapshotProcedure;
     }
 
-    public void setApplySnapshotProcedure(Procedure<byte[]> applySnapshotProcedure) {
+    public void setApplySnapshotConsumer(Consumer<byte[]> applySnapshotProcedure) {
         this.applySnapshotProcedure = applySnapshotProcedure;
     }
 
@@ -172,12 +172,12 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(final byte[] snapshotBytes, final long totalMemory) {
             LOG.debug("persist should not be called in state {}", this);
         }
 
         @Override
-        public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
+        public void commit(final long sequenceNumber) {
             LOG.debug("commit should not be called in state {}", this);
         }
 
@@ -187,12 +187,12 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+        public long trimLog(final long desiredTrimIndex) {
             LOG.debug("trimLog should not be called in state {}", this);
             return -1;
         }
 
-        protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
+        protected long doTrimLog(final long desiredTrimIndex) {
             //  we would want to keep the lastApplied as its used while capturing snapshots
             long lastApplied = context.getLastApplied();
             long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
@@ -211,7 +211,10 @@ public class SnapshotManager implements SnapshotState {
                 context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
                 context.getReplicatedLog().snapshotCommit();
                 return tempMin;
-            } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+            }
+
+            final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+            if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
                 // It's possible a follower was lagging and an install snapshot advanced its match index past
                 // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
                 // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
@@ -247,7 +250,7 @@ public class SnapshotManager implements SnapshotState {
             SnapshotManager.this.currentState = CREATING;
 
             try {
-                createSnapshotProcedure.apply(null);
+                createSnapshotProcedure.run();
             } catch (Exception e) {
                 SnapshotManager.this.currentState = IDLE;
                 LOG.error("Error creating snapshot", e);
@@ -286,15 +289,15 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
-            return doTrimLog(desiredTrimIndex, currentBehavior);
+        public long trimLog(final long desiredTrimIndex) {
+            return doTrimLog(desiredTrimIndex);
         }
     }
 
     private class Creating extends AbstractSnapshotState {
 
         @Override
-        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(final byte[] snapshotBytes, final long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
@@ -316,6 +319,7 @@ public class SnapshotManager implements SnapshotState {
             boolean logSizeExceededSnapshotBatchCount =
                     context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount();
 
+            final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
             if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
                 if(LOG.isDebugEnabled()) {
                     if(dataSizeThresholdExceeded) {
@@ -381,7 +385,7 @@ public class SnapshotManager implements SnapshotState {
     private class Persisting extends AbstractSnapshotState {
 
         @Override
-        public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
+        public void commit(final long sequenceNumber) {
             LOG.debug("{}: Snapshot success -  sequence number: {}", persistenceId(), sequenceNumber);
 
             if(applySnapshot != null) {
@@ -389,13 +393,13 @@ public class SnapshotManager implements SnapshotState {
                     Snapshot snapshot = applySnapshot.getSnapshot();
 
                     //clears the followers log, sets the snapshot index to ensure adjusted-index works
-                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
+                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
                     context.setLastApplied(snapshot.getLastAppliedIndex());
                     context.setCommitIndex(snapshot.getLastAppliedIndex());
                     context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
 
                     if(snapshot.getState().length > 0 ) {
-                        applySnapshotProcedure.apply(snapshot.getState());
+                        applySnapshotProcedure.accept(snapshot.getState());
                     }
 
                     applySnapshot.getCallback().onSuccess();