import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import org.junit.Before;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@Override
public void onReceive(Object message) throws Exception {
// Ignore scheduled SendHeartBeat messages.
- if(message instanceof SendHeartBeat) {
+ if (message instanceof SendHeartBeat) {
return;
}
try {
- if(behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
- RaftActorBehavior oldBehavior = behavior;
- behavior = behavior.handleMessage(getSender(), message);
- if(behavior != oldBehavior && behaviorStateChangeLatch != null) {
- behaviorStateChangeLatch.countDown();
+ if (behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
+ final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
+ if (nextBehavior != null) {
+ RaftActorBehavior oldBehavior = behavior;
+ behavior = nextBehavior;
+ if (behavior != oldBehavior && behaviorStateChangeLatch != null) {
+ behaviorStateChangeLatch.countDown();
+ }
}
}
} finally {
super.onReceive(message);
CountDownLatch latch = messagesReceivedLatches.get(message.getClass());
- if(latch != null) {
+ if (latch != null) {
latch.countDown();
}
}
}
void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
- for(Object m: getAllMatching(getSelf(), msgClass)) {
+ for (Object m: getAllMatching(getSelf(), msgClass)) {
getSelf().tell(m, sender);
}
}
<T> T getCapturedMessage(Class<T> msgClass) throws Exception {
- Object message = getFirstMatching(getSelf(), msgClass);
+ T message = getFirstMatching(getSelf(), msgClass);
assertNotNull("Message of type " + msgClass + " not received", message);
- return (T) message;
+ return message;
}
}
protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
protected final ActorSystem system = ActorSystem.create("test");
+ protected final TestActorFactory factory = new TestActorFactory(system);
protected TestActorRef<MemberActor> member1ActorRef;
protected TestActorRef<MemberActor> member2ActorRef;
protected TestActorRef<MemberActor> member3ActorRef;
}
@After
- public void tearDown() {
+ public void tearDown() throws Exception {
+
+ if (member1Actor.behavior != null) {
+ member1Actor.behavior.close();
+ }
+ if (member2Actor.behavior != null) {
+ member2Actor.behavior.close();
+ }
+ if (member3Actor.behavior != null) {
+ member3Actor.behavior.close();
+ }
+
JavaTestKit.shutdownActorSystem(system);
}
assertEquals(name + " behavior state", expState, actor.behavior.state());
}
- void initializeLeaderBehavior(MemberActor actor, RaftActorContext context,
- int numActiveFollowers) throws Exception {
+ void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers)
+ throws Exception {
// Leader sends immediate heartbeats - we don't care about it so ignore it.
+ // Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors
+ // haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as
+ // a workaround.
+
+ Leader leader = null;
+ AssertionError lastAssertError = null;
+ for (int i = 1; i <= 3; i++) {
+ actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
- actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers);
- Leader leader = new Leader(context);
- actor.waitForExpectedMessages(AppendEntriesReply.class);
+ leader = new Leader(context);
+ try {
+ actor.waitForExpectedMessages(AppendEntriesReply.class);
+ lastAssertError = null;
+ break;
+ } catch (AssertionError e) {
+ lastAssertError = e;
+ }
+ }
+
+ if (lastAssertError != null) {
+ throw lastAssertError;
+ }
+
+ context.setCurrentBehavior(leader);
+
+ // Delay assignment here so the AppendEntriesReply isn't forwarded to the behavior.
actor.behavior = leader;
actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
actor.clear();
+
}
TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
- TestActorRef<MemberActor> actor = TestActorRef.create(system, MemberActor.props(), name);
+ TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()), name);
MessageCollectorActor.waitUntilReady(actor);
return actor;
}
void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
- leaderActor.underlyingActor().behavior.handleMessage(leaderActor, new SendHeartBeat());
+ leaderActor.underlyingActor().behavior.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
}
}