Fix followerDistributedDataStore tear down
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / LeadershipTransferIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
15
16 import akka.actor.ActorRef;
17 import akka.actor.Status;
18 import akka.pattern.Patterns;
19 import akka.testkit.TestActorRef;
20 import akka.testkit.javadsl.TestKit;
21 import java.util.Collections;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
28 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
29 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
30 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
33 import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
34 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
35 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
36 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
37 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
38 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
39 import scala.concurrent.Await;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.FiniteDuration;
42
43 /**
44  * Tests leadership transfer end-to-end.
45  *
46  * @author Thomas Pantelis
47  */
48 public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
49
50     private final String follower3Id = factory.generateActorId("follower");
51     private ActorRef leaderNotifierActor;
52     private ActorRef follower1NotifierActor;
53     private ActorRef follower2NotifierActor;
54     private ActorRef follower3NotifierActor;
55     private TestActorRef<TestRaftActor> follower3Actor;
56     private ActorRef follower3CollectorActor;
57     private ActorRef requestLeadershipResultCollectorActor;
58
59     @Test
60     public void testLeaderTransferOnShutDown() throws Exception {
61         testLog.info("testLeaderTransferOnShutDown starting");
62
63         createRaftActors();
64
65         sendPayloadWithFollower2Lagging();
66
67         sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1();
68
69         sendShutDown(follower2Actor);
70
71         testLog.info("testLeaderTransferOnShutDown ending");
72     }
73
74     private void sendShutDown(final ActorRef actor) throws Exception {
75         testLog.info("sendShutDown for {} starting", actor.path());
76
77         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
78         Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, Shutdown.INSTANCE);
79
80         Boolean stopped = Await.result(stopFuture, duration);
81         assertEquals("Stopped", Boolean.TRUE, stopped);
82
83         testLog.info("sendShutDown for {} ending", actor.path());
84     }
85
86     private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Exception {
87         testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
88
89         clearMessages(leaderNotifierActor);
90         clearMessages(follower1NotifierActor);
91         clearMessages(follower2NotifierActor);
92         clearMessages(follower3NotifierActor);
93
94         // Simulate a delay for follower2 in receiving the LeaderTransitioning message with null leader id.
95         final TestRaftActor follower2Instance = follower2Actor.underlyingActor();
96         follower2Instance.startDropMessages(LeaderTransitioning.class);
97
98         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
99         final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
100
101         verifyRaftState(follower1Actor, RaftState.Leader);
102
103         Boolean stopped = Await.result(stopFuture, duration);
104         assertEquals("Stopped", Boolean.TRUE, stopped);
105
106         // Re-enable LeaderTransitioning messages to follower2.
107         final LeaderTransitioning leaderTransitioning = expectFirstMatching(follower2CollectorActor,
108                 LeaderTransitioning.class);
109         follower2Instance.stopDropMessages(LeaderTransitioning.class);
110
111         follower2Instance.stopDropMessages(AppendEntries.class);
112         ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
113         assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
114
115         // Now send the LeaderTransitioning to follower2 after it has received AppendEntries from the new leader.
116         follower2Actor.tell(leaderTransitioning, ActorRef.noSender());
117
118         verifyLeaderStateChangedMessages(leaderNotifierActor, null, follower1Id);
119         verifyLeaderStateChangedMessages(follower1NotifierActor, null, follower1Id);
120         // follower2 should only get 1 LeaderStateChanged with the new leaderId - the LeaderTransitioning message
121         // should not generate a LeaderStateChanged with null leaderId since it arrived after the new leaderId was set.
122         verifyLeaderStateChangedMessages(follower2NotifierActor, follower1Id);
123         verifyLeaderStateChangedMessages(follower3NotifierActor, null, follower1Id);
124
125         testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
126     }
127
128     private void sendPayloadWithFollower2Lagging() {
129         testLog.info("sendPayloadWithFollower2Lagging starting");
130
131         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
132
133         sendPayloadData(leaderActor, "zero");
134
135         expectFirstMatching(leaderCollectorActor, ApplyState.class);
136         expectFirstMatching(follower1CollectorActor, ApplyState.class);
137         expectFirstMatching(follower3CollectorActor, ApplyState.class);
138
139         testLog.info("sendPayloadWithFollower2Lagging ending");
140     }
141
142     private void createRaftActors() {
143         testLog.info("createRaftActors starting");
144
145         final Snapshot snapshot = Snapshot.create(EmptyState.INSTANCE, List.of(), -1, -1, -1, -1,
146                 1, null, new org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload(
147                         List.of(new ServerInfo(leaderId, true), new ServerInfo(follower1Id, true),
148                                 new ServerInfo(follower2Id, true), new ServerInfo(follower3Id, false))));
149
150         InMemorySnapshotStore.addSnapshot(leaderId, snapshot);
151         InMemorySnapshotStore.addSnapshot(follower1Id, snapshot);
152         InMemorySnapshotStore.addSnapshot(follower2Id, snapshot);
153         InMemorySnapshotStore.addSnapshot(follower3Id, snapshot);
154
155         follower1NotifierActor = factory.createActor(MessageCollectorActor.props(),
156                 factory.generateActorId(follower1Id + "-notifier"));
157         follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
158                 Map.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id),
159                         follower3Id, testActorPath(follower3Id)))
160                 .config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
161
162         follower2NotifierActor = factory.createActor(MessageCollectorActor.props(),
163                 factory.generateActorId(follower2Id + "-notifier"));
164         follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
165                 Map.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
166                         follower3Id, testActorPath(follower3Id)))
167                 .config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
168
169         follower3NotifierActor = factory.createActor(MessageCollectorActor.props(),
170                 factory.generateActorId(follower3Id + "-notifier"));
171         follower3Actor = newTestRaftActor(follower3Id,TestRaftActor.newBuilder().peerAddresses(
172                 Map.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString(),
173                         follower2Id, follower2Actor.path().toString()))
174                 .config(newFollowerConfigParams()).roleChangeNotifier(follower3NotifierActor));
175
176         peerAddresses = Map.of(
177                 follower1Id, follower1Actor.path().toString(),
178                 follower2Id, follower2Actor.path().toString(),
179                 follower3Id, follower3Actor.path().toString());
180
181         leaderConfigParams = newLeaderConfigParams();
182         leaderConfigParams.setElectionTimeoutFactor(3);
183         leaderNotifierActor = factory.createActor(MessageCollectorActor.props(),
184                 factory.generateActorId(leaderId + "-notifier"));
185         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses)
186                 .config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
187
188         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
189         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
190         follower3CollectorActor = follower3Actor.underlyingActor().collectorActor();
191         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
192
193         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
194
195         waitUntilLeader(leaderActor);
196
197         testLog.info("createRaftActors starting");
198     }
199
200     private static void verifyRaftState(final ActorRef raftActor, final RaftState expState) {
201         verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState()));
202     }
203
204     private static void verifyLeaderStateChangedMessages(final ActorRef notifierActor,
205             final String... expLeaderIds) {
206         List<LeaderStateChanged> leaderStateChanges = expectMatching(notifierActor, LeaderStateChanged.class,
207                 expLeaderIds.length);
208
209         Collections.reverse(leaderStateChanges);
210         Iterator<LeaderStateChanged> actual = leaderStateChanges.iterator();
211         for (int i = expLeaderIds.length - 1; i >= 0; i--) {
212             assertEquals("getLeaderId", expLeaderIds[i], actual.next().getLeaderId());
213         }
214     }
215
216     @Test
217     public void testLeaderTransferAborted() throws Exception {
218         testLog.info("testLeaderTransferAborted starting");
219
220         createRaftActors();
221
222         leaderActor.underlyingActor().startDropMessages(AppendEntriesReply.class);
223
224         sendShutDown(leaderActor);
225
226         verifyRaftState(follower1Actor, RaftState.Follower);
227         verifyRaftState(follower2Actor, RaftState.Follower);
228         verifyRaftState(follower3Actor, RaftState.Follower);
229
230         testLog.info("testLeaderTransferOnShutDown ending");
231     }
232
233     @Test
234     public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Exception {
235         testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
236
237         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
238         waitUntilLeader(leaderActor);
239
240         sendShutDown(leaderActor);
241
242         testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
243     }
244
245     private void sendFollower2RequestLeadershipTransferToLeader() {
246         testLog.info("sendFollower2RequestLeadershipTransferToLeader starting");
247
248         leaderActor.tell(
249                 new RequestLeadership(follower2Id, requestLeadershipResultCollectorActor), ActorRef.noSender());
250
251         testLog.info("sendFollower2RequestLeadershipTransferToLeader ending");
252     }
253
254     private void createRequestLeadershipResultCollectorActor() {
255         testLog.info("createRequestLeadershipResultCollectorActor starting");
256
257         requestLeadershipResultCollectorActor = factory.createActor(MessageCollectorActor.props());
258
259         testLog.info("createRequestLeadershipResultCollectorActor ending");
260     }
261
262     @Test
263     public void testSuccessfulRequestLeadershipTransferToFollower2() {
264         testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 starting");
265
266         createRaftActors();
267         createRequestLeadershipResultCollectorActor();
268
269         sendFollower2RequestLeadershipTransferToLeader();
270
271         verifyRaftState(follower2Actor, RaftState.Leader);
272
273         expectMatching(requestLeadershipResultCollectorActor, Status.Success.class, 1);
274
275         testLog.info("testSuccessfulRequestLeadershipTransferToFollower2 ending");
276     }
277
278     @Test
279     public void testRequestLeadershipTransferToFollower2WithFollower2Lagging() {
280         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging starting");
281
282         createRaftActors();
283         createRequestLeadershipResultCollectorActor();
284
285         sendPayloadWithFollower2Lagging();
286
287         sendFollower2RequestLeadershipTransferToLeader();
288
289         verifyRaftState(follower1Actor, RaftState.Follower);
290         verifyRaftState(follower2Actor, RaftState.Follower);
291         verifyRaftState(follower3Actor, RaftState.Follower);
292
293         Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
294         assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
295
296         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Lagging ending");
297     }
298
299
300     @Test
301     public void testRequestLeadershipTransferToFollower2WithFollower2Shutdown() throws Exception {
302         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown starting");
303
304         createRaftActors();
305         createRequestLeadershipResultCollectorActor();
306
307         sendShutDown(follower2Actor);
308
309         sendFollower2RequestLeadershipTransferToLeader();
310
311         verifyRaftState(follower1Actor, RaftState.Follower);
312         verifyRaftState(follower3Actor, RaftState.Follower);
313
314         Status.Failure failure = expectFirstMatching(requestLeadershipResultCollectorActor, Status.Failure.class);
315         assertTrue(failure.cause() instanceof LeadershipTransferFailedException);
316
317         testLog.info("testRequestLeadershipTransferToFollower2WithFollower2Shutdown ending");
318     }
319
320     @Test
321     public void testRequestLeadershipTransferToFollower2WithOtherFollowersDown() {
322         testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown starting");
323
324         createRaftActors();
325         createRequestLeadershipResultCollectorActor();
326
327         factory.killActor(follower1Actor, new TestKit(getSystem()));
328         factory.killActor(follower3Actor, new TestKit(getSystem()));
329
330         sendFollower2RequestLeadershipTransferToLeader();
331
332         expectFirstMatching(requestLeadershipResultCollectorActor, Status.Success.class);
333
334         verifyRaftState(follower2Actor, RaftState.Leader);
335         verifyRaftState(leaderActor, RaftState.Follower);
336
337         testLog.info("testRequestLeadershipTransferToFollower2WithOtherFollowersDown ending");
338     }
339 }