import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import akka.actor.Status;
+import akka.dispatch.ControlMessage;
import akka.dispatch.Dispatchers;
+import akka.dispatch.Mailboxes;
+import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import akka.util.Timeout;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Before;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
/**
static final int HEARTBEAT_INTERVAL = 50;
static class MemberActor extends MessageCollectorActor {
-
- volatile RaftActorBehavior behavior;
+ private volatile RaftActorBehavior behavior;
Map<Class<?>, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>();
Map<Class<?>, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>();
CountDownLatch behaviorStateChangeLatch;
public static Props props() {
- return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId());
+ return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId())
+ .withMailbox(Mailboxes.DefaultMailboxId());
}
@Override
public void onReceive(Object message) throws Exception {
// Ignore scheduled SendHeartBeat messages.
- if(message instanceof SendHeartBeat) {
+ if (message instanceof SendHeartBeat) {
+ return;
+ }
+
+ if (message instanceof SetBehavior) {
+ behavior = ((SetBehavior)message).behavior;
+ ((SetBehavior)message).context.setCurrentBehavior(behavior);
return;
}
+ if (message instanceof GetBehaviorState) {
+ if (behavior != null) {
+ getSender().tell(behavior.state(), self());
+ } else {
+ getSender().tell(new Status.Failure(new IllegalStateException(
+ "RaftActorBehavior is not set in MemberActor")), self());
+ }
+ }
+
+ if (message instanceof SendImmediateHeartBeat) {
+ message = SendHeartBeat.INSTANCE;
+ }
+
try {
- if(behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
- RaftActorBehavior oldBehavior = behavior;
- behavior = behavior.handleMessage(getSender(), message);
- if(behavior != oldBehavior && behaviorStateChangeLatch != null) {
- behaviorStateChangeLatch.countDown();
+ if (behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
+ final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
+ if (nextBehavior != null) {
+ RaftActorBehavior oldBehavior = behavior;
+ behavior = nextBehavior;
+ if (behavior != oldBehavior && behaviorStateChangeLatch != null) {
+ behaviorStateChangeLatch.countDown();
+ }
}
}
} finally {
super.onReceive(message);
CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
- if(latch != null) {
+ if (latch != null) {
latch.countDown();
}
}
}
+ @Override
+ public void postStop() throws Exception {
+ super.postStop();
+
+ if (behavior != null) {
+ behavior.close();
+ }
+ }
+
void expectBehaviorStateChange() {
behaviorStateChangeLatch = new CountDownLatch(1);
}
Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
}
- void expectMessageClass(Class<?> expClass, int expCount) {
+ void expectMessageClass(final Class<?> expClass, final int expCount) {
messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
}
- void waitForExpectedMessages(Class<?> expClass) {
+ void waitForExpectedMessages(final Class<?> expClass) {
CountDownLatch latch = messagesReceivedLatches.get(expClass);
assertNotNull("No messages received for " + expClass, latch);
assertTrue("Missing messages of type " + expClass,
Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
}
- void dropMessagesToBehavior(Class<?> msgClass) {
+ void dropMessagesToBehavior(final Class<?> msgClass) {
dropMessagesToBehavior(msgClass, 1);
}
- void dropMessagesToBehavior(Class<?> msgClass, int expCount) {
+ void dropMessagesToBehavior(final Class<?> msgClass, final int expCount) {
expectMessageClass(msgClass, expCount);
dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
}
dropMessagesToBehavior.clear();
}
- @Override
public void clear() {
behaviorStateChangeLatch = null;
clearDropMessagesToBehavior();
messagesReceivedLatches.clear();
- super.clear();
+ clearMessages(getSelf());
}
- void forwardCapturedMessageToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
+ void forwardCapturedMessageToBehavior(final Class<?> msgClass, final ActorRef sender) {
Object message = getFirstMatching(getSelf(), msgClass);
assertNotNull("Message of type " + msgClass + " not received", message);
getSelf().tell(message, sender);
}
- void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
- for(Object m: getAllMatching(getSelf(), msgClass)) {
+ void forwardCapturedMessagesToBehavior(final Class<?> msgClass, final ActorRef sender) {
+ for (Object m: getAllMatching(getSelf(), msgClass)) {
getSelf().tell(m, sender);
}
}
- <T> T getCapturedMessage(Class<T> msgClass) throws Exception {
+ <T> T getCapturedMessage(final Class<T> msgClass) {
T message = getFirstMatching(getSelf(), msgClass);
assertNotNull("Message of type " + msgClass + " not received", message);
return message;
}
}
+ static final class SendImmediateHeartBeat implements ControlMessage {
+ static final SendImmediateHeartBeat INSTANCE = new SendImmediateHeartBeat();
+
+ private SendImmediateHeartBeat() {
+ }
+ }
+
+ static final class GetBehaviorState implements ControlMessage {
+ static final GetBehaviorState INSTANCE = new GetBehaviorState();
+
+ private GetBehaviorState() {
+ }
+ }
+
+ static class SetBehavior implements ControlMessage {
+ RaftActorBehavior behavior;
+ MockRaftActorContext context;
+
+ SetBehavior(final RaftActorBehavior behavior, final MockRaftActorContext context) {
+ this.behavior = behavior;
+ this.context = context;
+ }
+ }
+
protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
protected final ActorSystem system = ActorSystem.create("test");
+ protected final TestActorFactory factory = new TestActorFactory(system);
protected TestActorRef<MemberActor> member1ActorRef;
protected TestActorRef<MemberActor> member2ActorRef;
protected TestActorRef<MemberActor> member3ActorRef;
@After
public void tearDown() throws Exception {
-
- if (member1Actor.behavior != null) {
- member1Actor.behavior.close();
- }
- if (member2Actor.behavior != null) {
- member2Actor.behavior.close();
- }
- if (member3Actor.behavior != null) {
- member3Actor.behavior.close();
- }
-
JavaTestKit.shutdownActorSystem(system);
}
return configParams;
}
- MockRaftActorContext newRaftActorContext(String id, ActorRef actor,
- Map<String, String> peerAddresses) {
+ MockRaftActorContext newRaftActorContext(final String id, final ActorRef actor,
+ final Map<String, String> peerAddresses) {
MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
context.setPeerAddresses(peerAddresses);
context.getTermInformation().updateAndPersist(1, "");
return context;
}
- void verifyBehaviorState(String name, MemberActor actor, RaftState expState) {
- assertEquals(name + " behavior state", expState, actor.behavior.state());
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ void verifyBehaviorState(final String name, final MemberActor actor, final RaftState expState) {
+ RaftState actualState;
+ try {
+ actualState = (RaftState) Await.result(Patterns.ask(actor.self(), GetBehaviorState.INSTANCE,
+ Timeout.apply(5, TimeUnit.SECONDS)), Duration.apply(5, TimeUnit.SECONDS));
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ assertEquals(name + " behavior state", expState, actualState);
}
- void initializeLeaderBehavior(MemberActor actor, RaftActorContext context, int numActiveFollowers) throws Exception {
+ void initializeLeaderBehavior(final MemberActor actor, final MockRaftActorContext context,
+ final int numActiveFollowers) {
// Leader sends immediate heartbeats - we don't care about it so ignore it.
+ // Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors
+ // haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as
+ // a workaround.
+
+ Leader leader = null;
+ AssertionError lastAssertError = null;
+ for (int i = 1; i <= 3; i++) {
+ actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
+
+ leader = new Leader(context);
+ try {
+ actor.waitForExpectedMessages(AppendEntriesReply.class);
+ lastAssertError = null;
+ break;
+ } catch (AssertionError e) {
+ lastAssertError = e;
+ }
+ }
+
+ if (lastAssertError != null) {
+ throw lastAssertError;
+ }
- actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
+ context.setCurrentBehavior(leader);
- @SuppressWarnings("resource")
- Leader leader = new Leader(context);
- actor.waitForExpectedMessages(AppendEntriesReply.class);
- // Delay assignment here so the AppendEntriesReply isn't forwarded to the behavior.
- actor.behavior = leader;
+ // Delay assignment of the leader behavior so the AppendEntriesReply isn't forwarded to the behavior.
+ actor.self().tell(new SetBehavior(leader, context), ActorRef.noSender());
actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
actor.clear();
}
- TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
- TestActorRef<MemberActor> actor = TestActorRef.create(system, MemberActor.props(), name);
+ TestActorRef<MemberActor> newMemberActor(final String name) throws TimeoutException, InterruptedException {
+ TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), name);
MessageCollectorActor.waitUntilReady(actor);
return actor;
}
- void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
+ void sendHeartbeat(final TestActorRef<MemberActor> leaderActor) {
Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
- leaderActor.underlyingActor().behavior.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+ leaderActor.tell(SendImmediateHeartBeat.INSTANCE, ActorRef.noSender());
}
}