import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
+import akka.dispatch.ControlMessage;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Predicate;
public class MessageCollectorActor extends UntypedActor {
private static final String ARE_YOU_READY = "ARE_YOU_READY";
public static final String GET_ALL_MESSAGES = "messages";
- private static final String CLEAR_MESSAGES = "clear-messages";
+
+ private static final Object CLEAR_MESSAGES = new ControlMessage() {
+ @Override
+ public String toString() {
+ return "clear-messages";
+ }
+ };
private final List<Object> messages = new ArrayList<>();
} else if (GET_ALL_MESSAGES.equals(message)) {
getSender().tell(new ArrayList<>(messages), getSelf());
} else if (CLEAR_MESSAGES.equals(message)) {
- clear();
+ messages.clear();
} else if (message != null) {
messages.add(message);
}
}
- public void clear() {
- messages.clear();
- }
-
@SuppressWarnings({"unchecked", "checkstyle:illegalCatch"})
- private static List<Object> getAllMessages(final ActorRef actor) {
+ public static List<Object> getAllMessages(final ActorRef actor) {
FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
Timeout operationTimeout = new Timeout(operationDuration);
Future<Object> future = Patterns.ask(actor, GET_ALL_MESSAGES, operationTimeout);
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
}
- throw new AssertionError("Did not receive message of type " + clazz, lastEx);
+ throw new AssertionError(actor + ": Did not receive message of type " + clazz + ", Actual received was "
+ + getAllMessages(actor), lastEx);
}
@SuppressWarnings("checkstyle:IllegalCatch")