+
+ private static void testFindPrimaryExceptions(final Object expectedException) throws Exception {
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
+ logicalStoreType(LogicalDatastoreType.CONFIGURATION).
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+ @Override
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
+ return Futures.successful(expectedException);
+ }
+ };
+
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+ try {
+ Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+ fail("Expected" + expectedException.getClass().toString());
+ } catch(Exception e){
+ if(!expectedException.getClass().isInstance(e)) {
+ fail("Expected Exception of type " + expectedException.getClass().toString());
+ }
+ }
+
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+ assertNull(cached);
+ }
+
+ @Test
+ public void testBroadcast() {
+ new JavaTestKit(getSystem()) {{
+ ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
+ MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
+ shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
+ DataStoreVersions.CURRENT_VERSION));
+ shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
+ DataStoreVersions.CURRENT_VERSION));
+ shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
+
+ Configuration mockConfig = mock(Configuration.class);
+ doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
+ when(mockConfig).getAllShardNames();
+
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mockConfig,
+ DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
+
+ actorContext.broadcast(new Function<Short, Object>() {
+ @Override
+ public Object apply(final Short v) {
+ return new TestMessage();
+ }
+ }, TestMessage.class);
+
+ MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
+ MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
+ }};
+ }
+