import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import akka.actor.Status;
import akka.dispatch.Dispatchers;
+import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
/**
static final int HEARTBEAT_INTERVAL = 50;
static class MemberActor extends MessageCollectorActor {
-
- volatile RaftActorBehavior behavior;
+ private volatile RaftActorBehavior behavior;
Map<Class<?>, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>();
Map<Class<?>, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>();
CountDownLatch behaviorStateChangeLatch;
return;
}
+ if (message instanceof SetBehavior) {
+ behavior = ((SetBehavior)message).behavior;
+ ((SetBehavior)message).context.setCurrentBehavior(behavior);
+ return;
+ }
+
+ if (message instanceof GetBehaviorState) {
+ if (behavior != null) {
+ getSender().tell(behavior.state(), self());
+ } else {
+ getSender().tell(new Status.Failure(new IllegalStateException(
+ "RaftActorBehavior is not set in MemberActor")), self());
+ }
+ }
+
+ if (message instanceof SendImmediateHeartBeat) {
+ message = SendHeartBeat.INSTANCE;
+ }
+
try {
if (behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
}
}
+ @Override
+ public void postStop() throws Exception {
+ super.postStop();
+
+ if (behavior != null) {
+ behavior.close();
+ }
+ }
+
void expectBehaviorStateChange() {
behaviorStateChangeLatch = new CountDownLatch(1);
}
}
}
+ static class SendImmediateHeartBeat {
+ public static final SendImmediateHeartBeat INSTANCE = new SendImmediateHeartBeat();
+
+ private SendImmediateHeartBeat() {
+ }
+ }
+
+ static class GetBehaviorState {
+ public static final GetBehaviorState INSTANCE = new GetBehaviorState();
+
+ private GetBehaviorState() {
+ }
+ }
+
+ static class SetBehavior {
+ RaftActorBehavior behavior;
+ MockRaftActorContext context;
+
+ SetBehavior(RaftActorBehavior behavior, MockRaftActorContext context) {
+ this.behavior = behavior;
+ this.context = context;
+ }
+ }
+
protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class);
protected final ActorSystem system = ActorSystem.create("test");
protected final TestActorFactory factory = new TestActorFactory(system);
@After
public void tearDown() throws Exception {
-
- if (member1Actor.behavior != null) {
- member1Actor.behavior.close();
- }
- if (member2Actor.behavior != null) {
- member2Actor.behavior.close();
- }
- if (member3Actor.behavior != null) {
- member3Actor.behavior.close();
- }
-
JavaTestKit.shutdownActorSystem(system);
}
return context;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
void verifyBehaviorState(String name, MemberActor actor, RaftState expState) {
- assertEquals(name + " behavior state", expState, actor.behavior.state());
+ try {
+ RaftState actualState = (RaftState) Await.result(Patterns.ask(actor.self(), GetBehaviorState.INSTANCE,
+ Timeout.apply(5, TimeUnit.SECONDS)), Duration.apply(5, TimeUnit.SECONDS));
+ assertEquals(name + " behavior state", expState, actualState);
+ } catch (Exception e) {
+ Throwables.propagate(e);
+ }
}
void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers)
context.setCurrentBehavior(leader);
- // Delay assignment here so the AppendEntriesReply isn't forwarded to the behavior.
- actor.behavior = leader;
+ // Delay assignment of the leader behavior so the AppendEntriesReply isn't forwarded to the behavior.
+ actor.self().tell(new SetBehavior(leader, context), ActorRef.noSender());
actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender());
actor.clear();
void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
- leaderActor.underlyingActor().behavior.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+ leaderActor.tell(SendImmediateHeartBeat.INSTANCE, ActorRef.noSender());
}
}