BUG-5626: use lambdas, Runnable and Consumer instead of Procedure
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
index 8e0d2f820b722a1b91742e67b5a438467af35ec3..b195686d2092ff4f54353a6d18295c62ebfe9bec 100644 (file)
@@ -8,13 +8,14 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.slf4j.Logger;
 
@@ -36,10 +37,10 @@ public class SnapshotManager implements SnapshotState {
     private CaptureSnapshot captureSnapshot;
     private long lastSequenceNumber = -1;
 
-    private Procedure<Void> createSnapshotProcedure;
+    private Runnable createSnapshotProcedure;
 
     private ApplySnapshot applySnapshot;
-    private Procedure<byte[]> applySnapshotProcedure;
+    private Consumer<byte[]> applySnapshotProcedure;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
@@ -71,13 +72,13 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
-        currentState.persist(snapshotBytes, currentBehavior, totalMemory);
+    public void persist(final byte[] snapshotBytes, final long totalMemory) {
+        currentState.persist(snapshotBytes, totalMemory);
     }
 
     @Override
-    public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
-        currentState.commit(sequenceNumber, currentBehavior);
+    public void commit(final long sequenceNumber) {
+        currentState.commit(sequenceNumber);
     }
 
     @Override
@@ -86,15 +87,15 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
-        return currentState.trimLog(desiredTrimIndex, currentBehavior);
+    public long trimLog(final long desiredTrimIndex) {
+        return currentState.trimLog(desiredTrimIndex);
     }
 
-    public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
+    public void setCreateSnapshotRunnable(Runnable createSnapshotProcedure) {
         this.createSnapshotProcedure = createSnapshotProcedure;
     }
 
-    public void setApplySnapshotProcedure(Procedure<byte[]> applySnapshotProcedure) {
+    public void setApplySnapshotConsumer(Consumer<byte[]> applySnapshotProcedure) {
         this.applySnapshotProcedure = applySnapshotProcedure;
     }
 
@@ -108,13 +109,44 @@ public class SnapshotManager implements SnapshotState {
     }
 
     private boolean hasFollowers(){
-        return context.getPeerAddresses().keySet().size() > 0;
+        return context.hasFollowers();
     }
 
     private String persistenceId(){
         return context.getId();
     }
 
+    public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
+            boolean installSnapshotInitiated) {
+        TermInformationReader lastAppliedTermInfoReader =
+                lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
+                        lastLogEntry, hasFollowers());
+
+        long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
+        long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
+
+        TermInformationReader replicatedToAllTermInfoReader =
+                replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
+
+        long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
+        long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
+
+        List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
+
+        long lastLogEntryIndex = lastAppliedIndex;
+        long lastLogEntryTerm = lastAppliedTerm;
+        if(lastLogEntry != null) {
+            lastLogEntryIndex = lastLogEntry.getIndex();
+            lastLogEntryTerm = lastLogEntry.getTerm();
+        } else {
+            LOG.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
+                    persistenceId(), lastAppliedIndex, lastAppliedTerm);
+        }
+
+        return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
+                newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, installSnapshotInitiated);
+    }
+
     private class AbstractSnapshotState implements SnapshotState {
 
         @Override
@@ -140,12 +172,12 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(final byte[] snapshotBytes, final long totalMemory) {
             LOG.debug("persist should not be called in state {}", this);
         }
 
         @Override
-        public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
+        public void commit(final long sequenceNumber) {
             LOG.debug("commit should not be called in state {}", this);
         }
 
@@ -155,12 +187,12 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+        public long trimLog(final long desiredTrimIndex) {
             LOG.debug("trimLog should not be called in state {}", this);
             return -1;
         }
 
-        protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
+        protected long doTrimLog(final long desiredTrimIndex) {
             //  we would want to keep the lastApplied as its used while capturing snapshots
             long lastApplied = context.getLastApplied();
             long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
@@ -179,7 +211,10 @@ public class SnapshotManager implements SnapshotState {
                 context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
                 context.getReplicatedLog().snapshotCommit();
                 return tempMin;
-            } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+            }
+
+            final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+            if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
                 // It's possible a follower was lagging and an install snapshot advanced its match index past
                 // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
                 // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
@@ -199,36 +234,7 @@ public class SnapshotManager implements SnapshotState {
         }
 
         private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
-            TermInformationReader lastAppliedTermInfoReader =
-                    lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
-                            lastLogEntry, hasFollowers());
-
-            long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
-            long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
-
-            TermInformationReader replicatedToAllTermInfoReader =
-                    replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
-
-            long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
-            long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
-
-            // send a CaptureSnapshot to self to make the expensive operation async.
-
-            List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
-
-            long lastLogEntryIndex = lastAppliedIndex;
-            long lastLogEntryTerm = lastAppliedTerm;
-            if(lastLogEntry != null) {
-                lastLogEntryIndex = lastLogEntry.getIndex();
-                lastLogEntryTerm = lastLogEntry.getTerm();
-            } else {
-                LOG.warn("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
-                    lastAppliedIndex, lastAppliedTerm);
-            }
-
-            captureSnapshot = new CaptureSnapshot(lastLogEntryIndex,
-                lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
-                    newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
+            captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null);
 
             if(captureSnapshot.isInstallSnapshotInitiated()) {
                 LOG.info("{}: Initiating snapshot capture {} to install on {}",
@@ -239,12 +245,12 @@ public class SnapshotManager implements SnapshotState {
 
             lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
 
-            LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
+            LOG.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber);
 
             SnapshotManager.this.currentState = CREATING;
 
             try {
-                createSnapshotProcedure.apply(null);
+                createSnapshotProcedure.run();
             } catch (Exception e) {
                 SnapshotManager.this.currentState = IDLE;
                 LOG.error("Error creating snapshot", e);
@@ -283,15 +289,15 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
-            return doTrimLog(desiredTrimIndex, currentBehavior);
+        public long trimLog(final long desiredTrimIndex) {
+            return doTrimLog(desiredTrimIndex);
         }
     }
 
     private class Creating extends AbstractSnapshotState {
 
         @Override
-        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(final byte[] snapshotBytes, final long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
@@ -300,7 +306,7 @@ public class SnapshotManager implements SnapshotState {
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(),
                     context.getTermInformation().getCurrentTerm(),
-                    context.getTermInformation().getVotedFor());
+                    context.getTermInformation().getVotedFor(), context.getPeerServerInfo(true));
 
             context.getPersistenceProvider().saveSnapshot(snapshot);
 
@@ -313,6 +319,7 @@ public class SnapshotManager implements SnapshotState {
             boolean logSizeExceededSnapshotBatchCount =
                     context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount();
 
+            final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
             if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
                 if(LOG.isDebugEnabled()) {
                     if(dataSizeThresholdExceeded) {
@@ -378,35 +385,37 @@ public class SnapshotManager implements SnapshotState {
     private class Persisting extends AbstractSnapshotState {
 
         @Override
-        public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
-            LOG.debug("Snapshot success sequence number: {}", sequenceNumber);
+        public void commit(final long sequenceNumber) {
+            LOG.debug("{}: Snapshot success -  sequence number: {}", persistenceId(), sequenceNumber);
 
             if(applySnapshot != null) {
                 try {
                     Snapshot snapshot = applySnapshot.getSnapshot();
-                    applySnapshotProcedure.apply(snapshot.getState());
 
                     //clears the followers log, sets the snapshot index to ensure adjusted-index works
-                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
+                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
                     context.setLastApplied(snapshot.getLastAppliedIndex());
                     context.setCommitIndex(snapshot.getLastAppliedIndex());
+                    context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
+
+                    if(snapshot.getState().length > 0 ) {
+                        applySnapshotProcedure.accept(snapshot.getState());
+                    }
 
                     applySnapshot.getCallback().onSuccess();
                 } catch (Exception e) {
-                    LOG.error("Error applying snapshot", e);
+                    LOG.error("{}: Error applying snapshot", context.getId(), e);
                 }
             } else {
                 context.getReplicatedLog().snapshotCommit();
             }
 
             context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
-                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), Long.MAX_VALUE, 0L, 0L));
 
             context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
 
-            lastSequenceNumber = -1;
-            applySnapshot = null;
-            SnapshotManager.this.currentState = IDLE;
+            snapshotComplete();
         }
 
         @Override
@@ -424,9 +433,15 @@ public class SnapshotManager implements SnapshotState {
                 applySnapshot.getCallback().onFailure();
             }
 
+            snapshotComplete();
+        }
+
+        private void snapshotComplete() {
             lastSequenceNumber = -1;
             applySnapshot = null;
             SnapshotManager.this.currentState = IDLE;
+
+            context.getActor().tell(SnapshotComplete.INSTANCE, context.getActor());
         }
 
         @Override