X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Futils%2FMessageCollectorActor.java;h=8d3377310581571aad360eb06c4cc7137be80340;hb=refs%2Fchanges%2F49%2F78449%2F2;hp=6ea7a20924d4480b0497c02e715eefddee81556c;hpb=73dc2546de1386c977a9e4efd8e589e3db223cc9;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java index 6ea7a20924..8d33773105 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java @@ -5,16 +5,18 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.utils; import akka.actor.ActorRef; import akka.actor.Props; -import akka.actor.UntypedActor; +import akka.actor.UntypedAbstractActor; +import akka.dispatch.ControlMessage; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Predicate; -import com.google.common.collect.Lists; +import com.google.common.base.Predicates; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Collections; @@ -22,153 +24,215 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.Assert; -import org.opendaylight.controller.cluster.raft.SerializationUtils; import scala.concurrent.Await; import scala.concurrent.Future; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; - -public class MessageCollectorActor extends UntypedActor { +public class MessageCollectorActor extends UntypedAbstractActor { private static final String ARE_YOU_READY = "ARE_YOU_READY"; - private static final String GET_ALL_MESSAGES = "get-all-messages"; - private static final String CLEAR_MESSAGES = "clear-messages"; + public static final String GET_ALL_MESSAGES = "messages"; + + private static final Object CLEAR_MESSAGES = new ControlMessage() { + @Override + public String toString() { + return "clear-messages"; + } + }; private final List messages = new ArrayList<>(); - @Override public void onReceive(Object message) throws Exception { - if(message.equals(ARE_YOU_READY)) { + @Override public void onReceive(final Object message) throws Exception { + if (ARE_YOU_READY.equals(message)) { getSender().tell("yes", getSelf()); - return; - } - - if(GET_ALL_MESSAGES.equals(message)) { + } else if (GET_ALL_MESSAGES.equals(message)) { getSender().tell(new ArrayList<>(messages), getSelf()); - } else if(CLEAR_MESSAGES.equals(message)) { + } else if (CLEAR_MESSAGES.equals(message)) { messages.clear(); - } else if(message != null) { - messages.add(SerializationUtils.fromSerializable(message)); + } else if (message != null) { + messages.add(message); } } - public void clear() { - messages.clear(); - } - - public static List getAllMessages(ActorRef actor) throws Exception { - FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS); + @SuppressWarnings({"unchecked", "checkstyle:illegalCatch"}) + public static List getAllMessages(final ActorRef actor) { + FiniteDuration operationDuration = FiniteDuration.create(5, TimeUnit.SECONDS); Timeout operationTimeout = new Timeout(operationDuration); Future future = Patterns.ask(actor, GET_ALL_MESSAGES, operationTimeout); - return (List) Await.result(future, operationDuration); + try { + return (List) Await.result(future, operationDuration); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } } - public static void clearMessages(ActorRef actor) { + public static void clearMessages(final ActorRef actor) { actor.tell(CLEAR_MESSAGES, ActorRef.noSender()); } /** - * Get the first message that matches the specified class - * @param actor - * @param clazz - * @return + * Get the first message that matches the specified class. + * + * @param actor the MessageCollectorActor reference + * @param clazz the class to match + * @return the first matching message */ - public static T getFirstMatching(ActorRef actor, Class clazz) throws Exception { + public static T getFirstMatching(final ActorRef actor, final Class clazz) { List allMessages = getAllMessages(actor); - for(Object message : allMessages){ - if(message.getClass().equals(clazz)){ - return (T) message; + for (Object message : allMessages) { + if (message.getClass().equals(clazz)) { + return clazz.cast(message); } } return null; } - public static List expectMatching(ActorRef actor, Class clazz, int count) { + @SuppressWarnings("checkstyle:IllegalCatch") + public static List expectMatching(final ActorRef actor, final Class clazz, final int count) { + return expectMatching(actor, clazz, count, msg -> true); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + public static List expectMatching(final ActorRef actor, final Class clazz, final int count, + final Predicate matcher) { int timeout = 5000; + Exception lastEx = null; List messages = Collections.emptyList(); - for(int i = 0; i < timeout / 50; i++) { + for (int i = 0; i < timeout / 50; i++) { try { messages = getAllMatching(actor, clazz); - if(messages.size() >= count) { + Iterables.removeIf(messages, Predicates.not(matcher)); + if (messages.size() >= count) { return messages; } - } catch (Exception e) {} + + lastEx = null; + } catch (Exception e) { + lastEx = e; + } Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } - Assert.fail(String.format("Expected %d messages of type %s. Actual received was %d: %s", count, clazz, - messages.size(), messages)); - return null; + throw new AssertionError(String.format("Expected %d messages of type %s. Actual received was %d: %s", count, + clazz, messages.size(), messages), lastEx); } - public static T expectFirstMatching(ActorRef actor, Class clazz) { + public static T expectFirstMatching(final ActorRef actor, final Class clazz) { return expectFirstMatching(actor, clazz, 5000); } - public static T expectFirstMatching(ActorRef actor, Class clazz, long timeout) { + @SuppressWarnings("checkstyle:IllegalCatch") + public static T expectFirstMatching(final ActorRef actor, final Class clazz, final long timeout) { + Exception lastEx = null; int count = (int) (timeout / 50); - for(int i = 0; i < count; i++) { + for (int i = 0; i < count; i++) { try { T message = getFirstMatching(actor, clazz); - if(message != null) { + if (message != null) { return message; } - } catch (Exception e) {} + + lastEx = null; + } catch (Exception e) { + lastEx = e; + } Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } - Assert.fail("Did not receive message of type " + clazz); - return null; + throw new AssertionError(actor + ": Did not receive message of type " + clazz + ", Actual received was " + + getAllMessages(actor), lastEx); } - public static T expectFirstMatching(ActorRef actor, Class clazz, Predicate matcher) { + @SuppressWarnings("checkstyle:IllegalCatch") + public static T expectFirstMatching(final ActorRef actor, final Class clazz, final Predicate matcher) { int timeout = 5000; + Exception lastEx = null; T lastMessage = null; - for(int i = 0; i < timeout / 50; i++) { + for (int i = 0; i < timeout / 50; i++) { try { List messages = getAllMatching(actor, clazz); - for(T msg: messages) { - if(matcher.apply(msg)) { + for (T msg : messages) { + if (matcher.apply(msg)) { return msg; } lastMessage = msg; } - } catch (Exception e) {} + + lastEx = null; + } catch (Exception e) { + lastEx = e; + } Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } - Assert.fail(String.format("Expected specific message of type %s. Last message received was: %s", clazz, lastMessage)); - return null; + throw new AssertionError(String.format("Expected specific message of type %s. Last message received was: %s", + clazz, lastMessage), lastEx); + } + + public static void assertNoneMatching(final ActorRef actor, final Class clazz) { + assertNoneMatching(actor, clazz, 5000); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + public static void assertNoneMatching(final ActorRef actor, final Class clazz, final long timeout) { + Exception lastEx = null; + int count = (int) (timeout / 50); + for (int i = 0; i < count; i++) { + try { + T message = getFirstMatching(actor, clazz); + if (message != null) { + Assert.fail("Unexpected message received" + message.toString()); + return; + } + + lastEx = null; + } catch (Exception e) { + lastEx = e; + } + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + if (lastEx != null) { + Throwables.throwIfUnchecked(lastEx); + throw new RuntimeException(lastEx); + } + + return; } - public static List getAllMatching(ActorRef actor, Class clazz) throws Exception { + + public static List getAllMatching(final ActorRef actor, final Class clazz) { List allMessages = getAllMessages(actor); - List output = Lists.newArrayList(); + List output = new ArrayList<>(); - for(Object message : allMessages){ - if(message.getClass().equals(clazz)){ - output.add((T) message); + for (Object message : allMessages) { + if (message.getClass().equals(clazz)) { + output.add(clazz.cast(message)); } } return output; } - public static void waitUntilReady(ActorRef actor) throws Exception { + public static void waitUntilReady(final ActorRef actor) throws TimeoutException, InterruptedException { long timeout = 500; - FiniteDuration duration = Duration.create(timeout, TimeUnit.MILLISECONDS); - for(int i = 0; i < 10; i++) { + FiniteDuration duration = FiniteDuration.create(timeout, TimeUnit.MILLISECONDS); + for (int i = 0; i < 10; i++) { try { Await.ready(Patterns.ask(actor, ARE_YOU_READY, timeout), duration); return; } catch (TimeoutException e) { + // will fall through below } }