2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.actor.Props;
17 import akka.actor.Status;
18 import akka.dispatch.Dispatchers;
19 import akka.pattern.Patterns;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Throwables;
24 import com.google.common.util.concurrent.Uninterruptibles;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
32 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
33 import org.opendaylight.controller.cluster.raft.RaftState;
34 import org.opendaylight.controller.cluster.raft.TestActorFactory;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
36 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
37 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import scala.concurrent.Await;
41 import scala.concurrent.duration.Duration;
42 import scala.concurrent.duration.FiniteDuration;
45 * Abstract base for a leader election scenario test.
47 * @author Thomas Pantelis
49 public class AbstractLeaderElectionScenarioTest {
50 static final int HEARTBEAT_INTERVAL = 50;
52 static class MemberActor extends MessageCollectorActor {
53 private volatile RaftActorBehavior behavior;
54 Map<Class<?>, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>();
55 Map<Class<?>, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>();
56 CountDownLatch behaviorStateChangeLatch;
58 public static Props props() {
59 return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId());
63 public void onReceive(Object message) throws Exception {
64 // Ignore scheduled SendHeartBeat messages.
65 if (message instanceof SendHeartBeat) {
69 if (message instanceof SetBehavior) {
70 behavior = ((SetBehavior)message).behavior;
71 ((SetBehavior)message).context.setCurrentBehavior(behavior);
75 if (message instanceof GetBehaviorState) {
76 if (behavior != null) {
77 getSender().tell(behavior.state(), self());
79 getSender().tell(new Status.Failure(new IllegalStateException(
80 "RaftActorBehavior is not set in MemberActor")), self());
84 if (message instanceof SendImmediateHeartBeat) {
85 message = SendHeartBeat.INSTANCE;
89 if (behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
90 final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
91 if (nextBehavior != null) {
92 RaftActorBehavior oldBehavior = behavior;
93 behavior = nextBehavior;
94 if (behavior != oldBehavior && behaviorStateChangeLatch != null) {
95 behaviorStateChangeLatch.countDown();
100 super.onReceive(message);
102 CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
110 public void postStop() throws Exception {
113 if (behavior != null) {
118 void expectBehaviorStateChange() {
119 behaviorStateChangeLatch = new CountDownLatch(1);
122 void waitForBehaviorStateChange() {
123 assertTrue("Expected behavior state change",
124 Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
127 void expectMessageClass(Class<?> expClass, int expCount) {
128 messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
131 void waitForExpectedMessages(Class<?> expClass) {
132 CountDownLatch latch = messagesReceivedLatches.get(expClass);
133 assertNotNull("No messages received for " + expClass, latch);
134 assertTrue("Missing messages of type " + expClass,
135 Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
138 void dropMessagesToBehavior(Class<?> msgClass) {
139 dropMessagesToBehavior(msgClass, 1);
142 void dropMessagesToBehavior(Class<?> msgClass, int expCount) {
143 expectMessageClass(msgClass, expCount);
144 dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
147 void clearDropMessagesToBehavior() {
148 dropMessagesToBehavior.clear();
152 public void clear() {
153 behaviorStateChangeLatch = null;
154 clearDropMessagesToBehavior();
155 messagesReceivedLatches.clear();
159 void forwardCapturedMessageToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
160 Object message = getFirstMatching(getSelf(), msgClass);
161 assertNotNull("Message of type " + msgClass + " not received", message);
162 getSelf().tell(message, sender);
165 void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
166 for (Object m: getAllMatching(getSelf(), msgClass)) {
167 getSelf().tell(m, sender);
171 <T> T getCapturedMessage(Class<T> msgClass) throws Exception {
172 T message = getFirstMatching(getSelf(), msgClass);
173 assertNotNull("Message of type " + msgClass + " not received", message);
178 static class SendImmediateHeartBeat {
179 public static final SendImmediateHeartBeat INSTANCE = new SendImmediateHeartBeat();
181 private SendImmediateHeartBeat() {
185 static class GetBehaviorState {
186 public static final GetBehaviorState INSTANCE = new GetBehaviorState();
188 private GetBehaviorState() {
192 static class SetBehavior {
193 RaftActorBehavior behavior;
194 MockRaftActorContext context;
196 SetBehavior(RaftActorBehavior behavior, MockRaftActorContext context) {
197 this.behavior = behavior;
198 this.context = context;
202 protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
203 protected final ActorSystem system = ActorSystem.create("test");
204 protected final TestActorFactory factory = new TestActorFactory(system);
205 protected TestActorRef<MemberActor> member1ActorRef;
206 protected TestActorRef<MemberActor> member2ActorRef;
207 protected TestActorRef<MemberActor> member3ActorRef;
208 protected MemberActor member1Actor;
209 protected MemberActor member2Actor;
210 protected MemberActor member3Actor;
211 protected MockRaftActorContext member1Context;
212 protected MockRaftActorContext member2Context;
213 protected MockRaftActorContext member3Context;
216 public void setup() throws Exception {
217 member1ActorRef = newMemberActor("member1");
218 member2ActorRef = newMemberActor("member2");
219 member3ActorRef = newMemberActor("member3");
221 member1Actor = member1ActorRef.underlyingActor();
222 member2Actor = member2ActorRef.underlyingActor();
223 member3Actor = member3ActorRef.underlyingActor();
227 public void tearDown() throws Exception {
228 JavaTestKit.shutdownActorSystem(system);
231 DefaultConfigParamsImpl newConfigParams() {
232 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
233 configParams.setHeartBeatInterval(new FiniteDuration(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS));
234 configParams.setElectionTimeoutFactor(100000);
235 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
239 MockRaftActorContext newRaftActorContext(String id, ActorRef actor,
240 Map<String, String> peerAddresses) {
241 MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
242 context.setPeerAddresses(peerAddresses);
243 context.getTermInformation().updateAndPersist(1, "");
247 @SuppressWarnings("checkstyle:IllegalCatch")
248 void verifyBehaviorState(String name, MemberActor actor, RaftState expState) {
250 RaftState actualState = (RaftState) Await.result(Patterns.ask(actor.self(), GetBehaviorState.INSTANCE,
251 Timeout.apply(5, TimeUnit.SECONDS)), Duration.apply(5, TimeUnit.SECONDS));
252 assertEquals(name + " behavior state", expState, actualState);
253 } catch (Exception e) {
254 Throwables.propagate(e);
258 void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers)
260 // Leader sends immediate heartbeats - we don't care about it so ignore it.
261 // Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors
262 // haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as
265 Leader leader = null;
266 AssertionError lastAssertError = null;
267 for (int i = 1; i <= 3; i++) {
268 actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
270 leader = new Leader(context);
272 actor.waitForExpectedMessages(AppendEntriesReply.class);
273 lastAssertError = null;
275 } catch (AssertionError e) {
280 if (lastAssertError != null) {
281 throw lastAssertError;
284 context.setCurrentBehavior(leader);
286 // Delay assignment of the leader behavior so the AppendEntriesReply isn't forwarded to the behavior.
287 actor.self().tell(new SetBehavior(leader, context), ActorRef.noSender());
289 actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
294 TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
295 TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props()
296 .withDispatcher(Dispatchers.DefaultDispatcherId()), name);
297 MessageCollectorActor.waitUntilReady(actor);
301 void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
302 Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
303 leaderActor.tell(SendImmediateHeartBeat.INSTANCE, ActorRef.noSender());