2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
16 import akka.actor.ActorRef;
17 import akka.actor.Status;
18 import akka.pattern.Patterns;
19 import akka.testkit.TestActorRef;
20 import akka.testkit.javadsl.TestKit;
21 import java.util.Collections;
22 import java.util.Iterator;
23 import java.util.List;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
28 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
29 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
30 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
33 import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
34 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
35 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
36 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
37 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
38 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
39 import scala.concurrent.Await;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.FiniteDuration;
44 * Tests leadership transfer end-to-end.
46 * @author Thomas Pantelis
48 public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
50 private final String follower3Id = factory.generateActorId("follower");
51 private ActorRef leaderNotifierActor;
52 private ActorRef follower1NotifierActor;
53 private ActorRef follower2NotifierActor;
54 private ActorRef follower3NotifierActor;
55 private TestActorRef<TestRaftActor> follower3Actor;
56 private ActorRef follower3CollectorActor;
57 private ActorRef requestLeadershipResultCollectorActor;
60 public void testLeaderTransferOnShutDown() throws Exception {
61 testLog.info("testLeaderTransferOnShutDown starting");
65 sendPayloadWithFollower2Lagging();
67 sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1();
69 sendShutDown(follower2Actor);
71 testLog.info("testLeaderTransferOnShutDown ending");
74 private void sendShutDown(final ActorRef actor) throws Exception {
75 testLog.info("sendShutDown for {} starting", actor.path());
77 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
78 Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, Shutdown.INSTANCE);
80 Boolean stopped = Await.result(stopFuture, duration);
81 assertEquals("Stopped", Boolean.TRUE, stopped);
83 testLog.info("sendShutDown for {} ending", actor.path());
86 private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Exception {
87 testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
89 clearMessages(leaderNotifierActor);
90 clearMessages(follower1NotifierActor);
91 clearMessages(follower2NotifierActor);
92 clearMessages(follower3NotifierActor);
94 // Simulate a delay for follower2 in receiving the LeaderTransitioning message with null leader id.
95 final TestRaftActor follower2Instance = follower2Actor.underlyingActor();
96 follower2Instance.startDropMessages(LeaderTransitioning.class);
98 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
99 final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
101 verifyRaftState(follower1Actor, RaftState.Leader);
103 Boolean stopped = Await.result(stopFuture, duration);
104 assertEquals("Stopped", Boolean.TRUE, stopped);
106 // Re-enable LeaderTransitioning messages to follower2.
107 final LeaderTransitioning leaderTransitioning = expectFirstMatching(follower2CollectorActor,
108 LeaderTransitioning.class);
109 follower2Instance.stopDropMessages(LeaderTransitioning.class);
111 follower2Instance.stopDropMessages(AppendEntries.class);
112 ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
113 assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
115 // Now send the LeaderTransitioning to follower2 after it has received AppendEntries from the new leader.
116 follower2Actor.tell(leaderTransitioning, ActorRef.noSender());
118 verifyLeaderStateChangedMessages(leaderNotifierActor, null, follower1Id);
119 verifyLeaderStateChangedMessages(follower1NotifierActor, null, follower1Id);
120 // follower2 should only get 1 LeaderStateChanged with the new leaderId - the LeaderTransitioning message
121 // should not generate a LeaderStateChanged with null leaderId since it arrived after the new leaderId was set.
122 verifyLeaderStateChangedMessages(follower2NotifierActor, follower1Id);
123 verifyLeaderStateChangedMessages(follower3NotifierActor, null, follower1Id);
125 testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
128 private void sendPayloadWithFollower2Lagging() {
129 testLog.info("sendPayloadWithFollower2Lagging starting");
131 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
133 sendPayloadData(leaderActor, "zero");
135 expectFirstMatching(leaderCollectorActor, ApplyState.class);
136 expectFirstMatching(follower1CollectorActor, ApplyState.class);
137 expectFirstMatching(follower3CollectorActor, ApplyState.class);
139 testLog.info("sendPayloadWithFollower2Lagging ending");
142 private void createRaftActors() {
143 testLog.info("createRaftActors starting");
145 final Snapshot snapshot = Snapshot.create(EmptyState.INSTANCE, List.of(), -1, -1, -1, -1,
146 1, null, new org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload(
147 List.of(new ServerInfo(leaderId, true), new ServerInfo(follower1Id, true),
148 new ServerInfo(follower2Id, true), new ServerInfo(follower3Id, false))));
150 InMemorySnapshotStore.addSnapshot(leaderId, snapshot);
151 InMemorySnapshotStore.addSnapshot(follower1Id, snapshot);
152 InMemorySnapshotStore.addSnapshot(follower2Id, snapshot);
153 InMemorySnapshotStore.addSnapshot(follower3Id, snapshot);
155 follower1NotifierActor = factory.createActor(MessageCollectorActor.props(),
156 factory.generateActorId(follower1Id + "-notifier"));
157 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
158 Map.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id),
159 follower3Id, testActorPath(follower3Id)))
160 .config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
162 follower2NotifierActor = factory.createActor(MessageCollectorActor.props(),
163 factory.generateActorId(follower2Id + "-notifier"));
164 follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
165 Map.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
166 follower3Id, testActorPath(follower3Id)))
167 .config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
169 follower3NotifierActor = factory.createActor(MessageCollectorActor.props(),
170 factory.generateActorId(follower3Id + "-notifier"));
171 follower3Actor = newTestRaftActor(follower3Id,TestRaftActor.newBuilder().peerAddresses(
172 Map.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
173 follower2Id, follower2Actor.path().toString()))
174 .config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor));
176 peerAddresses = Map.of(
177 follower1Id, follower1Actor.path().toString(),
178 follower2Id, follower2Actor.path().toString(),
179 follower3Id, follower3Actor.path().toString());
181 leaderConfigParams = newLeaderConfigParams();
182 leaderConfigParams.setElectionTimeoutFactor(3);
183 leaderNotifierActor = factory.createActor(MessageCollectorActor.props(),
184 factory.generateActorId(leaderId + "-notifier"));
185 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
186 .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
188 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
189 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
190 follower3CollectorActor = follower3Actor.underlyingActor().collectorActor();
191 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
193 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
195 waitUntilLeader(leaderActor);
197 testLog.info("createRaftActors starting");
200 private static void verifyRaftState(final ActorRef raftActor, final RaftState expState) {
201 verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState()));
204 private static void verifyLeaderStateChangedMessages(final ActorRef notifierActor,
205 final String... expLeaderIds) {
206 List<LeaderStateChanged> leaderStateChanges = expectMatching(notifierActor, LeaderStateChanged.class,
207 expLeaderIds.length);
209 Collections.reverse(leaderStateChanges);
210 Iterator<LeaderStateChanged> actual = leaderStateChanges.iterator();
211 for (int i = expLeaderIds.length - 1; i >= 0; i--) {
212 assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId());
217 public void testLeaderTransferAborted() throws Exception {
218 testLog.info("testLeaderTransferAborted starting");
222 leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
224 sendShutDown(leaderActor);
226 verifyRaftState(follower1Actor, RaftState.Follower);
227 verifyRaftState(follower2Actor, RaftState.Follower);
228 verifyRaftState(follower3Actor, RaftState.Follower);
230 testLog.info("testLeaderTransferOnShutDown ending");
234 public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Exception {
235 testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
237 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
238 waitUntilLeader(leaderActor);
240 sendShutDown(leaderActor);
242 testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
245 private void sendFollower2RequestLeadershipTransferToLeader() {
246 testLog.info("sendFollower2RequestLeadershipTransferToLeader starting");
249 new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender());
251 testLog.info("sendFollower2RequestLeadershipTransferToLeader ending");
254 private void createRequestLeadershipResultCollectorActor() {
255 testLog.info("createRequestLeadershipResultCollectorActor starting");
257 requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props());
259 testLog.info("createRequestLeadershipResultCollectorActor ending");
263 public void testSuccessfulRequestLeadershipTransferToFollower2() {
264 testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting");
267 createRequestLeadershipResultCollectorActor();
269 sendFollower2RequestLeadershipTransferToLeader();
271 verifyRaftState(follower2Actor, RaftState.Leader);
273 expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1);
275 testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending");
279 public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() {
280 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting");
283 createRequestLeadershipResultCollectorActor();
285 sendPayloadWithFollower2Lagging();
287 sendFollower2RequestLeadershipTransferToLeader();
289 verifyRaftState(follower1Actor, RaftState.Follower);
290 verifyRaftState(follower2Actor, RaftState.Follower);
291 verifyRaftState(follower3Actor, RaftState.Follower);
293 Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
294 assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
296 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
301 public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
302 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
305 createRequestLeadershipResultCollectorActor();
307 sendShutDown(follower2Actor);
309 sendFollower2RequestLeadershipTransferToLeader();
311 verifyRaftState(follower1Actor, RaftState.Follower);
312 verifyRaftState(follower3Actor, RaftState.Follower);
314 Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
315 assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
317 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");
321 public void testRequestLeadershipTransferToFollower2WithOtherFollowersDown() {
322 testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown starting");
325 createRequestLeadershipResultCollectorActor();
327 factory.killActor(follower1Actor, new TestKit(getSystem()));
328 factory.killActor(follower3Actor, new TestKit(getSystem()));
330 sendFollower2RequestLeadershipTransferToLeader();
332 expectFirstMatching(requestLeadershipResultCollectorActor, Status.Success.class);
334 verifyRaftState(follower2Actor, RaftState.Leader);
335 verifyRaftState(leaderActor, RaftState.Follower);
337 testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown ending");