+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public static <T> List<T> expectMatching(final ActorRef actor, final Class<T> clazz, final int count) {
+ return expectMatching(actor, clazz, count, msg -> true);
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public static <T> List<T> expectMatching(final ActorRef actor, final Class<T> clazz, final int count,
+ final Predicate<T> matcher) {
+ int timeout = 5000;
+ Exception lastEx = null;
+ List<T> messages = Collections.emptyList();
+ for (int i = 0; i < timeout / 50; i++) {
+ try {
+ messages = getAllMatching(actor, clazz);
+ Iterables.removeIf(messages, Predicates.not(matcher));
+ if (messages.size() >= count) {
+ return messages;
+ }
+
+ lastEx = null;
+ } catch (Exception e) {
+ lastEx = e;
+ }
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ throw new AssertionError(String.format("Expected %d messages of type %s. Actual received was %d: %s", count,
+ clazz, messages.size(), messages), lastEx);
+ }
+
+ public static <T> T expectFirstMatching(final ActorRef actor, final Class<T> clazz) {
+ return expectFirstMatching(actor, clazz, 5000);
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public static <T> T expectFirstMatching(final ActorRef actor, final Class<T> clazz, final long timeout) {
+ Exception lastEx = null;
+ int count = (int) (timeout / 50);
+ for (int i = 0; i < count; i++) {
+ try {
+ T message = getFirstMatching(actor, clazz);
+ if (message != null) {
+ return message;
+ }
+
+ lastEx = null;
+ } catch (Exception e) {
+ lastEx = e;
+ }
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ throw new AssertionError(actor + ": Did not receive message of type " + clazz + ", Actual received was "
+ + getAllMessages(actor), lastEx);
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public static <T> T expectFirstMatching(final ActorRef actor, final Class<T> clazz, final Predicate<T> matcher) {
+ int timeout = 5000;
+ Exception lastEx = null;
+ T lastMessage = null;
+ for (int i = 0; i < timeout / 50; i++) {
+ try {
+ List<T> messages = getAllMatching(actor, clazz);
+ for (T msg : messages) {
+ if (matcher.apply(msg)) {
+ return msg;
+ }
+
+ lastMessage = msg;
+ }
+
+ lastEx = null;
+ } catch (Exception e) {
+ lastEx = e;
+ }
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ throw new AssertionError(String.format("Expected specific message of type %s. Last message received was: %s",
+ clazz, lastMessage), lastEx);
+ }
+
+ public static <T> void assertNoneMatching(final ActorRef actor, final Class<T> clazz) {
+ assertNoneMatching(actor, clazz, 5000);
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public static <T> void assertNoneMatching(final ActorRef actor, final Class<T> clazz, final long timeout) {
+ Exception lastEx = null;
+ int count = (int) (timeout / 50);
+ for (int i = 0; i < count; i++) {
+ try {
+ T message = getFirstMatching(actor, clazz);
+ if (message != null) {
+ Assert.fail("Unexpected message received" + message.toString());
+ return;
+ }
+
+ lastEx = null;
+ } catch (Exception e) {
+ lastEx = e;
+ }
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+
+ if (lastEx != null) {
+ Throwables.throwIfUnchecked(lastEx);
+ throw new RuntimeException(lastEx);
+ }
+
+ return;
+ }
+
+
+ public static <T> List<T> getAllMatching(final ActorRef actor, final Class<T> clazz) {