Fix warnings in sal-akka-raft test classes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeaderElectionScenarioTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.testkit.JavaTestKit;
19 import akka.testkit.TestActorRef;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.TimeUnit;
25 import org.junit.After;
26 import org.junit.Before;
27 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
28 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
29 import org.opendaylight.controller.cluster.raft.RaftState;
30 import org.opendaylight.controller.cluster.raft.TestActorFactory;
31 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
32 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
33 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import scala.concurrent.duration.FiniteDuration;
37
38 /**
39  * Abstract base for a leader election scenario test.
40  *
41  * @author Thomas Pantelis
42  */
43 public class AbstractLeaderElectionScenarioTest {
44     static final int HEARTBEAT_INTERVAL = 50;
45
46     static class MemberActor extends MessageCollectorActor {
47
48         volatile RaftActorBehavior behavior;
49         Map<Class<?>, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>();
50         Map<Class<?>, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>();
51         CountDownLatch behaviorStateChangeLatch;
52
53         public static Props props() {
54             return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId());
55         }
56
57         @Override
58         public void onReceive(Object message) throws Exception {
59             // Ignore scheduled SendHeartBeat messages.
60             if (message instanceof SendHeartBeat) {
61                 return;
62             }
63
64             try {
65                 if (behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
66                     final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
67                     if (nextBehavior != null) {
68                         RaftActorBehavior oldBehavior = behavior;
69                         behavior = nextBehavior;
70                         if (behavior != oldBehavior && behaviorStateChangeLatch != null) {
71                             behaviorStateChangeLatch.countDown();
72                         }
73                     }
74                 }
75             } finally {
76                 super.onReceive(message);
77
78                 CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
79                 if (latch != null) {
80                     latch.countDown();
81                 }
82             }
83         }
84
85         void expectBehaviorStateChange() {
86             behaviorStateChangeLatch = new CountDownLatch(1);
87         }
88
89         void waitForBehaviorStateChange() {
90             assertTrue("Expected behavior state change",
91                     Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
92         }
93
94         void expectMessageClass(Class<?> expClass, int expCount) {
95             messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
96         }
97
98         void waitForExpectedMessages(Class<?> expClass) {
99             CountDownLatch latch = messagesReceivedLatches.get(expClass);
100             assertNotNull("No messages received for " + expClass, latch);
101             assertTrue("Missing messages of type " + expClass,
102                     Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
103         }
104
105         void dropMessagesToBehavior(Class<?> msgClass) {
106             dropMessagesToBehavior(msgClass, 1);
107         }
108
109         void dropMessagesToBehavior(Class<?> msgClass, int expCount) {
110             expectMessageClass(msgClass, expCount);
111             dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
112         }
113
114         void clearDropMessagesToBehavior() {
115             dropMessagesToBehavior.clear();
116         }
117
118         @Override
119         public void clear() {
120             behaviorStateChangeLatch = null;
121             clearDropMessagesToBehavior();
122             messagesReceivedLatches.clear();
123             super.clear();
124         }
125
126         void forwardCapturedMessageToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
127             Object message = getFirstMatching(getSelf(), msgClass);
128             assertNotNull("Message of type " + msgClass + " not received", message);
129             getSelf().tell(message, sender);
130         }
131
132         void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
133             for (Object m: getAllMatching(getSelf(), msgClass)) {
134                 getSelf().tell(m, sender);
135             }
136         }
137
138         <T> T getCapturedMessage(Class<T> msgClass) throws Exception {
139             T message = getFirstMatching(getSelf(), msgClass);
140             assertNotNull("Message of type " + msgClass + " not received", message);
141             return message;
142         }
143     }
144
145     protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
146     protected final ActorSystem system = ActorSystem.create("test");
147     protected final TestActorFactory factory = new TestActorFactory(system);
148     protected TestActorRef<MemberActor> member1ActorRef;
149     protected TestActorRef<MemberActor> member2ActorRef;
150     protected TestActorRef<MemberActor> member3ActorRef;
151     protected MemberActor member1Actor;
152     protected MemberActor member2Actor;
153     protected MemberActor member3Actor;
154     protected MockRaftActorContext member1Context;
155     protected MockRaftActorContext member2Context;
156     protected MockRaftActorContext member3Context;
157
158     @Before
159     public void setup() throws Exception {
160         member1ActorRef = newMemberActor("member1");
161         member2ActorRef = newMemberActor("member2");
162         member3ActorRef = newMemberActor("member3");
163
164         member1Actor = member1ActorRef.underlyingActor();
165         member2Actor = member2ActorRef.underlyingActor();
166         member3Actor = member3ActorRef.underlyingActor();
167     }
168
169     @After
170     public void tearDown() throws Exception {
171
172         if (member1Actor.behavior != null) {
173             member1Actor.behavior.close();
174         }
175         if (member2Actor.behavior != null) {
176             member2Actor.behavior.close();
177         }
178         if (member3Actor.behavior != null) {
179             member3Actor.behavior.close();
180         }
181
182         JavaTestKit.shutdownActorSystem(system);
183     }
184
185     DefaultConfigParamsImpl newConfigParams() {
186         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
187         configParams.setHeartBeatInterval(new FiniteDuration(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS));
188         configParams.setElectionTimeoutFactor(100000);
189         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
190         return configParams;
191     }
192
193     MockRaftActorContext newRaftActorContext(String id, ActorRef actor,
194             Map<String, String> peerAddresses) {
195         MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
196         context.setPeerAddresses(peerAddresses);
197         context.getTermInformation().updateAndPersist(1, "");
198         return context;
199     }
200
201     void verifyBehaviorState(String name, MemberActor actor, RaftState expState) {
202         assertEquals(name + " behavior state", expState, actor.behavior.state());
203     }
204
205     void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers)
206             throws Exception {
207         // Leader sends immediate heartbeats - we don't care about it so ignore it.
208         // Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors
209         // haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as
210         // a workaround.
211
212         Leader leader = null;
213         AssertionError lastAssertError = null;
214         for (int i = 1; i <= 3; i++) {
215             actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
216
217             leader = new Leader(context);
218             try {
219                 actor.waitForExpectedMessages(AppendEntriesReply.class);
220                 lastAssertError = null;
221                 break;
222             } catch (AssertionError e) {
223                 lastAssertError = e;
224             }
225         }
226
227         if (lastAssertError != null) {
228             throw lastAssertError;
229         }
230
231         context.setCurrentBehavior(leader);
232
233         // Delay assignment here so the AppendEntriesReply isn't forwarded to the behavior.
234         actor.behavior = leader;
235
236         actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
237         actor.clear();
238
239     }
240
241     TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
242         TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props()
243                 .withDispatcher(Dispatchers.DefaultDispatcherId()), name);
244         MessageCollectorActor.waitUntilReady(actor);
245         return actor;
246     }
247
248     void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
249         Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
250         leaderActor.underlyingActor().behavior.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
251     }
252 }