X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeaderElectionScenarioTest.java;h=8f0cc998030b66b2d19157da6df197812f240a3f;hp=36f5fd502e3c9cc3d0559dc2c22ab1fd04b0feea;hb=35235f427f3a056f85fe83ddd1133e67540328f7;hpb=f276ae33b951d173b51c467bb7bb1a5f5cf9a1e6 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderElectionScenarioTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderElectionScenarioTest.java index 36f5fd502e..8f0cc99803 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderElectionScenarioTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderElectionScenarioTest.java @@ -10,17 +10,24 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; + import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.actor.Status; +import akka.dispatch.ControlMessage; import akka.dispatch.Dispatchers; -import akka.testkit.JavaTestKit; +import akka.dispatch.Mailboxes; +import akka.pattern.Patterns; import akka.testkit.TestActorRef; +import akka.testkit.javadsl.TestKit; +import akka.util.Timeout; import com.google.common.util.concurrent.Uninterruptibles; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.After; import org.junit.Before; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; @@ -32,6 +39,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Await; import scala.concurrent.duration.FiniteDuration; /** @@ -43,30 +51,49 @@ public class AbstractLeaderElectionScenarioTest { static final int HEARTBEAT_INTERVAL = 50; static class MemberActor extends MessageCollectorActor { - - volatile RaftActorBehavior behavior; + private volatile RaftActorBehavior behavior; Map, CountDownLatch> messagesReceivedLatches = new ConcurrentHashMap<>(); Map, Boolean> dropMessagesToBehavior = new ConcurrentHashMap<>(); CountDownLatch behaviorStateChangeLatch; public static Props props() { - return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()); + return Props.create(MemberActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()) + .withMailbox(Mailboxes.DefaultMailboxId()); } @Override public void onReceive(Object message) throws Exception { // Ignore scheduled SendHeartBeat messages. - if(message instanceof SendHeartBeat) { + if (message instanceof SendHeartBeat) { + return; + } + + if (message instanceof SetBehavior) { + behavior = ((SetBehavior)message).behavior; + ((SetBehavior)message).context.setCurrentBehavior(behavior); return; } + if (message instanceof GetBehaviorState) { + if (behavior != null) { + getSender().tell(behavior.state(), self()); + } else { + getSender().tell(new Status.Failure(new IllegalStateException( + "RaftActorBehavior is not set in MemberActor")), self()); + } + } + + if (message instanceof SendImmediateHeartBeat) { + message = SendHeartBeat.INSTANCE; + } + try { - if(behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) { + if (behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) { final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message); if (nextBehavior != null) { RaftActorBehavior oldBehavior = behavior; behavior = nextBehavior; - if(behavior != oldBehavior && behaviorStateChangeLatch != null) { + if (behavior != oldBehavior && behaviorStateChangeLatch != null) { behaviorStateChangeLatch.countDown(); } } @@ -75,12 +102,21 @@ public class AbstractLeaderElectionScenarioTest { super.onReceive(message); CountDownLatch latch = messagesReceivedLatches.get(message.getClass()); - if(latch != null) { + if (latch != null) { latch.countDown(); } } } + @Override + public void postStop() throws Exception { + super.postStop(); + + if (behavior != null) { + behavior.close(); + } + } + void expectBehaviorStateChange() { behaviorStateChangeLatch = new CountDownLatch(1); } @@ -90,22 +126,22 @@ public class AbstractLeaderElectionScenarioTest { Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS)); } - void expectMessageClass(Class expClass, int expCount) { + void expectMessageClass(final Class expClass, final int expCount) { messagesReceivedLatches.put(expClass, new CountDownLatch(expCount)); } - void waitForExpectedMessages(Class expClass) { + void waitForExpectedMessages(final Class expClass) { CountDownLatch latch = messagesReceivedLatches.get(expClass); assertNotNull("No messages received for " + expClass, latch); assertTrue("Missing messages of type " + expClass, Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS)); } - void dropMessagesToBehavior(Class msgClass) { + void dropMessagesToBehavior(final Class msgClass) { dropMessagesToBehavior(msgClass, 1); } - void dropMessagesToBehavior(Class msgClass, int expCount) { + void dropMessagesToBehavior(final Class msgClass, final int expCount) { expectMessageClass(msgClass, expCount); dropMessagesToBehavior.put(msgClass, Boolean.TRUE); } @@ -114,33 +150,56 @@ public class AbstractLeaderElectionScenarioTest { dropMessagesToBehavior.clear(); } - @Override public void clear() { behaviorStateChangeLatch = null; clearDropMessagesToBehavior(); messagesReceivedLatches.clear(); - super.clear(); + clearMessages(getSelf()); } - void forwardCapturedMessageToBehavior(Class msgClass, ActorRef sender) throws Exception { + void forwardCapturedMessageToBehavior(final Class msgClass, final ActorRef sender) { Object message = getFirstMatching(getSelf(), msgClass); assertNotNull("Message of type " + msgClass + " not received", message); getSelf().tell(message, sender); } - void forwardCapturedMessagesToBehavior(Class msgClass, ActorRef sender) throws Exception { - for(Object m: getAllMatching(getSelf(), msgClass)) { + void forwardCapturedMessagesToBehavior(final Class msgClass, final ActorRef sender) { + for (Object m: getAllMatching(getSelf(), msgClass)) { getSelf().tell(m, sender); } } - T getCapturedMessage(Class msgClass) throws Exception { + T getCapturedMessage(final Class msgClass) { T message = getFirstMatching(getSelf(), msgClass); assertNotNull("Message of type " + msgClass + " not received", message); return message; } } + static final class SendImmediateHeartBeat implements ControlMessage { + static final SendImmediateHeartBeat INSTANCE = new SendImmediateHeartBeat(); + + private SendImmediateHeartBeat() { + } + } + + static final class GetBehaviorState implements ControlMessage { + static final GetBehaviorState INSTANCE = new GetBehaviorState(); + + private GetBehaviorState() { + } + } + + static class SetBehavior implements ControlMessage { + RaftActorBehavior behavior; + MockRaftActorContext context; + + SetBehavior(final RaftActorBehavior behavior, final MockRaftActorContext context) { + this.behavior = behavior; + this.context = context; + } + } + protected final Logger testLog = LoggerFactory.getLogger(MockRaftActorContext.class); protected final ActorSystem system = ActorSystem.create("test"); protected final TestActorFactory factory = new TestActorFactory(system); @@ -166,19 +225,8 @@ public class AbstractLeaderElectionScenarioTest { } @After - public void tearDown() throws Exception { - - if (member1Actor.behavior != null) { - member1Actor.behavior.close(); - } - if (member2Actor.behavior != null) { - member2Actor.behavior.close(); - } - if (member3Actor.behavior != null) { - member3Actor.behavior.close(); - } - - JavaTestKit.shutdownActorSystem(system); + public void tearDown() { + TestKit.shutdownActorSystem(system); } DefaultConfigParamsImpl newConfigParams() { @@ -189,44 +237,73 @@ public class AbstractLeaderElectionScenarioTest { return configParams; } - MockRaftActorContext newRaftActorContext(String id, ActorRef actor, - Map peerAddresses) { + MockRaftActorContext newRaftActorContext(final String id, final ActorRef actor, + final Map peerAddresses) { MockRaftActorContext context = new MockRaftActorContext(id, system, actor); context.setPeerAddresses(peerAddresses); context.getTermInformation().updateAndPersist(1, ""); return context; } - void verifyBehaviorState(String name, MemberActor actor, RaftState expState) { - assertEquals(name + " behavior state", expState, actor.behavior.state()); + @SuppressWarnings("checkstyle:IllegalCatch") + void verifyBehaviorState(final String name, final MemberActor actor, final RaftState expState) { + RaftState actualState; + try { + actualState = (RaftState) Await.result(Patterns.ask(actor.self(), GetBehaviorState.INSTANCE, + Timeout.apply(5, TimeUnit.SECONDS)), FiniteDuration.create(5, TimeUnit.SECONDS)); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + assertEquals(name + " behavior state", expState, actualState); } - void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers) throws Exception { + void initializeLeaderBehavior(final MemberActor actor, final MockRaftActorContext context, + final int numActiveFollowers) { // Leader sends immediate heartbeats - we don't care about it so ignore it. + // Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors + // haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as + // a workaround. + + Leader leader = null; + AssertionError lastAssertError = null; + for (int i = 1; i <= 3; i++) { + actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers); - actor.expectMessageClass(AppendEntriesReply.class, numActiveFollowers); + leader = new Leader(context); + try { + actor.waitForExpectedMessages(AppendEntriesReply.class); + lastAssertError = null; + break; + } catch (AssertionError e) { + lastAssertError = e; + } + } + + if (lastAssertError != null) { + throw lastAssertError; + } - Leader leader = new Leader(context); context.setCurrentBehavior(leader); - actor.waitForExpectedMessages(AppendEntriesReply.class); - // Delay assignment here so the AppendEntriesReply isn't forwarded to the behavior. - actor.behavior = leader; + // Delay assignment of the leader behavior so the AppendEntriesReply isn't forwarded to the behavior. + actor.self().tell(new SetBehavior(leader, context), ActorRef.noSender()); actor.forwardCapturedMessagesToBehavior(AppendEntriesReply.class, ActorRef.noSender()); actor.clear(); } - TestActorRef newMemberActor(String name) throws Exception { - TestActorRef actor = factory.createTestActor(MemberActor.props(). - withDispatcher(Dispatchers.DefaultDispatcherId()), name); + TestActorRef newMemberActor(final String name) throws TimeoutException, InterruptedException { + TestActorRef actor = factory.createTestActor(MemberActor.props() + .withDispatcher(Dispatchers.DefaultDispatcherId()), name); MessageCollectorActor.waitUntilReady(actor); return actor; } - void sendHeartbeat(TestActorRef leaderActor) { + void sendHeartbeat(final TestActorRef leaderActor) { Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS); - leaderActor.underlyingActor().behavior.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + leaderActor.tell(SendImmediateHeartBeat.INSTANCE, ActorRef.noSender()); } }