X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Futils%2FMessageCollectorActor.java;h=62f163fb7d0270a2415e934d193136d610e01ca3;hp=58928453b4d2332edd01407ca30e28a6bf292929;hb=c31509c7a6630e54a9f9749a643fed5e1a1ad380;hpb=879a57936375ca3dec48c5bf52b0b5988c807bae diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java index 58928453b4..62f163fb7d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java @@ -9,42 +9,53 @@ package org.opendaylight.controller.cluster.raft.utils; import akka.actor.ActorRef; +import akka.actor.Props; import akka.actor.UntypedActor; import akka.pattern.Patterns; import akka.util.Timeout; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.Assert; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - public class MessageCollectorActor extends UntypedActor { - private List messages = new ArrayList<>(); + private static final String ARE_YOU_READY = "ARE_YOU_READY"; + + private final List messages = new ArrayList<>(); @Override public void onReceive(Object message) throws Exception { + if(message.equals(ARE_YOU_READY)) { + getSender().tell("yes", getSelf()); + return; + } + if(message instanceof String){ if("get-all-messages".equals(message)){ - getSender().tell(messages, getSelf()); + getSender().tell(new ArrayList<>(messages), getSelf()); } - } else { + } else if(message != null) { messages.add(message); } } + public void clear() { + messages.clear(); + } + public static List getAllMessages(ActorRef actor) throws Exception { FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS); Timeout operationTimeout = new Timeout(operationDuration); Future future = Patterns.ask(actor, "get-all-messages", operationTimeout); - try { - return (List) Await.result(future, operationDuration); - } catch (Exception e) { - throw e; - } + return (List) Await.result(future, operationDuration); } /** @@ -53,16 +64,68 @@ public class MessageCollectorActor extends UntypedActor { * @param clazz * @return */ - public static Object getFirstMatching(ActorRef actor, Class clazz) throws Exception { + public static T getFirstMatching(ActorRef actor, Class clazz) throws Exception { List allMessages = getAllMessages(actor); for(Object message : allMessages){ if(message.getClass().equals(clazz)){ - return message; + return (T) message; } } return null; } + public static T expectFirstMatching(ActorRef actor, Class clazz) { + return expectFirstMatching(actor, clazz, 5000); + } + + public static T expectFirstMatching(ActorRef actor, Class clazz, long timeout) { + int count = (int) (timeout / 50); + for(int i = 0; i < count; i++) { + try { + T message = getFirstMatching(actor, clazz); + if(message != null) { + return message; + } + } catch (Exception e) {} + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + Assert.fail("Did not receive message of type " + clazz); + return null; + } + + public static List getAllMatching(ActorRef actor, Class clazz) throws Exception { + List allMessages = getAllMessages(actor); + + List output = Lists.newArrayList(); + + for(Object message : allMessages){ + if(message.getClass().equals(clazz)){ + output.add((T) message); + } + } + + return output; + } + + public static void waitUntilReady(ActorRef actor) throws Exception { + long timeout = 500; + FiniteDuration duration = Duration.create(timeout, TimeUnit.MILLISECONDS); + for(int i = 0; i < 10; i++) { + try { + Await.ready(Patterns.ask(actor, ARE_YOU_READY, timeout), duration); + return; + } catch (TimeoutException e) { + } + } + + throw new TimeoutException("Actor not ready in time."); + } + + public static Props props() { + return Props.create(MessageCollectorActor.class); + } }