3cc330a96aa101cab3c5b7c54462f3a8369b427d
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / LeadershipTransferIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static akka.pattern.Patterns.ask;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNull;
13 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
14 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.pattern.Patterns;
18 import akka.testkit.TestActorRef;
19 import akka.util.Timeout;
20 import com.google.common.base.Stopwatch;
21 import com.google.common.collect.ImmutableMap;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
26 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
27 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
28 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
29 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
30 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
31 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
32 import scala.concurrent.Await;
33 import scala.concurrent.Future;
34 import scala.concurrent.duration.FiniteDuration;
35
36 /**
37  * Tests leadership transfer end-to-end.
38  *
39  * @author Thomas Pantelis
40  */
41 public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrationTest {
42
43     private TestActorRef<MessageCollectorActor> leaderNotifierActor;
44     private TestActorRef<MessageCollectorActor> follower1NotifierActor;
45     private TestActorRef<MessageCollectorActor> follower2NotifierActor;
46
47     @Test
48     public void testLeaderTransferOnShutDown() throws Throwable {
49         testLog.info("testLeaderTransferOnShutDown starting");
50
51         createRaftActors();
52
53         sendPayloadWithFollower2Lagging();
54
55         sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1();
56
57         sendShutDown(follower2Actor);
58
59         testLog.info("testLeaderTransferOnShutDown ending");
60     }
61
62     private void sendShutDown(ActorRef actor) throws Exception {
63         testLog.info("sendShutDown for {} starting", actor.path());
64
65         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
66         Future<Boolean> stopFuture = Patterns.gracefulStop(actor, duration, new Shutdown());
67
68         Boolean stopped = Await.result(stopFuture, duration);
69         assertEquals("Stopped", Boolean.TRUE, stopped);
70
71         testLog.info("sendShutDown for {} ending", actor.path());
72     }
73
74     private void sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1() throws Throwable {
75         testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 starting");
76
77         clearMessages(leaderNotifierActor);
78         clearMessages(follower1NotifierActor);
79         clearMessages(follower2NotifierActor);
80
81         FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
82         Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown());
83
84         assertNullLeaderIdChange(leaderNotifierActor);
85         assertNullLeaderIdChange(follower1NotifierActor);
86         assertNullLeaderIdChange(follower2NotifierActor);
87
88         verifyRaftState(follower1Actor, RaftState.Leader);
89
90         Boolean stopped = Await.result(stopFuture, duration);
91         assertEquals("Stopped", Boolean.TRUE, stopped);
92
93         follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
94         ApplyState applyState = expectFirstMatching(follower2CollectorActor, ApplyState.class);
95         assertEquals("Apply sate index", 0, applyState.getReplicatedLogEntry().getIndex());
96
97         testLog.info("sendShutDownToLeaderAndVerifyLeadershipTransferToFollower1 ending");
98     }
99
100     private void sendPayloadWithFollower2Lagging() {
101         testLog.info("sendPayloadWithFollower2Lagging starting");
102
103         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
104
105         sendPayloadData(leaderActor, "zero");
106
107         expectFirstMatching(leaderCollectorActor, ApplyState.class);
108         expectFirstMatching(follower1CollectorActor, ApplyState.class);
109
110         testLog.info("sendPayloadWithFollower2Lagging ending");
111     }
112
113     private void createRaftActors() {
114         testLog.info("createRaftActors starting");
115
116         follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
117                 factory.generateActorId(follower1Id + "-notifier"));
118         follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
119                 ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
120                     config(newFollowerConfigParams()).roleChangeNotifier(follower1NotifierActor));
121
122         follower2NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
123                 factory.generateActorId(follower2Id + "-notifier"));
124         follower2Actor = newTestRaftActor(follower2Id,TestRaftActor.newBuilder().peerAddresses(
125                 ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString())).
126                     config(newFollowerConfigParams()).roleChangeNotifier(follower2NotifierActor));
127
128         peerAddresses = ImmutableMap.<String, String>builder().
129                 put(follower1Id, follower1Actor.path().toString()).
130                 put(follower2Id, follower2Actor.path().toString()).build();
131
132         leaderConfigParams = newLeaderConfigParams();
133         leaderConfigParams.setElectionTimeoutFactor(3);
134         leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
135                 factory.generateActorId(leaderId + "-notifier"));
136         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
137                 config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
138
139         follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
140         follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
141         leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
142
143         leaderContext = leaderActor.underlyingActor().getRaftActorContext();
144
145         waitUntilLeader(leaderActor);
146
147         testLog.info("createRaftActors starting");
148     }
149
150     private void verifyRaftState(ActorRef raftActor, final RaftState expState) throws Throwable {
151         Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
152         Throwable lastError = null;
153         Stopwatch sw = Stopwatch.createStarted();
154         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
155             try {
156                 OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
157                         GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
158                 assertEquals("getRaftState", expState.toString(), raftState.getRaftState());
159                 return;
160             } catch (Exception | AssertionError e) {
161                 lastError = e;
162                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
163             }
164         }
165
166         throw lastError;
167     }
168
169     private void assertNullLeaderIdChange(TestActorRef<MessageCollectorActor> notifierActor) {
170         LeaderStateChanged change = expectFirstMatching(notifierActor, LeaderStateChanged.class);
171         assertNull("Expected null leader Id", change.getLeaderId());
172     }
173
174     @Test
175     public void testLeaderTransferAborted() throws Throwable {
176         testLog.info("testLeaderTransferAborted starting");
177
178         createRaftActors();
179
180         follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
181         follower2Actor.underlyingActor().startDropMessages(AppendEntries.class);
182
183         sendShutDown(leaderActor);
184
185         testLog.info("testLeaderTransferOnShutDown ending");
186     }
187
188     @Test
189     public void testLeaderTransferSkippedOnShutdownWithNoFollowers() throws Throwable {
190         testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers starting");
191
192         leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().config(newLeaderConfigParams()));
193         waitUntilLeader(leaderActor);
194
195         sendShutDown(leaderActor);
196
197         testLog.info("testLeaderTransferSkippedOnShutdownWithNoFollowers ending");
198     }
199 }