Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeaderElectionScenarioTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.actor.Props;
17 import akka.actor.Status;
18 import akka.dispatch.ControlMessage;
19 import akka.dispatch.Dispatchers;
20 import akka.dispatch.Mailboxes;
21 import akka.pattern.Patterns;
22 import akka.testkit.TestActorRef;
23 import akka.testkit.javadsl.TestKit;
24 import akka.util.Timeout;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import java.util.Map;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.TimeoutException;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
34 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftState;
36 import org.opendaylight.controller.cluster.raft.TestActorFactory;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
39 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.concurrent.Await;
43 import scala.concurrent.duration.FiniteDuration;
44
45 /**
46  * Abstract base for a leader election scenario test.
47  *
48  * @author Thomas Pantelis
49  */
50 public class AbstractLeaderElectionScenarioTest {
51     static final int HEARTBEAT_INTERVAL = 50;
52
53     static class MemberActor extends MessageCollectorActor {
54         private volatile RaftActorBehavior behavior;
55         Map<Class<?>, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>();
56         Map<Class<?>, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>();
57         CountDownLatch behaviorStateChangeLatch;
58
59         public static Props props() {
60             return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId())
61                     .withMailbox(Mailboxes.DefaultMailboxId());
62         }
63
64         @Override
65         public void onReceive(Object message) throws Exception {
66             // Ignore scheduled SendHeartBeat messages.
67             if (message instanceof SendHeartBeat) {
68                 return;
69             }
70
71             if (message instanceof SetBehavior) {
72                 behavior = ((SetBehavior)message).behavior;
73                 ((SetBehavior)message).context.setCurrentBehavior(behavior);
74                 return;
75             }
76
77             if (message instanceof GetBehaviorState) {
78                 if (behavior != null) {
79                     getSender().tell(behavior.state(), self());
80                 } else {
81                     getSender().tell(new Status.Failure(new IllegalStateException(
82                             "RaftActorBehavior is not set in MemberActor")), self());
83                 }
84             }
85
86             if (message instanceof SendImmediateHeartBeat) {
87                 message = SendHeartBeat.INSTANCE;
88             }
89
90             try {
91                 if (behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
92                     final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
93                     if (nextBehavior != null) {
94                         RaftActorBehavior oldBehavior = behavior;
95                         behavior = nextBehavior;
96                         if (behavior != oldBehavior && behaviorStateChangeLatch != null) {
97                             behaviorStateChangeLatch.countDown();
98                         }
99                     }
100                 }
101             } finally {
102                 super.onReceive(message);
103
104                 CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
105                 if (latch != null) {
106                     latch.countDown();
107                 }
108             }
109         }
110
111         @Override
112         public void postStop() throws Exception {
113             super.postStop();
114
115             if (behavior != null) {
116                 behavior.close();
117             }
118         }
119
120         void expectBehaviorStateChange() {
121             behaviorStateChangeLatch = new CountDownLatch(1);
122         }
123
124         void waitForBehaviorStateChange() {
125             assertTrue("Expected behavior state change",
126                     Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
127         }
128
129         void expectMessageClass(final Class<?> expClass, final int expCount) {
130             messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
131         }
132
133         void waitForExpectedMessages(final Class<?> expClass) {
134             CountDownLatch latch = messagesReceivedLatches.get(expClass);
135             assertNotNull("No messages received for " + expClass, latch);
136             assertTrue("Missing messages of type " + expClass,
137                     Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
138         }
139
140         void dropMessagesToBehavior(final Class<?> msgClass) {
141             dropMessagesToBehavior(msgClass, 1);
142         }
143
144         void dropMessagesToBehavior(final Class<?> msgClass, final int expCount) {
145             expectMessageClass(msgClass, expCount);
146             dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
147         }
148
149         void clearDropMessagesToBehavior() {
150             dropMessagesToBehavior.clear();
151         }
152
153         public void clear() {
154             behaviorStateChangeLatch = null;
155             clearDropMessagesToBehavior();
156             messagesReceivedLatches.clear();
157             clearMessages(getSelf());
158         }
159
160         void forwardCapturedMessageToBehavior(final Class<?> msgClass, final ActorRef sender) {
161             Object message = getFirstMatching(getSelf(), msgClass);
162             assertNotNull("Message of type " + msgClass + " not received", message);
163             getSelf().tell(message, sender);
164         }
165
166         void forwardCapturedMessagesToBehavior(final Class<?> msgClass, final ActorRef sender) {
167             for (Object m: getAllMatching(getSelf(), msgClass)) {
168                 getSelf().tell(m, sender);
169             }
170         }
171
172         <T> T getCapturedMessage(final Class<T> msgClass) {
173             T message = getFirstMatching(getSelf(), msgClass);
174             assertNotNull("Message of type " + msgClass + " not received", message);
175             return message;
176         }
177     }
178
179     static final class SendImmediateHeartBeat implements ControlMessage {
180         static final SendImmediateHeartBeat INSTANCE = new SendImmediateHeartBeat();
181
182         private SendImmediateHeartBeat() {
183         }
184     }
185
186     static final class GetBehaviorState implements ControlMessage {
187         static final GetBehaviorState INSTANCE = new GetBehaviorState();
188
189         private GetBehaviorState() {
190         }
191     }
192
193     static class SetBehavior implements ControlMessage {
194         RaftActorBehavior behavior;
195         MockRaftActorContext context;
196
197         SetBehavior(final RaftActorBehavior behavior, final MockRaftActorContext context) {
198             this.behavior = behavior;
199             this.context = context;
200         }
201     }
202
203     protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
204     protected final ActorSystem system = ActorSystem.create("test");
205     protected final TestActorFactory factory = new TestActorFactory(system);
206     protected TestActorRef<MemberActor> member1ActorRef;
207     protected TestActorRef<MemberActor> member2ActorRef;
208     protected TestActorRef<MemberActor> member3ActorRef;
209     protected MemberActor member1Actor;
210     protected MemberActor member2Actor;
211     protected MemberActor member3Actor;
212     protected MockRaftActorContext member1Context;
213     protected MockRaftActorContext member2Context;
214     protected MockRaftActorContext member3Context;
215
216     @Before
217     public void setup() throws Exception {
218         member1ActorRef = newMemberActor("member1");
219         member2ActorRef = newMemberActor("member2");
220         member3ActorRef = newMemberActor("member3");
221
222         member1Actor = member1ActorRef.underlyingActor();
223         member2Actor = member2ActorRef.underlyingActor();
224         member3Actor = member3ActorRef.underlyingActor();
225     }
226
227     @After
228     public void tearDown() {
229         TestKit.shutdownActorSystem(system);
230     }
231
232     DefaultConfigParamsImpl newConfigParams() {
233         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
234         configParams.setHeartBeatInterval(new FiniteDuration(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS));
235         configParams.setElectionTimeoutFactor(100000);
236         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
237         return configParams;
238     }
239
240     MockRaftActorContext newRaftActorContext(final String id, final ActorRef actor,
241             final Map<String, String> peerAddresses) {
242         MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
243         context.setPeerAddresses(peerAddresses);
244         context.getTermInformation().updateAndPersist(1, "");
245         return context;
246     }
247
248     @SuppressWarnings("checkstyle:IllegalCatch")
249     void verifyBehaviorState(final String name, final MemberActor actor, final RaftState expState) {
250         RaftState actualState;
251         try {
252             actualState = (RaftState) Await.result(Patterns.ask(actor.self(), GetBehaviorState.INSTANCE,
253                 Timeout.apply(5, TimeUnit.SECONDS)), FiniteDuration.create(5, TimeUnit.SECONDS));
254         } catch (RuntimeException e) {
255             throw e;
256         } catch (Exception e) {
257             throw new RuntimeException(e);
258         }
259         assertEquals(name + " behavior state", expState, actualState);
260     }
261
262     void initializeLeaderBehavior(final MemberActor actor, final MockRaftActorContext context,
263             final int numActiveFollowers) {
264         // Leader sends immediate heartbeats - we don't care about it so ignore it.
265         // Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors
266         // haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as
267         // a workaround.
268
269         Leader leader = null;
270         AssertionError lastAssertError = null;
271         for (int i = 1; i <= 3; i++) {
272             actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
273
274             leader = new Leader(context);
275             try {
276                 actor.waitForExpectedMessages(AppendEntriesReply.class);
277                 lastAssertError = null;
278                 break;
279             } catch (AssertionError e) {
280                 lastAssertError = e;
281             }
282         }
283
284         if (lastAssertError != null) {
285             throw lastAssertError;
286         }
287
288         context.setCurrentBehavior(leader);
289
290         // Delay assignment of the leader behavior so the AppendEntriesReply isn't forwarded to the behavior.
291         actor.self().tell(new SetBehavior(leader, context), ActorRef.noSender());
292
293         actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
294         actor.clear();
295
296     }
297
298     TestActorRef<MemberActor> newMemberActor(final String name) throws TimeoutException, InterruptedException {
299         TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props()
300                 .withDispatcher(Dispatchers.DefaultDispatcherId()), name);
301         MessageCollectorActor.waitUntilReady(actor);
302         return actor;
303     }
304
305     void sendHeartbeat(final TestActorRef<MemberActor> leaderActor) {
306         Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
307         leaderActor.tell(SendImmediateHeartBeat.INSTANCE, ActorRef.noSender());
308     }
309 }