Merge "Bug-2692:Avoid fake snapshot during initiate snapshot"
authorTom Pantelis <tpanteli@brocade.com>
Tue, 10 Feb 2015 18:22:52 +0000 (18:22 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 10 Feb 2015 18:22:52 +0000 (18:22 +0000)
1  2 
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index afd68847ed55be67238bae830db9983aac53b78c,3336cbd9bec1bbc9feaac98610dd8bea85a85a6f..8f33d94700bc4a87231c44ab66758bd7c28bf049
@@@ -232,8 -232,6 +232,8 @@@ public abstract class AbstractLeader ex
              purgeInMemoryLog();
          }
  
 +        //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
 +        sendUpdatesToFollower(followerId, followerLogInformation, false);
          return this;
      }
  
          followerLogInformation.markFollowerActive();
  
          if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
 +            boolean wasLastChunk = false;
              if (reply.isSuccess()) {
                  if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
                      //this was the last chunk reply
                          // we can remove snapshot from the memory
                          setSnapshot(Optional.<ByteString>absent());
                      }
 +                    wasLastChunk = true;
  
                  } else {
                      followerToSnapshot.markSendStatus(true);
  
                  followerToSnapshot.markSendStatus(false);
              }
 +
 +            if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
 +                ActorSelection followerActor = context.getPeerActorSelection(followerId);
 +                if(followerActor != null) {
 +                    sendSnapshotChunk(followerActor, followerId);
 +                }
 +            }
 +
          } else {
              LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
                      context.getId(), reply.getChunkIndex(), followerId,
  
      private void sendAppendEntries() {
          // Send an AppendEntries to all followers
 -
 +        long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis();
          for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
              final String followerId = e.getKey();
 -            ActorSelection followerActor = context.getPeerActorSelection(followerId);
 -
 -            if (followerActor != null) {
 -                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
 -                long followerNextIndex = followerLogInformation.getNextIndex();
 -                boolean isFollowerActive = followerLogInformation.isFollowerActive();
 -
 -                FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
 -                if (followerToSnapshot != null) {
 -                    // if install snapshot is in process , then sent next chunk if possible
 -                    if (isFollowerActive && followerToSnapshot.canSendNextChunk()) {
 -                        sendSnapshotChunk(followerActor, followerId);
 -                    } else {
 -                        // we send a heartbeat even if we have not received a reply for the last chunk
 -                        sendAppendEntriesToFollower(followerActor, followerNextIndex,
 -                            Collections.<ReplicatedLogEntry>emptyList(), followerId);
 -                    }
 +            final FollowerLogInformation followerLogInformation = e.getValue();
 +            // This checks helps not to send a repeat message to the follower
 +            if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) {
 +                sendUpdatesToFollower(followerId, followerLogInformation, true);
 +            }
 +        }
 +    }
  
 -                } else {
 -                    long leaderLastIndex = context.getReplicatedLog().lastIndex();
 -                    long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
 -                    final List<ReplicatedLogEntry> entries;
 -
 -                    LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
 -                            context.getId(), leaderLastIndex, leaderSnapShotIndex);
 -
 -                    if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
 -                        LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(),
 -                                followerNextIndex, followerId);
 -
 -                        // FIXME : Sending one entry at a time
 -                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
 -
 -                    } else if (isFollowerActive && followerNextIndex >= 0 &&
 -                        leaderLastIndex >= followerNextIndex ) {
 -                        // if the followers next index is not present in the leaders log, and
 -                        // if the follower is just not starting and if leader's index is more than followers index
 -                        // then snapshot should be sent
 -
 -                        if(LOG.isDebugEnabled()) {
 -                            LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
 -                                    "follower-nextIndex: %s, leader-snapshot-index: %s,  " +
 -                                    "leader-last-index: %s", context.getId(), followerId,
 -                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
 -                        }
 -                        actor().tell(new InitiateInstallSnapshot(), actor());
 -
 -                        // we would want to sent AE as the capture snapshot might take time
 -                        entries =  Collections.<ReplicatedLogEntry>emptyList();
 -
 -                    } else {
 -                        //we send an AppendEntries, even if the follower is inactive
 -                        // in-order to update the followers timestamp, in case it becomes active again
 -                        entries =  Collections.<ReplicatedLogEntry>emptyList();
 -                    }
 +    /**
 +     *
 +     * This method checks if any update needs to be sent to the given follower. This includes append log entries,
 +     * sending next snapshot chunk, and initiating a snapshot.
 +     * @return true if any update is sent, false otherwise
 +     */
 +
 +    private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
 +                                          boolean sendHeartbeat) {
 +
 +        ActorSelection followerActor = context.getPeerActorSelection(followerId);
 +        if (followerActor != null) {
 +            long followerNextIndex = followerLogInformation.getNextIndex();
 +            boolean isFollowerActive = followerLogInformation.isFollowerActive();
 +
 +            if (mapFollowerToSnapshot.get(followerId) != null) {
 +                // if install snapshot is in process , then sent next chunk if possible
 +                if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
 +                    sendSnapshotChunk(followerActor, followerId);
 +                } else if(sendHeartbeat) {
 +                    // we send a heartbeat even if we have not received a reply for the last chunk
 +                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
 +                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
 +                }
 +            } else {
 +                long leaderLastIndex = context.getReplicatedLog().lastIndex();
 +                long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
 +                if (isFollowerActive &&
 +                    context.getReplicatedLog().isPresent(followerNextIndex)) {
 +                    // FIXME : Sending one entry at a time
 +                    final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
  
                      sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
 +
 +                } else if (isFollowerActive && followerNextIndex >= 0 &&
 +                    leaderLastIndex >= followerNextIndex) {
 +                    // if the followers next index is not present in the leaders log, and
 +                    // if the follower is just not starting and if leader's index is more than followers index
 +                    // then snapshot should be sent
 +
 +                    if (LOG.isDebugEnabled()) {
 +                        LOG.debug("InitiateInstallSnapshot to follower:{}," +
 +                                "follower-nextIndex:{}, leader-snapshot-index:{},  " +
 +                                "leader-last-index:{}", followerId,
 +                            followerNextIndex, leaderSnapShotIndex, leaderLastIndex
 +                        );
 +                    }
 +                    actor().tell(new InitiateInstallSnapshot(), actor());
 +
 +                    // Send heartbeat to follower whenever install snapshot is initiated.
 +                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
 +                            Collections.<ReplicatedLogEntry>emptyList(), followerId);
 +
 +                } else if(sendHeartbeat) {
 +                    //we send an AppendEntries, even if the follower is inactive
 +                    // in-order to update the followers timestamp, in case it becomes active again
 +                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
 +                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
                  }
 +
              }
          }
      }
                          // no need to capture snapshot
                          sendSnapshotChunk(followerActor, e.getKey());
  
-                     } else {
+                     } else if (!context.isSnapshotCaptureInitiated()) {
                          initiateCaptureSnapshot();
                          //we just need 1 follower who would need snapshot to be installed.
                          // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
          actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
                  lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
              actor());
+         context.setSnapshotCaptureInitiated(true);
      }
  
  
                          context.getReplicatedLog().getSnapshotIndex(),
                          context.getReplicatedLog().getSnapshotTerm(),
                          nextSnapshotChunk,
-                         followerToSnapshot.incrementChunkIndex(),
-                         followerToSnapshot.getTotalChunks(),
+                             followerToSnapshot.incrementChunkIndex(),
+                             followerToSnapshot.getTotalChunks(),
                          Optional.of(followerToSnapshot.getLastChunkHashCode())
                      ).toSerializable(),
                      actor()
index 95ec0a6f2fb5fa126be7ae7f1ca819911c97cad2,8b47e8e2f3c091eefea9c1c06c3c2d061da59a88..666cea69ec891754687b262db72f26d9012fbb0c
@@@ -1,8 -1,5 +1,5 @@@
  package org.opendaylight.controller.cluster.raft.behaviors;
  
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.assertNotNull;
- import static org.junit.Assert.assertTrue;
  import akka.actor.ActorRef;
  import akka.actor.PoisonPill;
  import akka.actor.Props;
@@@ -46,6 -43,10 +43,10 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
  import scala.concurrent.duration.FiniteDuration;
  
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertNotNull;
+ import static org.junit.Assert.assertTrue;
  public class LeaderTest extends AbstractRaftActorBehaviorTest {
  
      private final ActorRef leaderActor =
@@@ -86,9 -87,6 +87,9 @@@
                      actorContext.setPeerAddresses(peerAddresses);
  
                      Leader leader = new Leader(actorContext);
 +                    leader.markFollowerActive(followerActor.path().toString());
 +                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
 +                        TimeUnit.MILLISECONDS);
                      leader.handleMessage(senderActor, new SendHeartBeat());
  
                      final String out =
                      actorContext.setPeerAddresses(peerAddresses);
  
                      Leader leader = new Leader(actorContext);
 +                    leader.markFollowerActive(followerActor.path().toString());
 +                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
 +                        TimeUnit.MILLISECONDS);
                      RaftActorBehavior raftBehavior = leader
                          .handleMessage(senderActor, new Replicate(null, null,
                              new MockRaftActorContext.MockReplicatedLogEntry(1,
              leader.getFollowerToSnapshot().getNextChunk();
              leader.getFollowerToSnapshot().incrementChunkIndex();
  
 +            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
 +                TimeUnit.MILLISECONDS);
 +
              leader.handleMessage(leaderActor, new SendHeartBeat());
  
              AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
              //update follower timestamp
              leader.markFollowerActive(followerActor.path().toString());
  
 +            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
 +                TimeUnit.MILLISECONDS);
 +
              // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
              RaftActorBehavior raftBehavior = leader.handleMessage(
                  senderActor, new Replicate(null, "state-id", entry));
              assertEquals(1, cs.getLastAppliedTerm());
              assertEquals(4, cs.getLastIndex());
              assertEquals(2, cs.getLastTerm());
+             // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+             raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
+             List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
+             assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
          }};
      }
  
              assertEquals(snapshotIndex + 1, fli.getNextIndex());
          }};
      }
 +    @Test
 +    public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
 +        new JavaTestKit(getSystem()) {{
 +
 +            TestActorRef<MessageCollectorActor> followerActor =
 +                TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
 +
 +            Map<String, String> peerAddresses = new HashMap<>();
 +            peerAddresses.put("follower-reply",
 +                followerActor.path().toString());
 +
 +            final int followersLastIndex = 2;
 +            final int snapshotIndex = 3;
 +            final int snapshotTerm = 1;
 +            final int currentTerm = 2;
 +
 +            MockRaftActorContext actorContext =
 +                (MockRaftActorContext) createActorContext();
 +            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
 +                @Override
 +                public int getSnapshotChunkSize() {
 +                    return 50;
 +                }
 +            };
 +            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
 +            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 +
 +            actorContext.setConfigParams(configParams);
 +            actorContext.setPeerAddresses(peerAddresses);
 +            actorContext.setCommitIndex(followersLastIndex);
 +
 +            MockLeader leader = new MockLeader(actorContext);
 +
 +            Map<String, String> leadersSnapshot = new HashMap<>();
 +            leadersSnapshot.put("1", "A");
 +            leadersSnapshot.put("2", "B");
 +            leadersSnapshot.put("3", "C");
 +
 +            // set the snapshot variables in replicatedlog
 +            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
 +            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
 +            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 +
 +            ByteString bs = toByteString(leadersSnapshot);
 +            leader.setSnapshot(Optional.of(bs));
 +
 +            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 +
 +            List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
 +                InstallSnapshotMessages.InstallSnapshot.class);
 +
 +            assertEquals(1, objectList.size());
 +
 +            Object o = objectList.get(0);
 +            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
 +
 +            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
 +
 +            assertEquals(1, installSnapshot.getChunkIndex());
 +            assertEquals(3, installSnapshot.getTotalChunks());
 +
 +            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
 +                "follower-reply", installSnapshot.getChunkIndex(), true));
 +
 +            objectList = MessageCollectorActor.getAllMatching(followerActor,
 +                InstallSnapshotMessages.InstallSnapshot.class);
 +
 +            assertEquals(2, objectList.size());
 +
 +            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
 +
 +            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
 +                "follower-reply", installSnapshot.getChunkIndex(), true));
 +
 +            objectList = MessageCollectorActor.getAllMatching(followerActor,
 +                InstallSnapshotMessages.InstallSnapshot.class);
 +
 +            assertEquals(3, objectList.size());
 +
 +            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
 +
 +            // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
 +            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
 +                "follower-reply", installSnapshot.getChunkIndex(), true));
 +
 +            objectList = MessageCollectorActor.getAllMatching(followerActor,
 +                InstallSnapshotMessages.InstallSnapshot.class);
 +
 +            // Count should still stay at 3
 +            assertEquals(3, objectList.size());
 +        }};
 +    }
 +
  
      @Test
 -    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
 +    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
          new JavaTestKit(getSystem()) {{
  
              TestActorRef<MessageCollectorActor> followerActor =
              assertEquals(3, installSnapshot.getTotalChunks());
  
  
 -            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
 +            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
 +                followerActor.path().toString(), -1, false));
 +
 +            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
 +                TimeUnit.MILLISECONDS);
  
              leader.handleMessage(leaderActor, new SendHeartBeat());
  
 -            o = MessageCollectorActor.getAllMessages(followerActor).get(1);
 +            o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
  
              assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
  
              {
  
                  TestActorRef<MessageCollectorActor> followerActor =
 -                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
 +                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
  
                  Map<String, String> peerAddresses = new HashMap<>();
                  peerAddresses.put(followerActor.path().toString(),
  
                  leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
  
 -                leader.handleMessage(leaderActor, new SendHeartBeat());
 -
                  Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
  
 +                leader.handleMessage(leaderActor, new SendHeartBeat());
 +
                  o = MessageCollectorActor.getAllMessages(followerActor).get(1);
  
                  assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
              Leader leader = new Leader(leaderActorContext);
              leader.markFollowerActive(followerActor.path().toString());
  
 +            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
 +                TimeUnit.MILLISECONDS);
 +
              leader.handleMessage(leaderActor, new SendHeartBeat());
  
              AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
              Leader leader = new Leader(leaderActorContext);
              leader.markFollowerActive(followerActor.path().toString());
  
 +            Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
 +
              leader.handleMessage(leaderActor, new SendHeartBeat());
  
              AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
          }};
      }
  
 +
 +    @Test
 +    public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
 +        new JavaTestKit(getSystem()) {{
 +
 +            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
 +
 +            MockRaftActorContext leaderActorContext =
 +                new MockRaftActorContext("leader", getSystem(), leaderActor);
 +
 +            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
 +            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
 +            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 +
 +            leaderActorContext.setConfigParams(configParams);
 +
 +            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
 +
 +            MockRaftActorContext followerActorContext =
 +                new MockRaftActorContext("follower-reply", getSystem(), followerActor);
 +
 +            followerActorContext.setConfigParams(configParams);
 +
 +            Follower follower = new Follower(followerActorContext);
 +
 +            ForwardMessageToBehaviorActor.setBehavior(follower);
 +
 +            Map<String, String> peerAddresses = new HashMap<>();
 +            peerAddresses.put("follower-reply",
 +                followerActor.path().toString());
 +
 +            leaderActorContext.setPeerAddresses(peerAddresses);
 +
 +            leaderActorContext.getReplicatedLog().removeFrom(0);
 +
 +            //create 3 entries
 +            leaderActorContext.setReplicatedLog(
 +                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 +
 +            leaderActorContext.setCommitIndex(1);
 +
 +            Leader leader = new Leader(leaderActorContext);
 +            leader.markFollowerActive("follower-reply");
 +
 +            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
 +                TimeUnit.MILLISECONDS);
 +
 +            leader.handleMessage(leaderActor, new SendHeartBeat());
 +
 +            AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
 +                .getFirstMatching(followerActor, AppendEntries.class);
 +
 +            assertNotNull(appendEntries);
 +
 +            assertEquals(1, appendEntries.getLeaderCommit());
 +            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
 +            assertEquals(0, appendEntries.getPrevLogIndex());
 +
 +            AppendEntriesReply appendEntriesReply =
 +                (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
 +
 +            assertNotNull(appendEntriesReply);
 +
 +            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
 +
 +            List<Object> entries = ForwardMessageToBehaviorActor
 +                .getAllMatching(followerActor, AppendEntries.class);
 +
 +            assertEquals("AppendEntries count should be 2 ", 2, entries.size());
 +
 +            AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
 +
 +            assertEquals(1, appendEntriesSecond.getLeaderCommit());
 +            assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
 +            assertEquals(1, appendEntriesSecond.getPrevLogIndex());
 +
 +        }};
 +    }
 +
      class MockLeader extends Leader {
  
          FollowerToSnapshot fts;