2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.pattern.Patterns;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import com.google.common.collect.ImmutableMap;
23 import java.util.Arrays;
24 import java.util.Collections;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.Test;
29 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
31 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
32 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
34 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
35 import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
36 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
37 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
38 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
39 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
40 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
41 import scala.concurrent.Await;
42 import scala.concurrent.Future;
43 import scala.concurrent.duration.FiniteDuration;
46 * Tests leadership transfer end-to-end.
48 * @author Thomas Pantelis
50 public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
52 private final String follower3Id = factory.generateActorId("follower");
53 private TestActorRef<MessageCollectorActor> leaderNotifierActor;
54 private TestActorRef<MessageCollectorActor> follower1NotifierActor;
55 private TestActorRef<MessageCollectorActor> follower2NotifierActor;
56 private TestActorRef<MessageCollectorActor> follower3NotifierActor;
57 private TestActorRef<TestRaftActor> follower3Actor;
58 private ActorRef follower3CollectorActor;
59 private ActorRef requestLeadershipResultCollectorActor;
62 public void testLeaderTransferOnShutDown() throws Exception {
63 testLog.info("testLeaderTransferOnShutDown starting");
67 sendPayloadWithFollower2Lagging();
69 sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1();
71 sendShutDown(follower2Actor);
73 testLog.info("testLeaderTransferOnShutDown ending");
76 private void sendShutDown(final ActorRef actor) throws Exception {
77 testLog.info("sendShutDown for {} starting", actor.path());
79 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
80 Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, Shutdown.INSTANCE);
82 Boolean stopped = Await.result(stopFuture, duration);
83 assertEquals("Stopped", Boolean.TRUE, stopped);
85 testLog.info("sendShutDown for {} ending", actor.path());
88 private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Exception {
89 testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
91 clearMessages(leaderNotifierActor);
92 clearMessages(follower1NotifierActor);
93 clearMessages(follower2NotifierActor);
94 clearMessages(follower3NotifierActor);
96 // Simulate a delay for follower2 in receiving the LeaderTransitioning message with null leader id.
97 final TestRaftActor follower2Instance = follower2Actor.underlyingActor();
98 follower2Instance.startDropMessages(LeaderTransitioning.class);
100 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
101 final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
103 verifyRaftState(follower1Actor, RaftState.Leader);
105 Boolean stopped = Await.result(stopFuture, duration);
106 assertEquals("Stopped", Boolean.TRUE, stopped);
108 // Re-enable LeaderTransitioning messages to follower2.
109 final LeaderTransitioning leaderTransitioning = expectFirstMatching(follower2CollectorActor,
110 LeaderTransitioning.class);
111 follower2Instance.stopDropMessages(LeaderTransitioning.class);
113 follower2Instance.stopDropMessages(AppendEntries.class);
114 ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
115 assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
117 // Now send the LeaderTransitioning to follower2 after it has received AppendEntries from the new leader.
118 follower2Actor.tell(leaderTransitioning, ActorRef.noSender());
120 verifyLeaderStateChangedMessages(leaderNotifierActor, null, follower1Id);
121 verifyLeaderStateChangedMessages(follower1NotifierActor, null, follower1Id);
122 // follower2 should only get 1 LeaderStateChanged with the new leaderId - the LeaderTransitioning message
123 // should not generate a LeaderStateChanged with null leaderId since it arrived after the new leaderId was set.
124 verifyLeaderStateChangedMessages(follower2NotifierActor, follower1Id);
125 verifyLeaderStateChangedMessages(follower3NotifierActor, null, follower1Id);
127 testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
130 private void sendPayloadWithFollower2Lagging() {
131 testLog.info("sendPayloadWithFollower2Lagging starting");
133 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
135 sendPayloadData(leaderActor, "zero");
137 expectFirstMatching(leaderCollectorActor, ApplyState.class);
138 expectFirstMatching(follower1CollectorActor, ApplyState.class);
139 expectFirstMatching(follower3CollectorActor, ApplyState.class);
141 testLog.info("sendPayloadWithFollower2Lagging ending");
144 private void createRaftActors() {
145 testLog.info("createRaftActors starting");
147 final Snapshot snapshot = Snapshot.create(EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
148 1, null, new org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload(
149 Arrays.asList(new ServerInfo(leaderId, true), new ServerInfo(follower1Id, true),
150 new ServerInfo(follower2Id, true), new ServerInfo(follower3Id, false))));
152 InMemorySnapshotStore.addSnapshot(leaderId, snapshot);
153 InMemorySnapshotStore.addSnapshot(follower1Id, snapshot);
154 InMemorySnapshotStore.addSnapshot(follower2Id, snapshot);
155 InMemorySnapshotStore.addSnapshot(follower3Id, snapshot);
157 follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
158 factory.generateActorId(follower1Id + "-notifier"));
159 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
160 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id),
161 follower3Id, testActorPath(follower3Id)))
162 .config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
164 follower2NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
165 factory.generateActorId(follower2Id + "-notifier"));
166 follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
167 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
168 follower3Id, testActorPath(follower3Id)))
169 .config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
171 follower3NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
172 factory.generateActorId(follower3Id + "-notifier"));
173 follower3Actor = newTestRaftActor(follower3Id,TestRaftActor.newBuilder().peerAddresses(
174 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
175 follower2Id, follower2Actor.path().toString()))
176 .config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor));
178 peerAddresses = ImmutableMap.<String, String>builder()
179 .put(follower1Id, follower1Actor.path().toString())
180 .put(follower2Id, follower2Actor.path().toString())
181 .put(follower3Id, follower3Actor.path().toString()).build();
183 leaderConfigParams = newLeaderConfigParams();
184 leaderConfigParams.setElectionTimeoutFactor(3);
185 leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
186 factory.generateActorId(leaderId + "-notifier"));
187 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
188 .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
190 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
191 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
192 follower3CollectorActor = follower3Actor.underlyingActor().collectorActor();
193 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
195 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
197 waitUntilLeader(leaderActor);
199 testLog.info("createRaftActors starting");
202 private static void verifyRaftState(final ActorRef raftActor, final RaftState expState) {
203 verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState()));
206 private static void verifyLeaderStateChangedMessages(final TestActorRef<MessageCollectorActor> notifierActor,
207 final String... expLeaderIds) {
208 List<LeaderStateChanged> leaderStateChanges = expectMatching(notifierActor, LeaderStateChanged.class,
209 expLeaderIds.length);
211 Collections.reverse(leaderStateChanges);
212 Iterator<LeaderStateChanged> actual = leaderStateChanges.iterator();
213 for (int i = expLeaderIds.length - 1; i >= 0; i--) {
214 assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId());
219 public void testLeaderTransferAborted() throws Exception {
220 testLog.info("testLeaderTransferAborted starting");
224 leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
226 sendShutDown(leaderActor);
228 verifyRaftState(follower1Actor, RaftState.Follower);
229 verifyRaftState(follower2Actor, RaftState.Follower);
230 verifyRaftState(follower3Actor, RaftState.Follower);
232 testLog.info("testLeaderTransferOnShutDown ending");
236 public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Exception {
237 testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
239 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
240 waitUntilLeader(leaderActor);
242 sendShutDown(leaderActor);
244 testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
247 private void sendFollower2RequestLeadershipTransferToLeader() {
248 testLog.info("sendFollower2RequestLeadershipTransferToLeader starting");
251 new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender());
253 testLog.info("sendFollower2RequestLeadershipTransferToLeader ending");
256 private void createRequestLeadershipResultCollectorActor() {
257 testLog.info("createRequestLeadershipResultCollectorActor starting");
259 requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props());
261 testLog.info("createRequestLeadershipResultCollectorActor ending");
265 public void testSuccessfulRequestLeadershipTransferToFollower2() {
266 testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting");
269 createRequestLeadershipResultCollectorActor();
271 sendFollower2RequestLeadershipTransferToLeader();
273 verifyRaftState(follower2Actor, RaftState.Leader);
275 expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1);
277 testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending");
281 public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() {
282 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting");
285 createRequestLeadershipResultCollectorActor();
287 sendPayloadWithFollower2Lagging();
289 sendFollower2RequestLeadershipTransferToLeader();
291 verifyRaftState(follower1Actor, RaftState.Follower);
292 verifyRaftState(follower2Actor, RaftState.Follower);
293 verifyRaftState(follower3Actor, RaftState.Follower);
295 Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
296 assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
298 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
303 public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
304 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
307 createRequestLeadershipResultCollectorActor();
309 sendShutDown(follower2Actor);
311 sendFollower2RequestLeadershipTransferToLeader();
313 verifyRaftState(follower1Actor, RaftState.Follower);
314 verifyRaftState(follower3Actor, RaftState.Follower);
316 Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
317 assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
319 testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");
323 public void testRequestLeadershipTransferToFollower2WithOtherFollowersDown() throws Exception {
324 testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown starting");
327 createRequestLeadershipResultCollectorActor();
329 factory.killActor(follower1Actor, new JavaTestKit(getSystem()));
330 factory.killActor(follower3Actor, new JavaTestKit(getSystem()));
332 sendFollower2RequestLeadershipTransferToLeader();
334 expectFirstMatching(requestLeadershipResultCollectorActor, Status.Success.class);
336 verifyRaftState(follower2Actor, RaftState.Leader);
337 verifyRaftState(leaderActor, RaftState.Follower);
339 testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown ending");