2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.actor.Props;
17 import akka.actor.Status;
18 import akka.dispatch.ControlMessage;
19 import akka.dispatch.Dispatchers;
20 import akka.dispatch.Mailboxes;
21 import akka.pattern.Patterns;
22 import akka.testkit.TestActorRef;
23 import akka.testkit.javadsl.TestKit;
24 import akka.util.Timeout;
25 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.TimeoutException;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
34 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
35 import org.opendaylight.controller.cluster.raft.RaftState;
36 import org.opendaylight.controller.cluster.raft.TestActorFactory;
37 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
38 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
39 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.concurrent.Await;
43 import scala.concurrent.duration.FiniteDuration;
46 * Abstract base for a leader election scenario test.
48 * @author Thomas Pantelis
50 public class AbstractLeaderElectionScenarioTest {
51 static final int HEARTBEAT_INTERVAL = 50;
53 static class MemberActor extends MessageCollectorActor {
54 private volatile RaftActorBehavior behavior;
55 Map<Class<?>, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>();
56 Map<Class<?>, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>();
57 CountDownLatch behaviorStateChangeLatch;
59 public static Props props() {
60 return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId())
61 .withMailbox(Mailboxes.DefaultMailboxId());
65 public void onReceive(Object message) throws Exception {
66 // Ignore scheduled SendHeartBeat messages.
67 if (message instanceof SendHeartBeat) {
71 if (message instanceof SetBehavior) {
72 behavior = ((SetBehavior)message).behavior;
73 ((SetBehavior)message).context.setCurrentBehavior(behavior);
77 if (message instanceof GetBehaviorState) {
78 if (behavior != null) {
79 getSender().tell(behavior.state(), self());
81 getSender().tell(new Status.Failure(new IllegalStateException(
82 "RaftActorBehavior is not set in MemberActor")), self());
86 if (message instanceof SendImmediateHeartBeat) {
87 message = SendHeartBeat.INSTANCE;
91 if (behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
92 final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
93 if (nextBehavior != null) {
94 RaftActorBehavior oldBehavior = behavior;
95 behavior = nextBehavior;
96 if (behavior != oldBehavior && behaviorStateChangeLatch != null) {
97 behaviorStateChangeLatch.countDown();
102 super.onReceive(message);
104 CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
112 public void postStop() throws Exception {
115 if (behavior != null) {
120 void expectBehaviorStateChange() {
121 behaviorStateChangeLatch = new CountDownLatch(1);
124 void waitForBehaviorStateChange() {
125 assertTrue("Expected behavior state change",
126 Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
129 void expectMessageClass(final Class<?> expClass, final int expCount) {
130 messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
133 void waitForExpectedMessages(final Class<?> expClass) {
134 CountDownLatch latch = messagesReceivedLatches.get(expClass);
135 assertNotNull("No messages received for " + expClass, latch);
136 assertTrue("Missing messages of type " + expClass,
137 Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
140 void dropMessagesToBehavior(final Class<?> msgClass) {
141 dropMessagesToBehavior(msgClass, 1);
144 void dropMessagesToBehavior(final Class<?> msgClass, final int expCount) {
145 expectMessageClass(msgClass, expCount);
146 dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
149 void clearDropMessagesToBehavior() {
150 dropMessagesToBehavior.clear();
153 public void clear() {
154 behaviorStateChangeLatch = null;
155 clearDropMessagesToBehavior();
156 messagesReceivedLatches.clear();
157 clearMessages(getSelf());
160 void forwardCapturedMessageToBehavior(final Class<?> msgClass, final ActorRef sender) {
161 Object message = getFirstMatching(getSelf(), msgClass);
162 assertNotNull("Message of type " + msgClass + " not received", message);
163 getSelf().tell(message, sender);
166 void forwardCapturedMessagesToBehavior(final Class<?> msgClass, final ActorRef sender) {
167 for (Object m: getAllMatching(getSelf(), msgClass)) {
168 getSelf().tell(m, sender);
172 <T> T getCapturedMessage(final Class<T> msgClass) {
173 T message = getFirstMatching(getSelf(), msgClass);
174 assertNotNull("Message of type " + msgClass + " not received", message);
179 static final class SendImmediateHeartBeat implements ControlMessage {
180 static final SendImmediateHeartBeat INSTANCE = new SendImmediateHeartBeat();
182 private SendImmediateHeartBeat() {
186 static final class GetBehaviorState implements ControlMessage {
187 static final GetBehaviorState INSTANCE = new GetBehaviorState();
189 private GetBehaviorState() {
193 static class SetBehavior implements ControlMessage {
194 RaftActorBehavior behavior;
195 MockRaftActorContext context;
197 SetBehavior(final RaftActorBehavior behavior, final MockRaftActorContext context) {
198 this.behavior = behavior;
199 this.context = context;
203 protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
204 protected final ActorSystem system = ActorSystem.create("test");
205 protected final TestActorFactory factory = new TestActorFactory(system);
206 protected TestActorRef<MemberActor> member1ActorRef;
207 protected TestActorRef<MemberActor> member2ActorRef;
208 protected TestActorRef<MemberActor> member3ActorRef;
209 protected MemberActor member1Actor;
210 protected MemberActor member2Actor;
211 protected MemberActor member3Actor;
212 protected MockRaftActorContext member1Context;
213 protected MockRaftActorContext member2Context;
214 protected MockRaftActorContext member3Context;
217 public void setup() throws Exception {
218 member1ActorRef = newMemberActor("member1");
219 member2ActorRef = newMemberActor("member2");
220 member3ActorRef = newMemberActor("member3");
222 member1Actor = member1ActorRef.underlyingActor();
223 member2Actor = member2ActorRef.underlyingActor();
224 member3Actor = member3ActorRef.underlyingActor();
228 public void tearDown() {
229 TestKit.shutdownActorSystem(system);
232 DefaultConfigParamsImpl newConfigParams() {
233 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
234 configParams.setHeartBeatInterval(new FiniteDuration(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS));
235 configParams.setElectionTimeoutFactor(100000);
236 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
240 MockRaftActorContext newRaftActorContext(final String id, final ActorRef actor,
241 final Map<String, String> peerAddresses) {
242 MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
243 context.setPeerAddresses(peerAddresses);
244 context.getTermInformation().updateAndPersist(1, "");
248 @SuppressWarnings("checkstyle:IllegalCatch")
249 void verifyBehaviorState(final String name, final MemberActor actor, final RaftState expState) {
250 RaftState actualState;
252 actualState = (RaftState) Await.result(Patterns.ask(actor.self(), GetBehaviorState.INSTANCE,
253 Timeout.apply(5, TimeUnit.SECONDS)), FiniteDuration.create(5, TimeUnit.SECONDS));
254 } catch (RuntimeException e) {
256 } catch (Exception e) {
257 throw new RuntimeException(e);
259 assertEquals(name + " behavior state", expState, actualState);
262 void initializeLeaderBehavior(final MemberActor actor, final MockRaftActorContext context,
263 final int numActiveFollowers) {
264 // Leader sends immediate heartbeats - we don't care about it so ignore it.
265 // Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors
266 // haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as
269 Leader leader = null;
270 AssertionError lastAssertError = null;
271 for (int i = 1; i <= 3; i++) {
272 actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
274 leader = new Leader(context);
276 actor.waitForExpectedMessages(AppendEntriesReply.class);
277 lastAssertError = null;
279 } catch (AssertionError e) {
284 if (lastAssertError != null) {
285 throw lastAssertError;
288 context.setCurrentBehavior(leader);
290 // Delay assignment of the leader behavior so the AppendEntriesReply isn't forwarded to the behavior.
291 actor.self().tell(new SetBehavior(leader, context), ActorRef.noSender());
293 actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
298 TestActorRef<MemberActor> newMemberActor(final String name) throws TimeoutException, InterruptedException {
299 TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props()
300 .withDispatcher(Dispatchers.DefaultDispatcherId()), name);
301 MessageCollectorActor.waitUntilReady(actor);
305 void sendHeartbeat(final TestActorRef<MemberActor> leaderActor) {
306 Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
307 leaderActor.tell(SendImmediateHeartBeat.INSTANCE, ActorRef.noSender());