BUG-5626: use lambdas, Runnable and Consumer instead of Procedure
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
index 970fb8fc717c2b6d918c2e815fb9ef2b323a39fb..b195686d2092ff4f54353a6d18295c62ebfe9bec 100644 (file)
@@ -8,12 +8,14 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.slf4j.Logger;
 
@@ -35,16 +37,20 @@ public class SnapshotManager implements SnapshotState {
     private CaptureSnapshot captureSnapshot;
     private long lastSequenceNumber = -1;
 
-    private Procedure<Void> createSnapshotProcedure;
+    private Runnable createSnapshotProcedure;
 
-    private Snapshot applySnapshot;
-    private Procedure<byte[]> applySnapshotProcedure;
+    private ApplySnapshot applySnapshot;
+    private Consumer<byte[]> applySnapshotProcedure;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
         this.LOG = logger;
     }
 
+    public boolean isApplying() {
+        return applySnapshot != null;
+    }
+
     @Override
     public boolean isCapturing() {
         return currentState.isCapturing();
@@ -61,18 +67,18 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void apply(Snapshot snapshot) {
+    public void apply(ApplySnapshot snapshot) {
         currentState.apply(snapshot);
     }
 
     @Override
-    public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
-        currentState.persist(snapshotBytes, currentBehavior, totalMemory);
+    public void persist(final byte[] snapshotBytes, final long totalMemory) {
+        currentState.persist(snapshotBytes, totalMemory);
     }
 
     @Override
-    public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
-        currentState.commit(sequenceNumber, currentBehavior);
+    public void commit(final long sequenceNumber) {
+        currentState.commit(sequenceNumber);
     }
 
     @Override
@@ -81,15 +87,15 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
-        return currentState.trimLog(desiredTrimIndex, currentBehavior);
+    public long trimLog(final long desiredTrimIndex) {
+        return currentState.trimLog(desiredTrimIndex);
     }
 
-    public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
+    public void setCreateSnapshotRunnable(Runnable createSnapshotProcedure) {
         this.createSnapshotProcedure = createSnapshotProcedure;
     }
 
-    public void setApplySnapshotProcedure(Procedure<byte[]> applySnapshotProcedure) {
+    public void setApplySnapshotConsumer(Consumer<byte[]> applySnapshotProcedure) {
         this.applySnapshotProcedure = applySnapshotProcedure;
     }
 
@@ -103,13 +109,44 @@ public class SnapshotManager implements SnapshotState {
     }
 
     private boolean hasFollowers(){
-        return context.getPeerAddresses().keySet().size() > 0;
+        return context.hasFollowers();
     }
 
     private String persistenceId(){
         return context.getId();
     }
 
+    public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
+            boolean installSnapshotInitiated) {
+        TermInformationReader lastAppliedTermInfoReader =
+                lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
+                        lastLogEntry, hasFollowers());
+
+        long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
+        long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
+
+        TermInformationReader replicatedToAllTermInfoReader =
+                replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
+
+        long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
+        long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
+
+        List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
+
+        long lastLogEntryIndex = lastAppliedIndex;
+        long lastLogEntryTerm = lastAppliedTerm;
+        if(lastLogEntry != null) {
+            lastLogEntryIndex = lastLogEntry.getIndex();
+            lastLogEntryTerm = lastLogEntry.getTerm();
+        } else {
+            LOG.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
+                    persistenceId(), lastAppliedIndex, lastAppliedTerm);
+        }
+
+        return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
+                newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, installSnapshotInitiated);
+    }
+
     private class AbstractSnapshotState implements SnapshotState {
 
         @Override
@@ -130,17 +167,17 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void apply(Snapshot snapshot) {
+        public void apply(ApplySnapshot snapshot) {
             LOG.debug("apply should not be called in state {}", this);
         }
 
         @Override
-        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(final byte[] snapshotBytes, final long totalMemory) {
             LOG.debug("persist should not be called in state {}", this);
         }
 
         @Override
-        public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
+        public void commit(final long sequenceNumber) {
             LOG.debug("commit should not be called in state {}", this);
         }
 
@@ -150,12 +187,12 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+        public long trimLog(final long desiredTrimIndex) {
             LOG.debug("trimLog should not be called in state {}", this);
             return -1;
         }
 
-        protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
+        protected long doTrimLog(final long desiredTrimIndex) {
             //  we would want to keep the lastApplied as its used while capturing snapshots
             long lastApplied = context.getLastApplied();
             long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
@@ -174,7 +211,10 @@ public class SnapshotManager implements SnapshotState {
                 context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
                 context.getReplicatedLog().snapshotCommit();
                 return tempMin;
-            } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+            }
+
+            final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+            if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
                 // It's possible a follower was lagging and an install snapshot advanced its match index past
                 // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
                 // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
@@ -194,26 +234,7 @@ public class SnapshotManager implements SnapshotState {
         }
 
         private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
-            TermInformationReader lastAppliedTermInfoReader =
-                    lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
-                            lastLogEntry, hasFollowers());
-
-            long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
-            long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
-
-            TermInformationReader replicatedToAllTermInfoReader =
-                    replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
-
-            long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
-            long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
-
-            // send a CaptureSnapshot to self to make the expensive operation async.
-
-            List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
-
-            captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
-                    lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
-                    newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
+            captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null);
 
             if(captureSnapshot.isInstallSnapshotInitiated()) {
                 LOG.info("{}: Initiating snapshot capture {} to install on {}",
@@ -224,12 +245,12 @@ public class SnapshotManager implements SnapshotState {
 
             lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
 
-            LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
+            LOG.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber);
 
             SnapshotManager.this.currentState = CREATING;
 
             try {
-                createSnapshotProcedure.apply(null);
+                createSnapshotProcedure.run();
             } catch (Exception e) {
                 SnapshotManager.this.currentState = IDLE;
                 LOG.error("Error creating snapshot", e);
@@ -250,14 +271,14 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void apply(Snapshot snapshot) {
-            applySnapshot = snapshot;
+        public void apply(ApplySnapshot applySnapshot) {
+            SnapshotManager.this.applySnapshot = applySnapshot;
 
             lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
 
             LOG.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber);
 
-            context.getPersistenceProvider().saveSnapshot(snapshot);
+            context.getPersistenceProvider().saveSnapshot(applySnapshot.getSnapshot());
 
             SnapshotManager.this.currentState = PERSISTING;
         }
@@ -268,26 +289,28 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
-            return doTrimLog(desiredTrimIndex, currentBehavior);
+        public long trimLog(final long desiredTrimIndex) {
+            return doTrimLog(desiredTrimIndex);
         }
     }
 
     private class Creating extends AbstractSnapshotState {
 
         @Override
-        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(final byte[] snapshotBytes, final long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
             Snapshot snapshot = Snapshot.create(snapshotBytes,
                     captureSnapshot.getUnAppliedEntries(),
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
-                    captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+                    captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(),
+                    context.getTermInformation().getCurrentTerm(),
+                    context.getTermInformation().getVotedFor(), context.getPeerServerInfo(true));
 
             context.getPersistenceProvider().saveSnapshot(snapshot);
 
-            LOG.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot.getLogMessage());
+            LOG.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot);
 
             long dataThreshold = totalMemory *
                     context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
@@ -295,7 +318,8 @@ public class SnapshotManager implements SnapshotState {
 
             boolean logSizeExceededSnapshotBatchCount =
                     context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount();
-LOG.debug("Log size: {}, getSnapshotBatchCount: {}",context.getReplicatedLog().size(),context.getConfigParams().getSnapshotBatchCount());
+
+            final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
             if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
                 if(LOG.isDebugEnabled()) {
                     if(dataSizeThresholdExceeded) {
@@ -361,32 +385,37 @@ LOG.debug("Log size: {}, getSnapshotBatchCount: {}",context.getReplicatedLog().s
     private class Persisting extends AbstractSnapshotState {
 
         @Override
-        public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
-            LOG.debug("Snapshot success sequence number:", sequenceNumber);
+        public void commit(final long sequenceNumber) {
+            LOG.debug("{}: Snapshot success -  sequence number: {}", persistenceId(), sequenceNumber);
 
             if(applySnapshot != null) {
                 try {
-                    applySnapshotProcedure.apply(applySnapshot.getState());
+                    Snapshot snapshot = applySnapshot.getSnapshot();
 
                     //clears the followers log, sets the snapshot index to ensure adjusted-index works
-                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(applySnapshot, context, currentBehavior));
-                    context.setLastApplied(applySnapshot.getLastAppliedIndex());
-                    context.setCommitIndex(applySnapshot.getLastAppliedIndex());
+                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
+                    context.setLastApplied(snapshot.getLastAppliedIndex());
+                    context.setCommitIndex(snapshot.getLastAppliedIndex());
+                    context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
+
+                    if(snapshot.getState().length > 0 ) {
+                        applySnapshotProcedure.accept(snapshot.getState());
+                    }
+
+                    applySnapshot.getCallback().onSuccess();
                 } catch (Exception e) {
-                    LOG.error("Error applying snapshot", e);
+                    LOG.error("{}: Error applying snapshot", context.getId(), e);
                 }
             } else {
                 context.getReplicatedLog().snapshotCommit();
             }
 
             context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
-                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), Long.MAX_VALUE, 0L, 0L));
 
             context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
 
-            lastSequenceNumber = -1;
-            applySnapshot = null;
-            SnapshotManager.this.currentState = IDLE;
+            snapshotComplete();
         }
 
         @Override
@@ -400,11 +429,19 @@ LOG.debug("Log size: {}, getSnapshotBatchCount: {}",context.getReplicatedLog().s
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
                         context.getReplicatedLog().size());
+            } else {
+                applySnapshot.getCallback().onFailure();
             }
 
+            snapshotComplete();
+        }
+
+        private void snapshotComplete() {
             lastSequenceNumber = -1;
             applySnapshot = null;
             SnapshotManager.this.currentState = IDLE;
+
+            context.getActor().tell(SnapshotComplete.INSTANCE, context.getActor());
         }
 
         @Override