Further Guava Optional cleanups
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / LeadershipTransferIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
15
16 import akka.actor.ActorRef;
17 import akka.actor.Status;
18 import akka.pattern.Patterns;
19 import akka.testkit.TestActorRef;
20 import akka.testkit.javadsl.TestKit;
21 import com.google.common.collect.ImmutableMap;
22 import java.util.Arrays;
23 import java.util.Collections;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.concurrent.TimeUnit;
27 import org.junit.Test;
28 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
30 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
31 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
33 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
34 import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
35 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
36 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
37 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
38 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
39 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
40 import scala.concurrent.Await;
41 import scala.concurrent.Future;
42 import scala.concurrent.duration.FiniteDuration;
43
44 /**
45  * Tests leadership transfer end-to-end.
46  *
47  * @author Thomas Pantelis
48  */
49 public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
50
51     private final String follower3Id = factory.generateActorId("follower");
52     private ActorRef leaderNotifierActor;
53     private ActorRef follower1NotifierActor;
54     private ActorRef follower2NotifierActor;
55     private ActorRef follower3NotifierActor;
56     private TestActorRef<TestRaftActor> follower3Actor;
57     private ActorRef follower3CollectorActor;
58     private ActorRef requestLeadershipResultCollectorActor;
59
60     @Test
61     public void testLeaderTransferOnShutDown() throws Exception {
62         testLog.info("testLeaderTransferOnShutDown starting");
63
64         createRaftActors();
65
66         sendPayloadWithFollower2Lagging();
67
68         sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1();
69
70         sendShutDown(follower2Actor);
71
72         testLog.info("testLeaderTransferOnShutDown ending");
73     }
74
75     private void sendShutDown(final ActorRef actor) throws Exception {
76         testLog.info("sendShutDown for {} starting", actor.path());
77
78         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
79         Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, Shutdown.INSTANCE);
80
81         Boolean stopped = Await.result(stopFuture, duration);
82         assertEquals("Stopped", Boolean.TRUE, stopped);
83
84         testLog.info("sendShutDown for {} ending", actor.path());
85     }
86
87     private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Exception {
88         testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
89
90         clearMessages(leaderNotifierActor);
91         clearMessages(follower1NotifierActor);
92         clearMessages(follower2NotifierActor);
93         clearMessages(follower3NotifierActor);
94
95         // Simulate a delay for follower2 in receiving the LeaderTransitioning message with null leader id.
96         final TestRaftActor follower2Instance = follower2Actor.underlyingActor();
97         follower2Instance.startDropMessages(LeaderTransitioning.class);
98
99         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
100         final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
101
102         verifyRaftState(follower1Actor, RaftState.Leader);
103
104         Boolean stopped = Await.result(stopFuture, duration);
105         assertEquals("Stopped", Boolean.TRUE, stopped);
106
107         // Re-enable LeaderTransitioning messages to follower2.
108         final LeaderTransitioning leaderTransitioning = expectFirstMatching(follower2CollectorActor,
109                 LeaderTransitioning.class);
110         follower2Instance.stopDropMessages(LeaderTransitioning.class);
111
112         follower2Instance.stopDropMessages(AppendEntries.class);
113         ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
114         assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
115
116         // Now send the LeaderTransitioning to follower2 after it has received AppendEntries from the new leader.
117         follower2Actor.tell(leaderTransitioning, ActorRef.noSender());
118
119         verifyLeaderStateChangedMessages(leaderNotifierActor, null, follower1Id);
120         verifyLeaderStateChangedMessages(follower1NotifierActor, null, follower1Id);
121         // follower2 should only get 1 LeaderStateChanged with the new leaderId - the LeaderTransitioning message
122         // should not generate a LeaderStateChanged with null leaderId since it arrived after the new leaderId was set.
123         verifyLeaderStateChangedMessages(follower2NotifierActor, follower1Id);
124         verifyLeaderStateChangedMessages(follower3NotifierActor, null, follower1Id);
125
126         testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
127     }
128
129     private void sendPayloadWithFollower2Lagging() {
130         testLog.info("sendPayloadWithFollower2Lagging starting");
131
132         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
133
134         sendPayloadData(leaderActor, "zero");
135
136         expectFirstMatching(leaderCollectorActor, ApplyState.class);
137         expectFirstMatching(follower1CollectorActor, ApplyState.class);
138         expectFirstMatching(follower3CollectorActor, ApplyState.class);
139
140         testLog.info("sendPayloadWithFollower2Lagging ending");
141     }
142
143     private void createRaftActors() {
144         testLog.info("createRaftActors starting");
145
146         final Snapshot snapshot = Snapshot.create(EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
147                 1, null, new org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload(
148                         Arrays.asList(new ServerInfo(leaderId, true), new ServerInfo(follower1Id, true),
149                                 new ServerInfo(follower2Id, true), new ServerInfo(follower3Id, false))));
150
151         InMemorySnapshotStore.addSnapshot(leaderId, snapshot);
152         InMemorySnapshotStore.addSnapshot(follower1Id, snapshot);
153         InMemorySnapshotStore.addSnapshot(follower2Id, snapshot);
154         InMemorySnapshotStore.addSnapshot(follower3Id, snapshot);
155
156         follower1NotifierActor = factory.createActor(MessageCollectorActor.props(),
157                 factory.generateActorId(follower1Id + "-notifier"));
158         follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
159                 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id),
160                         follower3Id, testActorPath(follower3Id)))
161                 .config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
162
163         follower2NotifierActor = factory.createActor(MessageCollectorActor.props(),
164                 factory.generateActorId(follower2Id + "-notifier"));
165         follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
166                 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
167                         follower3Id, testActorPath(follower3Id)))
168                 .config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
169
170         follower3NotifierActor = factory.createActor(MessageCollectorActor.props(),
171                 factory.generateActorId(follower3Id + "-notifier"));
172         follower3Actor = newTestRaftActor(follower3Id,TestRaftActor.newBuilder().peerAddresses(
173                 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
174                         follower2Id, follower2Actor.path().toString()))
175                 .config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor));
176
177         peerAddresses = ImmutableMap.<String, String>builder()
178                 .put(follower1Id, follower1Actor.path().toString())
179                 .put(follower2Id, follower2Actor.path().toString())
180                 .put(follower3Id, follower3Actor.path().toString()).build();
181
182         leaderConfigParams = newLeaderConfigParams();
183         leaderConfigParams.setElectionTimeoutFactor(3);
184         leaderNotifierActor = factory.createActor(MessageCollectorActor.props(),
185                 factory.generateActorId(leaderId + "-notifier"));
186         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
187                 .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
188
189         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
190         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
191         follower3CollectorActor = follower3Actor.underlyingActor().collectorActor();
192         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
193
194         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
195
196         waitUntilLeader(leaderActor);
197
198         testLog.info("createRaftActors starting");
199     }
200
201     private static void verifyRaftState(final ActorRef raftActor, final RaftState expState) {
202         verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState()));
203     }
204
205     private static void verifyLeaderStateChangedMessages(final ActorRef notifierActor,
206             final String... expLeaderIds) {
207         List<LeaderStateChanged> leaderStateChanges = expectMatching(notifierActor, LeaderStateChanged.class,
208                 expLeaderIds.length);
209
210         Collections.reverse(leaderStateChanges);
211         Iterator<LeaderStateChanged> actual = leaderStateChanges.iterator();
212         for (int i = expLeaderIds.length - 1; i >= 0; i--) {
213             assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId());
214         }
215     }
216
217     @Test
218     public void testLeaderTransferAborted() throws Exception {
219         testLog.info("testLeaderTransferAborted starting");
220
221         createRaftActors();
222
223         leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
224
225         sendShutDown(leaderActor);
226
227         verifyRaftState(follower1Actor, RaftState.Follower);
228         verifyRaftState(follower2Actor, RaftState.Follower);
229         verifyRaftState(follower3Actor, RaftState.Follower);
230
231         testLog.info("testLeaderTransferOnShutDown ending");
232     }
233
234     @Test
235     public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Exception {
236         testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
237
238         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
239         waitUntilLeader(leaderActor);
240
241         sendShutDown(leaderActor);
242
243         testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
244     }
245
246     private void sendFollower2RequestLeadershipTransferToLeader() {
247         testLog.info("sendFollower2RequestLeadershipTransferToLeader starting");
248
249         leaderActor.tell(
250                 new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender());
251
252         testLog.info("sendFollower2RequestLeadershipTransferToLeader ending");
253     }
254
255     private void createRequestLeadershipResultCollectorActor() {
256         testLog.info("createRequestLeadershipResultCollectorActor starting");
257
258         requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props());
259
260         testLog.info("createRequestLeadershipResultCollectorActor ending");
261     }
262
263     @Test
264     public void testSuccessfulRequestLeadershipTransferToFollower2() {
265         testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting");
266
267         createRaftActors();
268         createRequestLeadershipResultCollectorActor();
269
270         sendFollower2RequestLeadershipTransferToLeader();
271
272         verifyRaftState(follower2Actor, RaftState.Leader);
273
274         expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1);
275
276         testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending");
277     }
278
279     @Test
280     public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() {
281         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting");
282
283         createRaftActors();
284         createRequestLeadershipResultCollectorActor();
285
286         sendPayloadWithFollower2Lagging();
287
288         sendFollower2RequestLeadershipTransferToLeader();
289
290         verifyRaftState(follower1Actor, RaftState.Follower);
291         verifyRaftState(follower2Actor, RaftState.Follower);
292         verifyRaftState(follower3Actor, RaftState.Follower);
293
294         Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
295         assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
296
297         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
298     }
299
300
301     @Test
302     public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
303         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
304
305         createRaftActors();
306         createRequestLeadershipResultCollectorActor();
307
308         sendShutDown(follower2Actor);
309
310         sendFollower2RequestLeadershipTransferToLeader();
311
312         verifyRaftState(follower1Actor, RaftState.Follower);
313         verifyRaftState(follower3Actor, RaftState.Follower);
314
315         Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
316         assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
317
318         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");
319     }
320
321     @Test
322     public void testRequestLeadershipTransferToFollower2WithOtherFollowersDown() {
323         testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown starting");
324
325         createRaftActors();
326         createRequestLeadershipResultCollectorActor();
327
328         factory.killActor(follower1Actor, new TestKit(getSystem()));
329         factory.killActor(follower3Actor, new TestKit(getSystem()));
330
331         sendFollower2RequestLeadershipTransferToLeader();
332
333         expectFirstMatching(requestLeadershipResultCollectorActor, Status.Success.class);
334
335         verifyRaftState(follower2Actor, RaftState.Leader);
336         verifyRaftState(leaderActor, RaftState.Follower);
337
338         testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown ending");
339     }
340 }