Bug 2187: AddServer unit test and bug fixes 63/28663/3
authorTom Pantelis <tpanteli@brocade.com>
Wed, 21 Oct 2015 09:02:17 +0000 (05:02 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 23 Oct 2015 17:26:51 +0000 (17:26 +0000)
Follow-up patch to https://git.opendaylight.org/gerrit/#/c/28018/.

Got the unit tests working and added more unit tests to cover more code.

Also fixed several bugs in the code that were failing the tests. One bug
was caused by replicating data quickly after install snapshot was
complete. On the final install snapshot chunk the follower sends an
ApplySnaphot message to persist and apply the snapshot. On the reply,
the leader assumes the follower is up-to-date and sets its next index.
However, applying the snapshot, ie updating the log and commit index, is
actually done after the async callback from the snapshot persist. In between
that time, if the leader sends the server config AppendEntries, the follower's
log is still empty and it deems itself out-of-sync and reports back failure.
This will cause the leader to eventually send a new install snaphot
which isn't which is not desirable. Also it may delay consensus for the
server config entry.

To fix this, I delayed the final InstallSnapshotReply until after the
ApplySnapshot is complete. I did this by adding a Callback to the
ApplySnapshot message which the SnapshotManager invokes.

Also the new server config was constructed without the leader's ID - it
needs to contain all members.

Also the ServerConfigurationPayload wasn't being applied in the
followers.

Another issue was that, if the leader had no peers initially, the
heartbeat wasn't scheduled so, when the new server was added, heartbeats
weren't occurring. So I change addFollower to schedule the heartbeat.

I added a test for adding a non-voting server which caused an endless
loop in AbstractLeader#handleAppendEntriesReply where it updates the
commitIndex based on the replicated count. To fix this, I added a break
if the replicatedLogEntry is null.

Change-Id: I5dff351140c611d58357cd58900bed401606038c
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
13 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java

index 1996a81..c5d81c1 100644 (file)
@@ -192,14 +192,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void handleCommand(final Object message) {
-        if (message instanceof ApplyState){
+        if(serverConfigurationSupport.handleMessage(message, this, getSender())) {
+            return;
+        } else if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
-            boolean result = serverConfigurationSupport.handleMessage(message, this, getSender());
-            if(result){
-                return;
-            }
-
             long elapsedTime = (System.nanoTime() - applyState.getStartTime());
             if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
                 LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
@@ -244,8 +241,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             captureSnapshot();
         } else if(message instanceof SwitchBehavior){
             switchBehavior(((SwitchBehavior) message));
-        } else if(!snapshotSupport.handleSnapshotMessage(message) &&
-                !serverConfigurationSupport.handleMessage(message, this, getSender())) {
+        } else if(!snapshotSupport.handleSnapshotMessage(message)) {
             switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
         }
     }
index 0c34158..ae23140 100644 (file)
@@ -7,26 +7,27 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.Cancellable;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
-import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -59,40 +60,71 @@ class RaftActorServerConfigurationSupport {
             // snapshot installation is successful
             onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender);
             return true;
-        } else if(message instanceof ApplyState){
-            ApplyState applyState = (ApplyState) message;
-            Payload data = applyState.getReplicatedLogEntry().getData();
-            if( data instanceof ServerConfigurationPayload){
-                 LOG.info("Server configuration : {} has been replicated to a majority of cluster servers succesfully",
-                                                                                    (ServerConfigurationPayload)data);
-                 // respond ok to follower
-                 respondToClient(raftActor, ServerChangeStatus.OK);
-                 return true;
-            }
-            return false;
+        } else if(message instanceof ApplyState) {
+            return onApplyState((ApplyState) message, raftActor);
         } else {
             return false;
         }
     }
 
+    private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
+        Payload data = applyState.getReplicatedLogEntry().getData();
+        if(data instanceof ServerConfigurationPayload) {
+            CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+            if(followerInfo != null && followerInfo.getContextId().equals(applyState.getIdentifier())) {
+                LOG.info("{} has been successfully replicated to a majority of followers", data);
+
+                // respond ok to follower
+                respondToClient(raftActor, ServerChangeStatus.OK);
+            }
+
+            return true;
+        }
+
+        return false;
+    }
+
     private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
-        LOG.debug("onAddServer: {}", addServer);
+        LOG.debug("{}: onAddServer: {}", context.getId(), addServer);
         if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
             return;
         }
 
         CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender);
-        boolean process = !followerInfoQueue.isEmpty();
+        boolean process = followerInfoQueue.isEmpty();
         followerInfoQueue.add(followerInfo);
         if(process) {
             processAddServer(raftActor);
         }
     }
 
+    /**
+     * The algorithm for AddServer is as follows:
+     * <ul>
+     * <li>Add the new server as a peer.</li>
+     * <li>Add the new follower to the leader.</li>
+     * <li>If new server should be voting member</li>
+     * <ul>
+     *     <li>Initialize FollowerState to VOTING_NOT_INITIALIZED.</li>
+     *     <li>Initiate install snapshot to the new follower.</li>
+     *     <li>When install snapshot complete, mark the follower as VOTING and re-calculate majority vote count.</li>
+     * </ul>
+     * <li>Persist and replicate ServerConfigurationPayload with the new server list.</li>
+     * <li>On replication consensus, respond to caller with OK.</li>
+     * </ul>
+     * If the install snapshot times out after a period of 2 * election time out
+     * <ul>
+     *     <li>Remove the new server as a peer.</li>
+     *     <li>Remove the new follower from the leader.</li>
+     *     <li>Respond to caller with TIMEOUT.</li>
+     * </ul>
+     */
     private void processAddServer(RaftActor raftActor){
-        LOG.debug("In processAddServer");
+        LOG.debug("{}: In processAddServer", context.getId());
+
         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
-        AddServer addSrv = followerInfoQueue.peek().getAddServer();
+        CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+        AddServer addSrv = followerInfo.getAddServer();
         context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress());
 
         // if voting member - initialize to VOTING_NOT_INITIALIZED
@@ -100,24 +132,6 @@ class RaftActorServerConfigurationSupport {
             FollowerState.NON_VOTING;
         leader.addFollower(addSrv.getNewServerId(), initialState);
 
-        // TODO
-        // if initialState == FollowerState.VOTING_NOT_INITIALIZED
-        //     Initiate snapshot via leader.initiateCaptureSnapshot(addServer.getNewServerId())
-        //     Start a timer to abort the operation after a period of time (maybe 2 times election timeout)
-        //     Set local instance state and wait for message from the AbstractLeader when install snapshot
-        //     is done and return now
-        //     When install snapshot message is received, go to step 1
-        // else
-        //     go to step 2
-        //
-        // 1) tell AbstractLeader mark the follower as VOTING and recalculate minReplicationCount and
-        //        minIsolatedLeaderPeerCount
-        // 2) persist and replicate ServerConfigurationPayload via
-        //           raftActor.persistData(sender, uuid, newServerConfigurationPayload)
-        // 3) Wait for commit complete via ApplyState message in RaftActor or time it out. In RaftActor,
-        //       on ApplyState, check if ReplicatedLogEntry payload is ServerConfigurationPayload and call
-        //       this class.
-        //
         if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
             LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId());
             leader.initiateCaptureSnapshot(addSrv.getNewServerId());
@@ -129,8 +143,7 @@ class RaftActorServerConfigurationSupport {
                 context.getActorSystem().dispatcher(), context.getActor());
         } else {
             LOG.debug("Directly persisting  the new server configuration : {}", addSrv.getNewServerId());
-            persistNewServerConfiguration(raftActor, followerInfoQueue.peek().getClientRequestor(),
-                                                                                 addSrv.getNewServerId());
+            persistNewServerConfiguration(raftActor, followerInfo);
         }
     }
 
@@ -153,6 +166,11 @@ class RaftActorServerConfigurationSupport {
 
     private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply,
                                                  RaftActor raftActor, ActorRef sender){
+        CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
+        // Sanity check - it's possible we get a reply after it timed out.
+        if(followerInfo == null) {
+            return;
+        }
 
         String followerId = reply.getFollowerId();
         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
@@ -160,27 +178,19 @@ class RaftActorServerConfigurationSupport {
         stopFollowerTimer();
         followerLogInformation.setFollowerState(FollowerState.VOTING);
         leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
-        persistNewServerConfiguration(raftActor, sender, followerId);
+
+        persistNewServerConfiguration(raftActor, followerInfo);
     }
 
-    private void persistNewServerConfiguration(RaftActor raftActor, ActorRef sender, String followerId){
-        /* get old server configuration list */
-        Map<String, String> tempMap =  context.getPeerAddresses();
-        List<String> cOld = new ArrayList<String>();
-        for (Map.Entry<String, String> entry : tempMap.entrySet()) {
-            if(!entry.getKey().equals(followerId)){
-                cOld.add(entry.getKey());
-            }
-        }
-        LOG.debug("Cold server configuration : {}",  cOld.toString());
-        /* get new server configuration list */
-        List <String> cNew = new ArrayList<String>(cOld);
-        cNew.add(followerId);
-        LOG.debug("Cnew server configuration : {}",  cNew.toString());
-        // construct the peer list
-        ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cOld, cNew);
-        /* TODO - persist new configuration - CHECK WHETHER USING getId below is correct */
-        raftActor.persistData(sender, context.getId(), servPayload);
+    private void persistNewServerConfiguration(RaftActor raftActor, CatchupFollowerInfo followerInfo){
+        List <String> cNew = new ArrayList<String>(context.getPeerAddresses().keySet());
+        cNew.add(context.getId());
+
+        LOG.debug("New server configuration : {}",  cNew.toString());
+
+        ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cNew, Collections.<String>emptyList());
+
+        raftActor.persistData(followerInfo.getClientRequestor(), followerInfo.getContextId(), servPayload);
    }
 
    private void stopFollowerTimer() {
@@ -190,22 +200,19 @@ class RaftActorServerConfigurationSupport {
    }
 
    private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){
-
         LOG.debug("onFollowerCatchupTimeout: {}",  serverId);
         AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
         // cleanup
         context.removePeer(serverId);
         leader.removeFollower(serverId);
-        LOG.warn("onFollowerCatchupTimeout - Timeout occured for server - {} while installing snapshot", serverId);
+        LOG.warn("Timeout occured for new server {} while installing snapshot", serverId);
         respondToClient(raftActor,ServerChangeStatus.TIMEOUT);
    }
 
    private void respondToClient(RaftActor raftActor, ServerChangeStatus result){
-
-        int size = followerInfoQueue.size();
-
         // remove the entry from the queue
         CatchupFollowerInfo fInfo = followerInfoQueue.remove();
+
         // get the sender
         ActorRef toClient = fInfo.getClientRequestor();
 
@@ -216,19 +223,27 @@ class RaftActorServerConfigurationSupport {
         }
    }
 
-    // mantain sender actorRef
-    private class CatchupFollowerInfo {
+    // maintain sender actorRef
+    private static class CatchupFollowerInfo {
         private final AddServer addServer;
         private final ActorRef clientRequestor;
+        private final String contextId;
 
         CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){
             addServer = addSrv;
             clientRequestor = cliReq;
+            contextId = UUID.randomUUID().toString();
         }
-        public AddServer getAddServer(){
+
+        String getContextId() {
+            return contextId;
+        }
+
+        AddServer getAddServer(){
             return addServer;
         }
-        public ActorRef getClientRequestor(){
+
+        ActorRef getClientRequestor(){
             return clientRequestor;
         }
     }
index 2db595d..bf0fc10 100644 (file)
@@ -55,7 +55,7 @@ class RaftActorSnapshotMessageSupport {
 
     boolean handleSnapshotMessage(Object message) {
         if(message instanceof ApplySnapshot ) {
-            onApplySnapshot(((ApplySnapshot) message).getSnapshot());
+            onApplySnapshot((ApplySnapshot) message);
             return true;
         } else if (message instanceof SaveSnapshotSuccess) {
             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
@@ -95,10 +95,10 @@ class RaftActorSnapshotMessageSupport {
         context.getSnapshotManager().commit(sequenceNumber, currentBehavior);
     }
 
-    private void onApplySnapshot(Snapshot snapshot) {
+    private void onApplySnapshot(ApplySnapshot message) {
         log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(),
-                snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm());
+                message.getSnapshot().getLastAppliedIndex(), message.getSnapshot().getLastAppliedTerm());
 
-        context.getSnapshotManager().apply(snapshot);
+        context.getSnapshotManager().apply(message);
     }
 }
index 32cb458..db1f193 100644 (file)
@@ -79,4 +79,10 @@ public class ServerConfigurationPayload extends Payload implements Serializable
     public Payload decode(AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) {
         return null;
     }
+
+    @Override
+    public String toString() {
+        return "ServerConfigurationPayload [newServerConfig=" + newServerConfig + ", oldServerConfig="
+                + oldServerConfig + "]";
+    }
 }
index c553a39..8e0d2f8 100644 (file)
@@ -12,6 +12,7 @@ import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -37,7 +38,7 @@ public class SnapshotManager implements SnapshotState {
 
     private Procedure<Void> createSnapshotProcedure;
 
-    private Snapshot applySnapshot;
+    private ApplySnapshot applySnapshot;
     private Procedure<byte[]> applySnapshotProcedure;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
@@ -45,6 +46,10 @@ public class SnapshotManager implements SnapshotState {
         this.LOG = logger;
     }
 
+    public boolean isApplying() {
+        return applySnapshot != null;
+    }
+
     @Override
     public boolean isCapturing() {
         return currentState.isCapturing();
@@ -61,7 +66,7 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void apply(Snapshot snapshot) {
+    public void apply(ApplySnapshot snapshot) {
         currentState.apply(snapshot);
     }
 
@@ -130,7 +135,7 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void apply(Snapshot snapshot) {
+        public void apply(ApplySnapshot snapshot) {
             LOG.debug("apply should not be called in state {}", this);
         }
 
@@ -260,14 +265,14 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void apply(Snapshot snapshot) {
-            applySnapshot = snapshot;
+        public void apply(ApplySnapshot applySnapshot) {
+            SnapshotManager.this.applySnapshot = applySnapshot;
 
             lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
 
             LOG.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber);
 
-            context.getPersistenceProvider().saveSnapshot(snapshot);
+            context.getPersistenceProvider().saveSnapshot(applySnapshot.getSnapshot());
 
             SnapshotManager.this.currentState = PERSISTING;
         }
@@ -374,16 +379,19 @@ public class SnapshotManager implements SnapshotState {
 
         @Override
         public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
-            LOG.debug("Snapshot success sequence number:", sequenceNumber);
+            LOG.debug("Snapshot success sequence number: {}", sequenceNumber);
 
             if(applySnapshot != null) {
                 try {
-                    applySnapshotProcedure.apply(applySnapshot.getState());
+                    Snapshot snapshot = applySnapshot.getSnapshot();
+                    applySnapshotProcedure.apply(snapshot.getState());
 
                     //clears the followers log, sets the snapshot index to ensure adjusted-index works
-                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(applySnapshot, context, currentBehavior));
-                    context.setLastApplied(applySnapshot.getLastAppliedIndex());
-                    context.setCommitIndex(applySnapshot.getLastAppliedIndex());
+                    context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
+                    context.setLastApplied(snapshot.getLastAppliedIndex());
+                    context.setCommitIndex(snapshot.getLastAppliedIndex());
+
+                    applySnapshot.getCallback().onSuccess();
                 } catch (Exception e) {
                     LOG.error("Error applying snapshot", e);
                 }
@@ -412,6 +420,8 @@ public class SnapshotManager implements SnapshotState {
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
                         context.getReplicatedLog().size());
+            } else {
+                applySnapshot.getCallback().onFailure();
             }
 
             lastSequenceNumber = -1;
index 46e0c87..f5a175f 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
 public interface SnapshotState {
@@ -42,7 +43,7 @@ public interface SnapshotState {
      *
      * @param snapshot the Snapshot to apply.
      */
-    void apply(Snapshot snapshot);
+    void apply(ApplySnapshot snapshot);
 
     /**
      * Persist the snapshot
index 54ee02a..3c43d80 100644 (file)
@@ -8,22 +8,49 @@
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 
-import java.io.Serializable;
-
 /**
  * Internal message, issued by follower to its actor
  */
-public class ApplySnapshot implements Serializable {
-    private static final long serialVersionUID = 1L;
+public class ApplySnapshot {
     private final Snapshot snapshot;
+    private final Callback callback;
 
     public ApplySnapshot(Snapshot snapshot) {
-        this.snapshot = snapshot;
+        this(snapshot, NOOP_CALLBACK);
+    }
+
+    public ApplySnapshot(@Nonnull Snapshot snapshot, @Nonnull Callback callback) {
+        this.snapshot = Preconditions.checkNotNull(snapshot);
+        this.callback = Preconditions.checkNotNull(callback);
     }
 
+    @Nonnull
     public Snapshot getSnapshot() {
         return snapshot;
     }
+
+    @Nonnull
+    public Callback getCallback() {
+        return callback;
+    }
+
+    public interface Callback {
+        void onSuccess();
+
+        void onFailure();
+    }
+
+    public static Callback NOOP_CALLBACK = new Callback() {
+        @Override
+        public void onSuccess() {
+        }
+
+        @Override
+        public void onFailure() {
+        }
+    };
 }
index 605a5c2..3c5ad04 100644 (file)
@@ -135,6 +135,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context);
         followerLogInformation.setFollowerState(followerState);
         followerToLog.put(followerId, followerLogInformation);
+
+        if(heartbeatSchedule == null) {
+            scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+        }
     }
 
     public void removeFollower(String followerId) {
@@ -254,9 +258,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             if (replicatedCount >= minReplicationCount) {
                 ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
-                if (replicatedLogEntry != null &&
-                    replicatedLogEntry.getTerm() == currentTerm()) {
+                if (replicatedLogEntry != null && replicatedLogEntry.getTerm() == currentTerm()) {
                     context.setCommitIndex(N);
+                } else {
+                    break;
                 }
             } else {
                 break;
index 7e6175d..787bd74 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -490,4 +491,15 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         return context.getId();
     }
 
+    public void applyServerConfiguration(ServerConfigurationPayload serverConfig) {
+        for(String peerId: context.getPeerAddresses().keySet()) {
+            context.removePeer(peerId);
+        }
+
+        for(String peerId: serverConfig.getNewServerConfig()) {
+            if(!getId().equals(peerId)) {
+                context.addToPeers(peerId, null);
+            }
+        }
+    }
 }
index d516f9c..8f41147 100644 (file)
@@ -14,6 +14,7 @@ import java.util.ArrayList;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -103,7 +104,7 @@ public class Follower extends AbstractRaftActorBehavior {
         // to make it easier to read. Before refactoring ensure tests
         // cover the code properly
 
-        if (snapshotTracker != null) {
+        if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
             // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
             AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
                     lastIndex(), lastTerm(), context.getPayloadVersion());
@@ -189,6 +190,10 @@ public class Follower extends AbstractRaftActorBehavior {
                 LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
 
                 context.getReplicatedLog().appendAndPersist(entry);
+
+                if(entry.getData() instanceof ServerConfigurationPayload) {
+                    applyServerConfiguration((ServerConfigurationPayload)entry.getData());
+                }
             }
 
             LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
@@ -335,7 +340,7 @@ public class Follower extends AbstractRaftActorBehavior {
         return super.handleMessage(sender, message);
     }
 
-    private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+    private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
 
         LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
                     logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
@@ -348,6 +353,9 @@ public class Follower extends AbstractRaftActorBehavior {
         updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
 
         try {
+            final InstallSnapshotReply reply = new InstallSnapshotReply(
+                    currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+
             if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
                     installSnapshot.getLastChunkHashCode())){
                 Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
@@ -359,19 +367,28 @@ public class Follower extends AbstractRaftActorBehavior {
                         context.getTermInformation().getCurrentTerm(),
                         context.getTermInformation().getVotedFor());
 
-                actor().tell(new ApplySnapshot(snapshot), actor());
+                ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
+                    @Override
+                    public void onSuccess() {
+                        LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
 
-                snapshotTracker = null;
-
-            }
+                        sender.tell(reply, actor());
+                    }
 
-            InstallSnapshotReply reply = new InstallSnapshotReply(
-                    currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+                    @Override
+                    public void onFailure() {
+                        sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), -1, false), actor());
+                    }
+                };
 
-            LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+                actor().tell(new ApplySnapshot(snapshot, applySnapshotCallback), actor());
 
-            sender.tell(reply, actor());
+                snapshotTracker = null;
+            } else {
+                LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
 
+                sender.tell(reply, actor());
+            }
         } catch (SnapshotTracker.InvalidChunkException e) {
             LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
 
index 538681e..5c9ba13 100644 (file)
@@ -7,7 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-//import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertEquals;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
 import akka.actor.ActorRef;
@@ -19,7 +19,9 @@ import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 //import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -28,20 +30,26 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
-//import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-//import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
-//import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+
 /**
  * Unit tests for RaftActorServerConfigurationSupport.
  *
@@ -60,23 +68,29 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
             Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
             actorFactory.generateActorId(FOLLOWER_ID));
 
-    private final TestActorRef<ForwardMessageToBehaviorActor> newServerActor = actorFactory.createTestActor(
-            Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
-            actorFactory.generateActorId(NEW_SERVER_ID));
+    private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
+    private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
 
-    private RaftActorContext newServerActorContext;
+    private RaftActorContext newFollowerActorContext;
     private final JavaTestKit testKit = new JavaTestKit(getSystem());
 
     @Before
     public void setup() {
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
+
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
-        newServerActorContext = new RaftActorContextImpl(newServerActor, newServerActor.underlyingActor().getContext(),
-                NEW_SERVER_ID, new ElectionTermImpl(NO_PERSISTENCE, NEW_SERVER_ID, LOG), -1, -1,
-                Maps.<String, String>newHashMap(), configParams, NO_PERSISTENCE, LOG);
-        newServerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        newFollowerCollectorActor = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(NEW_SERVER_ID + "Collector"));
+        newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
+                configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(NEW_SERVER_ID));
+        newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
     }
 
     @After
@@ -85,19 +99,16 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     }
 
     @Test
-    public void testAddServerWithFollower() throws Exception {
+    public void testAddServerWithExistingFollower() throws Exception {
         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
                 0, 3, 1).build());
-        followerActorContext.setCommitIndex(3);
-        followerActorContext.setLastApplied(3);
+        followerActorContext.setCommitIndex(2);
+        followerActorContext.setLastApplied(2);
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
 
-        Follower newServer = new Follower(newServerActorContext);
-        newServerActor.underlyingActor().setBehavior(newServer);
-
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
                 MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
                         followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -109,33 +120,173 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
 
-        leaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+        // Leader should install snapshot - capture and verify ApplySnapshot contents
+
+        ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
+        List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+        assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+        assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+        // Verify ServerConfigurationPayload entry in leader's log
+
+        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+        assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
+        assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
+
+        // Verify ServerConfigurationPayload entry in both followers
+
+        assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
+        verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
+
+        assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
+        verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
+
+        // Verify new server config was applied in both followers
+
+        assertEquals("Follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID),
+                followerActorContext.getPeerAddresses().keySet());
+
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
+                newFollowerActorContext.getPeerAddresses().keySet());
+
+        clearMessages(followerActor);
+        clearMessages(newFollowerCollectorActor);
+
+        expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
+        expectFirstMatching(followerActor, ApplyState.class);
+
+        assertEquals("Follower commit index", 3, followerActorContext.getCommitIndex());
+        assertEquals("Follower last applied index", 3, followerActorContext.getLastApplied());
+        assertEquals("New follower commit index", 3, newFollowerActorContext.getCommitIndex());
+        assertEquals("New follower last applied index", 3, newFollowerActorContext.getLastApplied());
+    }
+
+    @Test
+    public void testAddServerWithNoExistingFollower() throws Exception {
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+        initialActorContext.setCommitIndex(1);
+        initialActorContext.setLastApplied(1);
+        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
+                0, 2, 1).build());
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+        // Leader should install snapshot - capture and verify ApplySnapshot contents
+
+        ApplySnapshot applySnapshot = expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
+        List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+        assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+        assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+        // Verify ServerConfigurationPayload entry in leader's log
+
+        assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
+        assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+
+        // Verify ServerConfigurationPayload entry in the new follower
+
+        clearMessages(newFollowerCollectorActor);
+
+        expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
+        assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
+        verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+
+        // Verify new server config was applied in the new follower
+
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
+                newFollowerActorContext.getPeerAddresses().keySet());
+    }
+
+    @Test
+    public void testAddServerAsNonVoting() throws Exception {
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+        initialActorContext.setCommitIndex(-1);
+        initialActorContext.setLastApplied(-1);
+        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), false), testKit.getRef());
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+        assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+        // Verify ServerConfigurationPayload entry in leader's log
+
+        assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
+        assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+
+        // Verify ServerConfigurationPayload entry in the new follower
+
+        expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
+        assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
+        verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+
+        // Verify new server config was applied in the new follower
+
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
+                newFollowerActorContext.getPeerAddresses().keySet());
+
+        MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.SERIALIZABLE_CLASS, 500);
+    }
+
+    @Test
+    public void testAddServerWithInstallSnapshotTimeout() throws Exception {
+        newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
 
-        // leader should install snapshot - capture and verify ApplySnapshot contents
-        //ApplySnapshot applySnapshot = expectFirstMatching(followerActor, ApplySnapshot.class);
-        //List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
-        //assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+        initialActorContext.setCommitIndex(-1);
+        initialActorContext.setLastApplied(-1);
+        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
-        // leader should replicate new server config to both followers
-        //expectFirstMatching(followerActor, AppendEntries.class);
-        //expectFirstMatching(newServerActor, AppendEntries.class);
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
 
-        // verify ServerConfigurationPayload entry in leader's log
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
-        //assertEquals("Leader journal log size", 4, leaderActorContext.getReplicatedLog().size());
-        //assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
-        ReplicatedLogEntry logEntry = leaderActorContext.getReplicatedLog().get(
-                leaderActorContext.getReplicatedLog().lastIndex());
-        // verify logEntry contents
 
-        // Also verify ServerConfigurationPayload entry in both followers
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
-        //AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
-        //assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
-        //assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
+
+        assertEquals("Leader peers size", 0, leaderActorContext.getPeerAddresses().keySet().size());
+        assertEquals("Leader followers size", 0,
+                ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
     }
 
-    //@Test
+    @Test
     public void testAddServerWithNoLeader() {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
@@ -146,12 +297,12 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
                 actorFactory.generateActorId(LEADER_ID));
         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
 
-        noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
-        //AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
-        //assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
+        noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
     }
 
-    //@Test
+    @Test
     public void testAddServerForwardedToLeader() {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
@@ -169,16 +320,25 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
                 -1, -1, (short)0), leaderActor);
 
-        followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
-        //expectFirstMatching(leaderActor, AddServer.class);
+        followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+        expectFirstMatching(leaderActor, AddServer.class);
+    }
+
+    private void verifyServerConfigurationPayloadEntry(ReplicatedLog log, String... cNew) {
+        ReplicatedLogEntry logEntry = log.get(log.lastIndex());
+        assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
+        ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
+        assertEquals("getNewServerConfig", Sets.newHashSet(cNew), Sets.newHashSet(payload.getNewServerConfig()));
     }
 
     private RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
+        ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
+        termInfo.update(1, LEADER_ID);
         RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                id, new ElectionTermImpl(NO_PERSISTENCE, id, LOG), -1, -1,
+                id, termInfo, -1, -1,
                 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
 
         return followerActorContext;
@@ -198,6 +358,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
             context.setCommitIndex(fromContext.getCommitIndex());
             context.setLastApplied(fromContext.getLastApplied());
+            context.getTermInformation().update(fromContext.getTermInformation().getCurrentTerm(),
+                    fromContext.getTermInformation().getVotedFor());
         }
 
         @Override
@@ -217,9 +379,37 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-            configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-            configParams.setElectionTimeoutFactor(100000);
+            configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+            configParams.setElectionTimeoutFactor(1);
             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
         }
     }
+
+    public static class MockNewFollowerRaftActor extends MockRaftActor {
+        private final TestActorRef<MessageCollectorActor> collectorActor;
+        private volatile Class<?> dropMessageOfType;
+
+        public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
+            super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null);
+            this.collectorActor = collectorActor;
+        }
+
+        void setDropMessageOfType(Class<?> dropMessageOfType) {
+            this.dropMessageOfType = dropMessageOfType;
+        }
+
+        @Override
+        public void handleCommand(Object message) {
+            if(dropMessageOfType != null && dropMessageOfType.equals(message.getClass())) {
+                return;
+            }
+
+            super.handleCommand(message);
+            collectorActor.tell(message, getSender());
+        }
+
+        static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
+            return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
+        }
+    }
 }
index 7d41508..d79a483 100644 (file)
@@ -97,9 +97,10 @@ public class RaftActorSnapshotMessageSupportTest {
         Snapshot snapshot = Snapshot.create(snapshotBytes, Collections.<ReplicatedLogEntry>emptyList(),
                 lastIndexDuringSnapshotCapture, 1, lastAppliedDuringSnapshotCapture, 1);
 
-        sendMessageToSupport(new ApplySnapshot(snapshot));
+        ApplySnapshot applySnapshot = new ApplySnapshot(snapshot);
+        sendMessageToSupport(applySnapshot);
 
-        verify(mockSnapshotManager).apply(snapshot);
+        verify(mockSnapshotManager).apply(applySnapshot);
     }
 
     @Test
index a06d086..51818d1 100644 (file)
@@ -785,6 +785,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
+        applySnapshot.getCallback().onSuccess();
 
         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
                 leaderActor, InstallSnapshotReply.class);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.