Fix timing issue in PartitionedCandidateOnStartupElection*Test
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeaderElectionScenarioTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.actor.Props;
17 import akka.actor.Status;
18 import akka.dispatch.Dispatchers;
19 import akka.pattern.Patterns;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Throwables;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import java.util.Map;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
32 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
33 import org.opendaylight.controller.cluster.raft.RaftState;
34 import org.opendaylight.controller.cluster.raft.TestActorFactory;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
37 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import scala.concurrent.Await;
41 import scala.concurrent.duration.Duration;
42 import scala.concurrent.duration.FiniteDuration;
43
44 /**
45  * Abstract base for a leader election scenario test.
46  *
47  * @author Thomas Pantelis
48  */
49 public class AbstractLeaderElectionScenarioTest {
50     static final int HEARTBEAT_INTERVAL = 50;
51
52     static class MemberActor extends MessageCollectorActor {
53         private volatile RaftActorBehavior behavior;
54         Map<Class<?>, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>();
55         Map<Class<?>, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>();
56         CountDownLatch behaviorStateChangeLatch;
57
58         public static Props props() {
59             return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId());
60         }
61
62         @Override
63         public void onReceive(Object message) throws Exception {
64             // Ignore scheduled SendHeartBeat messages.
65             if (message instanceof SendHeartBeat) {
66                 return;
67             }
68
69             if (message instanceof SetBehavior) {
70                 behavior = ((SetBehavior)message).behavior;
71                 ((SetBehavior)message).context.setCurrentBehavior(behavior);
72                 return;
73             }
74
75             if (message instanceof GetBehaviorState) {
76                 if (behavior != null) {
77                     getSender().tell(behavior.state(), self());
78                 } else {
79                     getSender().tell(new Status.Failure(new IllegalStateException(
80                             "RaftActorBehavior is not set in MemberActor")), self());
81                 }
82             }
83
84             if (message instanceof SendImmediateHeartBeat) {
85                 message = SendHeartBeat.INSTANCE;
86             }
87
88             try {
89                 if (behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
90                     final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
91                     if (nextBehavior != null) {
92                         RaftActorBehavior oldBehavior = behavior;
93                         behavior = nextBehavior;
94                         if (behavior != oldBehavior && behaviorStateChangeLatch != null) {
95                             behaviorStateChangeLatch.countDown();
96                         }
97                     }
98                 }
99             } finally {
100                 super.onReceive(message);
101
102                 CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
103                 if (latch != null) {
104                     latch.countDown();
105                 }
106             }
107         }
108
109         @Override
110         public void postStop() throws Exception {
111             super.postStop();
112
113             if (behavior != null) {
114                 behavior.close();
115             }
116         }
117
118         void expectBehaviorStateChange() {
119             behaviorStateChangeLatch = new CountDownLatch(1);
120         }
121
122         void waitForBehaviorStateChange() {
123             assertTrue("Expected behavior state change",
124                     Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
125         }
126
127         void expectMessageClass(Class<?> expClass, int expCount) {
128             messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
129         }
130
131         void waitForExpectedMessages(Class<?> expClass) {
132             CountDownLatch latch = messagesReceivedLatches.get(expClass);
133             assertNotNull("No messages received for " + expClass, latch);
134             assertTrue("Missing messages of type " + expClass,
135                     Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
136         }
137
138         void dropMessagesToBehavior(Class<?> msgClass) {
139             dropMessagesToBehavior(msgClass, 1);
140         }
141
142         void dropMessagesToBehavior(Class<?> msgClass, int expCount) {
143             expectMessageClass(msgClass, expCount);
144             dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
145         }
146
147         void clearDropMessagesToBehavior() {
148             dropMessagesToBehavior.clear();
149         }
150
151         @Override
152         public void clear() {
153             behaviorStateChangeLatch = null;
154             clearDropMessagesToBehavior();
155             messagesReceivedLatches.clear();
156             super.clear();
157         }
158
159         void forwardCapturedMessageToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
160             Object message = getFirstMatching(getSelf(), msgClass);
161             assertNotNull("Message of type " + msgClass + " not received", message);
162             getSelf().tell(message, sender);
163         }
164
165         void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
166             for (Object m: getAllMatching(getSelf(), msgClass)) {
167                 getSelf().tell(m, sender);
168             }
169         }
170
171         <T> T getCapturedMessage(Class<T> msgClass) throws Exception {
172             T message = getFirstMatching(getSelf(), msgClass);
173             assertNotNull("Message of type " + msgClass + " not received", message);
174             return message;
175         }
176     }
177
178     static class SendImmediateHeartBeat {
179         public static final SendImmediateHeartBeat INSTANCE = new SendImmediateHeartBeat();
180
181         private SendImmediateHeartBeat() {
182         }
183     }
184
185     static class GetBehaviorState {
186         public static final GetBehaviorState INSTANCE = new GetBehaviorState();
187
188         private GetBehaviorState() {
189         }
190     }
191
192     static class SetBehavior {
193         RaftActorBehavior behavior;
194         MockRaftActorContext context;
195
196         SetBehavior(RaftActorBehavior behavior, MockRaftActorContext context) {
197             this.behavior = behavior;
198             this.context = context;
199         }
200     }
201
202     protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
203     protected final ActorSystem system = ActorSystem.create("test");
204     protected final TestActorFactory factory = new TestActorFactory(system);
205     protected TestActorRef<MemberActor> member1ActorRef;
206     protected TestActorRef<MemberActor> member2ActorRef;
207     protected TestActorRef<MemberActor> member3ActorRef;
208     protected MemberActor member1Actor;
209     protected MemberActor member2Actor;
210     protected MemberActor member3Actor;
211     protected MockRaftActorContext member1Context;
212     protected MockRaftActorContext member2Context;
213     protected MockRaftActorContext member3Context;
214
215     @Before
216     public void setup() throws Exception {
217         member1ActorRef = newMemberActor("member1");
218         member2ActorRef = newMemberActor("member2");
219         member3ActorRef = newMemberActor("member3");
220
221         member1Actor = member1ActorRef.underlyingActor();
222         member2Actor = member2ActorRef.underlyingActor();
223         member3Actor = member3ActorRef.underlyingActor();
224     }
225
226     @After
227     public void tearDown() throws Exception {
228         JavaTestKit.shutdownActorSystem(system);
229     }
230
231     DefaultConfigParamsImpl newConfigParams() {
232         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
233         configParams.setHeartBeatInterval(new FiniteDuration(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS));
234         configParams.setElectionTimeoutFactor(100000);
235         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
236         return configParams;
237     }
238
239     MockRaftActorContext newRaftActorContext(String id, ActorRef actor,
240             Map<String, String> peerAddresses) {
241         MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
242         context.setPeerAddresses(peerAddresses);
243         context.getTermInformation().updateAndPersist(1, "");
244         return context;
245     }
246
247     @SuppressWarnings("checkstyle:IllegalCatch")
248     void verifyBehaviorState(String name, MemberActor actor, RaftState expState) {
249         try {
250             RaftState actualState = (RaftState) Await.result(Patterns.ask(actor.self(), GetBehaviorState.INSTANCE,
251                     Timeout.apply(5, TimeUnit.SECONDS)), Duration.apply(5, TimeUnit.SECONDS));
252             assertEquals(name + " behavior state", expState, actualState);
253         } catch (Exception e) {
254             Throwables.propagate(e);
255         }
256     }
257
258     void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers)
259             throws Exception {
260         // Leader sends immediate heartbeats - we don't care about it so ignore it.
261         // Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors
262         // haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as
263         // a workaround.
264
265         Leader leader = null;
266         AssertionError lastAssertError = null;
267         for (int i = 1; i <= 3; i++) {
268             actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
269
270             leader = new Leader(context);
271             try {
272                 actor.waitForExpectedMessages(AppendEntriesReply.class);
273                 lastAssertError = null;
274                 break;
275             } catch (AssertionError e) {
276                 lastAssertError = e;
277             }
278         }
279
280         if (lastAssertError != null) {
281             throw lastAssertError;
282         }
283
284         context.setCurrentBehavior(leader);
285
286         // Delay assignment of the leader behavior so the AppendEntriesReply isn't forwarded to the behavior.
287         actor.self().tell(new SetBehavior(leader, context), ActorRef.noSender());
288
289         actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
290         actor.clear();
291
292     }
293
294     TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
295         TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props()
296                 .withDispatcher(Dispatchers.DefaultDispatcherId()), name);
297         MessageCollectorActor.waitUntilReady(actor);
298         return actor;
299     }
300
301     void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
302         Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
303         leaderActor.tell(SendImmediateHeartBeat.INSTANCE, ActorRef.noSender());
304     }
305 }