Use SnapshotManager 11/15611/8
authorMoiz Raja <moraja@cisco.com>
Tue, 3 Mar 2015 18:11:41 +0000 (10:11 -0800)
committerMoiz Raja <moraja@cisco.com>
Wed, 25 Mar 2015 18:35:04 +0000 (11:35 -0700)
Change-Id: I0d5d5aee12afb2b1f89fb7e4113c4fa5cc334dd3
Signed-off-by: Moiz Raja <moraja@cisco.com>
13 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index aa72485..1980411 100644 (file)
@@ -22,7 +22,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
@@ -40,7 +39,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
@@ -104,6 +102,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 public void apply(ApplyJournalEntries param) throws Exception {
                 }
             };
+    private static final String COMMIT_SNAPSHOT = "commit_snapshot";
 
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
@@ -119,13 +118,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      */
     private final RaftActorContextImpl context;
 
+    private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
+
     /**
      * The in-memory journal
      */
     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
 
-    private CaptureSnapshot captureSnapshot = null;
-
     private Stopwatch recoveryTimer;
 
     private int currentRecoveryBatchCount;
@@ -379,26 +378,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
                     persistenceId(), saveSnapshotFailure.cause());
 
-            context.getReplicatedLog().snapshotRollback();
-
-            LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
-                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
-                context.getReplicatedLog().getSnapshotIndex(),
-                context.getReplicatedLog().getSnapshotTerm(),
-                context.getReplicatedLog().size());
+            context.getSnapshotManager().rollback();
 
         } else if (message instanceof CaptureSnapshot) {
             LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
 
-            if(captureSnapshot == null) {
-                captureSnapshot = (CaptureSnapshot)message;
-                createSnapshot();
-            }
+            context.getSnapshotManager().create(createSnapshotProcedure);
 
-        } else if (message instanceof CaptureSnapshotReply){
+        } else if (message instanceof CaptureSnapshotReply) {
             handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
         } else if(message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
+        } else if (message.equals(COMMIT_SNAPSHOT)) {
+            commitSnapshot(-1);
         } else {
             reusableBehaviorStateHolder.init(currentBehavior);
 
@@ -416,7 +408,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .currentTerm(context.getTermInformation().getCurrentTerm())
                 .inMemoryJournalDataSize(replicatedLog.dataSize())
                 .inMemoryJournalLogSize(replicatedLog.size())
-                .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated())
+                .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
                 .lastApplied(context.getLastApplied())
                 .lastIndex(replicatedLog.lastIndex())
                 .lastTerm(replicatedLog.lastTerm())
@@ -515,15 +507,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             // the state to durable storage
                             self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
 
-                            // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
-                            if(!context.isSnapshotCaptureInitiated()){
-                                raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
-                                        raftContext.getTermInformation().getCurrentTerm());
-                                raftContext.getReplicatedLog().snapshotCommit();
-                            } else {
-                                LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
-                                        persistenceId(), getId());
-                            }
+                            context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+
                         } else if (clientActor != null) {
                             // Send message for replication
                             currentBehavior.handleMessage(getSelf(),
@@ -621,10 +606,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void commitSnapshot(long sequenceNumber) {
-        context.getReplicatedLog().snapshotCommit();
-
-        // TODO: Not sure if we want to be this aggressive with trimming stuff
-        trimPersistentData(sequenceNumber);
+        context.getSnapshotManager().commit(persistence(), sequenceNumber);
     }
 
     /**
@@ -747,67 +729,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
         LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
 
-        // create a snapshot object from the state provided and save it
-        // when snapshot is saved async, SaveSnapshotSuccess is raised.
-
-        Snapshot sn = Snapshot.create(snapshotBytes,
-            context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
-            captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
-            captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
-
-        persistence().saveSnapshot(sn);
-
-        LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
-
-        long dataThreshold = getTotalMemory() *
-                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-        if (context.getReplicatedLog().dataSize() > dataThreshold) {
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
-                        persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
-                        captureSnapshot.getLastAppliedIndex());
-            }
-
-            // if memory is less, clear the log based on lastApplied.
-            // this could/should only happen if one of the followers is down
-            // as normally we keep removing from the log when its replicated to all.
-            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
-                    captureSnapshot.getLastAppliedTerm());
-
-            // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
-            // install snapshot to a follower.
-            if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
-                getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
-            }
-        } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
-            // clear the log based on replicatedToAllIndex
-            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
-                    captureSnapshot.getReplicatedToAllTerm());
-
-            getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
-        } else {
-            // The replicatedToAllIndex was not found in the log
-            // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
-            // In this scenario we may need to save the snapshot to the akka persistence
-            // snapshot for recovery but we do not need to do the replicated log trimming.
-            context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
-                    replicatedLog.getSnapshotTerm());
-        }
-
-
-        LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " +
-            "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(),
-            replicatedLog.getSnapshotTerm());
-
-        if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
-            // this would be call straight to the leader and won't initiate in serialization
-            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
-                    ByteString.copyFrom(snapshotBytes)));
-        }
-
-        captureSnapshot = null;
-        context.setSnapshotCaptureInitiated(false);
+        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
     }
 
     protected long getTotalMemory() {
@@ -819,9 +741,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
-
         private static final int DATA_SIZE_DIVIDER = 5;
-        private long dataSizeSinceLastSnapshot = 0;
+        private long dataSizeSinceLastSnapshot = 0L;
+
 
         public ReplicatedLogImpl(Snapshot snapshot) {
             super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
@@ -887,9 +809,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         long dataSizeForCheck = dataSize;
 
                         dataSizeSinceLastSnapshot += logEntrySize;
-                        long journalSize = lastIndex() + 1;
 
-                        if(!hasFollowers()) {
+                        if (!hasFollowers()) {
                             // When we do not have followers we do not maintain an in-memory log
                             // due to this the journalSize will never become anything close to the
                             // snapshot batch count. In fact will mostly be 1.
@@ -903,51 +824,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             // as if we were maintaining a real snapshot
                             dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
                         }
-
+                        long journalSize = replicatedLogEntry.getIndex() + 1;
                         long dataThreshold = getTotalMemory() *
-                                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
-                        // when a snaphsot is being taken, captureSnapshot != null
-                        if (!context.isSnapshotCaptureInitiated() &&
-                                ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
-                                        dataSizeForCheck > dataThreshold)) {
-
-                            dataSizeSinceLastSnapshot = 0;
+                                context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
 
-                            LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
-                                " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+                        if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+                                || dataSizeForCheck > dataThreshold)) {
 
-                            long lastAppliedIndex = -1;
-                            long lastAppliedTerm = -1;
+                            boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+                                    currentBehavior.getReplicatedToAllIndex());
 
-                            ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
-                            if (!hasFollowers()) {
-                                lastAppliedIndex = replicatedLogEntry.getIndex();
-                                lastAppliedTerm = replicatedLogEntry.getTerm();
-                            } else if (lastAppliedEntry != null) {
-                                lastAppliedIndex = lastAppliedEntry.getIndex();
-                                lastAppliedTerm = lastAppliedEntry.getTerm();
+                            if(started){
+                                dataSizeSinceLastSnapshot = 0;
                             }
 
-                            if(LOG.isDebugEnabled()) {
-                                LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
-                                LOG.debug("{}: Snapshot Capture lastApplied:{} ",
-                                        persistenceId(), context.getLastApplied());
-                                LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
-                                        lastAppliedIndex);
-                                LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
-                                        lastAppliedTerm);
-                            }
-
-                            // send a CaptureSnapshot to self to make the expensive operation async.
-                            long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
-                            ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-                            getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
-                                (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
-                                (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
-                                null);
-                            context.setSnapshotCaptureInitiated(true);
                         }
+
                         if (callback != null){
                             callback.apply(replicatedLogEntry);
                         }
@@ -1051,7 +943,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         @Override
         public void saveSnapshot(Object o) {
             // Make saving Snapshot successful
-            commitSnapshot(-1L);
+            // Committing the snapshot here would end up calling commit in the creating state which would
+            // be a state violation. That's why now we send a message to commit the snapshot.
+            self().tell(COMMIT_SNAPSHOT, self());
+        }
+    }
+
+
+    private class CreateSnapshotProcedure implements Procedure<Void> {
+
+        @Override
+        public void apply(Void aVoid) throws Exception {
+            createSnapshot();
         }
     }
 
index 9d391a1..2e7eb5e 100644 (file)
@@ -166,8 +166,6 @@ public interface RaftActorContext {
      */
     ConfigParams getConfigParams();
 
-    void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated);
-
-    boolean isSnapshotCaptureInitiated();
+    SnapshotManager getSnapshotManager();
 
 }
index 6fc5e43..eb059d6 100644 (file)
@@ -41,6 +41,10 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private boolean snapshotCaptureInitiated;
 
+    // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
+    // be passed to it in the constructor
+    private SnapshotManager snapshotManager;
+
     public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
         String id,
         ElectionTerm termInformation, long commitIndex,
@@ -134,16 +138,6 @@ public class RaftActorContextImpl implements RaftActorContext {
         return configParams;
     }
 
-    @Override
-    public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
-        this.snapshotCaptureInitiated = snapshotCaptureInitiated;
-    }
-
-    @Override
-    public boolean isSnapshotCaptureInitiated() {
-        return snapshotCaptureInitiated;
-    }
-
     @Override public void addToPeers(String name, String address) {
         peerAddresses.put(name, address);
     }
@@ -166,4 +160,11 @@ public class RaftActorContextImpl implements RaftActorContext {
 
         peerAddresses.put(peerId, peerAddress);
     }
+
+    public SnapshotManager getSnapshotManager() {
+        if(snapshotManager == null){
+            snapshotManager = new SnapshotManager(this, LOG);
+        }
+        return snapshotManager;
+    }
 }
index cb38e82..432d678 100644 (file)
@@ -47,13 +47,13 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
-        currentState.captureToInstall(lastLogEntry, replicatedToAllIndex);
+    public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+        return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower);
     }
 
     @Override
-    public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
-        currentState.capture(lastLogEntry, replicatedToAllIndex);
+    public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        return currentState.capture(lastLogEntry, replicatedToAllIndex);
     }
 
     @Override
@@ -62,8 +62,9 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
-        currentState.persist(persistenceProvider, snapshotBytes, currentBehavior);
+    public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+                        RaftActorBehavior currentBehavior, long totalMemory) {
+        currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory);
     }
 
     @Override
@@ -77,8 +78,8 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public long trimLog(long desiredTrimIndex) {
-        return currentState.trimLog(desiredTrimIndex);
+    public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+        return currentState.trimLog(desiredTrimIndex, currentBehavior);
     }
 
     private boolean hasFollowers(){
@@ -97,13 +98,15 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
             LOG.debug("capture should not be called in state {}", this);
+            return false;
         }
 
         @Override
-        public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+        public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
             LOG.debug("captureToInstall should not be called in state {}", this);
+            return false;
         }
 
         @Override
@@ -112,7 +115,8 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior) {
+        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
+                            RaftActorBehavior currentBehavior, long totalMemory) {
             LOG.debug("persist should not be called in state {}", this);
         }
 
@@ -127,17 +131,22 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex) {
+        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
             LOG.debug("trimLog should not be called in state {}", this);
             return -1;
         }
 
-        protected long doTrimLog(long desiredTrimIndex){
+        protected long doTrimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior){
             //  we would want to keep the lastApplied as its used while capturing snapshots
             long lastApplied = context.getLastApplied();
             long tempMin = Math.min(desiredTrimIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
 
-            if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
+            if(LOG.isTraceEnabled()) {
+                LOG.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
+                        persistenceId(), desiredTrimIndex, lastApplied, tempMin);
+            }
+
+            if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
                 LOG.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
                         context.getTermInformation().getCurrentTerm());
 
@@ -146,15 +155,21 @@ public class SnapshotManager implements SnapshotState {
                 context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
                 context.getReplicatedLog().snapshotCommit();
                 return tempMin;
+            } else if(tempMin > currentBehavior.getReplicatedToAllIndex()) {
+                // It's possible a follower was lagging and an install snapshot advanced its match index past
+                // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
+                // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
+                // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
+                // trim the log to the last applied index even if previous entries weren't replicated to all followers.
+                currentBehavior.setReplicatedToAllIndex(tempMin);
             }
-
             return -1;
         }
     }
 
     private class Idle extends AbstractSnapshotState {
 
-        private void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, boolean toInstall) {
+        private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
             TermInformationReader lastAppliedTermInfoReader =
                     lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
                             lastLogEntry, hasFollowers());
@@ -171,24 +186,30 @@ public class SnapshotManager implements SnapshotState {
             // send a CaptureSnapshot to self to make the expensive operation async.
             captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
                     lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
-                    newReplicatedToAllIndex, newReplicatedToAllTerm, toInstall);
+                    newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
 
             SnapshotManager.this.currentState = CAPTURING;
 
-            LOG.info("{}: Initiating snapshot capture {}: {}", persistenceId(), toInstall ? "to install" : "",
-                    captureSnapshot);
+            if(targetFollower != null){
+                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
+            } else {
+                LOG.info("{}: Initiating snapshot capture {} to install on {}",
+                        persistenceId(), captureSnapshot, targetFollower);
+            }
 
             context.getActor().tell(captureSnapshot, context.getActor());
+
+            return true;
         }
 
         @Override
-        public void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
-            capture(lastLogEntry, replicatedToAllIndex, false);
+        public boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
+            return capture(lastLogEntry, replicatedToAllIndex, null);
         }
 
         @Override
-        public void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
-            capture(lastLogEntry, replicatedToAllIndex, true);
+        public boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
+            return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
         }
 
         @Override
@@ -197,8 +218,8 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public long trimLog(long desiredTrimIndex) {
-            return doTrimLog(desiredTrimIndex);
+        public long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior) {
+            return doTrimLog(desiredTrimIndex, currentBehavior);
         }
     }
 
@@ -235,7 +256,7 @@ public class SnapshotManager implements SnapshotState {
 
         @Override
         public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
-                            RaftActorBehavior currentBehavior) {
+                            RaftActorBehavior currentBehavior, long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
@@ -248,16 +269,28 @@ public class SnapshotManager implements SnapshotState {
 
             LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
-            long dataThreshold = Runtime.getRuntime().totalMemory() *
+            long dataThreshold = totalMemory *
                     context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
             if (context.getReplicatedLog().dataSize() > dataThreshold) {
+
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
+                            persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
+                            captureSnapshot.getLastAppliedIndex());
+                }
+
                 // if memory is less, clear the log based on lastApplied.
                 // this could/should only happen if one of the followers is down
                 // as normally we keep removing from the log when its replicated to all.
                 context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
                         captureSnapshot.getLastAppliedTerm());
 
-                currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+                // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
+                // install snapshot to a follower.
+                if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
+                    currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+                }
+
             } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
                 // clear the log based on replicatedToAllIndex
                 context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
@@ -350,7 +383,7 @@ public class SnapshotManager implements SnapshotState {
             } else if (entry != null) {
                 index = entry.getIndex();
                 term = entry.getTerm();
-            } else if(originalIndex == log.getSnapshotIndex()){
+            } else if(log.getSnapshotIndex() > -1){
                 index = log.getSnapshotIndex();
                 term = log.getSnapshotTerm();
             }
index 2ff30ec..9a9bf1c 100644 (file)
@@ -24,16 +24,21 @@ public interface SnapshotState {
      *
      * @param lastLogEntry the last entry in the replicated log
      * @param replicatedToAllIndex the current replicatedToAllIndex
+     *
+     * @return true if capture was started
      */
-    void capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+    boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
 
     /**
      * Initiate capture snapshot for the purposing of installing that snapshot
      *
      * @param lastLogEntry
      * @param replicatedToAllIndex
+     * @param targetFollower
+     *
+     * @return true if capture was started
      */
-    void captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex);
+    boolean captureToInstall(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower);
 
     /**
      * Create the snapshot
@@ -48,8 +53,10 @@ public interface SnapshotState {
      * @param persistenceProvider
      * @param snapshotBytes
      * @param currentBehavior
+     * @param totalMemory
      */
-    void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior);
+    void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes, RaftActorBehavior currentBehavior
+            ,long totalMemory);
 
     /**
      * Commit the snapshot by trimming the log
@@ -70,5 +77,5 @@ public interface SnapshotState {
      * @param desiredTrimIndex
      * @return the actual trim index
      */
-    long trimLog(long desiredTrimIndex);
+    long trimLog(long desiredTrimIndex, RaftActorBehavior currentBehavior);
 }
index a63c62f..2c433f9 100644 (file)
@@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -235,7 +234,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(context.getCommitIndex());
         }
 
-        if (!context.isSnapshotCaptureInitiated()) {
+        if (!context.getSnapshotManager().isCapturing()) {
             purgeInMemoryLog();
         }
 
@@ -388,7 +387,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerToSnapshot.markSendStatus(false);
             }
 
-            if (wasLastChunk && !context.isSnapshotCaptureInitiated()) {
+            if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
                 // Since the follower is now caught up try to purge the log.
                 purgeInMemoryLog();
             } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
@@ -491,7 +490,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         sendAppendEntries = true;
                     }
                 } else if (isFollowerActive && followerNextIndex >= 0 &&
-                    leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
+                    leaderLastIndex > followerNextIndex && !context.getSnapshotManager().isCapturing()) {
                     // if the followers next index is not present in the leaders log, and
                     // if the follower is just not starting and if leader's index is more than followers index
                     // then snapshot should be sent
@@ -562,37 +561,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 final ActorSelection followerActor = context.getPeerActorSelection(followerId);
                 sendSnapshotChunk(followerActor, followerId);
 
-            } else if (!context.isSnapshotCaptureInitiated()) {
 
-                ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
-                long lastAppliedIndex = -1;
-                long lastAppliedTerm = -1;
-
-                if (lastAppliedEntry != null) {
-                    lastAppliedIndex = lastAppliedEntry.getIndex();
-                    lastAppliedTerm = lastAppliedEntry.getTerm();
-                } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
-                    lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
-                    lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
-                }
-
-                boolean isInstallSnapshotInitiated = true;
-                long replicatedToAllIndex = super.getReplicatedToAllIndex();
-                ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-
-                CaptureSnapshot captureSnapshot = new CaptureSnapshot(
-                        lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
-                        (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
-                        (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
-                        isInstallSnapshotInitiated);
-
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("{}: Initiating install snapshot to follower {}: {}", logName(), followerId,
-                            captureSnapshot);
-                }
-
-                actor().tell(captureSnapshot, actor());
-                context.setSnapshotCaptureInitiated(true);
+            } else {
+                context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+                        this.getReplicatedToAllIndex(), followerId);
             }
         }
     }
index a1bcf85..2a3653e 100644 (file)
@@ -460,31 +460,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param snapshotCapturedIndex
      */
     protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
-        //  we would want to keep the lastApplied as its used while capturing snapshots
-        long lastApplied = context.getLastApplied();
-        long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
+        long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex, this);
 
-        if(LOG.isTraceEnabled()) {
-            LOG.trace("{}: performSnapshotWithoutCapture: snapshotCapturedIndex: {}, lastApplied: {}, tempMin: {}",
-                    logName, snapshotCapturedIndex, lastApplied, tempMin);
-        }
-
-        if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
-            LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin,
-                    context.getTermInformation().getCurrentTerm());
-
-            //use the term of the temp-min, since we check for isPresent, entry will not be null
-            ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
-            context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
-            context.getReplicatedLog().snapshotCommit();
-            setReplicatedToAllIndex(tempMin);
-        } else if(tempMin > getReplicatedToAllIndex()) {
-            // It's possible a follower was lagging and an install snapshot advanced its match index past
-            // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
-            // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
-            // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
-            // trim the log to the last applied index even if previous entries weren't replicated to all followers.
-            setReplicatedToAllIndex(tempMin);
+        if(actualIndex != -1){
+            setReplicatedToAllIndex(actualIndex);
         }
     }
 
index bdd459e..6a29a34 100644 (file)
@@ -255,7 +255,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         sender.tell(reply, actor());
 
-        if (!context.isSnapshotCaptureInitiated()) {
+        if (!context.getSnapshotManager().isCapturing()) {
             super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
         }
 
index 1cc7b5f..53cca23 100644 (file)
@@ -35,6 +35,7 @@ public class MockRaftActorContext implements RaftActorContext {
     private Map<String, String> peerAddresses = new HashMap<>();
     private ConfigParams configParams;
     private boolean snapshotCaptureInitiated;
+    private SnapshotManager snapshotManager;
 
     public MockRaftActorContext(){
         electionTerm = new ElectionTerm() {
@@ -191,13 +192,11 @@ public class MockRaftActorContext implements RaftActorContext {
     }
 
     @Override
-    public void setSnapshotCaptureInitiated(boolean snapshotCaptureInitiated) {
-        this.snapshotCaptureInitiated = snapshotCaptureInitiated;
-    }
-
-    @Override
-    public boolean isSnapshotCaptureInitiated() {
-        return snapshotCaptureInitiated;
+    public SnapshotManager getSnapshotManager() {
+        if(this.snapshotManager == null){
+            this.snapshotManager = new SnapshotManager(this, getLogger());
+        }
+        return this.snapshotManager;
     }
 
     public void setConfigParams(ConfigParams configParams) {
index 34932c7..b93b739 100644 (file)
@@ -60,7 +60,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntrie
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
@@ -738,10 +737,12 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("C"),
                         new MockRaftActorContext.MockPayload("D")));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
-
                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
 
+                raftActorContext.getSnapshotManager().capture(
+                        new MockRaftActorContext.MockReplicatedLogEntry(1, -1,
+                                new MockRaftActorContext.MockPayload("D")), -1);
+
                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
@@ -770,12 +771,13 @@ public class RaftActorTest extends AbstractActorTest {
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
                 mockRaftActor.waitForInitializeBehaviorComplete();
+                MockRaftActorContext.MockReplicatedLogEntry lastEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class));
 
                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 3, mock(Payload.class)));
-                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 4, mock(Payload.class)));
+                mockRaftActor.getReplicatedLog().append(lastEntry);
 
                 ByteString snapshotBytes = fromObject(Arrays.asList(
                         new MockRaftActorContext.MockPayload("A"),
@@ -787,7 +789,8 @@ public class RaftActorTest extends AbstractActorTest {
                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
 
                 long replicatedToAllIndex = 1;
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
+
+                mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
 
                 verify(mockRaftActor.delegate).createSnapshot();
 
@@ -929,7 +932,9 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
+                raftActorContext.getSnapshotManager().capture(
+                        new MockRaftActorContext.MockReplicatedLogEntry(1, 1,
+                                new MockRaftActorContext.MockPayload("D")), 1);
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 
@@ -1059,9 +1064,10 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
-                leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
+                leaderActor.getRaftActorContext().getSnapshotManager()
+                        .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
+                                new MockRaftActorContext.MockPayload("x")), 4);
 
-                leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
                 verify(leaderActor.delegate).createSnapshot();
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
@@ -1087,8 +1093,14 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-2"),
                         new MockRaftActorContext.MockPayload("foo-3"),
                         new MockRaftActorContext.MockPayload("foo-4")));
-                leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-                assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+
+                leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentProvider()
+                        , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
+
+                assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+
+                // The commit is needed to complete the snapshot creation process
+                leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
 
                 // capture snapshot reply should remove the snapshotted entries only
                 assertEquals(3, leaderActor.getReplicatedLog().size());
@@ -1151,9 +1163,10 @@ public class RaftActorTest extends AbstractActorTest {
                 assertEquals(6, followerActor.getReplicatedLog().size());
 
                 //snapshot on 4
-                followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
+                followerActor.getRaftActorContext().getSnapshotManager().capture(
+                        new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+                                new MockRaftActorContext.MockPayload("D")), 4);
 
-                followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
                 verify(followerActor.delegate).createSnapshot();
 
                 assertEquals(6, followerActor.getReplicatedLog().size());
@@ -1188,7 +1201,10 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-3"),
                         new MockRaftActorContext.MockPayload("foo-4")));
                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-                assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
+                assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
+
+                // The commit is needed to complete the snapshot creation process
+                followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentProvider(), -1);
 
                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
@@ -1286,7 +1302,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-3"),
                         new MockRaftActorContext.MockPayload("foo-4")));
                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-                assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+                assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
 
@@ -1370,7 +1386,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             // Trimming log in this scenario is a no-op
             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
-            assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+            assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
             assertEquals(-1, leader.getReplicatedToAllIndex());
 
         }};
@@ -1413,7 +1429,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             // Trimming log in this scenario is a no-op
             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
-            assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
+            assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
             assertEquals(3, leader.getReplicatedToAllIndex());
 
         }};
index 90272fe..3d75edb 100644 (file)
@@ -1,5 +1,6 @@
 package org.opendaylight.controller.cluster.raft;
 
+import static junit.framework.TestCase.assertFalse;
 import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
@@ -7,6 +8,7 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
@@ -21,7 +23,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -90,7 +91,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         // Force capturing toInstall = true
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
-                new MockRaftActorContext.MockPayload()), 0);
+                new MockRaftActorContext.MockPayload()), 0, "follower-1");
 
         assertEquals(true, snapshotManager.isCapturing());
 
@@ -112,9 +113,11 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCapture(){
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+        boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
                 new MockRaftActorContext.MockPayload()), 9);
 
+        assertTrue(capture);
+
         assertEquals(true, snapshotManager.isCapturing());
 
         CaptureSnapshot captureSnapshot = MessageCollectorActor.expectFirstMatching(actorRef, CaptureSnapshot.class);
@@ -136,17 +139,21 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testIllegalCapture() throws Exception {
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+        boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
                 new MockRaftActorContext.MockPayload()), 9);
 
+        assertTrue(capture);
+
         List<CaptureSnapshot> allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
 
         assertEquals(1, allMatching.size());
 
         // This will not cause snapshot capture to start again
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+        capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
                 new MockRaftActorContext.MockPayload()), 9);
 
+        assertFalse(capture);
+
         allMatching = MessageCollectorActor.getAllMatching(actorRef, CaptureSnapshot.class);
 
         assertEquals(1, allMatching.size());
@@ -165,7 +172,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.create(mockProcedure);
 
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
-        snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior);
+        snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
 
         ArgumentCaptor<Snapshot> snapshotArgumentCaptor = ArgumentCaptor.forClass(Snapshot.class);
         verify(mockDataPersistenceProvider).saveSnapshot(snapshotArgumentCaptor.capture());
@@ -225,9 +233,10 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         verify(mockProcedure, times(1)).apply(null);
 
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
 
-        Mockito.reset(mockProcedure);
+        reset(mockProcedure);
 
         snapshotManager.create(mockProcedure);
 
@@ -249,7 +258,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         snapshotManager.create(mockProcedure);
 
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -269,13 +279,12 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         snapshotManager.create(mockProcedure);
 
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
         verify(mockReplicatedLog).snapshotPreCommit(9L, 6L);
-
-        verify(mockRaftActorBehavior).setReplicatedToAllIndex(-1);
     }
 
     @Test
@@ -283,14 +292,17 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(Integer.MAX_VALUE).when(mockReplicatedLog).dataSize();
 
         // when replicatedToAllIndex = -1
-        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1);
+        boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
+
+        assertTrue(capture);
 
         snapshotManager.create(mockProcedure);
 
         byte[] bytes = new byte[] {1,2,3,4,5,6,7,8,9,10};
 
-        snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior);
+        snapshotManager.persist(mockDataPersistenceProvider, bytes, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -308,7 +320,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testCallingPersistWithoutCaptureWillDoNothing(){
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider, never()).saveSnapshot(any(Snapshot.class));
 
@@ -322,13 +335,15 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1);
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
         snapshotManager.create(mockProcedure);
 
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
 
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
 
         verify(mockDataPersistenceProvider).saveSnapshot(any(Snapshot.class));
 
@@ -341,11 +356,12 @@ public class SnapshotManagerTest extends AbstractActorTest {
     public void testCommit(){
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1);
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
         snapshotManager.create(mockProcedure);
 
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
 
         snapshotManager.commit(mockDataPersistenceProvider, 100L);
 
@@ -366,7 +382,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     public void testCommitBeforePersist(){
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1);
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
         snapshotManager.commit(mockDataPersistenceProvider, 100L);
 
@@ -394,11 +410,12 @@ public class SnapshotManagerTest extends AbstractActorTest {
     public void testCallingCommitMultipleTimesCausesNoHarm(){
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1);
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
         snapshotManager.create(mockProcedure);
 
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
 
         snapshotManager.commit(mockDataPersistenceProvider, 100L);
 
@@ -415,11 +432,12 @@ public class SnapshotManagerTest extends AbstractActorTest {
     public void testRollback(){
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1);
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
         snapshotManager.create(mockProcedure);
 
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
 
         snapshotManager.rollback();
 
@@ -431,7 +449,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
     public void testRollbackBeforePersist(){
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1);
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
         snapshotManager.rollback();
 
@@ -449,11 +467,12 @@ public class SnapshotManagerTest extends AbstractActorTest {
     public void testCallingRollbackMultipleTimesCausesNoHarm(){
         // when replicatedToAllIndex = -1
         snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(6, 9,
-                new MockRaftActorContext.MockPayload()), -1);
+                new MockRaftActorContext.MockPayload()), -1, "follower-1");
 
         snapshotManager.create(mockProcedure);
 
-        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior);
+        snapshotManager.persist(mockDataPersistenceProvider, new byte[]{}, mockRaftActorBehavior
+                , Runtime.getRuntime().totalMemory());
 
         snapshotManager.rollback();
 
@@ -473,7 +492,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
         doReturn(5L).when(replicatedLogEntry).getTerm();
 
-        snapshotManager.trimLog(10);
+        snapshotManager.trimLog(10, mockRaftActorBehavior);
 
         verify(mockReplicatedLog).snapshotPreCommit(10, 5);
         verify(mockReplicatedLog).snapshotCommit();
@@ -481,9 +500,11 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testTrimLogAfterCapture(){
-        snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+        boolean capture = snapshotManager.capture(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
                 new MockRaftActorContext.MockPayload()), 9);
 
+        assertTrue(capture);
+
         assertEquals(true, snapshotManager.isCapturing());
 
         ElectionTerm mockElectionTerm = mock(ElectionTerm.class);
@@ -495,7 +516,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
         doReturn(5L).when(replicatedLogEntry).getTerm();
 
-        snapshotManager.trimLog(10);
+        snapshotManager.trimLog(10, mockRaftActorBehavior);
 
         verify(mockReplicatedLog, never()).snapshotPreCommit(anyLong(), anyLong());
         verify(mockReplicatedLog, never()).snapshotCommit();
@@ -504,8 +525,10 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
     @Test
     public void testTrimLogAfterCaptureToInstall(){
-        snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
-                new MockRaftActorContext.MockPayload()), 9);
+        boolean capture = snapshotManager.captureToInstall(new MockRaftActorContext.MockReplicatedLogEntry(1,9,
+                new MockRaftActorContext.MockPayload()), 9, "follower-1");
+
+        assertTrue(capture);
 
         assertEquals(true, snapshotManager.isCapturing());
 
@@ -518,7 +541,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
         doReturn(replicatedLogEntry).when((mockReplicatedLog)).get(10);
         doReturn(5L).when(replicatedLogEntry).getTerm();
 
-        snapshotManager.trimLog(10);
+        snapshotManager.trimLog(10, mockRaftActorBehavior);
 
         verify(mockReplicatedLog, never()).snapshotPreCommit(10, 5);
         verify(mockReplicatedLog, never()).snapshotCommit();
index 383ebef..1dda279 100644 (file)
@@ -524,6 +524,8 @@ public class LeaderTest extends AbstractLeaderTest {
                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
                         new MockRaftActorContext.MockPayload("D"));
 
+        actorContext.getReplicatedLog().append(entry);
+
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
index adc7f47..aef2582 100644 (file)
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -69,13 +70,13 @@ import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListene
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -99,6 +100,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -1421,31 +1423,44 @@ public class ShardTest extends AbstractShardTest {
 
         dataStoreContextBuilder.persistent(persistent);
 
+
+
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
-            Creator<Shard> creator = new Creator<Shard>() {
-                @Override
-                public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            newDatastoreContext(), SCHEMA_CONTEXT) {
 
-                        DelegatingPersistentDataProvider delegating;
+            class TestShard extends Shard {
 
-                        @Override
-                        protected DataPersistenceProvider persistence() {
-                            if(delegating == null) {
-                                delegating = new DelegatingPersistentDataProvider(super.persistence());
-                            }
+                protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
+                                    DatastoreContext datastoreContext, SchemaContext schemaContext) {
+                    super(name, peerAddresses, datastoreContext, schemaContext);
+                }
 
-                            return delegating;
-                        }
+                DelegatingPersistentDataProvider delegating;
 
-                        @Override
-                        protected void commitSnapshot(final long sequenceNumber) {
-                            super.commitSnapshot(sequenceNumber);
-                            latch.get().countDown();
-                        }
-                    };
+                protected DataPersistenceProvider persistence() {
+                    if(delegating == null) {
+                        delegating = new DelegatingPersistentDataProvider(super.persistence());
+                    }
+                    return delegating;
+                }
+
+                @Override
+                protected void commitSnapshot(final long sequenceNumber) {
+                    super.commitSnapshot(sequenceNumber);
+                    latch.get().countDown();
+                }
+
+                @Override
+                public RaftActorContext getRaftActorContext() {
+                    return super.getRaftActorContext();
+                }
+            }
+
+            Creator<Shard> creator = new Creator<Shard>() {
+                @Override
+                public Shard create() throws Exception {
+                    return new TestShard(shardID, Collections.<String,String>emptyMap(),
+                            newDatastoreContext(), SCHEMA_CONTEXT);
                 }
             };
 
@@ -1458,8 +1473,9 @@ public class ShardTest extends AbstractShardTest {
 
             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
 
-            CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
-            shard.tell(capture, getRef());
+            // Trigger creation of a snapshot by ensuring
+            RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
+            raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
@@ -1471,7 +1487,7 @@ public class ShardTest extends AbstractShardTest {
             latch.set(new CountDownLatch(1));
             savedSnapshot.set(null);
 
-            shard.tell(capture, getRef());
+            raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));