import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
-import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Before;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
}
- void expectMessageClass(Class<?> expClass, int expCount) {
+ void expectMessageClass(final Class<?> expClass, final int expCount) {
messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
}
- void waitForExpectedMessages(Class<?> expClass) {
+ void waitForExpectedMessages(final Class<?> expClass) {
CountDownLatch latch = messagesReceivedLatches.get(expClass);
assertNotNull("No messages received for " + expClass, latch);
assertTrue("Missing messages of type " + expClass,
Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
}
- void dropMessagesToBehavior(Class<?> msgClass) {
+ void dropMessagesToBehavior(final Class<?> msgClass) {
dropMessagesToBehavior(msgClass, 1);
}
- void dropMessagesToBehavior(Class<?> msgClass, int expCount) {
+ void dropMessagesToBehavior(final Class<?> msgClass, final int expCount) {
expectMessageClass(msgClass, expCount);
dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
}
dropMessagesToBehavior.clear();
}
- @Override
public void clear() {
behaviorStateChangeLatch = null;
clearDropMessagesToBehavior();
messagesReceivedLatches.clear();
- super.clear();
+ clearMessages(getSelf());
}
- void forwardCapturedMessageToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
+ void forwardCapturedMessageToBehavior(final Class<?> msgClass, final ActorRef sender) {
Object message = getFirstMatching(getSelf(), msgClass);
assertNotNull("Message of type " + msgClass + " not received", message);
getSelf().tell(message, sender);
}
- void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
+ void forwardCapturedMessagesToBehavior(final Class<?> msgClass, final ActorRef sender) {
for (Object m: getAllMatching(getSelf(), msgClass)) {
getSelf().tell(m, sender);
}
}
- <T> T getCapturedMessage(Class<T> msgClass) throws Exception {
+ <T> T getCapturedMessage(final Class<T> msgClass) {
T message = getFirstMatching(getSelf(), msgClass);
assertNotNull("Message of type " + msgClass + " not received", message);
return message;
}
}
- static class SendImmediateHeartBeat implements ControlMessage {
- public static final SendImmediateHeartBeat INSTANCE = new SendImmediateHeartBeat();
+ static final class SendImmediateHeartBeat implements ControlMessage {
+ static final SendImmediateHeartBeat INSTANCE = new SendImmediateHeartBeat();
private SendImmediateHeartBeat() {
}
}
- static class GetBehaviorState implements ControlMessage {
- public static final GetBehaviorState INSTANCE = new GetBehaviorState();
+ static final class GetBehaviorState implements ControlMessage {
+ static final GetBehaviorState INSTANCE = new GetBehaviorState();
private GetBehaviorState() {
}
RaftActorBehavior behavior;
MockRaftActorContext context;
- SetBehavior(RaftActorBehavior behavior, MockRaftActorContext context) {
+ SetBehavior(final RaftActorBehavior behavior, final MockRaftActorContext context) {
this.behavior = behavior;
this.context = context;
}
return configParams;
}
- MockRaftActorContext newRaftActorContext(String id, ActorRef actor,
- Map<String, String> peerAddresses) {
+ MockRaftActorContext newRaftActorContext(final String id, final ActorRef actor,
+ final Map<String, String> peerAddresses) {
MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
context.setPeerAddresses(peerAddresses);
context.getTermInformation().updateAndPersist(1, "");
}
@SuppressWarnings("checkstyle:IllegalCatch")
- void verifyBehaviorState(String name, MemberActor actor, RaftState expState) {
+ void verifyBehaviorState(final String name, final MemberActor actor, final RaftState expState) {
+ RaftState actualState;
try {
- RaftState actualState = (RaftState) Await.result(Patterns.ask(actor.self(), GetBehaviorState.INSTANCE,
- Timeout.apply(5, TimeUnit.SECONDS)), Duration.apply(5, TimeUnit.SECONDS));
- assertEquals(name + " behavior state", expState, actualState);
+ actualState = (RaftState) Await.result(Patterns.ask(actor.self(), GetBehaviorState.INSTANCE,
+ Timeout.apply(5, TimeUnit.SECONDS)), Duration.apply(5, TimeUnit.SECONDS));
+ } catch (RuntimeException e) {
+ throw e;
} catch (Exception e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
+ assertEquals(name + " behavior state", expState, actualState);
}
- void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers)
- throws Exception {
+ void initializeLeaderBehavior(final MemberActor actor, final MockRaftActorContext context,
+ final int numActiveFollowers) {
// Leader sends immediate heartbeats - we don't care about it so ignore it.
// Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors
// haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as
}
- TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
+ TestActorRef<MemberActor> newMemberActor(final String name) throws TimeoutException, InterruptedException {
TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), name);
MessageCollectorActor.waitUntilReady(actor);
return actor;
}
- void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
+ void sendHeartbeat(final TestActorRef<MemberActor> leaderActor) {
Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
leaderActor.tell(SendImmediateHeartBeat.INSTANCE, ActorRef.noSender());
}