From 230831c9e7a9628ea401f69ebdbf46fb43d407af Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 28 Oct 2015 16:06:23 -0400 Subject: [PATCH] Add wait state for AddServer if snapshot in progress It is possible a snapshot capture coild be in progress when we attempt to initiate snapshot capture on AddServer. I added a wait state to the FSM and a new message, SnapshotComplete, that is sent by the SnapshotManager. Added more unit test cases. Change-Id: I119a264e03686ea70f7834e551c2fb45dd39f903 Signed-off-by: Tom Pantelis --- .../RaftActorServerConfigurationSupport.java | 123 +++++++-- .../cluster/raft/SnapshotManager.java | 11 +- .../raft/base/messages/SnapshotComplete.java | 20 ++ .../raft/behaviors/AbstractLeader.java | 17 +- ...ftActorServerConfigurationSupportTest.java | 247 ++++++++++++++++-- .../cluster/raft/SnapshotManagerTest.java | 5 + 6 files changed, 367 insertions(+), 56 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SnapshotComplete.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index aba94f653e..9e40c98e6b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -20,6 +20,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; @@ -64,6 +65,9 @@ class RaftActorServerConfigurationSupport { return true; } else if(message instanceof ApplyState) { return onApplyState((ApplyState) message, raftActor); + } else if(message instanceof SnapshotComplete) { + currentOperationState.onSnapshotComplete(raftActor); + return false; } else { return false; } @@ -133,6 +137,8 @@ class RaftActorServerConfigurationSupport { void onUnInitializedFollowerSnapshotReply(RaftActor raftActor, UnInitializedFollowerSnapshotReply reply); void onApplyState(RaftActor raftActor, ApplyState applyState); + + void onSnapshotComplete(RaftActor raftActor); } /** @@ -171,6 +177,10 @@ class RaftActorServerConfigurationSupport { LOG.debug("onApplyState was called in state {}", this); } + @Override + public void onSnapshotComplete(RaftActor raftActor) { + } + protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext operationContext){ Collection peers = raftContext.getPeers(); List newConfig = new ArrayList<>(peers.size() + 1); @@ -262,6 +272,32 @@ class RaftActorServerConfigurationSupport { AddServerContext getAddServerContext() { return addServerContext; } + + Cancellable newInstallSnapshotTimer(RaftActor raftActor) { + return raftContext.getActorSystem().scheduler().scheduleOnce( + new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2), + TimeUnit.MILLISECONDS), raftContext.getActor(), + new FollowerCatchUpTimeout(addServerContext.getOperation().getNewServerId()), + raftContext.getActorSystem().dispatcher(), raftContext.getActor()); + } + + void handleOnFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) { + String serverId = followerTimeout.getNewServerId(); + + LOG.debug("{}: onFollowerCatchupTimeout for new server {}", raftContext.getId(), serverId); + + // cleanup + raftContext.removePeer(serverId); + + boolean isLeader = raftActor.isLeader(); + if(isLeader) { + AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); + leader.removeFollower(serverId); + } + + operationComplete(raftActor, getAddServerContext(), + isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER); + } } /** @@ -288,19 +324,19 @@ class RaftActorServerConfigurationSupport { leader.addFollower(addServer.getNewServerId()); if(votingState == VotingState.VOTING_NOT_INITIALIZED){ - LOG.debug("{}: Leader sending initiate capture snapshot to new follower {}", raftContext.getId(), - addServer.getNewServerId()); - - leader.initiateCaptureSnapshot(addServer.getNewServerId()); - // schedule the install snapshot timeout timer - Cancellable installSnapshotTimer = raftContext.getActorSystem().scheduler().scheduleOnce( - new FiniteDuration(((raftContext.getConfigParams().getElectionTimeOutInterval().toMillis()) * 2), - TimeUnit.MILLISECONDS), raftContext.getActor(), - new FollowerCatchUpTimeout(addServer.getNewServerId()), - raftContext.getActorSystem().dispatcher(), raftContext.getActor()); - - currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer); + Cancellable installSnapshotTimer = newInstallSnapshotTimer(raftActor); + if(leader.initiateCaptureSnapshot(addServer.getNewServerId())) { + LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(), + addServer.getNewServerId()); + + currentOperationState = new InstallingSnapshot(getAddServerContext(), installSnapshotTimer); + } else { + LOG.debug("{}: Snapshot already in progress - waiting for completion", raftContext.getId()); + + currentOperationState = new WaitingForPriorSnapshotComplete(getAddServerContext(), + installSnapshotTimer); + } } else { LOG.debug("{}: New follower is non-voting - directly persisting new server configuration", raftContext.getId()); @@ -324,19 +360,10 @@ class RaftActorServerConfigurationSupport { @Override public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) { - String serverId = followerTimeout.getNewServerId(); - - LOG.debug("{}: onFollowerCatchupTimeout: {}", raftContext.getId(), serverId); - - AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); - - // cleanup - raftContext.removePeer(serverId); - leader.removeFollower(serverId); + handleOnFollowerCatchupTimeout(raftActor, followerTimeout); - LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(), serverId); - - operationComplete(raftActor, getAddServerContext(), ServerChangeStatus.TIMEOUT); + LOG.warn("{}: Timeout occured for new server {} while installing snapshot", raftContext.getId(), + followerTimeout.getNewServerId()); } @Override @@ -347,7 +374,7 @@ class RaftActorServerConfigurationSupport { // Sanity check to guard against receiving an UnInitializedFollowerSnapshotReply from a prior // add server operation that timed out. - if(getAddServerContext().getOperation().getNewServerId().equals(followerId)) { + if(getAddServerContext().getOperation().getNewServerId().equals(followerId) && raftActor.isLeader()) { AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); raftContext.getPeerInfo(followerId).setVotingState(VotingState.VOTING); leader.updateMinReplicaCount(); @@ -355,7 +382,53 @@ class RaftActorServerConfigurationSupport { persistNewServerConfiguration(raftActor, getAddServerContext()); installSnapshotTimer.cancel(); + } else { + LOG.debug("{}: Dropping UnInitializedFollowerSnapshotReply for server {}: {}", + raftContext.getId(), followerId, + !raftActor.isLeader() ? "not leader" : "server Id doesn't match"); + } + } + } + + /** + * The AddServer operation state for when there is a snapshot already in progress. When the current + * snapshot completes, it initiates an install snapshot. + */ + private class WaitingForPriorSnapshotComplete extends AddServerState { + private final Cancellable snapshotTimer; + + WaitingForPriorSnapshotComplete(AddServerContext addServerContext, Cancellable snapshotTimer) { + super(addServerContext); + this.snapshotTimer = Preconditions.checkNotNull(snapshotTimer); + } + + @Override + public void onSnapshotComplete(RaftActor raftActor) { + LOG.debug("{}: onSnapshotComplete", raftContext.getId()); + + if(!raftActor.isLeader()) { + LOG.debug("{}: No longer the leader", raftContext.getId()); + return; } + + AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); + if(leader.initiateCaptureSnapshot(getAddServerContext().getOperation().getNewServerId())) { + LOG.debug("{}: Initiating capture snapshot for new server {}", raftContext.getId(), + getAddServerContext().getOperation().getNewServerId()); + + currentOperationState = new InstallingSnapshot(getAddServerContext(), + newInstallSnapshotTimer(raftActor)); + + snapshotTimer.cancel(); + } + } + + @Override + public void onFollowerCatchupTimeout(RaftActor raftActor, FollowerCatchUpTimeout followerTimeout) { + handleOnFollowerCatchupTimeout(raftActor, followerTimeout); + + LOG.warn("{}: Timeout occured for new server {} while waiting for prior snapshot to complete", + raftContext.getId(), followerTimeout.getNewServerId()); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 26d8c0af08..4dbe9ee9a0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -15,6 +15,7 @@ import java.util.List; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.slf4j.Logger; @@ -404,9 +405,7 @@ public class SnapshotManager implements SnapshotState { context.getPersistenceProvider().deleteMessages(lastSequenceNumber); - lastSequenceNumber = -1; - applySnapshot = null; - SnapshotManager.this.currentState = IDLE; + snapshotComplete(); } @Override @@ -424,9 +423,15 @@ public class SnapshotManager implements SnapshotState { applySnapshot.getCallback().onFailure(); } + snapshotComplete(); + } + + private void snapshotComplete() { lastSequenceNumber = -1; applySnapshot = null; SnapshotManager.this.currentState = IDLE; + + context.getActor().tell(SnapshotComplete.INSTANCE, context.getActor()); } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SnapshotComplete.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SnapshotComplete.java new file mode 100644 index 0000000000..d0329ede59 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SnapshotComplete.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.base.messages; + +/** + * Internal message sent when a snapshot capture is complete. + * + * @author Thomas Pantelis + */ +public class SnapshotComplete { + public static final SnapshotComplete INSTANCE = new SnapshotComplete(); + + private SnapshotComplete() { + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index b3644a7343..497d98cdce 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -399,12 +399,20 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { - LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader", + LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply", logName(), followerId); return; } FollowerLogInformation followerLogInformation = followerToLog.get(followerId); + if(followerLogInformation == null) { + // This can happen during AddServer if it times out. + LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", + logName(), followerId); + mapFollowerToSnapshot.remove(followerId); + return; + } + followerLogInformation.markFollowerActive(); if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { @@ -623,17 +631,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * then send the existing snapshot in chunks to the follower. * @param followerId */ - public void initiateCaptureSnapshot(String followerId) { + public boolean initiateCaptureSnapshot(String followerId) { if (snapshot.isPresent()) { // if a snapshot is present in the memory, most likely another install is in progress // no need to capture snapshot. // This could happen if another follower needs an install when one is going on. final ActorSelection followerActor = context.getPeerActorSelection(followerId); sendSnapshotChunk(followerActor, followerId); - - + return true; } else { - context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), this.getReplicatedToAllIndex(), followerId); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index da7b4eddf4..883680b06d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -33,14 +33,17 @@ import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.Serve import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.messages.AddServer; import org.opendaylight.controller.cluster.raft.messages.AddServerReply; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; +import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; @@ -314,12 +317,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { Object installSnapshot = expectFirstMatching(newFollowerCollectorActor, InstallSnapshot.class); + // Send a second AddServer - should get queued JavaTestKit testKit2 = new JavaTestKit(getSystem()); leaderActor.tell(new AddServer(NEW_SERVER_ID2, followerActor.path().toString(), false), testKit2.getRef()); + // Continue the first AddServer newFollowerRaftActorInstance.setDropMessageOfType(null); newFollowerRaftActor.tell(installSnapshot, leaderActor); + // Verify both complete successfully AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); @@ -343,9 +349,54 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { } @Test - public void testAddServerWithInstallSnapshotTimeout() throws Exception { - newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS); + public void testAddServerWithPriorSnapshotInProgress() throws Exception { + RaftActorContext initialActorContext = new MockRaftActorContext(); + initialActorContext.setCommitIndex(-1); + initialActorContext.setLastApplied(-1); + initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(), + initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + + MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); + RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); + + TestActorRef leaderCollectorActor = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID + "Collector")); + leaderRaftActor.setCollectorActor(leaderCollectorActor); + + // Drop commit message for now to delay snapshot completion + leaderRaftActor.setDropMessageOfType(String.class); + + leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor); + + String commitMsg = expectFirstMatching(leaderCollectorActor, String.class); + + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + leaderRaftActor.setDropMessageOfType(null); + leaderActor.tell(commitMsg, leaderActor); + + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); + assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint()); + + expectFirstMatching(newFollowerCollectorActor, ApplySnapshot.class); + + // Verify ServerConfigurationPayload entry in leader's log + + assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex()); + assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex()); + assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied()); + verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID), + votingServer(NEW_SERVER_ID)); + } + + @Test + public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception { RaftActorContext initialActorContext = new MockRaftActorContext(); initialActorContext.setCommitIndex(-1); initialActorContext.setLastApplied(-1); @@ -358,13 +409,149 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1); + TestActorRef leaderCollectorActor = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID + "Collector")); + leaderRaftActor.setCollectorActor(leaderCollectorActor); + + // Drop commit message so the snapshot doesn't complete. + leaderRaftActor.setDropMessageOfType(String.class); + + leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor); + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus()); + assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size()); + } + + @Test + public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception { + RaftActorContext initialActorContext = new MockRaftActorContext(); + initialActorContext.setCommitIndex(-1); + initialActorContext.setLastApplied(-1); + initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(), + initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + + MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); + RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100); + + TestActorRef leaderCollectorActor = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID + "Collector")); + leaderRaftActor.setCollectorActor(leaderCollectorActor); + + // Drop the commit message so the snapshot doesn't complete yet. + leaderRaftActor.setDropMessageOfType(String.class); + + leaderActor.tell(new InitiateCaptureSnapshot(), leaderActor); + + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + + String commitMsg = expectFirstMatching(leaderCollectorActor, String.class); + + // Change the leader behavior to follower + leaderActor.tell(new Follower(leaderActorContext), leaderActor); + + // Drop CaptureSnapshotReply in case install snapshot is incorrectly initiated after the prior + // snapshot completes. This will prevent the invalid snapshot from completing and fail the + // isCapturing assertion below. + leaderRaftActor.setDropMessageOfType(CaptureSnapshotReply.class); + + // Complete the prior snapshot - this should be a no-op b/c it's no longer the leader + leaderActor.tell(commitMsg, leaderActor); + + leaderActor.tell(new FollowerCatchUpTimeout(NEW_SERVER_ID), leaderActor); + + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus()); + + assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size()); + assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing()); + } + + @Test + public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception { + RaftActorContext initialActorContext = new MockRaftActorContext(); + initialActorContext.setCommitIndex(-1); + initialActorContext.setLastApplied(-1); + initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(), + initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + + MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); + RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); + + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(8); + + TestActorRef leaderCollectorActor = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID + "Collector")); + leaderRaftActor.setCollectorActor(leaderCollectorActor); + + // Drop the UnInitializedFollowerSnapshotReply to delay it. + leaderRaftActor.setDropMessageOfType(UnInitializedFollowerSnapshotReply.class); + + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + + UnInitializedFollowerSnapshotReply snapshotReply = expectFirstMatching(leaderCollectorActor, + UnInitializedFollowerSnapshotReply.class); + + // Prevent election timeout when the leader switches to follower + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(100); + + // Change the leader behavior to follower + leaderActor.tell(new Follower(leaderActorContext), leaderActor); + + // Send the captured UnInitializedFollowerSnapshotReply - should be a no-op + leaderRaftActor.setDropMessageOfType(null); + leaderActor.tell(snapshotReply, leaderActor); + + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus()); + + assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size()); + } + + @Test + public void testAddServerWithInstallSnapshotTimeout() throws Exception { + RaftActorContext initialActorContext = new MockRaftActorContext(); + initialActorContext.setCommitIndex(-1); + initialActorContext.setLastApplied(-1); + initialActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(), + initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + + MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); + RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1); + + // Drop the InstallSnapshot message so it times out + newFollowerRaftActor.underlyingActor().setDropMessageOfType(InstallSnapshot.SERIALIZABLE_CLASS); + + leaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef()); + + leaderActor.tell(new UnInitializedFollowerSnapshotReply("bogus"), leaderActor); + + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus()); + assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size()); assertEquals("Leader followers size", 0, ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size()); @@ -439,10 +626,41 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { return followerActorContext; } - public static class MockLeaderRaftActor extends MockRaftActor { + static abstract class AbstractMockRaftActor extends MockRaftActor { + private volatile TestActorRef collectorActor; + private volatile Class dropMessageOfType; + + AbstractMockRaftActor(String id, Map peerAddresses, Optional config, + DataPersistenceProvider dataPersistenceProvider, TestActorRef collectorActor) { + super(id, peerAddresses, config, dataPersistenceProvider); + this.collectorActor = collectorActor; + } + + void setDropMessageOfType(Class dropMessageOfType) { + this.dropMessageOfType = dropMessageOfType; + } + + void setCollectorActor(TestActorRef collectorActor) { + this.collectorActor = collectorActor; + } + + @Override + public void handleCommand(Object message) { + if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) { + super.handleCommand(message); + } + + if(collectorActor != null) { + collectorActor.tell(message, getSender()); + } + } + } + + public static class MockLeaderRaftActor extends AbstractMockRaftActor { public MockLeaderRaftActor(Map peerAddresses, ConfigParams config, RaftActorContext fromContext) { - super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE); + super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null); + setPersistence(false); RaftActorContext context = getRaftActorContext(); for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) { @@ -480,26 +698,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { } } - public static class MockNewFollowerRaftActor extends MockRaftActor { - private final TestActorRef collectorActor; - private volatile Class dropMessageOfType; - + public static class MockNewFollowerRaftActor extends AbstractMockRaftActor { public MockNewFollowerRaftActor(ConfigParams config, TestActorRef collectorActor) { - super(NEW_SERVER_ID, Maps.newHashMap(), Optional.of(config), null); - this.collectorActor = collectorActor; - } - - void setDropMessageOfType(Class dropMessageOfType) { - this.dropMessageOfType = dropMessageOfType; - } - - @Override - public void handleCommand(Object message) { - if(dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) { - super.handleCommand(message); - } - - collectorActor.tell(message, getSender()); + super(NEW_SERVER_ID, Maps.newHashMap(), Optional.of(config), null, collectorActor); } static Props props(ConfigParams config, TestActorRef collectorActor) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java index ef1a926416..27596dff74 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java @@ -36,6 +36,7 @@ import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.SnapshotManager.LastAppliedTermInformationReader; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.slf4j.LoggerFactory; @@ -427,6 +428,8 @@ public class SnapshotManagerTest extends AbstractActorTest { assertEquals(90, criteriaCaptor.getValue().maxSequenceNr()); // sequenceNumber = 100 // config snapShotBatchCount = 10 // therefore maxSequenceNumber = 90 + + MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class); } @Test @@ -489,6 +492,8 @@ public class SnapshotManagerTest extends AbstractActorTest { snapshotManager.rollback(); verify(mockReplicatedLog).snapshotRollback(); + + MessageCollectorActor.expectFirstMatching(actorRef, SnapshotComplete.class); } -- 2.36.6