+ final DataTree mockDataTree = Mockito.mock(DataTree.class);
+ final String expPrimaryPath = "akka://test-system/find-primary-shard";
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
+ }
+ };
+
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+
+ assertNotNull(actual);
+ assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
+ assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
+ assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+ expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+ assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
+
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+ PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+ assertEquals(cachedInfo, actual);
+
+ actorContext.getPrimaryShardInfoCache().remove("foobar");
+
+ cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+ assertNull(cached);
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
+ testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+ testFindPrimaryExceptions(new NotInitializedException("not initialized"));
+ }
+
+ private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();