7ed45956c17f77f16cc6f60493de4097680d31c1
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / LeadershipTransferIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
15
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.pattern.Patterns;
20 import akka.testkit.TestActorRef;
21 import com.google.common.collect.ImmutableMap;
22 import java.util.Collections;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
28 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
29 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
30 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
33 import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
34 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
35 import scala.concurrent.Await;
36 import scala.concurrent.Future;
37 import scala.concurrent.duration.FiniteDuration;
38
39 /**
40  * Tests leadership transfer end-to-end.
41  *
42  * @author Thomas Pantelis
43  */
44 public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
45
46     private final String follower3Id = factory.generateActorId("follower");
47     private TestActorRef<MessageCollectorActor> leaderNotifierActor;
48     private TestActorRef<MessageCollectorActor> follower1NotifierActor;
49     private TestActorRef<MessageCollectorActor> follower2NotifierActor;
50     private TestActorRef<MessageCollectorActor> follower3NotifierActor;
51     private TestActorRef<TestRaftActor> follower3Actor;
52     private ActorRef follower3CollectorActor;
53     private ActorRef requestLeadershipResultCollectorActor;
54
55     @Test
56     public void testLeaderTransferOnShutDown() throws Exception {
57         testLog.info("testLeaderTransferOnShutDown starting");
58
59         createRaftActors();
60
61         sendPayloadWithFollower2Lagging();
62
63         sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1();
64
65         sendShutDown(follower2Actor);
66
67         testLog.info("testLeaderTransferOnShutDown ending");
68     }
69
70     private void sendShutDown(ActorRef actor) throws Exception {
71         testLog.info("sendShutDown for {} starting", actor.path());
72
73         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
74         Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, Shutdown.INSTANCE);
75
76         Boolean stopped = Await.result(stopFuture, duration);
77         assertEquals("Stopped", Boolean.TRUE, stopped);
78
79         testLog.info("sendShutDown for {} ending", actor.path());
80     }
81
82     private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Exception {
83         testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
84
85         clearMessages(leaderNotifierActor);
86         clearMessages(follower1NotifierActor);
87         clearMessages(follower2NotifierActor);
88         clearMessages(follower3NotifierActor);
89
90         // Simulate a delay for follower2 in receiving the LeaderTransitioning message with null leader id.
91         final TestRaftActor follower2Instance = follower2Actor.underlyingActor();
92         follower2Instance.startDropMessages(LeaderTransitioning.class);
93
94         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
95         final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
96
97         verifyRaftState(follower1Actor, RaftState.Leader);
98
99         Boolean stopped = Await.result(stopFuture, duration);
100         assertEquals("Stopped", Boolean.TRUE, stopped);
101
102         // Re-enable LeaderTransitioning messages to follower2.
103         final LeaderTransitioning leaderTransitioning = expectFirstMatching(follower2CollectorActor,
104                 LeaderTransitioning.class);
105         follower2Instance.stopDropMessages(LeaderTransitioning.class);
106
107         follower2Instance.stopDropMessages(AppendEntries.class);
108         ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
109         assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
110
111         // Now send the LeaderTransitioning to follower2 after it has received AppendEntries from the new leader.
112         follower2Actor.tell(leaderTransitioning, ActorRef.noSender());
113
114         verifyLeaderStateChangedMessages(leaderNotifierActor, null, follower1Id);
115         verifyLeaderStateChangedMessages(follower1NotifierActor, null, follower1Id);
116         // follower2 should only get 1 LeaderStateChanged with the new leaderId - the LeaderTransitioning message
117         // should not generate a LeaderStateChanged with null leaderId since it arrived after the new leaderId was set.
118         verifyLeaderStateChangedMessages(follower2NotifierActor, follower1Id);
119         verifyLeaderStateChangedMessages(follower3NotifierActor, null, follower1Id);
120
121         testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
122     }
123
124     private void sendPayloadWithFollower2Lagging() {
125         testLog.info("sendPayloadWithFollower2Lagging starting");
126
127         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
128
129         sendPayloadData(leaderActor, "zero");
130
131         expectFirstMatching(leaderCollectorActor, ApplyState.class);
132         expectFirstMatching(follower1CollectorActor, ApplyState.class);
133         expectFirstMatching(follower3CollectorActor, ApplyState.class);
134
135         testLog.info("sendPayloadWithFollower2Lagging ending");
136     }
137
138     private void createRaftActors() {
139         testLog.info("createRaftActors starting");
140
141         follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
142                 factory.generateActorId(follower1Id + "-notifier"));
143         follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
144                 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id),
145                         follower3Id, testActorPath(follower3Id)))
146                 .config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
147
148         follower2NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
149                 factory.generateActorId(follower2Id + "-notifier"));
150         follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
151                 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
152                         follower3Id, testActorPath(follower3Id)))
153                 .config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
154
155         follower3NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
156                 factory.generateActorId(follower3Id + "-notifier"));
157         follower3Actor = newTestRaftActor(follower3Id,TestRaftActor.newBuilder().peerAddresses(
158                 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
159                         follower2Id, follower2Actor.path().toString()))
160                 .config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor));
161
162         peerAddresses = ImmutableMap.<String, String>builder()
163                 .put(follower1Id, follower1Actor.path().toString())
164                 .put(follower2Id, follower2Actor.path().toString())
165                 .put(follower3Id, follower3Actor.path().toString()).build();
166
167         leaderConfigParams = newLeaderConfigParams();
168         leaderConfigParams.setElectionTimeoutFactor(3);
169         leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
170                 factory.generateActorId(leaderId + "-notifier"));
171         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
172                 .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
173
174         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
175         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
176         follower3CollectorActor = follower3Actor.underlyingActor().collectorActor();
177         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
178
179         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
180         leaderContext.getPeerInfo(follower3Id).setVotingState(VotingState.NON_VOTING);
181
182         waitUntilLeader(leaderActor);
183
184         testLog.info("createRaftActors starting");
185     }
186
187     private static void verifyRaftState(ActorRef raftActor, final RaftState expState) {
188         verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState()));
189     }
190
191     private void verifyLeaderStateChangedMessages(TestActorRef<MessageCollectorActor> notifierActor,
192             String... expLeaderIds) {
193         List<LeaderStateChanged> leaderStateChanges = expectMatching(notifierActor, LeaderStateChanged.class,
194                 expLeaderIds.length);
195
196         Collections.reverse(leaderStateChanges);
197         Iterator<LeaderStateChanged> actual = leaderStateChanges.iterator();
198         for (int i = expLeaderIds.length - 1; i >= 0; i--) {
199             assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId());
200         }
201     }
202
203     @Test
204     public void testLeaderTransferAborted() throws Exception {
205         testLog.info("testLeaderTransferAborted starting");
206
207         createRaftActors();
208
209         leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
210
211         sendShutDown(leaderActor);
212
213         verifyRaftState(follower1Actor, RaftState.Follower);
214         verifyRaftState(follower2Actor, RaftState.Follower);
215         verifyRaftState(follower3Actor, RaftState.Follower);
216
217         testLog.info("testLeaderTransferOnShutDown ending");
218     }
219
220     @Test
221     public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Exception {
222         testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
223
224         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
225         waitUntilLeader(leaderActor);
226
227         sendShutDown(leaderActor);
228
229         testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
230     }
231
232     private void sendFollower2RequestLeadershipTransferToLeader() {
233         testLog.info("sendFollower2RequestLeadershipTransferToLeader starting");
234
235         leaderActor.tell(
236                 new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender());
237
238         testLog.info("sendFollower2RequestLeadershipTransferToLeader ending");
239     }
240
241     private void createRequestLeadershipResultCollectorActor() {
242         testLog.info("createRequestLeadershipResultCollectorActor starting");
243
244         requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props());
245
246         testLog.info("createRequestLeadershipResultCollectorActor ending");
247     }
248
249     @Test
250     public void testSuccessfulRequestLeadershipTransferToFollower2() {
251         testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting");
252
253         createRaftActors();
254         createRequestLeadershipResultCollectorActor();
255
256         sendFollower2RequestLeadershipTransferToLeader();
257
258         verifyRaftState(follower2Actor, RaftState.Leader);
259
260         expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1);
261
262         testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending");
263     }
264
265     @Test
266     public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() {
267         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting");
268
269         createRaftActors();
270         createRequestLeadershipResultCollectorActor();
271
272         sendPayloadWithFollower2Lagging();
273
274         sendFollower2RequestLeadershipTransferToLeader();
275
276         verifyRaftState(follower1Actor, RaftState.Follower);
277         verifyRaftState(follower2Actor, RaftState.Follower);
278         verifyRaftState(follower3Actor, RaftState.Follower);
279
280         Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
281         assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
282
283         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
284     }
285
286     @Test
287     public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
288         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
289
290         createRaftActors();
291         createRequestLeadershipResultCollectorActor();
292
293         sendShutDown(follower2Actor);
294
295         sendFollower2RequestLeadershipTransferToLeader();
296
297         verifyRaftState(follower1Actor, RaftState.Follower);
298         verifyRaftState(follower3Actor, RaftState.Follower);
299
300         Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
301         assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
302
303         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");
304     }
305 }