*/
import akka.actor.Actor;
+import akka.actor.ActorIdentity;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
+import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
private void verifyActorReady(ActorRef actorRef) {
// Sometimes we see messages go to dead letters soon after creation - it seems the actor isn't quite
// in a state yet to receive messages or isn't actually created yet. This seems to happen with
- // actorSelection so, to alleviate it, we use an actorSelection and call resolveOne with retries to
- // ensure it's ready.
+ // actorSelection so, to alleviate it, we use an actorSelection and send an Identify message with
+ // retries to ensure it's ready.
- int tries = 1;
- while(true) {
+ Timeout timeout = new Timeout(100, TimeUnit.MILLISECONDS);
+ Throwable lastError = null;
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
try {
- Timeout timeout = new Timeout(100, TimeUnit.MILLISECONDS);
- Future<ActorRef> future = system.actorSelection(actorRef.path()).resolveOne(timeout);
- Await.ready(future, timeout.duration());
- break;
- } catch (Exception e) {
- if(tries++ > 20) {
- throw new RuntimeException(e);
- }
+ ActorSelection actorSelection = system.actorSelection(actorRef.path().toString());
+ Future<Object> future = Patterns.ask(actorSelection, new Identify(""), timeout);
+ ActorIdentity reply = (ActorIdentity)Await.result(future, timeout.duration());
+ Assert.assertNotNull("Identify returned null", reply.getRef());
+ return;
+ } catch (Exception | AssertionError e) {
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ lastError = e;
}
}
+
+ throw new RuntimeException(lastError);
}
/**