2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Props;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestActorRef;
19 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.After;
25 import org.junit.Before;
26 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
27 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
28 import org.opendaylight.controller.cluster.raft.RaftState;
29 import org.opendaylight.controller.cluster.raft.TestActorFactory;
30 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
31 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
32 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import scala.concurrent.duration.FiniteDuration;
38 * Abstract base for a leader election scenario test.
40 * @author Thomas Pantelis
42 public class AbstractLeaderElectionScenarioTest {
43 static final int HEARTBEAT_INTERVAL = 50;
45 static class MemberActor extends MessageCollectorActor {
47 volatile RaftActorBehavior behavior;
48 Map<Class<?>, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>();
49 Map<Class<?>, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>();
50 CountDownLatch behaviorStateChangeLatch;
52 public static Props props() {
53 return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId());
57 public void onReceive(Object message) throws Exception {
58 // Ignore scheduled SendHeartBeat messages.
59 if(message instanceof SendHeartBeat) {
64 if(behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
65 final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
66 if (nextBehavior != null) {
67 RaftActorBehavior oldBehavior = behavior;
68 behavior = nextBehavior;
69 if(behavior != oldBehavior && behaviorStateChangeLatch != null) {
70 behaviorStateChangeLatch.countDown();
75 super.onReceive(message);
77 CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
84 void expectBehaviorStateChange() {
85 behaviorStateChangeLatch = new CountDownLatch(1);
88 void waitForBehaviorStateChange() {
89 assertTrue("Expected behavior state change",
90 Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
93 void expectMessageClass(Class<?> expClass, int expCount) {
94 messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
97 void waitForExpectedMessages(Class<?> expClass) {
98 CountDownLatch latch = messagesReceivedLatches.get(expClass);
99 assertNotNull("No messages received for " + expClass, latch);
100 assertTrue("Missing messages of type " + expClass,
101 Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
104 void dropMessagesToBehavior(Class<?> msgClass) {
105 dropMessagesToBehavior(msgClass, 1);
108 void dropMessagesToBehavior(Class<?> msgClass, int expCount) {
109 expectMessageClass(msgClass, expCount);
110 dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
113 void clearDropMessagesToBehavior() {
114 dropMessagesToBehavior.clear();
118 public void clear() {
119 behaviorStateChangeLatch = null;
120 clearDropMessagesToBehavior();
121 messagesReceivedLatches.clear();
125 void forwardCapturedMessageToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
126 Object message = getFirstMatching(getSelf(), msgClass);
127 assertNotNull("Message of type " + msgClass + " not received", message);
128 getSelf().tell(message, sender);
131 void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
132 for(Object m: getAllMatching(getSelf(), msgClass)) {
133 getSelf().tell(m, sender);
137 <T> T getCapturedMessage(Class<T> msgClass) throws Exception {
138 T message = getFirstMatching(getSelf(), msgClass);
139 assertNotNull("Message of type " + msgClass + " not received", message);
144 protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
145 protected final ActorSystem system = ActorSystem.create("test");
146 protected final TestActorFactory factory = new TestActorFactory(system);
147 protected TestActorRef<MemberActor> member1ActorRef;
148 protected TestActorRef<MemberActor> member2ActorRef;
149 protected TestActorRef<MemberActor> member3ActorRef;
150 protected MemberActor member1Actor;
151 protected MemberActor member2Actor;
152 protected MemberActor member3Actor;
153 protected MockRaftActorContext member1Context;
154 protected MockRaftActorContext member2Context;
155 protected MockRaftActorContext member3Context;
158 public void setup() throws Exception {
159 member1ActorRef = newMemberActor("member1");
160 member2ActorRef = newMemberActor("member2");
161 member3ActorRef = newMemberActor("member3");
163 member1Actor = member1ActorRef.underlyingActor();
164 member2Actor = member2ActorRef.underlyingActor();
165 member3Actor = member3ActorRef.underlyingActor();
169 public void tearDown() throws Exception {
171 if (member1Actor.behavior != null) {
172 member1Actor.behavior.close();
174 if (member2Actor.behavior != null) {
175 member2Actor.behavior.close();
177 if (member3Actor.behavior != null) {
178 member3Actor.behavior.close();
181 JavaTestKit.shutdownActorSystem(system);
184 DefaultConfigParamsImpl newConfigParams() {
185 DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
186 configParams.setHeartBeatInterval(new FiniteDuration(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS));
187 configParams.setElectionTimeoutFactor(100000);
188 configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
192 MockRaftActorContext newRaftActorContext(String id, ActorRef actor,
193 Map<String, String> peerAddresses) {
194 MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
195 context.setPeerAddresses(peerAddresses);
196 context.getTermInformation().updateAndPersist(1, "");
200 void verifyBehaviorState(String name, MemberActor actor, RaftState expState) {
201 assertEquals(name + " behavior state", expState, actor.behavior.state());
204 void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers) throws Exception {
205 // Leader sends immediate heartbeats - we don't care about it so ignore it.
206 // Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors
207 // haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as
210 Leader leader = null;
211 AssertionError lastAssertError = null;
212 for(int i = 1; i <= 3; i++) {
213 actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
215 leader = new Leader(context);
217 actor.waitForExpectedMessages(AppendEntriesReply.class);
218 lastAssertError = null;
220 } catch (AssertionError e) {
225 if(lastAssertError != null) {
226 throw lastAssertError;
229 context.setCurrentBehavior(leader);
231 // Delay assignment here so the AppendEntriesReply isn't forwarded to the behavior.
232 actor.behavior = leader;
234 actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
239 TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
240 TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props().
241 withDispatcher(Dispatchers.DefaultDispatcherId()), name);
242 MessageCollectorActor.waitUntilReady(actor);
246 void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
247 Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
248 leaderActor.underlyingActor().behavior.handleMessage(leaderActor, SendHeartBeat.INSTANCE);