+
+ @Test
+ public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
+
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+ doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
+ }
+ };
+
+
+ Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+ ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+
+ assertNotNull(actual);
+
+ Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+ ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+ assertEquals(cachedSelection, actual);
+
+ // Wait for 200 Milliseconds. The cached entry should have been removed.
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+
+ cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+ assertNull(cached);
+
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
+
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+ doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful((Object) new PrimaryNotFound("foobar"));
+ }
+ };
+
+
+ Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+ try {
+ Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+ fail("Expected PrimaryNotFoundException");
+ } catch(PrimaryNotFoundException e){
+
+ }
+
+ Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+ assertNull(cached);
+
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+ doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful((Object) new ActorNotInitialized());
+ }
+ };
+
+
+ Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+ try {
+ Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+ fail("Expected NotInitializedException");
+ } catch(NotInitializedException e){
+
+ }
+
+ Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+ assertNull(cached);
+
+ }
+