Migrate to java.time.Duration
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
index cb0864ffaf8d04d2fa59813dc4b6fe553e4ca8a2..2ca2192ef7a4df655e5e584b02ae7b7445dad8cc 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -23,16 +24,17 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
+import akka.actor.UntypedAbstractActor;
 import akka.dispatch.Futures;
 import akka.japi.Creator;
-import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.typesafe.config.ConfigFactory;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -57,13 +59,12 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 public class ActorContextTest extends AbstractActorTest {
@@ -73,7 +74,7 @@ public class ActorContextTest extends AbstractActorTest {
     private static class TestMessage {
     }
 
-    private static class MockShardManager extends UntypedActor {
+    private static final class MockShardManager extends UntypedAbstractActor {
 
         private final boolean found;
         private final ActorRef actorRef;
@@ -85,7 +86,7 @@ public class ActorContextTest extends AbstractActorTest {
             this.actorRef = actorRef;
         }
 
-        @Override public void onReceive(final Object message) throws Exception {
+        @Override public void onReceive(final Object message) {
             if (message instanceof FindPrimary) {
                 FindPrimary fp = (FindPrimary)message;
                 Object resp = findPrimaryResponses.get(fp.getShardName());
@@ -110,11 +111,11 @@ public class ActorContextTest extends AbstractActorTest {
         }
 
         private static Props props(final boolean found, final ActorRef actorRef) {
-            return Props.create(new MockShardManagerCreator(found, actorRef) );
+            return Props.create(new MockShardManagerCreator(found, actorRef));
         }
 
         private static Props props() {
-            return Props.create(new MockShardManagerCreator() );
+            return Props.create(new MockShardManagerCreator());
         }
 
         @SuppressWarnings("serial")
@@ -133,7 +134,7 @@ public class ActorContextTest extends AbstractActorTest {
             }
 
             @Override
-            public MockShardManager create() throws Exception {
+            public MockShardManager create() {
                 return new MockShardManager(found, actorRef);
             }
         }
@@ -141,92 +142,66 @@ public class ActorContextTest extends AbstractActorTest {
 
     @Test
     public void testFindLocalShardWithShardFound() {
-        new JavaTestKit(getSystem()) {
-            {
-                new Within(duration("1 seconds")) {
-                    @Override
-                    protected void run() {
+        final TestKit testKit = new TestKit(getSystem());
+        testKit.within(Duration.ofSeconds(1), () -> {
+            ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
 
-                        ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+            ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
 
-                        ActorRef shardManagerActorRef = getSystem()
-                                .actorOf(MockShardManager.props(true, shardActorRef));
+            ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+                mock(ClusterWrapper.class), mock(Configuration.class));
 
-                        ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
-                                mock(ClusterWrapper.class), mock(Configuration.class));
+            Optional<ActorRef> out = actorContext.findLocalShard("default");
 
-                        Optional<ActorRef> out = actorContext.findLocalShard("default");
-
-                        assertEquals(shardActorRef, out.get());
-
-                        expectNoMsg();
-                    }
-                };
-            }
-        };
+            assertEquals(shardActorRef, out.get());
 
+            testKit.expectNoMessage();
+            return null;
+        });
     }
 
     @Test
     public void testFindLocalShardWithShardNotFound() {
-        new JavaTestKit(getSystem()) {
-            {
-                ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
+        ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
 
-                ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
-                        mock(ClusterWrapper.class), mock(Configuration.class));
-
-                Optional<ActorRef> out = actorContext.findLocalShard("default");
-                assertTrue(!out.isPresent());
-            }
-        };
+        ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class),
+            mock(Configuration.class));
 
+        Optional<ActorRef> out = actorContext.findLocalShard("default");
+        assertFalse(out.isPresent());
     }
 
     @Test
     public void testExecuteRemoteOperation() {
-        new JavaTestKit(getSystem()) {
-            {
-                ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+        ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
 
-                ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
+        ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
 
-                ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
-                        mock(ClusterWrapper.class), mock(Configuration.class));
+        ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+            mock(ClusterWrapper.class), mock(Configuration.class));
 
-                ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+        ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-                Object out = actorContext.executeOperation(actor, "hello");
+        Object out = actorContext.executeOperation(actor, "hello");
 
-                assertEquals("hello", out);
-            }
-        };
+        assertEquals("hello", out);
     }
 
     @Test
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public void testExecuteRemoteOperationAsync() {
-        new JavaTestKit(getSystem()) {
-            {
-                ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+    public void testExecuteRemoteOperationAsync() throws Exception {
+        ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
 
-                ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
+        ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
 
-                ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
-                        mock(ClusterWrapper.class), mock(Configuration.class));
+        ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+            mock(ClusterWrapper.class), mock(Configuration.class));
 
-                ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+        ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-                Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
+        Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
 
-                try {
-                    Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
-                    assertEquals("Result", "hello", result);
-                } catch (Exception e) {
-                    throw new AssertionError(e);
-                }
-            }
-        };
+        Object result = Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
+        assertEquals("Result", "hello", result);
     }
 
     @Test
@@ -235,62 +210,60 @@ public class ActorContextTest extends AbstractActorTest {
         ActorContext actorContext = null;
 
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(false, actorContext.isPathLocal(null));
-        assertEquals(false, actorContext.isPathLocal(""));
+        assertFalse(actorContext.isPathLocal(null));
+        assertFalse(actorContext.isPathLocal(""));
 
         clusterWrapper.setSelfAddress(null);
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(false, actorContext.isPathLocal(""));
+        assertFalse(actorContext.isPathLocal(""));
 
         // even if the path is in local format, match the primary path (first 3 elements) and return true
         clusterWrapper.setSelfAddress(new Address("akka", "test"));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+        assertTrue(actorContext.isPathLocal("akka://test/user/$a"));
 
         clusterWrapper.setSelfAddress(new Address("akka", "test"));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+        assertTrue(actorContext.isPathLocal("akka://test/user/$a"));
 
         clusterWrapper.setSelfAddress(new Address("akka", "test"));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
+        assertTrue(actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
 
         // self address of remote format,but Tx path local format.
         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(true, actorContext.isPathLocal(
-            "akka://system/user/shardmanager/shard/transaction"));
+        assertTrue(actorContext.isPathLocal("akka://system/user/shardmanager/shard/transaction"));
 
         // self address of local format,but Tx path remote format.
         clusterWrapper.setSelfAddress(new Address("akka", "system"));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(false, actorContext.isPathLocal(
-            "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
+        assertFalse(actorContext.isPathLocal("akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
 
         //local path but not same
         clusterWrapper.setSelfAddress(new Address("akka", "test"));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
+        assertTrue(actorContext.isPathLocal("akka://test1/user/$a"));
 
         //ip and port same
         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(true, actorContext.isPathLocal("akka://system@127.0.0.1:2550/"));
+        assertTrue(actorContext.isPathLocal("akka://system@127.0.0.1:2550/"));
 
         // forward-slash missing in address
         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2550"));
+        assertFalse(actorContext.isPathLocal("akka://system@127.0.0.1:2550"));
 
         //ips differ
         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(false, actorContext.isPathLocal("akka://system@127.1.0.1:2550/"));
+        assertFalse(actorContext.isPathLocal("akka://system@127.1.0.1:2550/"));
 
         //ports differ
         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
-        assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2551/"));
+        assertFalse(actorContext.isPathLocal("akka://system@127.0.0.1:2551/"));
     }
 
     @Test
@@ -316,41 +289,37 @@ public class ActorContextTest extends AbstractActorTest {
 
     @Test
     public void testSetDatastoreContext() {
-        new JavaTestKit(getSystem()) {
-            {
-                ActorContext actorContext = new ActorContext(getSystem(), getRef(),
-                        mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
-                                .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
-                        new PrimaryShardInfoFutureCache());
+        final TestKit testKit = new TestKit(getSystem());
+        ActorContext actorContext = new ActorContext(getSystem(), testKit.getRef(),
+            mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
+            .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
+            new PrimaryShardInfoFutureCache());
 
-                assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
-                assertEquals("getTransactionCommitOperationTimeout", 7,
-                        actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+        assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
+        assertEquals("getTransactionCommitOperationTimeout", 7,
+            actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
 
-                DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
-                        .shardTransactionCommitTimeoutInSeconds(8).build();
+        DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
+                .shardTransactionCommitTimeoutInSeconds(8).build();
 
-                DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
-                Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
+        DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
+        Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
 
-                actorContext.setDatastoreContext(mockContextFactory);
+        actorContext.setDatastoreContext(mockContextFactory);
 
-                expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
+        testKit.expectMsgClass(Duration.ofSeconds(5), DatastoreContextFactory.class);
 
-                Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
+        Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
 
-                assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
-                assertEquals("getTransactionCommitOperationTimeout", 8,
-                        actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
-            }
-        };
+        assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
+        assertEquals("getTransactionCommitOperationTimeout", 8,
+            actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
     }
 
     @Test
     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
 
-        TestActorRef<MessageCollectorActor> shardManager = TestActorRef.create(getSystem(),
-                Props.create(MessageCollectorActor.class));
+        ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
 
         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
@@ -367,10 +336,10 @@ public class ActorContextTest extends AbstractActorTest {
         };
 
         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
-        PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+        PrimaryShardInfo actual = Await.result(foobar, FiniteDuration.apply(5000, TimeUnit.MILLISECONDS));
 
         assertNotNull(actual);
-        assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
+        assertFalse("LocalShardDataTree present", actual.getLocalShardDataTree().isPresent());
         assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
                 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
         assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
@@ -391,8 +360,7 @@ public class ActorContextTest extends AbstractActorTest {
     @Test
     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
 
-        TestActorRef<MessageCollectorActor> shardManager = TestActorRef.create(getSystem(),
-                Props.create(MessageCollectorActor.class));
+        ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
 
         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
@@ -409,10 +377,10 @@ public class ActorContextTest extends AbstractActorTest {
         };
 
         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
-        PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+        PrimaryShardInfo actual = Await.result(foobar, FiniteDuration.apply(5000, TimeUnit.MILLISECONDS));
 
         assertNotNull(actual);
-        assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
+        assertTrue("LocalShardDataTree present", actual.getLocalShardDataTree().isPresent());
         assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
         assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
                 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
@@ -432,19 +400,18 @@ public class ActorContextTest extends AbstractActorTest {
     }
 
     @Test
-    public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
+    public void testFindPrimaryShardAsyncPrimaryNotFound() {
         testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
     }
 
     @Test
-    public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+    public void testFindPrimaryShardAsyncActorNotInitialized() {
         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private static void testFindPrimaryExceptions(final Object expectedException) throws Exception {
-        TestActorRef<MessageCollectorActor> shardManager =
-            TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+    private static void testFindPrimaryExceptions(final Object expectedException) {
+        ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
 
         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
@@ -462,7 +429,7 @@ public class ActorContextTest extends AbstractActorTest {
         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
 
         try {
-            Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+            Await.result(foobar, FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
             fail("Expected" + expectedException.getClass().toString());
         } catch (Exception e) {
             if (!expectedException.getClass().isInstance(e)) {
@@ -477,34 +444,30 @@ public class ActorContextTest extends AbstractActorTest {
 
     @Test
     public void testBroadcast() {
-        new JavaTestKit(getSystem()) {
-            {
-                ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
-                ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
-
-                TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
-                        MockShardManager.props());
-                MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
-                shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
-                        shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
-                shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
-                        shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
-                shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
-
-                Configuration mockConfig = mock(Configuration.class);
-                doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
-                        .getAllShardNames();
-
-                ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
-                        mock(ClusterWrapper.class), mockConfig,
-                        DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
-                        new PrimaryShardInfoFutureCache());
-
-                actorContext.broadcast(v -> new TestMessage(), TestMessage.class);
-
-                MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
-                MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
-            }
-        };
+        ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props());
+        ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props());
+
+        TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
+            MockShardManager.props());
+        MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
+        shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
+            shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
+        shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
+            shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
+        shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
+
+        Configuration mockConfig = mock(Configuration.class);
+        doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
+        .getAllShardNames();
+
+        ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+            mock(ClusterWrapper.class), mockConfig,
+            DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
+            new PrimaryShardInfoFutureCache());
+
+        actorContext.broadcast(v -> new TestMessage(), TestMessage.class);
+
+        MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
+        MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
     }
 }