2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static akka.pattern.Patterns.ask;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNull;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.pattern.Patterns;
18 import akka.testkit.TestActorRef;
19 import akka.util.Timeout;
20 import com.google.common.base.Stopwatch;
21 import com.google.common.collect.ImmutableMap;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
26 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
27 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
28 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
29 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
30 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
32 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
33 import scala.concurrent.Await;
34 import scala.concurrent.Future;
35 import scala.concurrent.duration.FiniteDuration;
38 * Tests leadership transfer end-to-end.
40 * @author Thomas Pantelis
42 public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
44 private final String follower3Id = factory.generateActorId("follower");
45 private TestActorRef<MessageCollectorActor> leaderNotifierActor;
46 private TestActorRef<MessageCollectorActor> follower1NotifierActor;
47 private TestActorRef<MessageCollectorActor> follower2NotifierActor;
48 private TestActorRef<MessageCollectorActor> follower3NotifierActor;
49 private TestActorRef<TestRaftActor> follower3Actor;
50 private ActorRef follower3CollectorActor;
53 public void testLeaderTransferOnShutDown() throws Throwable {
54 testLog.info("testLeaderTransferOnShutDown starting");
58 sendPayloadWithFollower2Lagging();
60 sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1();
62 sendShutDown(follower2Actor);
64 testLog.info("testLeaderTransferOnShutDown ending");
67 private void sendShutDown(ActorRef actor) throws Exception {
68 testLog.info("sendShutDown for {} starting", actor.path());
70 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
71 Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, Shutdown.INSTANCE);
73 Boolean stopped = Await.result(stopFuture, duration);
74 assertEquals("Stopped", Boolean.TRUE, stopped);
76 testLog.info("sendShutDown for {} ending", actor.path());
79 private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Throwable {
80 testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
82 clearMessages(leaderNotifierActor);
83 clearMessages(follower1NotifierActor);
84 clearMessages(follower2NotifierActor);
85 clearMessages(follower3NotifierActor);
87 FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
88 Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
90 assertNullLeaderIdChange(leaderNotifierActor);
91 assertNullLeaderIdChange(follower1NotifierActor);
92 assertNullLeaderIdChange(follower2NotifierActor);
93 assertNullLeaderIdChange(follower3NotifierActor);
95 verifyRaftState(follower1Actor, RaftState.Leader);
97 Boolean stopped = Await.result(stopFuture, duration);
98 assertEquals("Stopped", Boolean.TRUE, stopped);
100 follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
101 ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
102 assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
104 testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
107 private void sendPayloadWithFollower2Lagging() {
108 testLog.info("sendPayloadWithFollower2Lagging starting");
110 follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
112 sendPayloadData(leaderActor, "zero");
114 expectFirstMatching(leaderCollectorActor, ApplyState.class);
115 expectFirstMatching(follower1CollectorActor, ApplyState.class);
116 expectFirstMatching(follower3CollectorActor, ApplyState.class);
118 testLog.info("sendPayloadWithFollower2Lagging ending");
121 private void createRaftActors() {
122 testLog.info("createRaftActors starting");
124 follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
125 factory.generateActorId(follower1Id + "-notifier"));
126 follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
127 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id),
128 follower3Id, testActorPath(follower3Id))).
129 config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
131 follower2NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
132 factory.generateActorId(follower2Id + "-notifier"));
133 follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
134 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
135 follower3Id, testActorPath(follower3Id))).
136 config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
138 follower3NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
139 factory.generateActorId(follower3Id + "-notifier"));
140 follower3Actor = newTestRaftActor(follower3Id,TestRaftActor.newBuilder().peerAddresses(
141 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
142 follower2Id, follower2Actor.path().toString())).
143 config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor));
145 peerAddresses = ImmutableMap.<String, String>builder().
146 put(follower1Id, follower1Actor.path().toString()).
147 put(follower2Id, follower2Actor.path().toString()).
148 put(follower3Id, follower3Actor.path().toString()).build();
150 leaderConfigParams = newLeaderConfigParams();
151 leaderConfigParams.setElectionTimeoutFactor(3);
152 leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
153 factory.generateActorId(leaderId + "-notifier"));
154 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
155 config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
157 follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
158 follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
159 follower3CollectorActor = follower3Actor.underlyingActor().collectorActor();
160 leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
162 leaderContext = leaderActor.underlyingActor().getRaftActorContext();
163 leaderContext.getPeerInfo(follower3Id).setVotingState(VotingState.NON_VOTING);
165 waitUntilLeader(leaderActor);
167 testLog.info("createRaftActors starting");
170 private static void verifyRaftState(ActorRef raftActor, final RaftState expState) throws Throwable {
171 Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
172 Throwable lastError = null;
173 Stopwatch sw = Stopwatch.createStarted();
174 while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
176 OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
177 GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
178 assertEquals("getRaftState", expState.toString(), raftState.getRaftState());
180 } catch (Exception | AssertionError e) {
182 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
189 private static void assertNullLeaderIdChange(TestActorRef<MessageCollectorActor> notifierActor) {
190 LeaderStateChanged change = expectFirstMatching(notifierActor, LeaderStateChanged.class);
191 assertNull("Expected null leader Id", change.getLeaderId());
195 public void testLeaderTransferAborted() throws Throwable {
196 testLog.info("testLeaderTransferAborted starting");
200 leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
202 sendShutDown(leaderActor);
204 verifyRaftState(follower1Actor, RaftState.Follower);
205 verifyRaftState(follower2Actor, RaftState.Follower);
206 verifyRaftState(follower3Actor, RaftState.Follower);
208 testLog.info("testLeaderTransferOnShutDown ending");
212 public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Throwable {
213 testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
215 leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
216 waitUntilLeader(leaderActor);
218 sendShutDown(leaderActor);
220 testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");