Merge "Create transaction on the backend datastore only when neccessary"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
index 8426b03a37de93242b506df7fd9d4368d24a169f..bc80937897c97104a7b8a17ac1d0cf9eff672143 100644 (file)
@@ -1,36 +1,67 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
+import akka.dispatch.Futures;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 import com.google.common.base.Optional;
-
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.time.StopWatch;
+import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
-import java.util.concurrent.TimeUnit;
+public class ActorContextTest extends AbstractActorTest{
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
+    static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
 
-public class ActorContextTest extends AbstractActorTest{
+    private static class TestMessage {
+    }
 
     private static class MockShardManager extends UntypedActor {
 
         private final boolean found;
         private final ActorRef actorRef;
+        private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
 
         private MockShardManager(boolean found, ActorRef actorRef){
 
@@ -39,6 +70,18 @@ public class ActorContextTest extends AbstractActorTest{
         }
 
         @Override public void onReceive(Object message) throws Exception {
+            if(message instanceof FindPrimary) {
+                FindPrimary fp = (FindPrimary)message;
+                Object resp = findPrimaryResponses.get(fp.getShardName());
+                if(resp == null) {
+                    log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
+                } else {
+                    getSender().tell(resp, getSelf());
+                }
+
+                return;
+            }
+
             if(found){
                 getSender().tell(new LocalShardFound(actorRef), getSelf());
             } else {
@@ -46,15 +89,28 @@ public class ActorContextTest extends AbstractActorTest{
             }
         }
 
+        void addFindPrimaryResp(String shardName, Object resp) {
+            findPrimaryResponses.put(shardName, resp);
+        }
+
         private static Props props(final boolean found, final ActorRef actorRef){
             return Props.create(new MockShardManagerCreator(found, actorRef) );
         }
 
+        private static Props props(){
+            return Props.create(new MockShardManagerCreator() );
+        }
+
         @SuppressWarnings("serial")
         private static class MockShardManagerCreator implements Creator<MockShardManager> {
             final boolean found;
             final ActorRef actorRef;
 
+            MockShardManagerCreator() {
+                this.found = false;
+                this.actorRef = null;
+            }
+
             MockShardManagerCreator(boolean found, ActorRef actorRef) {
                 this.found = found;
                 this.actorRef = actorRef;
@@ -99,23 +155,15 @@ public class ActorContextTest extends AbstractActorTest{
     @Test
     public void testFindLocalShardWithShardNotFound(){
         new JavaTestKit(getSystem()) {{
+            ActorRef shardManagerActorRef = getSystem()
+                    .actorOf(MockShardManager.props(false, null));
 
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
-
-                    ActorRef shardManagerActorRef = getSystem()
-                        .actorOf(MockShardManager.props(false, null));
-
-                    ActorContext actorContext =
-                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
                             mock(Configuration.class));
 
-                    Optional<ActorRef> out = actorContext.findLocalShard("default");
-                    assertTrue(!out.isPresent());
-                    expectNoMsg();
-                }
-            };
+            Optional<ActorRef> out = actorContext.findLocalShard("default");
+            assertTrue(!out.isPresent());
         }};
 
     }
@@ -123,63 +171,389 @@ public class ActorContextTest extends AbstractActorTest{
     @Test
     public void testExecuteRemoteOperation() {
         new JavaTestKit(getSystem()) {{
+            ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
 
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+            ActorRef shardManagerActorRef = getSystem()
+                    .actorOf(MockShardManager.props(true, shardActorRef));
 
-                    ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
 
-                    ActorRef shardManagerActorRef = getSystem()
-                        .actorOf(MockShardManager.props(true, shardActorRef));
+            ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-                    ActorContext actorContext =
-                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
-                            mock(Configuration.class));
+            Object out = actorContext.executeOperation(actor, "hello");
 
-                    ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+            assertEquals("hello", out);
+        }};
+    }
 
-                    Object out = actorContext.executeOperation(actor, "hello");
+    @Test
+    public void testExecuteRemoteOperationAsync() {
+        new JavaTestKit(getSystem()) {{
+            ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
 
-                    assertEquals("hello", out);
+            ActorRef shardManagerActorRef = getSystem()
+                    .actorOf(MockShardManager.props(true, shardActorRef));
 
-                    expectNoMsg();
-                }
-            };
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
+                            mock(Configuration.class));
+
+            ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+
+            Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
+
+            try {
+                Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
+                assertEquals("Result", "hello", result);
+            } catch(Exception e) {
+                throw new AssertionError(e);
+            }
         }};
     }
 
     @Test
-    public void testExecuteRemoteOperationAsync() {
+    public void testIsPathLocal() {
+        MockClusterWrapper clusterWrapper = new MockClusterWrapper();
+        ActorContext actorContext = null;
+
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal(null));
+        assertEquals(false, actorContext.isPathLocal(""));
+
+        clusterWrapper.setSelfAddress(null);
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal(""));
+
+        // even if the path is in local format, match the primary path (first 3 elements) and return true
+        clusterWrapper.setSelfAddress(new Address("akka", "test"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+
+        clusterWrapper.setSelfAddress(new Address("akka", "test"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+
+        clusterWrapper.setSelfAddress(new Address("akka", "test"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
+
+        // self address of remote format,but Tx path local format.
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal(
+            "akka://system/user/shardmanager/shard/transaction"));
+
+        // self address of local format,but Tx path remote format.
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal(
+            "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
+
+        //local path but not same
+        clusterWrapper.setSelfAddress(new Address("akka", "test"));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
+
+        //ip and port same
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
+
+        // forward-slash missing in address
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
+
+        //ips differ
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
+
+        //ports differ
+        clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+        actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+        assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
+    }
+
+    @Test
+    public void testResolvePathForRemoteActor() {
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(
+                        ClusterWrapper.class),
+                        mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+                "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
+                "akka://system/user/shardmanager/shard/transaction");
+
+        String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testResolvePathForLocalActor() {
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+                "akka://system/user/shardmanager/shard",
+                "akka://system/user/shardmanager/shard/transaction");
+
+        String expected = "akka://system/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testResolvePathForRemoteActorWithProperRemoteAddress() {
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+                "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
+                "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
+
+        String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testRateLimiting(){
+        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                transactionCreationInitialRateLimit(155L).build();
+
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class), dataStoreContext);
+
+        // Check that the initial value is being picked up from DataStoreContext
+        assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
+
+        actorContext.setTxCreationLimit(1.0);
+
+        assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
+
+
+        StopWatch watch = new StopWatch();
+
+        watch.start();
+
+        actorContext.acquireTxCreationPermit();
+        actorContext.acquireTxCreationPermit();
+        actorContext.acquireTxCreationPermit();
+
+        watch.stop();
+
+        assertTrue("did not take as much time as expected", watch.getTime() > 1000);
+    }
+
+    @Test
+    public void testClientDispatcherIsGlobalDispatcher(){
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class), DatastoreContext.newBuilder().build());
+
+        assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+
+    }
+
+    @Test
+    public void testClientDispatcherIsNotGlobalDispatcher(){
+        ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
+
+        ActorContext actorContext =
+                new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class), DatastoreContext.newBuilder().build());
+
+        assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+
+        actorSystem.shutdown();
+
+    }
+
+    @Test
+    public void testSetDatastoreContext() {
         new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
+                            mock(Configuration.class), DatastoreContext.newBuilder().
+                                operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
 
-            new Within(duration("3 seconds")) {
-                @Override
-                protected void run() {
+            assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
+            assertEquals("getTransactionCommitOperationTimeout", 7,
+                    actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
 
-                    ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+            DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
+                    shardTransactionCommitTimeoutInSeconds(8).build();
 
-                    ActorRef shardManagerActorRef = getSystem()
-                        .actorOf(MockShardManager.props(true, shardActorRef));
+            actorContext.setDatastoreContext(newContext);
 
-                    ActorContext actorContext =
-                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
-                            mock(Configuration.class));
+            expectMsgClass(duration("5 seconds"), DatastoreContext.class);
 
-                    ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+            Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
 
-                    Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
+            assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
+            assertEquals("getTransactionCommitOperationTimeout", 8,
+                    actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+        }};
+    }
 
-                    try {
-                        Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
-                        assertEquals("Result", "hello", result);
-                    } catch(Exception e) {
-                        throw new AssertionError(e);
-                    }
+    @Test
+    public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
 
-                    expectNoMsg();
-                }
-            };
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+
+            final String expPrimaryPath = "akka://test-system/find-primary-shard";
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), dataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new PrimaryFound(expPrimaryPath));
+                        }
+                    };
+
+
+            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+            PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+
+            assertNotNull(actual);
+            assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
+            assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+                    expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+
+            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+            PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+            assertEquals(cachedInfo, actual);
+
+            // Wait for 200 Milliseconds. The cached entry should have been removed.
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+
+            cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+            assertNull(cached);
+
+    }
+
+    @Test
+    public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), dataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new PrimaryNotFoundException("not found"));
+                        }
+                    };
+
+
+            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+            try {
+                Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+                fail("Expected PrimaryNotFoundException");
+            } catch(PrimaryNotFoundException e){
+
+            }
+
+            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+            assertNull(cached);
+    }
+
+    @Test
+    public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), dataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new NotInitializedException("not iniislized"));
+                        }
+                    };
+
+
+            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+            try {
+                Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+                fail("Expected NotInitializedException");
+            } catch(NotInitializedException e){
+
+            }
+
+            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+            assertNull(cached);
+    }
+
+    @Test
+    public void testBroadcast() {
+        new JavaTestKit(getSystem()) {{
+            ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
+            MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
+            shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
+            shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
+            shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
+
+            Configuration mockConfig = mock(Configuration.class);
+            doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
+                    when(mockConfig).getAllShardNames();
+
+            ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+                    mock(ClusterWrapper.class), mockConfig,
+                    DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
+
+            actorContext.broadcast(new TestMessage());
+
+            expectFirstMatching(shardActorRef1, TestMessage.class);
+            expectFirstMatching(shardActorRef2, TestMessage.class);
         }};
     }
+
+    private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
+        int count = 5000 / 50;
+        for(int i = 0; i < count; i++) {
+            try {
+                T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
+                if(message != null) {
+                    return message;
+                }
+            } catch (Exception e) {}
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        Assert.fail("Did not receive message of type " + clazz);
+        return null;
+    }
 }