Bug 2187: Code cleanup and refactoring 74/28674/5
authorTom Pantelis <tpanteli@brocade.com>
Wed, 21 Oct 2015 22:47:31 +0000 (18:47 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 23 Oct 2015 19:14:18 +0000 (19:14 +0000)
I addressed remaining comments from a prior patch.

I also refactored RaftActorServerConfigurationSupport to use an FSM
similar to the SnapshotManager with some generic classes. This will
make it easier to implement RemoveServer and reuse code.

Change-Id: Id3cdcede3f9c393c878abd3e9a9d3a5e12c5fb8a
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/FollowerCatchUpTimeout.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/UnInitializedFollowerSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java

index ae23140114ed7d2ca8e73584ecd997fefd437723..a00c0241d96b61cbb3b2a5b0330f7c992d94b0b4 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -37,28 +38,29 @@ import scala.concurrent.duration.FiniteDuration;
  */
 class RaftActorServerConfigurationSupport {
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
-    private final RaftActorContext context;
-    // client follower queue
-    private final Queue<CatchupFollowerInfo> followerInfoQueue = new LinkedList<CatchupFollowerInfo>();
-    // timeout handle
-    private Cancellable followerTimeout = null;
+
+    private final OperationState IDLE = new Idle();
+
+    private final RaftActorContext raftContext;
+
+    private final Queue<ServerOperationContext<?>> pendingOperationsQueue = new LinkedList<>();
+
+    private OperationState currentOperationState = IDLE;
 
     RaftActorServerConfigurationSupport(RaftActorContext context) {
-        this.context = context;
+        this.raftContext = context;
     }
 
     boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
         if(message instanceof AddServer) {
             onAddServer((AddServer)message, raftActor, sender);
             return true;
-        } else if (message instanceof FollowerCatchUpTimeout){
-            FollowerCatchUpTimeout followerTimeout  = (FollowerCatchUpTimeout)message;
-            // abort follower catchup on timeout
-            onFollowerCatchupTimeout(raftActor, sender, followerTimeout.getNewServerId());
+        } else if (message instanceof FollowerCatchUpTimeout) {
+            currentOperationState.onFollowerCatchupTimeout(raftActor, (FollowerCatchUpTimeout)message);
             return true;
-        } else if (message instanceof UnInitializedFollowerSnapshotReply){
-            // snapshot installation is successful
-            onUnInitializedFollowerSnapshotReply((UnInitializedFollowerSnapshotReply)message, raftActor,sender);
+        } else if (message instanceof UnInitializedFollowerSnapshotReply) {
+            currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor,
+                    (UnInitializedFollowerSnapshotReply)message);
             return true;
         } else if(message instanceof ApplyState) {
             return onApplyState((ApplyState) message, raftActor);
@@ -70,34 +72,13 @@ class RaftActorServerConfigurationSupport {
     private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
         Payload data = applyState.getReplicatedLogEntry().getData();
         if(data instanceof ServerConfigurationPayload) {
-            CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
-            if(followerInfo != null && followerInfo.getContextId().equals(applyState.getIdentifier())) {
-                LOG.info("{} has been successfully replicated to a majority of followers", data);
-
-                // respond ok to follower
-                respondToClient(raftActor, ServerChangeStatus.OK);
-            }
-
+            currentOperationState.onApplyState(raftActor, applyState);
             return true;
         }
 
         return false;
     }
 
-    private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
-        LOG.debug("{}: onAddServer: {}", context.getId(), addServer);
-        if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
-            return;
-        }
-
-        CatchupFollowerInfo followerInfo = new CatchupFollowerInfo(addServer,sender);
-        boolean process = followerInfoQueue.isEmpty();
-        followerInfoQueue.add(followerInfo);
-        if(process) {
-            processAddServer(raftActor);
-        }
-    }
-
     /**
      * The algorithm for AddServer is as follows:
      * <ul>
@@ -119,119 +100,276 @@ class RaftActorServerConfigurationSupport {
      *     <li>Respond to caller with TIMEOUT.</li>
      * </ul>
      */
-    private void processAddServer(RaftActor raftActor){
-        LOG.debug("{}: In processAddServer", context.getId());
-
-        AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
-        CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
-        AddServer addSrv = followerInfo.getAddServer();
-        context.addToPeers(addSrv.getNewServerId(), addSrv.getNewServerAddress());
-
-        // if voting member - initialize to VOTING_NOT_INITIALIZED
-        FollowerState initialState = addSrv.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
-            FollowerState.NON_VOTING;
-        leader.addFollower(addSrv.getNewServerId(), initialState);
-
-        if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
-            LOG.debug("Leader sending initiate capture snapshot to follower : {}", addSrv.getNewServerId());
-            leader.initiateCaptureSnapshot(addSrv.getNewServerId());
-            // schedule the catchup timeout timer
-            followerTimeout = context.getActorSystem().scheduler()
-               .scheduleOnce(new FiniteDuration(((context.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
-                TimeUnit.MILLISECONDS),
-                context.getActor(), new FollowerCatchUpTimeout(addSrv.getNewServerId()),
-                context.getActorSystem().dispatcher(), context.getActor());
+    private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
+        LOG.debug("{}: onAddServer: {}", raftContext.getId(), addServer);
+
+        onNewOperation(raftActor, new AddServerContext(addServer, sender));
+    }
+
+    private void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
+        if (raftActor.isLeader()) {
+            currentOperationState.onNewOperation(raftActor, operationContext);
         } else {
-            LOG.debug("Directly persisting  the new server configuration : {}", addSrv.getNewServerId());
-            persistNewServerConfiguration(raftActor, followerInfo);
+            ActorSelection leader = raftActor.getLeader();
+            if (leader != null) {
+                LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
+                leader.forward(operationContext.getOperation(), raftActor.getContext());
+            } else {
+                LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
+                operationContext.getClientRequestor().tell(operationContext.newReply(
+                        ServerChangeStatus.NO_LEADER, null), raftActor.self());
+            }
         }
     }
 
-    private boolean noLeaderOrForwardedToLeader(Object message, RaftActor raftActor, ActorRef sender) {
-        if (raftActor.isLeader()) {
-            return false;
+    /**
+     * Interface for a server operation FSM state.
+     */
+    private interface OperationState {
+        void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext);
+
+        void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout);
+
+        void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply);
+
+        void onApplyState(RaftActor raftActor, ApplyState applyState);
+    }
+
+    /**
+     * Interface for the initial state for a server operation.
+     */
+    private interface InitialOperationState {
+        void initiate(RaftActor raftActor);
+    }
+
+    /**
+     * Abstract base class for server operation FSM state. Handles common behavior for all states.
+     */
+    private abstract class AbstractOperationState implements OperationState {
+        @Override
+        public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
+            // We're currently processing another operation so queue it to be processed later.
+
+            LOG.debug("{}: Server operation already in progress - queueing {}", raftContext.getId(),
+                    operationContext.getOperation());
+
+            pendingOperationsQueue.add(operationContext);
         }
 
-        ActorSelection leader = raftActor.getLeader();
-        if (leader != null) {
-            LOG.debug("Not leader - forwarding to leader {}", leader);
-            leader.forward(message, raftActor.getContext());
-        } else {
-            LOG.debug("No leader - returning NO_LEADER AddServerReply");
-            sender.tell(new AddServerReply(ServerChangeStatus.NO_LEADER, null), raftActor.self());
+        @Override
+        public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
+            LOG.debug("onFollowerCatchupTimeout should not be called in state {}", this);
         }
 
-        return true;
+        @Override
+        public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
+            LOG.debug("onUnInitializedFollowerSnapshotReply was called in state {}", this);
+        }
+
+        @Override
+        public void onApplyState(RaftActor raftActor, ApplyState applyState) {
+            LOG.debug("onApplyState was called in state {}", this);
+        }
+
+        protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
+            List <String> newConfig = new ArrayList<String>(raftContext.getPeerAddresses().keySet());
+            newConfig.add(raftContext.getId());
+
+            LOG.debug("{}: New server configuration : {}", raftContext.getId(), newConfig);
+
+            ServerConfigurationPayload payload = new ServerConfigurationPayload(newConfig, Collections.<String>emptyList());
+
+            raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
+
+            currentOperationState = new Persisting(operationContext);
+        }
+
+        protected void operationComplete(RaftActor raftActor, ServerOperationContext<?> operationContext,
+                ServerChangeStatus status) {
+
+            LOG.debug("{}: Returning {} for operation {}", raftContext.getId(), status, operationContext.getOperation());
+
+            operationContext.getClientRequestor().tell(operationContext.newReply(status, raftActor.getLeaderId()),
+                    raftActor.self());
+
+            currentOperationState = IDLE;
+
+            ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
+            if(nextOperation != null) {
+                RaftActorServerConfigurationSupport.this.onNewOperation(raftActor, nextOperation);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return getClass().getSimpleName();
+        }
     }
 
-    private void onUnInitializedFollowerSnapshotReply(UnInitializedFollowerSnapshotReply reply,
-                                                 RaftActor raftActor, ActorRef sender){
-        CatchupFollowerInfo followerInfo = followerInfoQueue.peek();
-        // Sanity check - it's possible we get a reply after it timed out.
-        if(followerInfo == null) {
-            return;
+    /**
+     * The state when no server operation is in progress. It immediately initiates new server operations.
+     */
+    private class Idle extends AbstractOperationState {
+        @Override
+        public void onNewOperation(RaftActor raftActor, ServerOperationContext<?> operationContext) {
+            operationContext.newInitialOperationState(RaftActorServerConfigurationSupport.this).initiate(raftActor);
         }
 
-        String followerId = reply.getFollowerId();
-        AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
-        FollowerLogInformation followerLogInformation = leader.getFollower(followerId);
-        stopFollowerTimer();
-        followerLogInformation.setFollowerState(FollowerState.VOTING);
-        leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
+        @Override
+        public void onApplyState(RaftActor raftActor, ApplyState applyState) {
+            // Noop - we override b/c ApplyState is called normally for followers in the idle state.
+        }
+    }
+
+    /**
+     * The state when a new server configuration is being persisted and replicated.
+     */
+    private class Persisting extends AbstractOperationState {
+        private final ServerOperationContext<?> operationContext;
 
-        persistNewServerConfiguration(raftActor, followerInfo);
+        Persisting(ServerOperationContext<?> operationContext) {
+            this.operationContext = operationContext;
+        }
+
+        @Override
+        public void onApplyState(RaftActor raftActor, ApplyState applyState) {
+            // Sanity check - we could get an ApplyState from a previous operation that timed out so make
+            // sure it's meant for us.
+            if(operationContext.getContextId().equals(applyState.getIdentifier())) {
+                LOG.info("{}: {} has been successfully replicated to a majority of followers",
+                        applyState.getReplicatedLogEntry().getData());
+
+                operationComplete(raftActor, operationContext, ServerChangeStatus.OK);
+            }
+        }
     }
 
-    private void persistNewServerConfiguration(RaftActor raftActor, CatchupFollowerInfo followerInfo){
-        List <String> cNew = new ArrayList<String>(context.getPeerAddresses().keySet());
-        cNew.add(context.getId());
+    /**
+     * Abstract base class for an AddServer operation state.
+     */
+    private abstract class AddServerState extends AbstractOperationState {
+        private final AddServerContext addServerContext;
 
-        LOG.debug("New server configuration : {}",  cNew.toString());
+        AddServerState(AddServerContext addServerContext) {
+            this.addServerContext = addServerContext;
+        }
 
-        ServerConfigurationPayload servPayload = new ServerConfigurationPayload(cNew, Collections.<String>emptyList());
+        AddServerContext getAddServerContext() {
+            return addServerContext;
+        }
+    }
 
-        raftActor.persistData(followerInfo.getClientRequestor(), followerInfo.getContextId(), servPayload);
-   }
+    /**
+     * The initial state for the AddServer operation. It adds the new follower as a peer and initiates
+     * snapshot capture, if necessary.
+     */
+    private class InitialAddServerState extends AddServerState implements InitialOperationState {
+        InitialAddServerState(AddServerContext addServerContext) {
+            super(addServerContext);
+        }
+
+        @Override
+        public void initiate(RaftActor raftActor) {
+            AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+
+            AddServer addServer = getAddServerContext().getOperation();
+
+            LOG.debug("{}: Initiating {}", raftContext.getId(), addServer);
+
+            raftContext.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress());
+
+            // if voting member - initialize to VOTING_NOT_INITIALIZED
+            FollowerState initialState = addServer.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
+                FollowerState.NON_VOTING;
+            leader.addFollower(addServer.getNewServerId(), initialState);
 
-   private void stopFollowerTimer() {
-        if (followerTimeout != null && !followerTimeout.isCancelled()) {
-            followerTimeout.cancel();
+            if(initialState == FollowerState.VOTING_NOT_INITIALIZED){
+                LOG.debug("{}: Leader sending initiate capture snapshot to new follower {}", raftContext.getId(),
+                        addServer.getNewServerId());
+
+                leader.initiateCaptureSnapshot(addServer.getNewServerId());
+
+                // schedule the install snapshot timeout timer
+                Cancellable installSnapshotTimer = raftContext.getActorSystem().scheduler().scheduleOnce(
+                        new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
+                                TimeUnit.MILLISECONDS), raftContext.getActor(),
+                                new FollowerCatchUpTimeout(addServer.getNewServerId()),
+                                raftContext.getActorSystem().dispatcher(), raftContext.getActor());
+
+                currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
+            } else {
+                LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
+                        raftContext.getId());
+
+                persistNewServerConfiguration(raftActor, getAddServerContext());
+            }
         }
-   }
+    }
 
-   private void onFollowerCatchupTimeout(RaftActor raftActor, ActorRef sender, String serverId){
-        LOG.debug("onFollowerCatchupTimeout: {}",  serverId);
-        AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
-        // cleanup
-        context.removePeer(serverId);
-        leader.removeFollower(serverId);
-        LOG.warn("Timeout occured for new server {} while installing snapshot", serverId);
-        respondToClient(raftActor,ServerChangeStatus.TIMEOUT);
-   }
+    /**
+     * The AddServer operation state for when the catch-up snapshot is being installed. It handles successful
+     * reply or timeout.
+     */
+    private class InstallingSnapshot extends AddServerState {
+        private final Cancellable installSnapshotTimer;
+
+        InstallingSnapshot(AddServerContext addServerContext, Cancellable installSnapshotTimer) {
+            super(addServerContext);
+            this.installSnapshotTimer = Preconditions.checkNotNull(installSnapshotTimer);
+        }
+
+        @Override
+        public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
+            String serverId = followerTimeout.getNewServerId();
+
+            LOG.debug("{}: onFollowerCatchupTimeout: {}", raftContext.getId(), serverId);
+
+            AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+
+            // cleanup
+            raftContext.removePeer(serverId);
+            leader.removeFollower(serverId);
 
-   private void respondToClient(RaftActor raftActor, ServerChangeStatus result){
-        // remove the entry from the queue
-        CatchupFollowerInfo fInfo = followerInfoQueue.remove();
+            LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(), serverId);
 
-        // get the sender
-        ActorRef toClient = fInfo.getClientRequestor();
+            operationComplete(raftActor, getAddServerContext(), ServerChangeStatus.TIMEOUT);
+        }
+
+        @Override
+        public void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply) {
+            LOG.debug("{}: onUnInitializedFollowerSnapshotReply: {}", raftContext.getId(), reply);
+
+            String followerId = reply.getFollowerId();
+
+            // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
+            // add server operation that timed out.
+            if(getAddServerContext().getOperation().getNewServerId().equals(followerId)) {
+                AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+                FollowerLogInformation followerLogInformation = leader.getFollower(followerId);
 
-        toClient.tell(new AddServerReply(result, raftActor.getLeaderId()), raftActor.self());
-        LOG.debug("Response returned is {} for server {} ",  result, fInfo.getAddServer().getNewServerId());
-        if(!followerInfoQueue.isEmpty()){
-            processAddServer(raftActor);
+                installSnapshotTimer.cancel();
+
+                followerLogInformation.setFollowerState(FollowerState.VOTING);
+                leader.updateMinReplicaCountAndMinIsolatedLeaderPeerCount();
+
+                persistNewServerConfiguration(raftActor, getAddServerContext());
+            }
         }
-   }
+    }
 
-    // maintain sender actorRef
-    private static class CatchupFollowerInfo {
-        private final AddServer addServer;
+    /**
+     * Stores context information for a server operation.
+     *
+     * @param <T> the operation type
+     */
+    private static abstract class ServerOperationContext<T> {
+        private final T operation;
         private final ActorRef clientRequestor;
         private final String contextId;
 
-        CatchupFollowerInfo(AddServer addSrv, ActorRef cliReq){
-            addServer = addSrv;
-            clientRequestor = cliReq;
+        ServerOperationContext(T operation, ActorRef clientRequestor){
+            this.operation = operation;
+            this.clientRequestor = clientRequestor;
             contextId = UUID.randomUUID().toString();
         }
 
@@ -239,12 +377,35 @@ class RaftActorServerConfigurationSupport {
             return contextId;
         }
 
-        AddServer getAddServer(){
-            return addServer;
+        T getOperation() {
+            return operation;
         }
 
-        ActorRef getClientRequestor(){
+        ActorRef getClientRequestor() {
             return clientRequestor;
         }
+
+        abstract Object newReply(ServerChangeStatus status, String leaderId);
+
+        abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
+    }
+
+    /**
+     * Stores context information for an AddServer operation.
+     */
+    private static class AddServerContext extends ServerOperationContext<AddServer> {
+        AddServerContext(AddServer addServer, ActorRef clientRequestor) {
+            super(addServer, clientRequestor);
+        }
+
+        @Override
+        Object newReply(ServerChangeStatus status, String leaderId) {
+            return new AddServerReply(status, leaderId);
+        }
+
+        @Override
+        InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
+            return support.new InitialAddServerState(this);
+        }
     }
 }
index 3c5ad0428fd90ba59e3aabfea701a46f8a7af6cb..d4612dbedafb91f7015e7d66caf8d2735f45a8f2 100644 (file)
@@ -157,7 +157,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
     }
 
-    public int getMinIsolatedLeaderPeerCount(){
+    protected int getMinIsolatedLeaderPeerCount(){
         return minIsolatedLeaderPeerCount;
     }
 
@@ -433,7 +433,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     }
                     wasLastChunk = true;
                     FollowerState followerState = followerLogInformation.getFollowerState();
-                    if(followerState ==  FollowerState.VOTING_NOT_INITIALIZED){
+                    if(followerState == FollowerState.VOTING_NOT_INITIALIZED){
                         UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess =
                                              new UnInitializedFollowerSnapshotReply(followerId);
                         context.getActor().tell(unInitFollowerSnapshotSuccess, context.getActor());
index 787bd74629b0c643400e49deb32a40468c52e50a..b20671f9d8de3ca4b8ec83b402107aa5763fb5b3 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
@@ -492,14 +494,19 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     public void applyServerConfiguration(ServerConfigurationPayload serverConfig) {
-        for(String peerId: context.getPeerAddresses().keySet()) {
-            context.removePeer(peerId);
-        }
-
+        Map<String, String> currentPeers = new HashMap<>(context.getPeerAddresses());
         for(String peerId: serverConfig.getNewServerConfig()) {
             if(!getId().equals(peerId)) {
-                context.addToPeers(peerId, null);
+                if(!currentPeers.containsKey(peerId)) {
+                    context.addToPeers(peerId, null);
+                } else {
+                    currentPeers.remove(peerId);
+                }
             }
         }
+
+        for(String peerIdToRemove: currentPeers.keySet()) {
+            context.removePeer(peerIdToRemove);
+        }
     }
 }
index 670070279459d5c9a0baa92d75f2cb86cb3f1fb9..365e3a0b3c61feb440d0dc9eb4ce46cc835af890 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014 Dell Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Dell Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
index 50f20705ca2e1d94eb3bf3707f04ed9efbf93149..4d66fa6d4d3d0fbf4dac4bf006e2f259c63e614e 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014 Dell Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Dell Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -14,10 +14,16 @@ package org.opendaylight.controller.cluster.raft.messages;
 public class  UnInitializedFollowerSnapshotReply {
     private final String followerId;
 
-    public UnInitializedFollowerSnapshotReply(String follId){
-       this.followerId = follId;
+    public UnInitializedFollowerSnapshotReply(String followerId){
+       this.followerId = followerId;
     }
+
     public String getFollowerId() {
         return followerId;
     }
+
+    @Override
+    public String toString() {
+        return "UnInitializedFollowerSnapshotReply [followerId=" + followerId + "]";
+    }
 }
index 5c9ba13b8beda3e439df5dce505f2d96c747e1be..2528c8aad49e034b9d60ac74f2b1fef310b59290 100644 (file)
@@ -59,6 +59,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     static final String LEADER_ID = "leader";
     static final String FOLLOWER_ID = "follower";
     static final String NEW_SERVER_ID = "new-server";
+    static final String NEW_SERVER_ID2 = "new-server2";
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
     private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
 
@@ -70,8 +71,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
     private TestActorRef<MockNewFollowerRaftActor> newFollowerRaftActor;
     private TestActorRef<MessageCollectorActor> newFollowerCollectorActor;
-
     private RaftActorContext newFollowerActorContext;
+
     private final JavaTestKit testKit = new JavaTestKit(getSystem());
 
     @Before
@@ -79,10 +80,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();
 
-        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-        configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
-        configParams.setElectionTimeoutFactor(100000);
-        configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+        DefaultConfigParamsImpl configParams = newFollowerConfigParams();
 
         newFollowerCollectorActor = actorFactory.createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -90,7 +88,20 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         newFollowerRaftActor = actorFactory.createTestActor(MockNewFollowerRaftActor.props(
                 configParams, newFollowerCollectorActor).withDispatcher(Dispatchers.DefaultDispatcherId()),
                 actorFactory.generateActorId(NEW_SERVER_ID));
-        newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
+
+        try {
+            newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
+        } catch (Exception e) {
+            newFollowerActorContext = newFollowerRaftActor.underlyingActor().getRaftActorContext();
+        }
+    }
+
+    private DefaultConfigParamsImpl newFollowerConfigParams() {
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(100000);
+        configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+        return configParams;
     }
 
     @After
@@ -156,9 +167,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, FOLLOWER_ID),
                 newFollowerActorContext.getPeerAddresses().keySet());
 
-        clearMessages(followerActor);
-        clearMessages(newFollowerCollectorActor);
-
         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
         expectFirstMatching(followerActor, ApplyState.class);
 
@@ -205,8 +213,6 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         // Verify ServerConfigurationPayload entry in the new follower
 
-        clearMessages(newFollowerCollectorActor);
-
         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
         verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
@@ -218,7 +224,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     }
 
     @Test
-    public void testAddServerAsNonVoting() throws Exception {
+    public void testAddServersAsNonVoting() throws Exception {
         RaftActorContext initialActorContext = new MockRaftActorContext();
         initialActorContext.setCommitIndex(-1);
         initialActorContext.setLastApplied(-1);
@@ -256,7 +262,81 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID),
                 newFollowerActorContext.getPeerAddresses().keySet());
 
-        MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.SERIALIZABLE_CLASS, 500);
+        MessageCollectorActor.assertNoneMatching(newFollowerCollectorActor, InstallSnapshot.class, 500);
+
+        // Add another non-voting server.
+
+        RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
+        Follower newFollower2 = new Follower(follower2ActorContext);
+        followerActor.underlyingActor().setBehavior(newFollower2);
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit.getRef());
+
+        addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+        assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+        assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
+        assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
+                LEADER_ID, NEW_SERVER_ID, NEW_SERVER_ID2);
+    }
+
+    @Test
+    public void testAddServerWithOperationInProgress() throws Exception {
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+        initialActorContext.setCommitIndex(-1);
+        initialActorContext.setLastApplied(-1);
+        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+        RaftActorContext follower2ActorContext = newFollowerContext(NEW_SERVER_ID2, followerActor);
+        Follower newFollower2 = new Follower(follower2ActorContext);
+        followerActor.underlyingActor().setBehavior(newFollower2);
+
+        MockNewFollowerRaftActor newFollowerRaftActorInstance = newFollowerRaftActor.underlyingActor();
+        newFollowerRaftActorInstance.setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+        // Wait for leader's install snapshot and capture it
+
+        Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
+
+        JavaTestKit testKit2 = new JavaTestKit(getSystem());
+        leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
+
+        newFollowerRaftActorInstance.setDropMessageOfType(null);
+        newFollowerRaftActor.tell(installSnapshot, leaderActor);
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+
+        addServerReply = testKit2.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+
+        // Verify ServerConfigurationPayload entries in leader's log
+
+        assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
+        assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
+                LEADER_ID, NEW_SERVER_ID, NEW_SERVER_ID2);
+
+        // Verify ServerConfigurationPayload entry in the new follower
+
+        MessageCollectorActor.expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
+
+        assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
+               newFollowerActorContext.getPeerAddresses().keySet());
     }
 
     @Test
@@ -275,6 +355,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
 
         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
@@ -340,6 +421,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
                 id, termInfo, -1, -1,
                 ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
+        followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         return followerActorContext;
     }
@@ -380,7 +464,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
             configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
-            configParams.setElectionTimeoutFactor(1);
+            configParams.setElectionTimeoutFactor(10);
             return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
         }
     }
@@ -400,11 +484,10 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         @Override
         public void handleCommand(Object message) {
-            if(dropMessageOfType != null && dropMessageOfType.equals(message.getClass())) {
-                return;
+            if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
+                super.handleCommand(message);
             }
 
-            super.handleCommand(message);
             collectorActor.tell(message, getSender());
         }