8c7c9cb7c4a3ed640d5d04c0a58534a99a819abb
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeaderElectionScenarioTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.actor.Props;
17 import akka.actor.Status;
18 import akka.dispatch.ControlMessage;
19 import akka.dispatch.Dispatchers;
20 import akka.dispatch.Mailboxes;
21 import akka.pattern.Patterns;
22 import akka.testkit.JavaTestKit;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Throwables;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.Map;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
34 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftState;
36 import org.opendaylight.controller.cluster.raft.TestActorFactory;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
39 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.concurrent.Await;
43 import scala.concurrent.duration.Duration;
44 import scala.concurrent.duration.FiniteDuration;
45
46 /**
47  * Abstract base for a leader election scenario test.
48  *
49  * @author Thomas Pantelis
50  */
51 public class AbstractLeaderElectionScenarioTest {
52     static final int HEARTBEAT_INTERVAL = 50;
53
54     static class MemberActor extends MessageCollectorActor {
55         private volatile RaftActorBehavior behavior;
56         Map<Class<?>, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>();
57         Map<Class<?>, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>();
58         CountDownLatch behaviorStateChangeLatch;
59
60         public static Props props() {
61             return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId())
62                     .withMailbox(Mailboxes.DefaultMailboxId());
63         }
64
65         @Override
66         public void onReceive(Object message) throws Exception {
67             // Ignore scheduled SendHeartBeat messages.
68             if (message instanceof SendHeartBeat) {
69                 return;
70             }
71
72             if (message instanceof SetBehavior) {
73                 behavior = ((SetBehavior)message).behavior;
74                 ((SetBehavior)message).context.setCurrentBehavior(behavior);
75                 return;
76             }
77
78             if (message instanceof GetBehaviorState) {
79                 if (behavior != null) {
80                     getSender().tell(behavior.state(), self());
81                 } else {
82                     getSender().tell(new Status.Failure(new IllegalStateException(
83                             "RaftActorBehavior is not set in MemberActor")), self());
84                 }
85             }
86
87             if (message instanceof SendImmediateHeartBeat) {
88                 message = SendHeartBeat.INSTANCE;
89             }
90
91             try {
92                 if (behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
93                     final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
94                     if (nextBehavior != null) {
95                         RaftActorBehavior oldBehavior = behavior;
96                         behavior = nextBehavior;
97                         if (behavior != oldBehavior && behaviorStateChangeLatch != null) {
98                             behaviorStateChangeLatch.countDown();
99                         }
100                     }
101                 }
102             } finally {
103                 super.onReceive(message);
104
105                 CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
106                 if (latch != null) {
107                     latch.countDown();
108                 }
109             }
110         }
111
112         @Override
113         public void postStop() throws Exception {
114             super.postStop();
115
116             if (behavior != null) {
117                 behavior.close();
118             }
119         }
120
121         void expectBehaviorStateChange() {
122             behaviorStateChangeLatch = new CountDownLatch(1);
123         }
124
125         void waitForBehaviorStateChange() {
126             assertTrue("Expected behavior state change",
127                     Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
128         }
129
130         void expectMessageClass(Class<?> expClass, int expCount) {
131             messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
132         }
133
134         void waitForExpectedMessages(Class<?> expClass) {
135             CountDownLatch latch = messagesReceivedLatches.get(expClass);
136             assertNotNull("No messages received for " + expClass, latch);
137             assertTrue("Missing messages of type " + expClass,
138                     Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
139         }
140
141         void dropMessagesToBehavior(Class<?> msgClass) {
142             dropMessagesToBehavior(msgClass, 1);
143         }
144
145         void dropMessagesToBehavior(Class<?> msgClass, int expCount) {
146             expectMessageClass(msgClass, expCount);
147             dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
148         }
149
150         void clearDropMessagesToBehavior() {
151             dropMessagesToBehavior.clear();
152         }
153
154         @Override
155         public void clear() {
156             behaviorStateChangeLatch = null;
157             clearDropMessagesToBehavior();
158             messagesReceivedLatches.clear();
159             super.clear();
160         }
161
162         void forwardCapturedMessageToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
163             Object message = getFirstMatching(getSelf(), msgClass);
164             assertNotNull("Message of type " + msgClass + " not received", message);
165             getSelf().tell(message, sender);
166         }
167
168         void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
169             for (Object m: getAllMatching(getSelf(), msgClass)) {
170                 getSelf().tell(m, sender);
171             }
172         }
173
174         <T> T getCapturedMessage(Class<T> msgClass) throws Exception {
175             T message = getFirstMatching(getSelf(), msgClass);
176             assertNotNull("Message of type " + msgClass + " not received", message);
177             return message;
178         }
179     }
180
181     static class SendImmediateHeartBeat implements ControlMessage {
182         public static final SendImmediateHeartBeat INSTANCE = new SendImmediateHeartBeat();
183
184         private SendImmediateHeartBeat() {
185         }
186     }
187
188     static class GetBehaviorState implements ControlMessage {
189         public static final GetBehaviorState INSTANCE = new GetBehaviorState();
190
191         private GetBehaviorState() {
192         }
193     }
194
195     static class SetBehavior implements ControlMessage {
196         RaftActorBehavior behavior;
197         MockRaftActorContext context;
198
199         SetBehavior(RaftActorBehavior behavior, MockRaftActorContext context) {
200             this.behavior = behavior;
201             this.context = context;
202         }
203     }
204
205     protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
206     protected final ActorSystem system = ActorSystem.create("test");
207     protected final TestActorFactory factory = new TestActorFactory(system);
208     protected TestActorRef<MemberActor> member1ActorRef;
209     protected TestActorRef<MemberActor> member2ActorRef;
210     protected TestActorRef<MemberActor> member3ActorRef;
211     protected MemberActor member1Actor;
212     protected MemberActor member2Actor;
213     protected MemberActor member3Actor;
214     protected MockRaftActorContext member1Context;
215     protected MockRaftActorContext member2Context;
216     protected MockRaftActorContext member3Context;
217
218     @Before
219     public void setup() throws Exception {
220         member1ActorRef = newMemberActor("member1");
221         member2ActorRef = newMemberActor("member2");
222         member3ActorRef = newMemberActor("member3");
223
224         member1Actor = member1ActorRef.underlyingActor();
225         member2Actor = member2ActorRef.underlyingActor();
226         member3Actor = member3ActorRef.underlyingActor();
227     }
228
229     @After
230     public void tearDown() throws Exception {
231         JavaTestKit.shutdownActorSystem(system);
232     }
233
234     DefaultConfigParamsImpl newConfigParams() {
235         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
236         configParams.setHeartBeatInterval(new FiniteDuration(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS));
237         configParams.setElectionTimeoutFactor(100000);
238         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
239         return configParams;
240     }
241
242     MockRaftActorContext newRaftActorContext(String id, ActorRef actor,
243             Map<String, String> peerAddresses) {
244         MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
245         context.setPeerAddresses(peerAddresses);
246         context.getTermInformation().updateAndPersist(1, "");
247         return context;
248     }
249
250     @SuppressWarnings("checkstyle:IllegalCatch")
251     void verifyBehaviorState(String name, MemberActor actor, RaftState expState) {
252         try {
253             RaftState actualState = (RaftState) Await.result(Patterns.ask(actor.self(), GetBehaviorState.INSTANCE,
254                     Timeout.apply(5, TimeUnit.SECONDS)), Duration.apply(5, TimeUnit.SECONDS));
255             assertEquals(name + " behavior state", expState, actualState);
256         } catch (Exception e) {
257             Throwables.propagate(e);
258         }
259     }
260
261     void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers)
262             throws Exception {
263         // Leader sends immediate heartbeats - we don't care about it so ignore it.
264         // Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors
265         // haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as
266         // a workaround.
267
268         Leader leader = null;
269         AssertionError lastAssertError = null;
270         for (int i = 1; i <= 3; i++) {
271             actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
272
273             leader = new Leader(context);
274             try {
275                 actor.waitForExpectedMessages(AppendEntriesReply.class);
276                 lastAssertError = null;
277                 break;
278             } catch (AssertionError e) {
279                 lastAssertError = e;
280             }
281         }
282
283         if (lastAssertError != null) {
284             throw lastAssertError;
285         }
286
287         context.setCurrentBehavior(leader);
288
289         // Delay assignment of the leader behavior so the AppendEntriesReply isn't forwarded to the behavior.
290         actor.self().tell(new SetBehavior(leader, context), ActorRef.noSender());
291
292         actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
293         actor.clear();
294
295     }
296
297     TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
298         TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props()
299                 .withDispatcher(Dispatchers.DefaultDispatcherId()), name);
300         MessageCollectorActor.waitUntilReady(actor);
301         return actor;
302     }
303
304     void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
305         Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
306         leaderActor.tell(SendImmediateHeartBeat.INSTANCE, ActorRef.noSender());
307     }
308 }