import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
+import akka.util.Timeout;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
/**
* TestActorFactory provides methods to create both normal and test actors and to kill them when the factory is closed
*/
public ActorRef createActor(Props props){
ActorRef actorRef = system.actorOf(props);
- createdActors.add(actorRef);
- return actorRef;
+ return addActor(actorRef);
}
/**
*/
public ActorRef createActor(Props props, String actorId){
ActorRef actorRef = system.actorOf(props, actorId);
- createdActors.add(actorRef);
- return actorRef;
+ return addActor(actorRef);
}
/**
* @param <T>
* @return
*/
+ @SuppressWarnings("unchecked")
public <T extends Actor> TestActorRef<T> createTestActor(Props props, String actorId){
TestActorRef<T> actorRef = TestActorRef.create(system, props, actorId);
+ return (TestActorRef<T>) addActor(actorRef);
+ }
+
+ private <T extends ActorRef> ActorRef addActor(T actorRef) {
createdActors.add(actorRef);
+ verifyActorReady(actorRef);
return actorRef;
}
+ private void verifyActorReady(ActorRef actorRef) {
+ // Sometimes we see messages go to dead letters soon after creation - it seems the actor isn't quite
+ // in a state yet to receive messages or isn't actually created yet. This seems to happen with
+ // actorSelection so, to alleviate it, we use an actorSelection and call resolveOne with retries to
+ // ensure it's ready.
+
+ int tries = 1;
+ while(true) {
+ try {
+ Timeout timeout = new Timeout(100, TimeUnit.MILLISECONDS);
+ Future<ActorRef> future = system.actorSelection(actorRef.path()).resolveOne(timeout);
+ Await.ready(future, timeout.duration());
+ break;
+ } catch (Exception e) {
+ if(tries++ > 20) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
/**
* Create a test actor with an auto-generated name
* @param props
* @param <T>
* @return
*/
+ @SuppressWarnings("unchecked")
public <T extends Actor> TestActorRef<T> createTestActor(Props props){
TestActorRef<T> actorRef = TestActorRef.create(system, props);
- createdActors.add(actorRef);
- return actorRef;
+ return (TestActorRef<T>) addActor(actorRef);
}
/**
return prefix + actorCount++;
}
+ public void killActor(ActorRef actor, JavaTestKit kit) {
+ killActor(actor, kit, true);
+ }
+
+ public String createTestActorPath(String actorId){
+ return "akka://test/user/" + actorId;
+ }
+
+ private void killActor(ActorRef actor, JavaTestKit kit, boolean remove) {
+ LOG.info("Killing actor {}", actor);
+ kit.watch(actor);
+ actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ kit.expectTerminated(JavaTestKit.duration("5 seconds"), actor);
+
+ if(remove) {
+ createdActors.remove(actor);
+ }
+ }
+
@Override
public void close() {
- new JavaTestKit(system) {{
- for(ActorRef actor : createdActors) {
- watch(actor);
- LOG.info("Killing actor {}", actor);
- actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- expectTerminated(duration("5 seconds"), actor);
- }
- }};
+ JavaTestKit kit = new JavaTestKit(system);
+ for(ActorRef actor : createdActors) {
+ killActor(actor, kit, false);
+ }
}
}
\ No newline at end of file