Use SnapshotManager
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
index cb38e82ac3c7f5bb42cee275e3ed1f24a423ee0b..432d678491e96a5509502265014ea755bc7e8eee 100644 (file)
@@ -47,13 +47,13 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
-        currentState.captureToInstall(lastLogEntry, replicatedToAllIndex);
+    public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+        return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower);
     }
 
     @Override
-    public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
-        currentState.capture(lastLogEntry, replicatedToAllIndex);
+    public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        return currentState.capture(lastLogEntry, replicatedToAllIndex);
     }
 
     @Override
@@ -62,8 +62,9 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
-        currentState.persist(persistenceProvider, snapshotBytes, currentBehavior);
+    public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+                        RaftActorBehavior currentBehavior, long totalMemory) {
+        currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory);
     }
 
     @Override
@@ -77,8 +78,8 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public long trimLog(long desiredTrimIndex) {
-        return currentState.trimLog(desiredTrimIndex);
+    public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+        return currentState.trimLog(desiredTrimIndex, currentBehavior);
     }
 
     private boolean hasFollowers(){
@@ -97,13 +98,15 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
             LOG.debug("capture should not be called in state {}", this);
+            return false;
         }
 
         @Override
-        public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
             LOG.debug("captureToInstall should not be called in state {}", this);
+            return false;
         }
 
         @Override
@@ -112,7 +115,8 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
+        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+                            RaftActorBehavior currentBehavior, long totalMemory) {
             LOG.debug("persist should not be called in state {}", this);
         }
 
@@ -127,17 +131,22 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex) {
+        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
             LOG.debug("trimLog should not be called in state {}", this);
             return -1;
         }
 
-        protected long doTrimLog(long desiredTrimIndex){
+        protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
             //  we would want to keep the lastApplied as its used while capturing snapshots
             long lastApplied = context.getLastApplied();
             long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
 
-            if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
+            if(LOG.isTraceEnabled()) {
+                LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
+                        persistenceId(), desiredTrimIndex, lastApplied, tempMin);
+            }
+
+            if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
                 LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
                         context.getTermInformation().getCurrentTerm());
 
@@ -146,15 +155,21 @@ public class SnapshotManager implements SnapshotState {
                 context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
                 context.getReplicatedLog().snapshotCommit();
                 return tempMin;
+            } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+                // It's possible a follower was lagging and an install snapshot advanced its match index past
+                // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
+                // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
+                // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
+                // trim the log to the last applied index even if previous entries weren't replicated to all followers.
+                currentBehavior.setReplicatedToAllIndex(tempMin);
             }
-
             return -1;
         }
     }
 
     private class Idle extends AbstractSnapshotState {
 
-        private void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, boolean toInstall) {
+        private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
             TermInformationReader lastAppliedTermInfoReader =
                     lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
                             lastLogEntry, hasFollowers());
@@ -171,24 +186,30 @@ public class SnapshotManager implements SnapshotState {
             // send a CaptureSnapshot to self to make the expensive operation async.
             captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
                     lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
-                    newReplicatedToAllIndex, newReplicatedToAllTerm, toInstall);
+                    newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
 
             SnapshotManager.this.currentState = CAPTURING;
 
-            LOG.info("{}: Initiating snapshot capture {}: {}", persistenceId(), toInstall ? "to install" : "",
-                    captureSnapshot);
+            if(targetFollower != null){
+                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
+            } else {
+                LOG.info("{}: Initiating snapshot capture {} to install on {}",
+                        persistenceId(), captureSnapshot, targetFollower);
+            }
 
             context.getActor().tell(captureSnapshot, context.getActor());
+
+            return true;
         }
 
         @Override
-        public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
-            capture(lastLogEntry, replicatedToAllIndex, false);
+        public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            return capture(lastLogEntry, replicatedToAllIndex, null);
         }
 
         @Override
-        public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
-            capture(lastLogEntry, replicatedToAllIndex, true);
+        public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+            return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
         }
 
         @Override
@@ -197,8 +218,8 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex) {
-            return doTrimLog(desiredTrimIndex);
+        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+            return doTrimLog(desiredTrimIndex, currentBehavior);
         }
     }
 
@@ -235,7 +256,7 @@ public class SnapshotManager implements SnapshotState {
 
         @Override
         public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
-                            RaftActorBehavior currentBehavior) {
+                            RaftActorBehavior currentBehavior, long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
@@ -248,16 +269,28 @@ public class SnapshotManager implements SnapshotState {
 
             LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
-            long dataThreshold = Runtime.getRuntime().totalMemory() *
+            long dataThreshold = totalMemory *
                     context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
             if (context.getReplicatedLog().dataSize() > dataThreshold) {
+
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
+                            persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
+                            captureSnapshot.getLastAppliedIndex());
+                }
+
                 // if memory is less, clear the log based on lastApplied.
                 // this could/should only happen if one of the followers is down
                 // as normally we keep removing from the log when its replicated to all.
                 context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
                         captureSnapshot.getLastAppliedTerm());
 
-                currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+                // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
+                // install snapshot to a follower.
+                if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
+                    currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+                }
+
             } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
                 // clear the log based on replicatedToAllIndex
                 context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
@@ -350,7 +383,7 @@ public class SnapshotManager implements SnapshotState {
             } else if (entry != null) {
                 index = entry.getIndex();
                 term = entry.getTerm();
-            } else if(originalIndex == log.getSnapshotIndex()){
+            } else if(log.getSnapshotIndex() > -1){
                 index = log.getSnapshotIndex();
                 term = log.getSnapshotTerm();
             }