+ @Test
+ public void testBroadcast() {
+ new TestKit(getSystem()) {
+ {
+ ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props());
+ ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props());
+
+ TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
+ MockShardManager.props());
+ MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
+ shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
+ shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
+ shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
+ shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
+ shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
+
+ Configuration mockConfig = mock(Configuration.class);
+ doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
+ .getAllShardNames();
+
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mockConfig,
+ DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
+ new PrimaryShardInfoFutureCache());
+
+ actorContext.broadcast(v -> new TestMessage(), TestMessage.class);
+
+ MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
+ MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
+ }
+ };
+ }