Merge "Add missing copyright text"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
index cb38e82ac3c7f5bb42cee275e3ed1f24a423ee0b..9a916625c9331413685d6263bfe053930b6795bf 100644 (file)
@@ -10,8 +10,9 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import java.util.List;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -19,9 +20,7 @@ import org.slf4j.Logger;
 
 public class SnapshotManager implements SnapshotState {
 
-
     private final SnapshotState IDLE = new Idle();
-    private final SnapshotState CAPTURING = new Capturing();
     private final SnapshotState PERSISTING = new Persisting();
     private final SnapshotState CREATING = new Creating();
 
@@ -35,6 +34,9 @@ public class SnapshotManager implements SnapshotState {
 
     private SnapshotState currentState = IDLE;
     private CaptureSnapshot captureSnapshot;
+    private long lastSequenceNumber = -1;
+
+    private Procedure<Void> createSnapshotProcedure;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
@@ -47,28 +49,23 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
-        currentState.captureToInstall(lastLogEntry, replicatedToAllIndex);
+    public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+        return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower);
     }
 
     @Override
-    public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
-        currentState.capture(lastLogEntry, replicatedToAllIndex);
+    public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        return currentState.capture(lastLogEntry, replicatedToAllIndex);
     }
 
     @Override
-    public void create(Procedure<Void> callback) {
-        currentState.create(callback);
+    public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+        currentState.persist(snapshotBytes, currentBehavior, totalMemory);
     }
 
     @Override
-    public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
-        currentState.persist(persistenceProvider, snapshotBytes, currentBehavior);
-    }
-
-    @Override
-    public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
-        currentState.commit(persistenceProvider, sequenceNumber);
+    public void commit(long sequenceNumber) {
+        currentState.commit(sequenceNumber);
     }
 
     @Override
@@ -77,8 +74,17 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public long trimLog(long desiredTrimIndex) {
-        return currentState.trimLog(desiredTrimIndex);
+    public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+        return currentState.trimLog(desiredTrimIndex, currentBehavior);
+    }
+
+    public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
+        this.createSnapshotProcedure = createSnapshotProcedure;
+    }
+
+    @VisibleForTesting
+    public CaptureSnapshot getCaptureSnapshot() {
+        return captureSnapshot;
     }
 
     private boolean hasFollowers(){
@@ -97,27 +103,24 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
             LOG.debug("capture should not be called in state {}", this);
+            return false;
         }
 
         @Override
-        public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
             LOG.debug("captureToInstall should not be called in state {}", this);
+            return false;
         }
 
         @Override
-        public void create(Procedure<Void> callback) {
-            LOG.debug("create should not be called in state {}", this);
-        }
-
-        @Override
-        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
+        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
             LOG.debug("persist should not be called in state {}", this);
         }
 
         @Override
-        public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+        public void commit(long sequenceNumber) {
             LOG.debug("commit should not be called in state {}", this);
         }
 
@@ -127,17 +130,22 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex) {
+        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
             LOG.debug("trimLog should not be called in state {}", this);
             return -1;
         }
 
-        protected long doTrimLog(long desiredTrimIndex){
+        protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
             //  we would want to keep the lastApplied as its used while capturing snapshots
             long lastApplied = context.getLastApplied();
             long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
 
-            if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
+            if(LOG.isTraceEnabled()) {
+                LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
+                        persistenceId(), desiredTrimIndex, lastApplied, tempMin);
+            }
+
+            if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
                 LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
                         context.getTermInformation().getCurrentTerm());
 
@@ -146,15 +154,21 @@ public class SnapshotManager implements SnapshotState {
                 context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
                 context.getReplicatedLog().snapshotCommit();
                 return tempMin;
+            } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+                // It's possible a follower was lagging and an install snapshot advanced its match index past
+                // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
+                // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
+                // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
+                // trim the log to the last applied index even if previous entries weren't replicated to all followers.
+                currentBehavior.setReplicatedToAllIndex(tempMin);
             }
-
             return -1;
         }
     }
 
     private class Idle extends AbstractSnapshotState {
 
-        private void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, boolean toInstall) {
+        private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
             TermInformationReader lastAppliedTermInfoReader =
                     lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
                             lastLogEntry, hasFollowers());
@@ -169,61 +183,56 @@ public class SnapshotManager implements SnapshotState {
             long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
 
             // send a CaptureSnapshot to self to make the expensive operation async.
+
+            List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
+
             captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
                     lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
-                    newReplicatedToAllIndex, newReplicatedToAllTerm, toInstall);
+                    newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
 
-            SnapshotManager.this.currentState = CAPTURING;
+            if(captureSnapshot.isInstallSnapshotInitiated()) {
+                LOG.info("{}: Initiating snapshot capture {} to install on {}",
+                        persistenceId(), captureSnapshot, targetFollower);
+            } else {
+                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
+            }
 
-            LOG.info("{}: Initiating snapshot capture {}: {}", persistenceId(), toInstall ? "to install" : "",
-                    captureSnapshot);
+            lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
 
-            context.getActor().tell(captureSnapshot, context.getActor());
-        }
+            LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
 
-        @Override
-        public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
-            capture(lastLogEntry, replicatedToAllIndex, false);
-        }
+            SnapshotManager.this.currentState = CREATING;
 
-        @Override
-        public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
-            capture(lastLogEntry, replicatedToAllIndex, true);
-        }
+            try {
+                createSnapshotProcedure.apply(null);
+            } catch (Exception e) {
+                SnapshotManager.this.currentState = IDLE;
+                LOG.error("Error creating snapshot", e);
+                return false;
+            }
 
-        @Override
-        public String toString() {
-            return "Idle";
+            return true;
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex) {
-            return doTrimLog(desiredTrimIndex);
+        public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            return capture(lastLogEntry, replicatedToAllIndex, null);
         }
-    }
-
-    private class Capturing extends AbstractSnapshotState {
 
         @Override
-        public boolean isCapturing() {
-            return true;
+        public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+            return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
         }
 
         @Override
-        public void create(Procedure<Void> callback) {
-            try {
-                callback.apply(null);
-                SnapshotManager.this.currentState = CREATING;
-            } catch (Exception e) {
-                LOG.error("Unexpected error occurred", e);
-            }
+        public String toString() {
+            return "Idle";
         }
 
         @Override
-        public String toString() {
-            return "Capturing";
+        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+            return doTrimLog(desiredTrimIndex, currentBehavior);
         }
-
     }
 
     private class Creating extends AbstractSnapshotState {
@@ -234,30 +243,41 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
-                            RaftActorBehavior currentBehavior) {
+        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
             Snapshot sn = Snapshot.create(snapshotBytes,
-                    context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+                    captureSnapshot.getUnAppliedEntries(),
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
-            persistenceProvider.saveSnapshot(sn);
+            context.getPersistenceProvider().saveSnapshot(sn);
 
             LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
-            long dataThreshold = Runtime.getRuntime().totalMemory() *
+            long dataThreshold = totalMemory *
                     context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
             if (context.getReplicatedLog().dataSize() > dataThreshold) {
+
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
+                            persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
+                            captureSnapshot.getLastAppliedIndex());
+                }
+
                 // if memory is less, clear the log based on lastApplied.
                 // this could/should only happen if one of the followers is down
                 // as normally we keep removing from the log when its replicated to all.
                 context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
                         captureSnapshot.getLastAppliedTerm());
 
-                currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+                // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
+                // install snapshot to a follower.
+                if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
+                    currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+                }
+
             } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
                 // clear the log based on replicatedToAllIndex
                 context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
@@ -298,13 +318,14 @@ public class SnapshotManager implements SnapshotState {
     private class Persisting extends AbstractSnapshotState {
 
         @Override
-        public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+        public void commit(long sequenceNumber) {
             context.getReplicatedLog().snapshotCommit();
-            persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
+            context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
                     sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
-            persistenceProvider.deleteMessages(sequenceNumber);
+            context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
 
+            lastSequenceNumber = -1;
             SnapshotManager.this.currentState = IDLE;
         }
 
@@ -333,7 +354,7 @@ public class SnapshotManager implements SnapshotState {
         long getTerm();
     }
 
-    private static class LastAppliedTermInformationReader implements TermInformationReader{
+    static class LastAppliedTermInformationReader implements TermInformationReader{
         private long index;
         private long term;
 
@@ -350,7 +371,7 @@ public class SnapshotManager implements SnapshotState {
             } else if (entry != null) {
                 index = entry.getIndex();
                 term = entry.getTerm();
-            } else if(originalIndex == log.getSnapshotIndex()){
+            } else if(log.getSnapshotIndex() > -1){
                 index = log.getSnapshotIndex();
                 term = log.getSnapshotTerm();
             }