+ @Test
+ public void testSetDatastoreContext() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
+ mock(Configuration.class), DatastoreContext.newBuilder().
+ operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
+
+ assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
+ assertEquals("getTransactionCommitOperationTimeout", 7,
+ actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+
+ DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
+ shardTransactionCommitTimeoutInSeconds(8).build();
+
+ actorContext.setDatastoreContext(newContext);
+
+ expectMsgClass(duration("5 seconds"), DatastoreContext.class);
+
+ Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
+
+ assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
+ assertEquals("getTransactionCommitOperationTimeout", 8,
+ actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+ }};
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
+
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+ doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
+ }
+ };
+
+
+ Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+ ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+
+ assertNotNull(actual);
+
+ Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+ ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+ assertEquals(cachedSelection, actual);
+
+ // Wait for 200 Milliseconds. The cached entry should have been removed.
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+
+ cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+ assertNull(cached);
+
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
+
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+ doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful((Object) new PrimaryNotFound("foobar"));
+ }
+ };
+
+
+ Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+ try {
+ Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+ fail("Expected PrimaryNotFoundException");
+ } catch(PrimaryNotFoundException e){
+
+ }
+
+ Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+ assertNull(cached);
+
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+ DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+ doReturn("config").when(mockDataStoreContext).getDataStoreType();
+ doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), mockDataStoreContext) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful((Object) new ActorNotInitialized());
+ }
+ };
+
+
+ Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+ try {
+ Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+ fail("Expected NotInitializedException");
+ } catch(NotInitializedException e){
+
+ }
+
+ Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+ assertNull(cached);
+
+ }
+