Add wait state for AddServer if snapshot in progress 25/28925/3
authorTom Pantelis <tpanteli@brocade.com>
Wed, 28 Oct 2015 20:06:23 +0000 (16:06 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 29 Oct 2015 20:38:39 +0000 (20:38 +0000)
It is possible a snapshot capture coild be in progress when we
attempt to initiate snapshot capture on AddServer. I added a wait
state to the FSM and a new message, SnapshotComplete, that is sent
by the SnapshotManager.

Added more unit test cases.

Change-Id: I119a264e03686ea70f7834e551c2fb45dd39f903
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SnapshotComplete.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java

index aba94f653efdc1bc455d1a66b1ac88f48e8fb0f9..9e40c98e6b079692ef320f1270da9d185a81168c 100644 (file)
@@ -20,6 +20,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
@@ -64,6 +65,9 @@ class RaftActorServerConfigurationSupport {
             return true;
         } else if(message instanceof ApplyState) {
             return onApplyState((ApplyState) message, raftActor);
+        } else if(message instanceof SnapshotComplete) {
+            currentOperationState.onSnapshotComplete(raftActor);
+            return false;
         } else {
             return false;
         }
@@ -133,6 +137,8 @@ class RaftActorServerConfigurationSupport {
         void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply);
 
         void onApplyState(RaftActor raftActor, ApplyState applyState);
+
+        void onSnapshotComplete(RaftActor raftActor);
     }
 
     /**
@@ -171,6 +177,10 @@ class RaftActorServerConfigurationSupport {
             LOG.debug("onApplyState was called in state {}", this);
         }
 
+        @Override
+        public void onSnapshotComplete(RaftActor raftActor) {
+        }
+
         protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
             Collection<PeerInfo> peers = raftContext.getPeers();
             List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
@@ -262,6 +272,32 @@ class RaftActorServerConfigurationSupport {
         AddServerContext getAddServerContext() {
             return addServerContext;
         }
+
+        Cancellable newInstallSnapshotTimer(RaftActor raftActor) {
+            return raftContext.getActorSystem().scheduler().scheduleOnce(
+                    new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
+                            TimeUnit.MILLISECONDS), raftContext.getActor(),
+                            new FollowerCatchUpTimeout(addServerContext.getOperation().getNewServerId()),
+                            raftContext.getActorSystem().dispatcher(), raftContext.getActor());
+        }
+
+        void handleOnFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
+            String serverId = followerTimeout.getNewServerId();
+
+            LOG.debug("{}: onFollowerCatchupTimeout for new server {}", raftContext.getId(), serverId);
+
+            // cleanup
+            raftContext.removePeer(serverId);
+
+            boolean isLeader = raftActor.isLeader();
+            if(isLeader) {
+                AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+                leader.removeFollower(serverId);
+            }
+
+            operationComplete(raftActor, getAddServerContext(),
+                    isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
+        }
     }
 
     /**
@@ -288,19 +324,19 @@ class RaftActorServerConfigurationSupport {
             leader.addFollower(addServer.getNewServerId());
 
             if(votingState == VotingState.VOTING_NOT_INITIALIZED){
-                LOG.debug("{}: Leader sending initiate capture snapshot to new follower {}", raftContext.getId(),
-                        addServer.getNewServerId());
-
-                leader.initiateCaptureSnapshot(addServer.getNewServerId());
-
                 // schedule the install snapshot timeout timer
-                Cancellable installSnapshotTimer = raftContext.getActorSystem().scheduler().scheduleOnce(
-                        new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2),
-                                TimeUnit.MILLISECONDS), raftContext.getActor(),
-                                new FollowerCatchUpTimeout(addServer.getNewServerId()),
-                                raftContext.getActorSystem().dispatcher(), raftContext.getActor());
-
-                currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
+                Cancellable installSnapshotTimer = newInstallSnapshotTimer(raftActor);
+                if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) {
+                    LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
+                            addServer.getNewServerId());
+
+                    currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer);
+                } else {
+                    LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId());
+
+                    currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(),
+                            installSnapshotTimer);
+                }
             } else {
                 LOG.debug("{}: New follower is non-voting - directly persisting new server configuration",
                         raftContext.getId());
@@ -324,19 +360,10 @@ class RaftActorServerConfigurationSupport {
 
         @Override
         public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
-            String serverId = followerTimeout.getNewServerId();
-
-            LOG.debug("{}: onFollowerCatchupTimeout: {}", raftContext.getId(), serverId);
-
-            AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
-
-            // cleanup
-            raftContext.removePeer(serverId);
-            leader.removeFollower(serverId);
+            handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
 
-            LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(), serverId);
-
-            operationComplete(raftActor, getAddServerContext(), ServerChangeStatus.TIMEOUT);
+            LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(),
+                    followerTimeout.getNewServerId());
         }
 
         @Override
@@ -347,7 +374,7 @@ class RaftActorServerConfigurationSupport {
 
             // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior
             // add server operation that timed out.
-            if(getAddServerContext().getOperation().getNewServerId().equals(followerId)) {
+            if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) {
                 AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
                 raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING);
                 leader.updateMinReplicaCount();
@@ -355,7 +382,53 @@ class RaftActorServerConfigurationSupport {
                 persistNewServerConfiguration(raftActor, getAddServerContext());
 
                 installSnapshotTimer.cancel();
+            } else {
+                LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}",
+                        raftContext.getId(), followerId,
+                        !raftActor.isLeader() ? "not leader" : "server Id doesn't match");
+            }
+        }
+    }
+
+    /**
+     * The AddServer operation state for when there is a snapshot already in progress. When the current
+     * snapshot completes, it initiates an install snapshot.
+     */
+    private class WaitingForPriorSnapshotComplete extends AddServerState {
+        private final Cancellable snapshotTimer;
+
+        WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) {
+            super(addServerContext);
+            this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer);
+        }
+
+        @Override
+        public void onSnapshotComplete(RaftActor raftActor) {
+            LOG.debug("{}: onSnapshotComplete", raftContext.getId());
+
+            if(!raftActor.isLeader()) {
+                LOG.debug("{}: No longer the leader", raftContext.getId());
+                return;
             }
+
+            AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+            if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) {
+                LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(),
+                        getAddServerContext().getOperation().getNewServerId());
+
+                currentOperationState = new InstallingSnapshot(getAddServerContext(),
+                        newInstallSnapshotTimer(raftActor));
+
+                snapshotTimer.cancel();
+            }
+        }
+
+        @Override
+        public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) {
+            handleOnFollowerCatchupTimeout(raftActor, followerTimeout);
+
+            LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete",
+                    raftContext.getId(), followerTimeout.getNewServerId());
         }
     }
 
index 26d8c0af084a5233437896539df7d5db6f439ad8..4dbe9ee9a0f9553e279aa4af7b4abb8682c697a3 100644 (file)
@@ -15,6 +15,7 @@ import java.util.List;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.slf4j.Logger;
 
@@ -404,9 +405,7 @@ public class SnapshotManager implements SnapshotState {
 
             context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
 
-            lastSequenceNumber = -1;
-            applySnapshot = null;
-            SnapshotManager.this.currentState = IDLE;
+            snapshotComplete();
         }
 
         @Override
@@ -424,9 +423,15 @@ public class SnapshotManager implements SnapshotState {
                 applySnapshot.getCallback().onFailure();
             }
 
+            snapshotComplete();
+        }
+
+        private void snapshotComplete() {
             lastSequenceNumber = -1;
             applySnapshot = null;
             SnapshotManager.this.currentState = IDLE;
+
+            context.getActor().tell(SnapshotComplete.INSTANCE, context.getActor());
         }
 
         @Override
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SnapshotComplete.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SnapshotComplete.java
new file mode 100644 (file)
index 0000000..d0329ed
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+/**
+ * Internal message sent when a snapshot capture is complete.
+ *
+ * @author Thomas Pantelis
+ */
+public class SnapshotComplete {
+    public static final SnapshotComplete INSTANCE = new SnapshotComplete();
+
+    private SnapshotComplete() {
+    }
+}
index b3644a7343c64606fbf5502dc49152113bce2e23..497d98cdce13cded64be94696591ffdb171f22d3 100644 (file)
@@ -399,12 +399,20 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
 
         if (followerToSnapshot == null) {
-            LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+            LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply",
                     logName(), followerId);
             return;
         }
 
         FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+        if(followerLogInformation == null) {
+            // This can happen during AddServer if it times out.
+            LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
+                    logName(), followerId);
+            mapFollowerToSnapshot.remove(followerId);
+            return;
+        }
+
         followerLogInformation.markFollowerActive();
 
         if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
@@ -623,17 +631,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * then send the existing snapshot in chunks to the follower.
      * @param followerId
      */
-    public void initiateCaptureSnapshot(String followerId) {
+    public boolean initiateCaptureSnapshot(String followerId) {
         if (snapshot.isPresent()) {
             // if a snapshot is present in the memory, most likely another install is in progress
             // no need to capture snapshot.
             // This could happen if another follower needs an install when one is going on.
             final ActorSelection followerActor = context.getPeerActorSelection(followerId);
             sendSnapshotChunk(followerActor, followerId);
-
-
+            return true;
         } else {
-            context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+            return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
                     this.getReplicatedToAllIndex(), followerId);
         }
     }
index da7b4eddf4951db8890964c12531b4f1e8d22f1e..883680b06d2f66d4a1e0d9a75bceb37434beebe1 100644 (file)
@@ -33,14 +33,17 @@ import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.Serve
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -314,12 +317,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class);
 
+        // Send a second AddServer - should get queued
         JavaTestKit testKit2 = new JavaTestKit(getSystem());
         leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef());
 
+        // Continue the first AddServer
         newFollowerRaftActorInstance.setDropMessageOfType(null);
         newFollowerRaftActor.tell(installSnapshot, leaderActor);
 
+        // Verify both complete successfully
         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
         assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
 
@@ -343,9 +349,54 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     }
 
     @Test
-    public void testAddServerWithInstallSnapshotTimeout() throws Exception {
-        newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
+    public void testAddServerWithPriorSnapshotInProgress() throws Exception {
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+        initialActorContext.setCommitIndex(-1);
+        initialActorContext.setLastApplied(-1);
+        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+        TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID + "Collector"));
+        leaderRaftActor.setCollectorActor(leaderCollectorActor);
+
+        // Drop commit message for now to delay snapshot completion
+        leaderRaftActor.setDropMessageOfType(String.class);
+
+        leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
+
+        String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
+        leaderRaftActor.setDropMessageOfType(null);
+        leaderActor.tell(commitMsg, leaderActor);
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+        assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+
+        expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class);
+
+        // Verify ServerConfigurationPayload entry in leader's log
+
+        assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
+        assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
+                votingServer(NEW_SERVER_ID));
+    }
+
+    @Test
+    public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
         RaftActorContext initialActorContext = new MockRaftActorContext();
         initialActorContext.setCommitIndex(-1);
         initialActorContext.setLastApplied(-1);
@@ -358,13 +409,149 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
         RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
 
+        TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID + "Collector"));
+        leaderRaftActor.setCollectorActor(leaderCollectorActor);
+
+        // Drop commit message so the snapshot doesn't complete.
+        leaderRaftActor.setDropMessageOfType(String.class);
+
+        leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
+
         leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
 
         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
 
+        assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
+    }
+
+    @Test
+    public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+        initialActorContext.setCommitIndex(-1);
+        initialActorContext.setLastApplied(-1);
+        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
+
+        TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID + "Collector"));
+        leaderRaftActor.setCollectorActor(leaderCollectorActor);
+
+        // Drop the commit message so the snapshot doesn't complete yet.
+        leaderRaftActor.setDropMessageOfType(String.class);
+
+        leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor);
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+        String commitMsg = expectFirstMatching(leaderCollectorActor, String.class);
+
+        // Change the leader behavior to follower
+        leaderActor.tell(new Follower(leaderActorContext), leaderActor);
+
+        // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior
+        // snapshot completes. This will prevent the invalid snapshot from completing and fail the
+        // isCapturing assertion below.
+        leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class);
+
+        // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader
+        leaderActor.tell(commitMsg, leaderActor);
+
+        leaderActor.tell(new FollowerCatchUpTimeout(NEW_SERVER_ID), leaderActor);
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
+
+        assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
+        assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
+    }
+
+    @Test
+    public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+        initialActorContext.setCommitIndex(-1);
+        initialActorContext.setLastApplied(-1);
+        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8);
+
+        TestActorRef<MessageCollectorActor> leaderCollectorActor = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID + "Collector"));
+        leaderRaftActor.setCollectorActor(leaderCollectorActor);
+
+        // Drop the UnInitializedFollowerSnapshotReply to delay it.
+        leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class);
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+        UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor,
+                UnInitializedFollowerSnapshotReply.class);
+
+        // Prevent election timeout when the leader switches to follower
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100);
+
+        // Change the leader behavior to follower
+        leaderActor.tell(new Follower(leaderActorContext), leaderActor);
+
+        // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op
+        leaderRaftActor.setDropMessageOfType(null);
+        leaderActor.tell(snapshotReply, leaderActor);
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
+
+        assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
+    }
+
+    @Test
+    public void testAddServerWithInstallSnapshotTimeout() throws Exception {
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+        initialActorContext.setCommitIndex(-1);
+        initialActorContext.setLastApplied(-1);
+        initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
+
+        // Drop the InstallSnapshot message so it times out
+        newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS);
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
+
+        leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor);
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
+
         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
         assertEquals("Leader followers size", 0,
                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
@@ -439,10 +626,41 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         return followerActorContext;
     }
 
-    public static class MockLeaderRaftActor extends MockRaftActor {
+    static abstract class AbstractMockRaftActor extends MockRaftActor {
+        private volatile TestActorRef<MessageCollectorActor> collectorActor;
+        private volatile Class<?> dropMessageOfType;
+
+        AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+                DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
+            super(id, peerAddresses, config, dataPersistenceProvider);
+            this.collectorActor = collectorActor;
+        }
+
+        void setDropMessageOfType(Class<?> dropMessageOfType) {
+            this.dropMessageOfType = dropMessageOfType;
+        }
+
+        void setCollectorActor(TestActorRef<MessageCollectorActor> collectorActor) {
+            this.collectorActor = collectorActor;
+        }
+
+        @Override
+        public void handleCommand(Object message) {
+            if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
+                super.handleCommand(message);
+            }
+
+            if(collectorActor != null) {
+                collectorActor.tell(message, getSender());
+            }
+        }
+    }
+
+    public static class MockLeaderRaftActor extends AbstractMockRaftActor {
         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
                 RaftActorContext fromContext) {
-            super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE);
+            super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
+            setPersistence(false);
 
             RaftActorContext context = getRaftActorContext();
             for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
@@ -480,26 +698,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         }
     }
 
-    public static class MockNewFollowerRaftActor extends MockRaftActor {
-        private final TestActorRef<MessageCollectorActor> collectorActor;
-        private volatile Class<?> dropMessageOfType;
-
+    public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
-            super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null);
-            this.collectorActor = collectorActor;
-        }
-
-        void setDropMessageOfType(Class<?> dropMessageOfType) {
-            this.dropMessageOfType = dropMessageOfType;
-        }
-
-        @Override
-        public void handleCommand(Object message) {
-            if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
-                super.handleCommand(message);
-            }
-
-            collectorActor.tell(message, getSender());
+            super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
         }
 
         static Props props(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
index ef1a92641613c1df24b6316c2ff7bb30f68fdc65..27596dff74f6a0b83ebab2775cf507d7515900fb 100644 (file)
@@ -36,6 +36,7 @@ import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.slf4j.LoggerFactory;
@@ -427,6 +428,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100
                                                                      // config snapShotBatchCount = 10
                                                                      // therefore maxSequenceNumber = 90
+
+        MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
     }
 
     @Test
@@ -489,6 +492,8 @@ public class SnapshotManagerTest extends AbstractActorTest {
         snapshotManager.rollback();
 
         verify(mockReplicatedLog).snapshotRollback();
+
+        MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class);
     }