2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.pattern.Patterns;
20 import akka.testkit.TestActorRef;
21 import com.google.common.collect.ImmutableMap;
22 import java.util.Collections;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
28 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
29 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
30 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
33 import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
34 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
35 import scala.concurrent.Await;
36 import scala.concurrent.Future;
37 import scala.concurrent.duration.FiniteDuration;
40 * Tests leadership transfer end-to-end.
42 * @author Thomas Pantelis
44 public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
46 private final String follower3Id = factory.generateActorId("follower");
47 private TestActorRef<MessageCollectorActor> leaderNotifierActor;
48 private TestActorRef<MessageCollectorActor> follower1NotifierActor;
49 private TestActorRef<MessageCollectorActor> follower2NotifierActor;
50 private TestActorRef<MessageCollectorActor> follower3NotifierActor;
51 private TestActorRef<TestRaftActor> follower3Actor;
52 private ActorRef follower3CollectorActor;
53 private ActorRef requestLeadershipResultCollectorActor;
56 public void testLeaderTransferOnShutDown() throws Exception {
57 testLog.info("testLeaderTransferOnShutDown starting");
61 sendPayloadWithFollower2Lagging();
63 sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1();
65 sendShutDown(follower2Actor);
67 testLog.info("testLeaderTransferOnShutDown ending");
70 private void sendShutDown(ActorRef actor) throws Exception {
71 testLog.info("sendShutDown for {} starting", actor.path());
73 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
74 Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, Shutdown.INSTANCE);
76 Boolean stopped = Await.result(stopFuture, duration);
77 assertEquals("Stopped", Boolean.TRUE, stopped);
79 testLog.info("sendShutDown for {} ending", actor.path());
82 private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Exception {
83 testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
85 clearMessages(leaderNotifierActor);
86 clearMessages(follower1NotifierActor);
87 clearMessages(follower2NotifierActor);
88 clearMessages(follower3NotifierActor);
90 // Simulate a delay for follower2 in receiving the LeaderTransitioning message with null leader id.
91 final TestRaftActor follower2Instance = follower2Actor.underlyingActor();
92 follower2Instance.startDropMessages(LeaderTransitioning.class);
94 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
95 final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
97 verifyRaftState(follower1Actor, RaftState.Leader);
99 Boolean stopped = Await.result(stopFuture, duration);
100 assertEquals("Stopped", Boolean.TRUE, stopped);
102 // Re-enable LeaderTransitioning messages to follower2.
103 final LeaderTransitioning leaderTransitioning = expectFirstMatching(follower2CollectorActor,
104 LeaderTransitioning.class);
105 follower2Instance.stopDropMessages(LeaderTransitioning.class);
107 follower2Instance.stopDropMessages(AppendEntries.class);
108 ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
109 assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
111 // Now send the LeaderTransitioning to follower2 after it has received AppendEntries from the new leader.
112 follower2Actor.tell(leaderTransitioning, ActorRef.noSender());
114 verifyLeaderStateChangedMessages(leaderNotifierActor, null, follower1Id);
115 verifyLeaderStateChangedMessages(follower1NotifierActor, null, follower1Id);
116 // follower2 should only get 1 LeaderStateChanged with the new leaderId - the LeaderTransitioning message
117 // should not generate a LeaderStateChanged with null leaderId since it arrived after the new leaderId was set.
118 verifyLeaderStateChangedMessages(follower2NotifierActor, follower1Id);
119 verifyLeaderStateChangedMessages(follower3NotifierActor, null, follower1Id);
121 testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
124 private void sendPayloadWithFollower2Lagging() {
125 testLog.info("sendPayloadWithFollower2Lagging starting");
127 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
129 sendPayloadData(leaderActor, "zero");
131 expectFirstMatching(leaderCollectorActor, ApplyState.class);
132 expectFirstMatching(follower1CollectorActor, ApplyState.class);
133 expectFirstMatching(follower3CollectorActor, ApplyState.class);
135 testLog.info("sendPayloadWithFollower2Lagging ending");
138 private void createRaftActors() {
139 testLog.info("createRaftActors starting");
141 follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
142 factory.generateActorId(follower1Id + "-notifier"));
143 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
144 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id),
145 follower3Id, testActorPath(follower3Id)))
146 .config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
148 follower2NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
149 factory.generateActorId(follower2Id + "-notifier"));
150 follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
151 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
152 follower3Id, testActorPath(follower3Id)))
153 .config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
155 follower3NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
156 factory.generateActorId(follower3Id + "-notifier"));
157 follower3Actor = newTestRaftActor(follower3Id,TestRaftActor.newBuilder().peerAddresses(
158 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
159 follower2Id, follower2Actor.path().toString()))
160 .config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor));
162 peerAddresses = ImmutableMap.<String, String>builder()
163 .put(follower1Id, follower1Actor.path().toString())
164 .put(follower2Id, follower2Actor.path().toString())
165 .put(follower3Id, follower3Actor.path().toString()).build();
167 leaderConfigParams = newLeaderConfigParams();
168 leaderConfigParams.setElectionTimeoutFactor(3);
169 leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
170 factory.generateActorId(leaderId + "-notifier"));
171 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
172 .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
174 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
175 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
176 follower3CollectorActor = follower3Actor.underlyingActor().collectorActor();
177 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
179 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
180 leaderContext.getPeerInfo(follower3Id).setVotingState(VotingState.NON_VOTING);
182 waitUntilLeader(leaderActor);
184 testLog.info("createRaftActors starting");
187 private static void verifyRaftState(ActorRef raftActor, final RaftState expState) {
188 verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState()));
191 private void verifyLeaderStateChangedMessages(TestActorRef<MessageCollectorActor> notifierActor,
192 String... expLeaderIds) {
193 List<LeaderStateChanged> leaderStateChanges = expectMatching(notifierActor, LeaderStateChanged.class,
194 expLeaderIds.length);
196 Collections.reverse(leaderStateChanges);
197 Iterator<LeaderStateChanged> actual = leaderStateChanges.iterator();
198 for (int i = expLeaderIds.length - 1; i >= 0; i--) {
199 assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId());
204 public void testLeaderTransferAborted() throws Exception {
205 testLog.info("testLeaderTransferAborted starting");
209 leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
211 sendShutDown(leaderActor);
213 verifyRaftState(follower1Actor, RaftState.Follower);
214 verifyRaftState(follower2Actor, RaftState.Follower);
215 verifyRaftState(follower3Actor, RaftState.Follower);
217 testLog.info("testLeaderTransferOnShutDown ending");
221 public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Exception {
222 testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
224 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
225 waitUntilLeader(leaderActor);
227 sendShutDown(leaderActor);
229 testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
232 private void sendFollower2RequestLeadershipTransferToLeader() {
233 testLog.info("sendFollower2RequestLeadershipTransferToLeader starting");
236 new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender());
238 testLog.info("sendFollower2RequestLeadershipTransferToLeader ending");
241 private void createRequestLeadershipResultCollectorActor() {
242 testLog.info("createRequestLeadershipResultCollectorActor starting");
244 requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props());
246 testLog.info("createRequestLeadershipResultCollectorActor ending");
250 public void testSuccessfulRequestLeadershipTransferToFollower2() {
251 testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting");
254 createRequestLeadershipResultCollectorActor();
256 sendFollower2RequestLeadershipTransferToLeader();
258 verifyRaftState(follower2Actor, RaftState.Leader);
260 expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1);
262 testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending");
266 public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() {
267 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting");
270 createRequestLeadershipResultCollectorActor();
272 sendPayloadWithFollower2Lagging();
274 sendFollower2RequestLeadershipTransferToLeader();
276 verifyRaftState(follower1Actor, RaftState.Follower);
277 verifyRaftState(follower2Actor, RaftState.Follower);
278 verifyRaftState(follower3Actor, RaftState.Follower);
280 Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
281 assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
283 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
287 public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
288 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
291 createRequestLeadershipResultCollectorActor();
293 sendShutDown(follower2Actor);
295 sendFollower2RequestLeadershipTransferToLeader();
297 verifyRaftState(follower1Actor, RaftState.Follower);
298 verifyRaftState(follower3Actor, RaftState.Follower);
300 Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
301 assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
303 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");