Bug 2187: AddServer unit test and bug fixes 63/28663/3
authorTom Pantelis <tpanteli@brocade.com>
Wed, 21 Oct 2015 09:02:17 +0000 (05:02 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 23 Oct 2015 17:26:51 +0000 (17:26 +0000)
Follow-up patch to https://git.opendaylight.org/gerrit/#/c/28018/.

Got the unit tests working and added more unit tests to cover more code.

Also fixed several bugs in the code that were failing the tests. One bug
was caused by replicating data quickly after install snapshot was
complete. On the final install snapshot chunk the follower sends an
ApplySnaphot message to persist and apply the snapshot. On the reply,
the leader assumes the follower is up-to-date and sets its next index.
However, applying the snapshot, ie updating the log and commit index, is
actually done after the async callback from the snapshot persist. In between
that time, if the leader sends the server config AppendEntries, the follower's
log is still empty and it deems itself out-of-sync and reports back failure.
This will cause the leader to eventually send a new install snaphot
which isn't which is not desirable. Also it may delay consensus for the
server config entry.

To fix this, I delayed the final InstallSnapshotReply until after the
ApplySnapshot is complete. I did this by adding a Callback to the
ApplySnapshot message which the SnapshotManager invokes.

Also the new server config was constructed without the leader's ID - it
needs to contain all members.

Also the ServerConfigurationPayload wasn't being applied in the
followers.

Another issue was that, if the leader had no peers initially, the
heartbeat wasn't scheduled so, when the new server was added, heartbeats
weren't occurring. So I change addFollower to schedule the heartbeat.

I added a test for adding a non-voting server which caused an endless
loop in AbstractLeader#handleAppendEntriesReply where it updates the
commitIndex based on the replicated count. To fix this, I added a break
if the replicatedLogEntry is null.

Change-Id: I5dff351140c611d58357cd58900bed401606038c
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
13 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java

index 1996a814dbfc2a6c1af1a224fcf4ed23083877fc..c5d81c18ccc15e719b337c35102e4e0f23f5cf2d 100644 (file)
@@ -192,14 +192,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void handleCommand(final Object message) {
 
     @Override
     public void handleCommand(final Object message) {
-        if (message instanceof ApplyState){
+        if(serverConfigurationSupport.handleMessage(message, this, getSender())) {
+            return;
+        } else if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
             ApplyState applyState = (ApplyState) message;
 
-            boolean result = serverConfigurationSupport.handleMessage(message, this, getSender());
-            if(result){
-                return;
-            }
-
             long elapsedTime = (System.nanoTime() - applyState.getStartTime());
             if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
                 LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
             long elapsedTime = (System.nanoTime() - applyState.getStartTime());
             if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
                 LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
@@ -244,8 +241,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             captureSnapshot();
         } else if(message instanceof SwitchBehavior){
             switchBehavior(((SwitchBehavior) message));
             captureSnapshot();
         } else if(message instanceof SwitchBehavior){
             switchBehavior(((SwitchBehavior) message));
-        } else if(!snapshotSupport.handleSnapshotMessage(message) &&
-                !serverConfigurationSupport.handleMessage(message, this, getSender())) {
+        } else if(!snapshotSupport.handleSnapshotMessage(message)) {
             switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
         }
     }
             switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
         }
     }
index 0c34158ca33c05dd751a66cb8eda8b4f252596cb..ae23140114ed7d2ca8e73584ecd997fefd437723 100644 (file)
@@ -7,26 +7,27 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
  */
 package org.opendaylight.controller.cluster.raft;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
 import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
-import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -59,40 +60,71 @@ class RaftActorServerConfigurationSupport {
             // snapshot installation is successful
             onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender);
             return true;
             // snapshot installation is successful
             onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender);
             return true;
-        } else if(message instanceof ApplyState){
-            ApplyState applyState = (ApplyState) message;
-            Payload data = applyState.getReplicatedLogEntry().getData();
-            if( data instanceof ServerConfigurationPayload){
-                 LOG.info("Server configuration : {} has been replicated to a majority of cluster servers succesfully",
-                                                                                    (ServerConfigurationPayload)data);
-                 // respond ok to follower
-                 respondToClient(raftActor, ServerChangeStatus.OK);
-                 return true;
-            }
-            return false;
+        } else if(message instanceof ApplyState) {
+            return onApplyState((ApplyState) message, raftActor);
         } else {
             return false;
         }
     }
 
         } else {
             return false;
         }
     }
 
+    private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
+        Payload data = applyState.getReplicatedLogEntry().getData();
+        if(data instanceof ServerConfigurationPayload) {
+            CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+            if(followerInfo != null && followerInfo.getContextId().equals(applyState.getIdentifier())) {
+                LOG.info("{} has been successfully replicated to a majority of followers", data);
+
+                // respond ok to follower
+                respondToClient(raftActor, ServerChangeStatus.OK);
+            }
+
+            return true;
+        }
+
+        return false;
+    }
+
     private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
     private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
-        LOG.debug("onAddServer: {}", addServer);
+        LOG.debug("{}: onAddServer: {}", context.getId(), addServer);
         if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
             return;
         }
 
         CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender);
         if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
             return;
         }
 
         CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender);
-        boolean process = !followerInfoQueue.isEmpty();
+        boolean process = followerInfoQueue.isEmpty();
         followerInfoQueue.add(followerInfo);
         if(process) {
             processAddServer(raftActor);
         }
     }
 
         followerInfoQueue.add(followerInfo);
         if(process) {
             processAddServer(raftActor);
         }
     }
 
+    /**
+     * The algorithm for AddServer is as follows:
+     * <ul>
+     * <li>Add the new server as a peer.</li>
+     * <li>Add the new follower to the leader.</li>
+     * <li>If new server should be voting member</li>
+     * <ul>
+     *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
+     *     <li>Initiate install snapshot to the new follower.</li>
+     *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
+     * </ul>
+     * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
+     * <li>On replication consensus, respond to caller with OK.</li>
+     * </ul>
+     * If the install snapshot times out after a period of 2 * election time out
+     * <ul>
+     *     <li>Remove the new server as a peer.</li>
+     *     <li>Remove the new follower from the leader.</li>
+     *     <li>Respond to caller with TIMEOUT.</li>
+     * </ul>
+     */
     private void processAddServer(RaftActor raftActor){
     private void processAddServer(RaftActor raftActor){
-        LOG.debug("In processAddServer");
+        LOG.debug("{}: In processAddServer", context.getId());
+
         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
-        AddServer addSrv = followerInfoQueue.peek().getAddServer();
+        CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+        AddServer addSrv = followerInfo.getAddServer();
         context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress());
 
         // if voting member - initialize to VOTING_NOT_INITIALIZED
         context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress());
 
         // if voting member - initialize to VOTING_NOT_INITIALIZED
@@ -100,24 +132,6 @@ class RaftActorServerConfigurationSupport {
             FollowerState.NON_VOTING;
         leader.addFollower(addSrv.getNewServerId(), initialState);
 
             FollowerState.NON_VOTING;
         leader.addFollower(addSrv.getNewServerId(), initialState);
 
-        // TODO
-        // if initialState == FollowerState.VOTING_NOT_INITIALIZED
-        //     Initiate snapshot via leader.initiateCaptureSnapshot(addServer.getNewServerId())
-        //     Start a timer to abort the operation after a period of time (maybe 2 times election timeout)
-        //     Set local instance state and wait for message from the AbstractLeader when install snapshot
-        //     is done and return now
-        //     When install snapshot message is received, go to step 1
-        // else
-        //     go to step 2
-        //
-        // 1) tell AbstractLeader mark the follower as VOTING and recalculate minReplicationCount and
-        //        minIsolatedLeaderPeerCount
-        // 2) persist and replicate ServerConfigurationPayload via
-        //           raftActor.persistData(sender, uuid, newServerConfigurationPayload)
-        // 3) Wait for commit complete via ApplyState message in RaftActor or time it out. In RaftActor,
-        //       on ApplyState, check if ReplicatedLogEntry payload is ServerConfigurationPayload and call
-        //       this class.
-        //
         if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
             LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId());
             leader.initiateCaptureSnapshot(addSrv.getNewServerId());
         if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
             LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId());
             leader.initiateCaptureSnapshot(addSrv.getNewServerId());
@@ -129,8 +143,7 @@ class RaftActorServerConfigurationSupport {
                 context.getActorSystem().dispatcher(), context.getActor());
         } else {
             LOG.debug("Directly persisting  the new server configuration : {}", addSrv.getNewServerId());
                 context.getActorSystem().dispatcher(), context.getActor());
         } else {
             LOG.debug("Directly persisting  the new server configuration : {}", addSrv.getNewServerId());
-            persistNewServerConfiguration(raftActor, followerInfoQueue.peek().getClientRequestor(),
-                                                                                 addSrv.getNewServerId());
+            persistNewServerConfiguration(raftActor, followerInfo);
         }
     }
 
         }
     }
 
@@ -153,6 +166,11 @@ class RaftActorServerConfigurationSupport {
 
     private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply,
                                                  RaftActor raftActor, ActorRef sender){
 
     private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply,
                                                  RaftActor raftActor, ActorRef sender){
+        CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+        // Sanity check - it's possible we get a reply after it timed out.
+        if(followerInfo == null) {
+            return;
+        }
 
         String followerId = reply.getFollowerId();
         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
 
         String followerId = reply.getFollowerId();
         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
@@ -160,27 +178,19 @@ class RaftActorServerConfigurationSupport {
         stopFollowerTimer();
         followerLogInformation.setFollowerState(FollowerState.VOTING);
         leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
         stopFollowerTimer();
         followerLogInformation.setFollowerState(FollowerState.VOTING);
         leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
-        persistNewServerConfiguration(raftActor, sender, followerId);
+
+        persistNewServerConfiguration(raftActor, followerInfo);
     }
 
     }
 
-    private void persistNewServerConfiguration(RaftActor raftActor, ActorRef sender, String followerId){
-        /* get old server configuration list */
-        Map<String, String> tempMap =  context.getPeerAddresses();
-        List<String> cOld = new ArrayList<String>();
-        for (Map.Entry<String, String> entry : tempMap.entrySet()) {
-            if(!entry.getKey().equals(followerId)){
-                cOld.add(entry.getKey());
-            }
-        }
-        LOG.debug("Cold server configuration : {}",  cOld.toString());
-        /* get new server configuration list */
-        List <String> cNew = new ArrayList<String>(cOld);
-        cNew.add(followerId);
-        LOG.debug("Cnew server configuration : {}",  cNew.toString());
-        // construct the peer list
-        ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cOld, cNew);
-        /* TODO - persist new configuration - CHECK WHETHER USING getId below is correct */
-        raftActor.persistData(sender, context.getId(), servPayload);
+    private void persistNewServerConfiguration(RaftActor raftActor, CatchupFollowerInfo followerInfo){
+        List <String> cNew = new ArrayList<String>(context.getPeerAddresses().keySet());
+        cNew.add(context.getId());
+
+        LOG.debug("New server configuration : {}",  cNew.toString());
+
+        ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cNew, Collections.<String>emptyList());
+
+        raftActor.persistData(followerInfo.getClientRequestor(), followerInfo.getContextId(), servPayload);
    }
 
    private void stopFollowerTimer() {
    }
 
    private void stopFollowerTimer() {
@@ -190,22 +200,19 @@ class RaftActorServerConfigurationSupport {
    }
 
    private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){
    }
 
    private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){
-
         LOG.debug("onFollowerCatchupTimeout: {}",  serverId);
         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
         // cleanup
         context.removePeer(serverId);
         leader.removeFollower(serverId);
         LOG.debug("onFollowerCatchupTimeout: {}",  serverId);
         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
         // cleanup
         context.removePeer(serverId);
         leader.removeFollower(serverId);
-        LOG.warn("onFollowerCatchupTimeout - Timeout occured for server - {} while installing snapshot", serverId);
+        LOG.warn("Timeout occured for new server {} while installing snapshot", serverId);
         respondToClient(raftActor,ServerChangeStatus.TIMEOUT);
    }
 
    private void respondToClient(RaftActor raftActor, ServerChangeStatus result){
         respondToClient(raftActor,ServerChangeStatus.TIMEOUT);
    }
 
    private void respondToClient(RaftActor raftActor, ServerChangeStatus result){
-
-        int size = followerInfoQueue.size();
-
         // remove the entry from the queue
         CatchupFollowerInfo fInfo = followerInfoQueue.remove();
         // remove the entry from the queue
         CatchupFollowerInfo fInfo = followerInfoQueue.remove();
+
         // get the sender
         ActorRef toClient = fInfo.getClientRequestor();
 
         // get the sender
         ActorRef toClient = fInfo.getClientRequestor();
 
@@ -216,19 +223,27 @@ class RaftActorServerConfigurationSupport {
         }
    }
 
         }
    }
 
-    // mantain sender actorRef
-    private class CatchupFollowerInfo {
+    // maintain sender actorRef
+    private static class CatchupFollowerInfo {
         private final AddServer addServer;
         private final ActorRef clientRequestor;
         private final AddServer addServer;
         private final ActorRef clientRequestor;
+        private final String contextId;
 
         CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){
             addServer = addSrv;
             clientRequestor = cliReq;
 
         CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){
             addServer = addSrv;
             clientRequestor = cliReq;
+            contextId = UUID.randomUUID().toString();
         }
         }
-        public AddServer getAddServer(){
+
+        String getContextId() {
+            return contextId;
+        }
+
+        AddServer getAddServer(){
             return addServer;
         }
             return addServer;
         }
-        public ActorRef getClientRequestor(){
+
+        ActorRef getClientRequestor(){
             return clientRequestor;
         }
     }
             return clientRequestor;
         }
     }
index 2db595d743b727fdde09a66d57f8791c0545ac2f..bf0fc10aad944ae8539c8b3b2a4da70020500944 100644 (file)
@@ -55,7 +55,7 @@ class RaftActorSnapshotMessageSupport {
 
     boolean handleSnapshotMessage(Object message) {
         if(message instanceof ApplySnapshot ) {
 
     boolean handleSnapshotMessage(Object message) {
         if(message instanceof ApplySnapshot ) {
-            onApplySnapshot(((ApplySnapshot) message).getSnapshot());
+            onApplySnapshot((ApplySnapshot) message);
             return true;
         } else if (message instanceof SaveSnapshotSuccess) {
             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
             return true;
         } else if (message instanceof SaveSnapshotSuccess) {
             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
@@ -95,10 +95,10 @@ class RaftActorSnapshotMessageSupport {
         context.getSnapshotManager().commit(sequenceNumber, currentBehavior);
     }
 
         context.getSnapshotManager().commit(sequenceNumber, currentBehavior);
     }
 
-    private void onApplySnapshot(Snapshot snapshot) {
+    private void onApplySnapshot(ApplySnapshot message) {
         log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(),
         log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(),
-                snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm());
+                message.getSnapshot().getLastAppliedIndex(), message.getSnapshot().getLastAppliedTerm());
 
 
-        context.getSnapshotManager().apply(snapshot);
+        context.getSnapshotManager().apply(message);
     }
 }
     }
 }
index 32cb458c2076a3a56cd6bf86df12759f1133f766..db1f193cba360fd389a683d435a9fa8a6ff61698 100644 (file)
@@ -79,4 +79,10 @@ public class ServerConfigurationPayload extends Payload implements Serializable
     public Payload decode(AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) {
         return null;
     }
     public Payload decode(AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) {
         return null;
     }
+
+    @Override
+    public String toString() {
+        return "ServerConfigurationPayload [newServerConfig=" + newServerConfig + ", oldServerConfig="
+                + oldServerConfig + "]";
+    }
 }
 }
index c553a397f6ba8b6531d4e85a8e8332f87697ef44..8e0d2f820b722a1b91742e67b5a438467af35ec3 100644 (file)
@@ -12,6 +12,7 @@ import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -37,7 +38,7 @@ public class SnapshotManager implements SnapshotState {
 
     private Procedure<Void> createSnapshotProcedure;
 
 
     private Procedure<Void> createSnapshotProcedure;
 
-    private Snapshot applySnapshot;
+    private ApplySnapshot applySnapshot;
     private Procedure<byte[]> applySnapshotProcedure;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
     private Procedure<byte[]> applySnapshotProcedure;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
@@ -45,6 +46,10 @@ public class SnapshotManager implements SnapshotState {
         this.LOG = logger;
     }
 
         this.LOG = logger;
     }
 
+    public boolean isApplying() {
+        return applySnapshot != null;
+    }
+
     @Override
     public boolean isCapturing() {
         return currentState.isCapturing();
     @Override
     public boolean isCapturing() {
         return currentState.isCapturing();
@@ -61,7 +66,7 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
     }
 
     @Override
-    public void apply(Snapshot snapshot) {
+    public void apply(ApplySnapshot snapshot) {
         currentState.apply(snapshot);
     }
 
         currentState.apply(snapshot);
     }
 
@@ -130,7 +135,7 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
         }
 
         @Override
-        public void apply(Snapshot snapshot) {
+        public void apply(ApplySnapshot snapshot) {
             LOG.debug("apply should not be called in state {}", this);
         }
 
             LOG.debug("apply should not be called in state {}", this);
         }
 
@@ -260,14 +265,14 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
         }
 
         @Override
-        public void apply(Snapshot snapshot) {
-            applySnapshot = snapshot;
+        public void apply(ApplySnapshot applySnapshot) {
+            SnapshotManager.this.applySnapshot = applySnapshot;
 
             lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
 
             LOG.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber);
 
 
             lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
 
             LOG.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber);
 
-            context.getPersistenceProvider().saveSnapshot(snapshot);
+            context.getPersistenceProvider().saveSnapshot(applySnapshot.getSnapshot());
 
             SnapshotManager.this.currentState = PERSISTING;
         }
 
             SnapshotManager.this.currentState = PERSISTING;
         }
@@ -374,16 +379,19 @@ public class SnapshotManager implements SnapshotState {
 
         @Override
         public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
 
         @Override
         public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
-            LOG.debug("Snapshot success sequence number:", sequenceNumber);
+            LOG.debug("Snapshot success sequence number: {}", sequenceNumber);
 
             if(applySnapshot != null) {
                 try {
 
             if(applySnapshot != null) {
                 try {
-                    applySnapshotProcedure.apply(applySnapshot.getState());
+                    Snapshot snapshot = applySnapshot.getSnapshot();
+                    applySnapshotProcedure.apply(snapshot.getState());
 
                     //clears the followers log, sets the snapshot index to ensure adjusted-index works
 
                     //clears the followers log, sets the snapshot index to ensure adjusted-index works
-                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(applySnapshot, context, currentBehavior));
-                    context.setLastApplied(applySnapshot.getLastAppliedIndex());
-                    context.setCommitIndex(applySnapshot.getLastAppliedIndex());
+                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
+                    context.setLastApplied(snapshot.getLastAppliedIndex());
+                    context.setCommitIndex(snapshot.getLastAppliedIndex());
+
+                    applySnapshot.getCallback().onSuccess();
                 } catch (Exception e) {
                     LOG.error("Error applying snapshot", e);
                 }
                 } catch (Exception e) {
                     LOG.error("Error applying snapshot", e);
                 }
@@ -412,6 +420,8 @@ public class SnapshotManager implements SnapshotState {
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
                         context.getReplicatedLog().size());
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
                         context.getReplicatedLog().size());
+            } else {
+                applySnapshot.getCallback().onFailure();
             }
 
             lastSequenceNumber = -1;
             }
 
             lastSequenceNumber = -1;
index 46e0c87fc21d6ca6bb9a5a49e4ec9aa8538cc969..f5a175f3891de08defa6a0a7cefccf816fbf8213 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft;
 
 
 package org.opendaylight.controller.cluster.raft;
 
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
 public interface SnapshotState {
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
 public interface SnapshotState {
@@ -42,7 +43,7 @@ public interface SnapshotState {
      *
      * @param snapshot the Snapshot to apply.
      */
      *
      * @param snapshot the Snapshot to apply.
      */
-    void apply(Snapshot snapshot);
+    void apply(ApplySnapshot snapshot);
 
     /**
      * Persist the snapshot
 
     /**
      * Persist the snapshot
index 54ee02a057ec1805d70e1a15cc264ab3eb0ea956..3c43d809b9c55ed5456850e0b875a1bb2df433a1 100644 (file)
@@ -8,22 +8,49 @@
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 
 import org.opendaylight.controller.cluster.raft.Snapshot;
 
-import java.io.Serializable;
-
 /**
  * Internal message, issued by follower to its actor
  */
 /**
  * Internal message, issued by follower to its actor
  */
-public class ApplySnapshot implements Serializable {
-    private static final long serialVersionUID = 1L;
+public class ApplySnapshot {
     private final Snapshot snapshot;
     private final Snapshot snapshot;
+    private final Callback callback;
 
     public ApplySnapshot(Snapshot snapshot) {
 
     public ApplySnapshot(Snapshot snapshot) {
-        this.snapshot = snapshot;
+        this(snapshot, NOOP_CALLBACK);
+    }
+
+    public ApplySnapshot(@Nonnull Snapshot snapshot, @Nonnull Callback callback) {
+        this.snapshot = Preconditions.checkNotNull(snapshot);
+        this.callback = Preconditions.checkNotNull(callback);
     }
 
     }
 
+    @Nonnull
     public Snapshot getSnapshot() {
         return snapshot;
     }
     public Snapshot getSnapshot() {
         return snapshot;
     }
+
+    @Nonnull
+    public Callback getCallback() {
+        return callback;
+    }
+
+    public interface Callback {
+        void onSuccess();
+
+        void onFailure();
+    }
+
+    public static Callback NOOP_CALLBACK = new Callback() {
+        @Override
+        public void onSuccess() {
+        }
+
+        @Override
+        public void onFailure() {
+        }
+    };
 }
 }
index 605a5c21a4a7ecfab4a006d43cacebed088ce00b..3c5ad0428fd90ba59e3aabfea701a46f8a7af6cb 100644 (file)
@@ -135,6 +135,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context);
         followerLogInformation.setFollowerState(followerState);
         followerToLog.put(followerId, followerLogInformation);
         FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context);
         followerLogInformation.setFollowerState(followerState);
         followerToLog.put(followerId, followerLogInformation);
+
+        if(heartbeatSchedule == null) {
+            scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+        }
     }
 
     public void removeFollower(String followerId) {
     }
 
     public void removeFollower(String followerId) {
@@ -254,9 +258,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             if (replicatedCount >= minReplicationCount) {
                 ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
 
             if (replicatedCount >= minReplicationCount) {
                 ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
-                if (replicatedLogEntry != null &&
-                    replicatedLogEntry.getTerm() == currentTerm()) {
+                if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) {
                     context.setCommitIndex(N);
                     context.setCommitIndex(N);
+                } else {
+                    break;
                 }
             } else {
                 break;
                 }
             } else {
                 break;
index 7e6175d9332277c2e828c84812d8d5de754f2f08..787bd74629b0c643400e49deb32a40468c52e50a 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -490,4 +491,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return context.getId();
     }
 
         return context.getId();
     }
 
+    public void applyServerConfiguration(ServerConfigurationPayload serverConfig) {
+        for(String peerId: context.getPeerAddresses().keySet()) {
+            context.removePeer(peerId);
+        }
+
+        for(String peerId: serverConfig.getNewServerConfig()) {
+            if(!getId().equals(peerId)) {
+                context.addToPeers(peerId, null);
+            }
+        }
+    }
 }
 }
index d516f9ccb3431442244f0b19762274554ca27c60..8f4114753714a3c70648a794d32c0f9031ac387e 100644 (file)
@@ -14,6 +14,7 @@ import java.util.ArrayList;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -103,7 +104,7 @@ public class Follower extends AbstractRaftActorBehavior {
         // to make it easier to read. Before refactoring ensure tests
         // cover the code properly
 
         // to make it easier to read. Before refactoring ensure tests
         // cover the code properly
 
-        if (snapshotTracker != null) {
+        if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
             // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
             AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
                     lastIndex(), lastTerm(), context.getPayloadVersion());
             // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
             AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
                     lastIndex(), lastTerm(), context.getPayloadVersion());
@@ -189,6 +190,10 @@ public class Follower extends AbstractRaftActorBehavior {
                 LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
 
                 context.getReplicatedLog().appendAndPersist(entry);
                 LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
 
                 context.getReplicatedLog().appendAndPersist(entry);
+
+                if(entry.getData() instanceof ServerConfigurationPayload) {
+                    applyServerConfiguration((ServerConfigurationPayload)entry.getData());
+                }
             }
 
             LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
             }
 
             LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
@@ -335,7 +340,7 @@ public class Follower extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
         return super.handleMessage(sender, message);
     }
 
-    private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+    private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
 
         LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
                     logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
 
         LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
                     logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
@@ -348,6 +353,9 @@ public class Follower extends AbstractRaftActorBehavior {
         updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
 
         try {
         updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
 
         try {
+            final InstallSnapshotReply reply = new InstallSnapshotReply(
+                    currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+
             if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
                     installSnapshot.getLastChunkHashCode())){
                 Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
             if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
                     installSnapshot.getLastChunkHashCode())){
                 Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
@@ -359,19 +367,28 @@ public class Follower extends AbstractRaftActorBehavior {
                         context.getTermInformation().getCurrentTerm(),
                         context.getTermInformation().getVotedFor());
 
                         context.getTermInformation().getCurrentTerm(),
                         context.getTermInformation().getVotedFor());
 
-                actor().tell(new ApplySnapshot(snapshot), actor());
+                ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
+                    @Override
+                    public void onSuccess() {
+                        LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
 
 
-                snapshotTracker = null;
-
-            }
+                        sender.tell(reply, actor());
+                    }
 
 
-            InstallSnapshotReply reply = new InstallSnapshotReply(
-                    currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+                    @Override
+                    public void onFailure() {
+                        sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), -1, false), actor());
+                    }
+                };
 
 
-            LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+                actor().tell(new ApplySnapshot(snapshot, applySnapshotCallback), actor());
 
 
-            sender.tell(reply, actor());
+                snapshotTracker = null;
+            } else {
+                LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
 
 
+                sender.tell(reply, actor());
+            }
         } catch (SnapshotTracker.InvalidChunkException e) {
             LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
 
         } catch (SnapshotTracker.InvalidChunkException e) {
             LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
 
index 538681e8df1eff6ba0c061fbd31f1bcae0390b61..5c9ba13b8beda3e439df5dce505f2d96c747e1be 100644 (file)
@@ -7,7 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
  */
 package org.opendaylight.controller.cluster.raft;
 
-//import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertEquals;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
 import akka.actor.ActorRef;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
 import akka.actor.ActorRef;
@@ -19,7 +19,9 @@ import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.util.Collections;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 //import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.Map;
 //import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -28,20 +30,26 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
-//import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-//import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
-//import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+
 /**
  * Unit tests for RaftActorServerConfigurationSupport.
  *
 /**
  * Unit tests for RaftActorServerConfigurationSupport.
  *
@@ -60,23 +68,29 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
             actorFactory.generateActorId(FOLLOWER_ID));
 
             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
             actorFactory.generateActorId(FOLLOWER_ID));
 
-    private final TestActorRef<ForwardMessageToBehaviorActor> newServerActor = actorFactory.createTestActor(
-            Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
-            actorFactory.generateActorId(NEW_SERVER_ID));
+    private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
+    private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
 
 
-    private RaftActorContext newServerActorContext;
+    private RaftActorContext newFollowerActorContext;
     private final JavaTestKit testKit = new JavaTestKit(getSystem());
 
     @Before
     public void setup() {
     private final JavaTestKit testKit = new JavaTestKit(getSystem());
 
     @Before
     public void setup() {
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
+
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
-        newServerActorContext = new RaftActorContextImpl(newServerActor, newServerActor.underlyingActor().getContext(),
-                NEW_SERVER_ID, new ElectionTermImpl(NO_PERSISTENCE, NEW_SERVER_ID, LOG), -1, -1,
-                Maps.<String, String>newHashMap(), configParams, NO_PERSISTENCE, LOG);
-        newServerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        newFollowerCollectorActor = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
+        newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
+                configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(NEW_SERVER_ID));
+        newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
     }
 
     @After
     }
 
     @After
@@ -85,19 +99,16 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     }
 
     @Test
     }
 
     @Test
-    public void testAddServerWithFollower() throws Exception {
+    public void testAddServerWithExistingFollower() throws Exception {
         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
                 0, 3, 1).build());
         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
                 0, 3, 1).build());
-        followerActorContext.setCommitIndex(3);
-        followerActorContext.setLastApplied(3);
+        followerActorContext.setCommitIndex(2);
+        followerActorContext.setLastApplied(2);
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
 
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
 
-        Follower newServer = new Follower(newServerActorContext);
-        newServerActor.underlyingActor().setBehavior(newServer);
-
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -109,33 +120,173 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
 
 
         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
 
-        leaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+        // Leader should install snapshot - capture and verify ApplySnapshot contents
+
+        ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
+        List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+        assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+        assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+        // Verify ServerConfigurationPayload entry in leader's log
+
+        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+        assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
+        assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
+
+        // Verify ServerConfigurationPayload entry in both followers
+
+        assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
+        verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
+
+        assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
+        verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
+
+        // Verify new server config was applied in both followers
+
+        assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID),
+                followerActorContext.getPeerAddresses().keySet());
+
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
+                newFollowerActorContext.getPeerAddresses().keySet());
+
+        clearMessages(followerActor);
+        clearMessages(newFollowerCollectorActor);
+
+        expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
+        expectFirstMatching(followerActor, ApplyState.class);
+
+        assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
+        assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
+        assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
+        assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
+    }
+
+    @Test
+    public void testAddServerWithNoExistingFollower() throws Exception {
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+        initialActorContext.setCommitIndex(1);
+        initialActorContext.setLastApplied(1);
+        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
+                0, 2, 1).build());
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+        // Leader should install snapshot - capture and verify ApplySnapshot contents
+
+        ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
+        List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+        assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+        assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+        // Verify ServerConfigurationPayload entry in leader's log
+
+        assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
+        assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+
+        // Verify ServerConfigurationPayload entry in the new follower
+
+        clearMessages(newFollowerCollectorActor);
+
+        expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
+        assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
+        verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+
+        // Verify new server config was applied in the new follower
+
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
+                newFollowerActorContext.getPeerAddresses().keySet());
+    }
+
+    @Test
+    public void testAddServerAsNonVoting() throws Exception {
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+        initialActorContext.setCommitIndex(-1);
+        initialActorContext.setLastApplied(-1);
+        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+        assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+        // Verify ServerConfigurationPayload entry in leader's log
+
+        assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
+        assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+
+        // Verify ServerConfigurationPayload entry in the new follower
+
+        expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
+        assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
+        verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+
+        // Verify new server config was applied in the new follower
+
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
+                newFollowerActorContext.getPeerAddresses().keySet());
+
+        MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.SERIALIZABLE_CLASS, 500);
+    }
+
+    @Test
+    public void testAddServerWithInstallSnapshotTimeout() throws Exception {
+        newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
 
 
-        // leader should install snapshot - capture and verify ApplySnapshot contents
-        //ApplySnapshot applySnapshot = expectFirstMatching(followerActor, ApplySnapshot.class);
-        //List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
-        //assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+        initialActorContext.setCommitIndex(-1);
+        initialActorContext.setLastApplied(-1);
+        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
 
-        // leader should replicate new server config to both followers
-        //expectFirstMatching(followerActor, AppendEntries.class);
-        //expectFirstMatching(newServerActor, AppendEntries.class);
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
 
 
-        // verify ServerConfigurationPayload entry in leader's log
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
-        //assertEquals("Leader journal log size", 4, leaderActorContext.getReplicatedLog().size());
-        //assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
-        ReplicatedLogEntry logEntry = leaderActorContext.getReplicatedLog().get(
-                leaderActorContext.getReplicatedLog().lastIndex());
-        // verify logEntry contents
 
 
-        // Also verify ServerConfigurationPayload entry in both followers
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
 
-        //AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
-        //assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
-        //assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
+
+        assertEquals("Leader peers size", 0, leaderActorContext.getPeerAddresses().keySet().size());
+        assertEquals("Leader followers size", 0,
+                ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
     }
 
     }
 
-    //@Test
+    @Test
     public void testAddServerWithNoLeader() {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
     public void testAddServerWithNoLeader() {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
@@ -146,12 +297,12 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
                 actorFactory.generateActorId(LEADER_ID));
         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
 
                 actorFactory.generateActorId(LEADER_ID));
         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
 
-        noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
-        //AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
-        //assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
+        noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
     }
 
     }
 
-    //@Test
+    @Test
     public void testAddServerForwardedToLeader() {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
     public void testAddServerForwardedToLeader() {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
@@ -169,16 +320,25 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
                 -1, -1, (short)0), leaderActor);
 
         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
                 -1, -1, (short)0), leaderActor);
 
-        followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
-        //expectFirstMatching(leaderActor, AddServer.class);
+        followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+        expectFirstMatching(leaderActor, AddServer.class);
+    }
+
+    private void verifyServerConfigurationPayloadEntry(ReplicatedLog log, String... cNew) {
+        ReplicatedLogEntry logEntry = log.get(log.lastIndex());
+        assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
+        ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
+        assertEquals("getNewServerConfig", Sets.newHashSet(cNew), Sets.newHashSet(payload.getNewServerConfig()));
     }
 
     private RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
     }
 
     private RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
+        ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
+        termInfo.update(1, LEADER_ID);
         RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
         RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                id, new ElectionTermImpl(NO_PERSISTENCE, id, LOG), -1, -1,
+                id, termInfo, -1, -1,
                 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
 
         return followerActorContext;
                 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
 
         return followerActorContext;
@@ -198,6 +358,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
             context.setCommitIndex(fromContext.getCommitIndex());
             context.setLastApplied(fromContext.getLastApplied());
 
             context.setCommitIndex(fromContext.getCommitIndex());
             context.setLastApplied(fromContext.getLastApplied());
+            context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
+                    fromContext.getTermInformation().getVotedFor());
         }
 
         @Override
         }
 
         @Override
@@ -217,9 +379,37 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
 
         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-            configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-            configParams.setElectionTimeoutFactor(100000);
+            configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+            configParams.setElectionTimeoutFactor(1);
             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
         }
     }
             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
         }
     }
+
+    public static class MockNewFollowerRaftActor extends MockRaftActor {
+        private final TestActorRef<MessageCollectorActor> collectorActor;
+        private volatile Class<?> dropMessageOfType;
+
+        public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
+            super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null);
+            this.collectorActor = collectorActor;
+        }
+
+        void setDropMessageOfType(Class<?> dropMessageOfType) {
+            this.dropMessageOfType = dropMessageOfType;
+        }
+
+        @Override
+        public void handleCommand(Object message) {
+            if(dropMessageOfType != null && dropMessageOfType.equals(message.getClass())) {
+                return;
+            }
+
+            super.handleCommand(message);
+            collectorActor.tell(message, getSender());
+        }
+
+        static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
+            return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
+        }
+    }
 }
 }
index 7d41508c3f298fed6df2e0b141505cd391522d3f..d79a48357ab96e77c2393be49ad9ea3888fafaeb 100644 (file)
@@ -97,9 +97,10 @@ public class RaftActorSnapshotMessageSupportTest {
         Snapshot snapshot = Snapshot.create(snapshotBytes, Collections.<ReplicatedLogEntry>emptyList(),
                 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
 
         Snapshot snapshot = Snapshot.create(snapshotBytes, Collections.<ReplicatedLogEntry>emptyList(),
                 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
 
-        sendMessageToSupport(new ApplySnapshot(snapshot));
+        ApplySnapshot applySnapshot = new ApplySnapshot(snapshot);
+        sendMessageToSupport(applySnapshot);
 
 
-        verify(mockSnapshotManager).apply(snapshot);
+        verify(mockSnapshotManager).apply(applySnapshot);
     }
 
     @Test
     }
 
     @Test
index a06d086e99d4267fe62c3e32033701d69eeca460..51818d15b6475b02b4b58a82e570f9252069baa9 100644 (file)
@@ -785,6 +785,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
+        applySnapshot.getCallback().onSuccess();
 
         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
                 leaderActor, InstallSnapshotReply.class);
 
         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
                 leaderActor, InstallSnapshotReply.class);