+ public void testFindLocalShardWithShardNotFound() {
+ new JavaTestKit(getSystem()) {
+ {
+ ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
+
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ Optional<ActorRef> out = actorContext.findLocalShard("default");
+ assertTrue(!out.isPresent());
+ }
+ };
+
+ }
+
+ @Test
+ public void testExecuteRemoteOperation() {
+ new JavaTestKit(getSystem()) {
+ {
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+ ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
+
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+
+ Object out = actorContext.executeOperation(actor, "hello");
+
+ assertEquals("hello", out);
+ }
+ };
+ }
+
+ @Test
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void testExecuteRemoteOperationAsync() {
+ new JavaTestKit(getSystem()) {
+ {
+ ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
+
+ ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
+
+ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+
+ Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
+
+ try {
+ Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
+ assertEquals("Result", "hello", result);
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ }
+ }
+ };
+ }
+
+ @Test
+ public void testIsPathLocal() {
+ MockClusterWrapper clusterWrapper = new MockClusterWrapper();
+ ActorContext actorContext = null;
+
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal(null));
+ assertEquals(false, actorContext.isPathLocal(""));
+
+ clusterWrapper.setSelfAddress(null);
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal(""));
+
+ // even if the path is in local format, match the primary path (first 3 elements) and return true
+ clusterWrapper.setSelfAddress(new Address("akka", "test"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+
+ clusterWrapper.setSelfAddress(new Address("akka", "test"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
+
+ clusterWrapper.setSelfAddress(new Address("akka", "test"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
+
+ // self address of remote format,but Tx path local format.
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal(
+ "akka://system/user/shardmanager/shard/transaction"));
+
+ // self address of local format,but Tx path remote format.
+ clusterWrapper.setSelfAddress(new Address("akka", "system"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal(
+ "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
+
+ //local path but not same
+ clusterWrapper.setSelfAddress(new Address("akka", "test"));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
+
+ //ip and port same
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(true, actorContext.isPathLocal("akka://system@127.0.0.1:2550/"));
+
+ // forward-slash missing in address
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2550"));
+
+ //ips differ
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal("akka://system@127.1.0.1:2550/"));
+
+ //ports differ
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
+ actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2551/"));
+ }
+
+ @Test
+ public void testClientDispatcherIsGlobalDispatcher() {
+ ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
+
+ assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+ }
+
+ @Test
+ public void testClientDispatcherIsNotGlobalDispatcher() {
+ ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers",
+ ConfigFactory.load("application-with-custom-dispatchers.conf"));
+
+ ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
+
+ assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+
+ actorSystem.terminate();
+ }
+
+ @Test
+ public void testSetDatastoreContext() {
+ new JavaTestKit(getSystem()) {
+ {
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
+ .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
+ new PrimaryShardInfoFutureCache());
+
+ assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
+ assertEquals("getTransactionCommitOperationTimeout", 7,
+ actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+
+ DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
+ .shardTransactionCommitTimeoutInSeconds(8).build();
+
+ DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
+ Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
+
+ actorContext.setDatastoreContext(mockContextFactory);
+
+ expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
+
+ Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
+
+ assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
+ assertEquals("getTransactionCommitOperationTimeout", 8,
+ actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+ }
+ };
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
+
+ ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
+
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
+ .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
+ .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+
+ final String expPrimaryPath = "akka://test-system/find-primary-shard";
+ final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
+ ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+ @Override
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
+ return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
+ }
+ };
+
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+
+ assertNotNull(actual);
+ assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
+ assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+ expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+ assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
+
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+ PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+ assertEquals(cachedInfo, actual);
+
+ actorContext.getPrimaryShardInfoCache().remove("foobar");
+
+ cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+ assertNull(cached);
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
+
+ ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
+
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
+ .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
+ .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+
+ final DataTree mockDataTree = Mockito.mock(DataTree.class);
+ final String expPrimaryPath = "akka://test-system/find-primary-shard";
+ ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+ @Override
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
+ return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
+ }
+ };
+
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+
+ assertNotNull(actual);
+ assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
+ assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
+ assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+ expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+ assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
+
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+ PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+ assertEquals(cachedInfo, actual);
+
+ actorContext.getPrimaryShardInfoCache().remove("foobar");
+
+ cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+ assertNull(cached);
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
+ testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
+ }
+
+ @Test
+ public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+ testFindPrimaryExceptions(new NotInitializedException("not initialized"));
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private static void testFindPrimaryExceptions(final Object expectedException) throws Exception {
+ ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
+
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
+ .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
+ .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+