BUG-5626: use lambdas, Runnable and Consumer instead of Procedure
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
index 9571173175ff220aa3fb5b6b9ba757eee150ffe4..b195686d2092ff4f54353a6d18295c62ebfe9bec 100644 (file)
@@ -8,10 +8,10 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -37,10 +37,10 @@ public class SnapshotManager implements SnapshotState {
     private CaptureSnapshot captureSnapshot;
     private long lastSequenceNumber = -1;
 
-    private Procedure<Void> createSnapshotProcedure;
+    private Runnable createSnapshotProcedure;
 
     private ApplySnapshot applySnapshot;
-    private Procedure<byte[]> applySnapshotProcedure;
+    private Consumer<byte[]> applySnapshotProcedure;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
@@ -72,13 +72,13 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
-        currentState.persist(snapshotBytes, currentBehavior, totalMemory);
+    public void persist(final byte[] snapshotBytes, final long totalMemory) {
+        currentState.persist(snapshotBytes, totalMemory);
     }
 
     @Override
-    public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
-        currentState.commit(sequenceNumber, currentBehavior);
+    public void commit(final long sequenceNumber) {
+        currentState.commit(sequenceNumber);
     }
 
     @Override
@@ -87,15 +87,15 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
-        return currentState.trimLog(desiredTrimIndex, currentBehavior);
+    public long trimLog(final long desiredTrimIndex) {
+        return currentState.trimLog(desiredTrimIndex);
     }
 
-    public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
+    public void setCreateSnapshotRunnable(Runnable createSnapshotProcedure) {
         this.createSnapshotProcedure = createSnapshotProcedure;
     }
 
-    public void setApplySnapshotProcedure(Procedure<byte[]> applySnapshotProcedure) {
+    public void setApplySnapshotConsumer(Consumer<byte[]> applySnapshotProcedure) {
         this.applySnapshotProcedure = applySnapshotProcedure;
     }
 
@@ -139,8 +139,8 @@ public class SnapshotManager implements SnapshotState {
             lastLogEntryIndex = lastLogEntry.getIndex();
             lastLogEntryTerm = lastLogEntry.getTerm();
         } else {
-            LOG.warn("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
-                lastAppliedIndex, lastAppliedTerm);
+            LOG.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
+                    persistenceId(), lastAppliedIndex, lastAppliedTerm);
         }
 
         return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
@@ -172,12 +172,12 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(final byte[] snapshotBytes, final long totalMemory) {
             LOG.debug("persist should not be called in state {}", this);
         }
 
         @Override
-        public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
+        public void commit(final long sequenceNumber) {
             LOG.debug("commit should not be called in state {}", this);
         }
 
@@ -187,12 +187,12 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+        public long trimLog(final long desiredTrimIndex) {
             LOG.debug("trimLog should not be called in state {}", this);
             return -1;
         }
 
-        protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
+        protected long doTrimLog(final long desiredTrimIndex) {
             //  we would want to keep the lastApplied as its used while capturing snapshots
             long lastApplied = context.getLastApplied();
             long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
@@ -211,7 +211,10 @@ public class SnapshotManager implements SnapshotState {
                 context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
                 context.getReplicatedLog().snapshotCommit();
                 return tempMin;
-            } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+            }
+
+            final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+            if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
                 // It's possible a follower was lagging and an install snapshot advanced its match index past
                 // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
                 // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
@@ -242,12 +245,12 @@ public class SnapshotManager implements SnapshotState {
 
             lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
 
-            LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
+            LOG.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber);
 
             SnapshotManager.this.currentState = CREATING;
 
             try {
-                createSnapshotProcedure.apply(null);
+                createSnapshotProcedure.run();
             } catch (Exception e) {
                 SnapshotManager.this.currentState = IDLE;
                 LOG.error("Error creating snapshot", e);
@@ -286,15 +289,15 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
-            return doTrimLog(desiredTrimIndex, currentBehavior);
+        public long trimLog(final long desiredTrimIndex) {
+            return doTrimLog(desiredTrimIndex);
         }
     }
 
     private class Creating extends AbstractSnapshotState {
 
         @Override
-        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(final byte[] snapshotBytes, final long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
@@ -303,7 +306,7 @@ public class SnapshotManager implements SnapshotState {
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(),
                     context.getTermInformation().getCurrentTerm(),
-                    context.getTermInformation().getVotedFor());
+                    context.getTermInformation().getVotedFor(), context.getPeerServerInfo(true));
 
             context.getPersistenceProvider().saveSnapshot(snapshot);
 
@@ -316,6 +319,7 @@ public class SnapshotManager implements SnapshotState {
             boolean logSizeExceededSnapshotBatchCount =
                     context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount();
 
+            final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
             if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
                 if(LOG.isDebugEnabled()) {
                     if(dataSizeThresholdExceeded) {
@@ -381,29 +385,33 @@ public class SnapshotManager implements SnapshotState {
     private class Persisting extends AbstractSnapshotState {
 
         @Override
-        public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
-            LOG.debug("Snapshot success sequence number: {}", sequenceNumber);
+        public void commit(final long sequenceNumber) {
+            LOG.debug("{}: Snapshot success -  sequence number: {}", persistenceId(), sequenceNumber);
 
             if(applySnapshot != null) {
                 try {
                     Snapshot snapshot = applySnapshot.getSnapshot();
-                    applySnapshotProcedure.apply(snapshot.getState());
 
                     //clears the followers log, sets the snapshot index to ensure adjusted-index works
-                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
+                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
                     context.setLastApplied(snapshot.getLastAppliedIndex());
                     context.setCommitIndex(snapshot.getLastAppliedIndex());
+                    context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
+
+                    if(snapshot.getState().length > 0 ) {
+                        applySnapshotProcedure.accept(snapshot.getState());
+                    }
 
                     applySnapshot.getCallback().onSuccess();
                 } catch (Exception e) {
-                    LOG.error("Error applying snapshot", e);
+                    LOG.error("{}: Error applying snapshot", context.getId(), e);
                 }
             } else {
                 context.getReplicatedLog().snapshotCommit();
             }
 
             context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
-                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), Long.MAX_VALUE, 0L, 0L));
 
             context.getPersistenceProvider().deleteMessages(lastSequenceNumber);