2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
16 import akka.actor.ActorRef;
17 import akka.actor.Status;
18 import akka.pattern.Patterns;
19 import akka.testkit.TestActorRef;
20 import akka.testkit.javadsl.TestKit;
21 import com.google.common.collect.ImmutableMap;
22 import java.util.Arrays;
23 import java.util.Collections;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.concurrent.TimeUnit;
27 import org.junit.Test;
28 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
30 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
31 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
34 import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
35 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
36 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
37 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
38 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
39 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
40 import scala.concurrent.Await;
41 import scala.concurrent.Future;
42 import scala.concurrent.duration.FiniteDuration;
45 * Tests leadership transfer end-to-end.
47 * @author Thomas Pantelis
49 public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
51 private final String follower3Id = factory.generateActorId("follower");
52 private ActorRef leaderNotifierActor;
53 private ActorRef follower1NotifierActor;
54 private ActorRef follower2NotifierActor;
55 private ActorRef follower3NotifierActor;
56 private TestActorRef<TestRaftActor> follower3Actor;
57 private ActorRef follower3CollectorActor;
58 private ActorRef requestLeadershipResultCollectorActor;
61 public void testLeaderTransferOnShutDown() throws Exception {
62 testLog.info("testLeaderTransferOnShutDown starting");
66 sendPayloadWithFollower2Lagging();
68 sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1();
70 sendShutDown(follower2Actor);
72 testLog.info("testLeaderTransferOnShutDown ending");
75 private void sendShutDown(final ActorRef actor) throws Exception {
76 testLog.info("sendShutDown for {} starting", actor.path());
78 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
79 Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, Shutdown.INSTANCE);
81 Boolean stopped = Await.result(stopFuture, duration);
82 assertEquals("Stopped", Boolean.TRUE, stopped);
84 testLog.info("sendShutDown for {} ending", actor.path());
87 private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Exception {
88 testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
90 clearMessages(leaderNotifierActor);
91 clearMessages(follower1NotifierActor);
92 clearMessages(follower2NotifierActor);
93 clearMessages(follower3NotifierActor);
95 // Simulate a delay for follower2 in receiving the LeaderTransitioning message with null leader id.
96 final TestRaftActor follower2Instance = follower2Actor.underlyingActor();
97 follower2Instance.startDropMessages(LeaderTransitioning.class);
99 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
100 final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
102 verifyRaftState(follower1Actor, RaftState.Leader);
104 Boolean stopped = Await.result(stopFuture, duration);
105 assertEquals("Stopped", Boolean.TRUE, stopped);
107 // Re-enable LeaderTransitioning messages to follower2.
108 final LeaderTransitioning leaderTransitioning = expectFirstMatching(follower2CollectorActor,
109 LeaderTransitioning.class);
110 follower2Instance.stopDropMessages(LeaderTransitioning.class);
112 follower2Instance.stopDropMessages(AppendEntries.class);
113 ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
114 assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
116 // Now send the LeaderTransitioning to follower2 after it has received AppendEntries from the new leader.
117 follower2Actor.tell(leaderTransitioning, ActorRef.noSender());
119 verifyLeaderStateChangedMessages(leaderNotifierActor, null, follower1Id);
120 verifyLeaderStateChangedMessages(follower1NotifierActor, null, follower1Id);
121 // follower2 should only get 1 LeaderStateChanged with the new leaderId - the LeaderTransitioning message
122 // should not generate a LeaderStateChanged with null leaderId since it arrived after the new leaderId was set.
123 verifyLeaderStateChangedMessages(follower2NotifierActor, follower1Id);
124 verifyLeaderStateChangedMessages(follower3NotifierActor, null, follower1Id);
126 testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
129 private void sendPayloadWithFollower2Lagging() {
130 testLog.info("sendPayloadWithFollower2Lagging starting");
132 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
134 sendPayloadData(leaderActor, "zero");
136 expectFirstMatching(leaderCollectorActor, ApplyState.class);
137 expectFirstMatching(follower1CollectorActor, ApplyState.class);
138 expectFirstMatching(follower3CollectorActor, ApplyState.class);
140 testLog.info("sendPayloadWithFollower2Lagging ending");
143 private void createRaftActors() {
144 testLog.info("createRaftActors starting");
146 final Snapshot snapshot = Snapshot.create(EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
147 1, null, new org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload(
148 Arrays.asList(new ServerInfo(leaderId, true), new ServerInfo(follower1Id, true),
149 new ServerInfo(follower2Id, true), new ServerInfo(follower3Id, false))));
151 InMemorySnapshotStore.addSnapshot(leaderId, snapshot);
152 InMemorySnapshotStore.addSnapshot(follower1Id, snapshot);
153 InMemorySnapshotStore.addSnapshot(follower2Id, snapshot);
154 InMemorySnapshotStore.addSnapshot(follower3Id, snapshot);
156 follower1NotifierActor = factory.createActor(MessageCollectorActor.props(),
157 factory.generateActorId(follower1Id + "-notifier"));
158 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
159 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id),
160 follower3Id, testActorPath(follower3Id)))
161 .config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
163 follower2NotifierActor = factory.createActor(MessageCollectorActor.props(),
164 factory.generateActorId(follower2Id + "-notifier"));
165 follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
166 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
167 follower3Id, testActorPath(follower3Id)))
168 .config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
170 follower3NotifierActor = factory.createActor(MessageCollectorActor.props(),
171 factory.generateActorId(follower3Id + "-notifier"));
172 follower3Actor = newTestRaftActor(follower3Id,TestRaftActor.newBuilder().peerAddresses(
173 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
174 follower2Id, follower2Actor.path().toString()))
175 .config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor));
177 peerAddresses = ImmutableMap.<String, String>builder()
178 .put(follower1Id, follower1Actor.path().toString())
179 .put(follower2Id, follower2Actor.path().toString())
180 .put(follower3Id, follower3Actor.path().toString()).build();
182 leaderConfigParams = newLeaderConfigParams();
183 leaderConfigParams.setElectionTimeoutFactor(3);
184 leaderNotifierActor = factory.createActor(MessageCollectorActor.props(),
185 factory.generateActorId(leaderId + "-notifier"));
186 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
187 .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
189 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
190 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
191 follower3CollectorActor = follower3Actor.underlyingActor().collectorActor();
192 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
194 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
196 waitUntilLeader(leaderActor);
198 testLog.info("createRaftActors starting");
201 private static void verifyRaftState(final ActorRef raftActor, final RaftState expState) {
202 verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState()));
205 private static void verifyLeaderStateChangedMessages(final ActorRef notifierActor,
206 final String... expLeaderIds) {
207 List<LeaderStateChanged> leaderStateChanges = expectMatching(notifierActor, LeaderStateChanged.class,
208 expLeaderIds.length);
210 Collections.reverse(leaderStateChanges);
211 Iterator<LeaderStateChanged> actual = leaderStateChanges.iterator();
212 for (int i = expLeaderIds.length - 1; i >= 0; i--) {
213 assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId());
218 public void testLeaderTransferAborted() throws Exception {
219 testLog.info("testLeaderTransferAborted starting");
223 leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
225 sendShutDown(leaderActor);
227 verifyRaftState(follower1Actor, RaftState.Follower);
228 verifyRaftState(follower2Actor, RaftState.Follower);
229 verifyRaftState(follower3Actor, RaftState.Follower);
231 testLog.info("testLeaderTransferOnShutDown ending");
235 public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Exception {
236 testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
238 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
239 waitUntilLeader(leaderActor);
241 sendShutDown(leaderActor);
243 testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
246 private void sendFollower2RequestLeadershipTransferToLeader() {
247 testLog.info("sendFollower2RequestLeadershipTransferToLeader starting");
250 new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender());
252 testLog.info("sendFollower2RequestLeadershipTransferToLeader ending");
255 private void createRequestLeadershipResultCollectorActor() {
256 testLog.info("createRequestLeadershipResultCollectorActor starting");
258 requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props());
260 testLog.info("createRequestLeadershipResultCollectorActor ending");
264 public void testSuccessfulRequestLeadershipTransferToFollower2() {
265 testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting");
268 createRequestLeadershipResultCollectorActor();
270 sendFollower2RequestLeadershipTransferToLeader();
272 verifyRaftState(follower2Actor, RaftState.Leader);
274 expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1);
276 testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending");
280 public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() {
281 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting");
284 createRequestLeadershipResultCollectorActor();
286 sendPayloadWithFollower2Lagging();
288 sendFollower2RequestLeadershipTransferToLeader();
290 verifyRaftState(follower1Actor, RaftState.Follower);
291 verifyRaftState(follower2Actor, RaftState.Follower);
292 verifyRaftState(follower3Actor, RaftState.Follower);
294 Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
295 assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
297 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
302 public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
303 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
306 createRequestLeadershipResultCollectorActor();
308 sendShutDown(follower2Actor);
310 sendFollower2RequestLeadershipTransferToLeader();
312 verifyRaftState(follower1Actor, RaftState.Follower);
313 verifyRaftState(follower3Actor, RaftState.Follower);
315 Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
316 assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
318 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");
322 public void testRequestLeadershipTransferToFollower2WithOtherFollowersDown() {
323 testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown starting");
326 createRequestLeadershipResultCollectorActor();
328 factory.killActor(follower1Actor, new TestKit(getSystem()));
329 factory.killActor(follower3Actor, new TestKit(getSystem()));
331 sendFollower2RequestLeadershipTransferToLeader();
333 expectFirstMatching(requestLeadershipResultCollectorActor, Status.Success.class);
335 verifyRaftState(follower2Actor, RaftState.Leader);
336 verifyRaftState(leaderActor, RaftState.Follower);
338 testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown ending");