2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.pattern.Patterns;
18 import akka.testkit.TestActorRef;
19 import com.google.common.collect.ImmutableMap;
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
26 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
27 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
28 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
29 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
30 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
31 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
32 import scala.concurrent.Await;
33 import scala.concurrent.Future;
34 import scala.concurrent.duration.FiniteDuration;
37 * Tests leadership transfer end-to-end.
39 * @author Thomas Pantelis
41 public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
43 private final String follower3Id = factory.generateActorId("follower");
44 private TestActorRef<MessageCollectorActor> leaderNotifierActor;
45 private TestActorRef<MessageCollectorActor> follower1NotifierActor;
46 private TestActorRef<MessageCollectorActor> follower2NotifierActor;
47 private TestActorRef<MessageCollectorActor> follower3NotifierActor;
48 private TestActorRef<TestRaftActor> follower3Actor;
49 private ActorRef follower3CollectorActor;
52 public void testLeaderTransferOnShutDown() throws Exception {
53 testLog.info("testLeaderTransferOnShutDown starting");
57 sendPayloadWithFollower2Lagging();
59 sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1();
61 sendShutDown(follower2Actor);
63 testLog.info("testLeaderTransferOnShutDown ending");
66 private void sendShutDown(ActorRef actor) throws Exception {
67 testLog.info("sendShutDown for {} starting", actor.path());
69 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
70 Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, Shutdown.INSTANCE);
72 Boolean stopped = Await.result(stopFuture, duration);
73 assertEquals("Stopped", Boolean.TRUE, stopped);
75 testLog.info("sendShutDown for {} ending", actor.path());
78 private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Exception {
79 testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
81 clearMessages(leaderNotifierActor);
82 clearMessages(follower1NotifierActor);
83 clearMessages(follower2NotifierActor);
84 clearMessages(follower3NotifierActor);
86 // Simulate a delay for follower2 in receiving the LeaderTransitioning message with null leader id.
87 final TestRaftActor follower2Instance = follower2Actor.underlyingActor();
88 follower2Instance.startDropMessages(LeaderTransitioning.class);
90 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
91 final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
93 verifyRaftState(follower1Actor, RaftState.Leader);
95 Boolean stopped = Await.result(stopFuture, duration);
96 assertEquals("Stopped", Boolean.TRUE, stopped);
98 // Re-enable LeaderTransitioning messages to follower2.
99 final LeaderTransitioning leaderTransitioning = expectFirstMatching(follower2CollectorActor,
100 LeaderTransitioning.class);
101 follower2Instance.stopDropMessages(LeaderTransitioning.class);
103 follower2Instance.stopDropMessages(AppendEntries.class);
104 ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
105 assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
107 // Now send the LeaderTransitioning to follower2 after it has received AppendEntries from the new leader.
108 follower2Actor.tell(leaderTransitioning, ActorRef.noSender());
110 verifyLeaderStateChangedMessages(leaderNotifierActor, null, follower1Id);
111 verifyLeaderStateChangedMessages(follower1NotifierActor, null, follower1Id);
112 // follower2 should only get 1 LeaderStateChanged with the new leaderId - the LeaderTransitioning message
113 // should not generate a LeaderStateChanged with null leaderId since it arrived after the new leaderId was set.
114 verifyLeaderStateChangedMessages(follower2NotifierActor, follower1Id);
115 verifyLeaderStateChangedMessages(follower3NotifierActor, null, follower1Id);
117 testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
120 private void sendPayloadWithFollower2Lagging() {
121 testLog.info("sendPayloadWithFollower2Lagging starting");
123 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
125 sendPayloadData(leaderActor, "zero");
127 expectFirstMatching(leaderCollectorActor, ApplyState.class);
128 expectFirstMatching(follower1CollectorActor, ApplyState.class);
129 expectFirstMatching(follower3CollectorActor, ApplyState.class);
131 testLog.info("sendPayloadWithFollower2Lagging ending");
134 private void createRaftActors() {
135 testLog.info("createRaftActors starting");
137 follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
138 factory.generateActorId(follower1Id + "-notifier"));
139 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
140 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id),
141 follower3Id, testActorPath(follower3Id)))
142 .config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
144 follower2NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
145 factory.generateActorId(follower2Id + "-notifier"));
146 follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
147 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
148 follower3Id, testActorPath(follower3Id)))
149 .config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
151 follower3NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
152 factory.generateActorId(follower3Id + "-notifier"));
153 follower3Actor = newTestRaftActor(follower3Id,TestRaftActor.newBuilder().peerAddresses(
154 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
155 follower2Id, follower2Actor.path().toString()))
156 .config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor));
158 peerAddresses = ImmutableMap.<String, String>builder()
159 .put(follower1Id, follower1Actor.path().toString())
160 .put(follower2Id, follower2Actor.path().toString())
161 .put(follower3Id, follower3Actor.path().toString()).build();
163 leaderConfigParams = newLeaderConfigParams();
164 leaderConfigParams.setElectionTimeoutFactor(3);
165 leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
166 factory.generateActorId(leaderId + "-notifier"));
167 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
168 .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
170 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
171 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
172 follower3CollectorActor = follower3Actor.underlyingActor().collectorActor();
173 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
175 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
176 leaderContext.getPeerInfo(follower3Id).setVotingState(VotingState.NON_VOTING);
178 waitUntilLeader(leaderActor);
180 testLog.info("createRaftActors starting");
183 private static void verifyRaftState(ActorRef raftActor, final RaftState expState) {
184 verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState()));
187 private void verifyLeaderStateChangedMessages(TestActorRef<MessageCollectorActor> notifierActor,
188 String... expLeaderIds) {
189 List<LeaderStateChanged> leaderStateChanges = expectMatching(notifierActor, LeaderStateChanged.class,
190 expLeaderIds.length);
192 Collections.reverse(leaderStateChanges);
193 Iterator<LeaderStateChanged> actual = leaderStateChanges.iterator();
194 for (int i = expLeaderIds.length - 1; i >= 0; i--) {
195 assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId());
200 public void testLeaderTransferAborted() throws Exception {
201 testLog.info("testLeaderTransferAborted starting");
205 leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
207 sendShutDown(leaderActor);
209 verifyRaftState(follower1Actor, RaftState.Follower);
210 verifyRaftState(follower2Actor, RaftState.Follower);
211 verifyRaftState(follower3Actor, RaftState.Follower);
213 testLog.info("testLeaderTransferOnShutDown ending");
217 public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Exception {
218 testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
220 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
221 waitUntilLeader(leaderActor);
223 sendShutDown(leaderActor);
225 testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");