import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-public class ActorContextTest extends AbstractActorTest {
+public class ActorUtilsTest extends AbstractActorTest {
- static final Logger LOG = LoggerFactory.getLogger(ActorContextTest.class);
+ static final Logger LOG = LoggerFactory.getLogger(ActorUtilsTest.class);
private static class TestMessage {
}
ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
- ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
mock(ClusterWrapper.class), mock(Configuration.class));
- Optional<ActorRef> out = actorContext.findLocalShard("default");
+ Optional<ActorRef> out = actorUtils.findLocalShard("default");
assertEquals(shardActorRef, out.get());
public void testFindLocalShardWithShardNotFound() {
ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
- ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class),
+ ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class),
mock(Configuration.class));
- Optional<ActorRef> out = actorContext.findLocalShard("default");
+ Optional<ActorRef> out = actorUtils.findLocalShard("default");
assertFalse(out.isPresent());
}
ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
- ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
mock(ClusterWrapper.class), mock(Configuration.class));
- ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+ ActorSelection actor = actorUtils.actorSelection(shardActorRef.path());
- Object out = actorContext.executeOperation(actor, "hello");
+ Object out = actorUtils.executeOperation(actor, "hello");
assertEquals("hello", out);
}
ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
- ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
mock(ClusterWrapper.class), mock(Configuration.class));
- ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
+ ActorSelection actor = actorUtils.actorSelection(shardActorRef.path());
- Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
+ Future<Object> future = actorUtils.executeOperationAsync(actor, "hello");
Object result = Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
assertEquals("Result", "hello", result);
@Test
public void testIsPathLocal() {
MockClusterWrapper clusterWrapper = new MockClusterWrapper();
- ActorContext actorContext = null;
+ ActorUtils actorUtils = null;
- actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertFalse(actorContext.isPathLocal(null));
- assertFalse(actorContext.isPathLocal(""));
+ actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertFalse(actorUtils.isPathLocal(null));
+ assertFalse(actorUtils.isPathLocal(""));
clusterWrapper.setSelfAddress(null);
- actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertFalse(actorContext.isPathLocal(""));
+ actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertFalse(actorUtils.isPathLocal(""));
// even if the path is in local format, match the primary path (first 3 elements) and return true
clusterWrapper.setSelfAddress(new Address("akka", "test"));
- actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertTrue(actorContext.isPathLocal("akka://test/user/$a"));
+ actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertTrue(actorUtils.isPathLocal("akka://test/user/$a"));
clusterWrapper.setSelfAddress(new Address("akka", "test"));
- actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertTrue(actorContext.isPathLocal("akka://test/user/$a"));
+ actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertTrue(actorUtils.isPathLocal("akka://test/user/$a"));
clusterWrapper.setSelfAddress(new Address("akka", "test"));
- actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertTrue(actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
+ actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertTrue(actorUtils.isPathLocal("akka://test/user/token2/token3/$a"));
// self address of remote format,but Tx path local format.
clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
- actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertTrue(actorContext.isPathLocal("akka://system/user/shardmanager/shard/transaction"));
+ actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertTrue(actorUtils.isPathLocal("akka://system/user/shardmanager/shard/transaction"));
// self address of local format,but Tx path remote format.
clusterWrapper.setSelfAddress(new Address("akka", "system"));
- actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertFalse(actorContext.isPathLocal("akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
+ actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
//local path but not same
clusterWrapper.setSelfAddress(new Address("akka", "test"));
- actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertTrue(actorContext.isPathLocal("akka://test1/user/$a"));
+ actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertTrue(actorUtils.isPathLocal("akka://test1/user/$a"));
//ip and port same
clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
- actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertTrue(actorContext.isPathLocal("akka://system@127.0.0.1:2550/"));
+ actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertTrue(actorUtils.isPathLocal("akka://system@127.0.0.1:2550/"));
// forward-slash missing in address
clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
- actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertFalse(actorContext.isPathLocal("akka://system@127.0.0.1:2550"));
+ actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2550"));
//ips differ
clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
- actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertFalse(actorContext.isPathLocal("akka://system@127.1.0.1:2550/"));
+ actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertFalse(actorUtils.isPathLocal("akka://system@127.1.0.1:2550/"));
//ports differ
clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
- actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertFalse(actorContext.isPathLocal("akka://system@127.0.0.1:2551/"));
+ actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
+ assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2551/"));
}
@Test
public void testClientDispatcherIsGlobalDispatcher() {
- ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ ActorUtils actorUtils = new ActorUtils(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
- assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+ assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorUtils.getClientDispatcher());
}
@Test
ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers",
ConfigFactory.load("application-with-custom-dispatchers.conf"));
- ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
+ ActorUtils actorUtils = new ActorUtils(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
- assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
+ assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorUtils.getClientDispatcher());
actorSystem.terminate();
}
@Test
public void testSetDatastoreContext() {
final TestKit testKit = new TestKit(getSystem());
- ActorContext actorContext = new ActorContext(getSystem(), testKit.getRef(),
+ ActorUtils actorUtils = new ActorUtils(getSystem(), testKit.getRef(),
mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
.operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
new PrimaryShardInfoFutureCache());
- assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
+ assertEquals("getOperationDuration", 5, actorUtils.getOperationDuration().toSeconds());
assertEquals("getTransactionCommitOperationTimeout", 7,
- actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+ actorUtils.getTransactionCommitOperationTimeout().duration().toSeconds());
DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
.shardTransactionCommitTimeoutInSeconds(8).build();
DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
- actorContext.setDatastoreContext(mockContextFactory);
+ actorUtils.setDatastoreContext(mockContextFactory);
testKit.expectMsgClass(Duration.ofSeconds(5), DatastoreContextFactory.class);
- Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
+ Assert.assertSame("getDatastoreContext", newContext, actorUtils.getDatastoreContext());
- assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
+ assertEquals("getOperationDuration", 6, actorUtils.getOperationDuration().toSeconds());
assertEquals("getTransactionCommitOperationTimeout", 8,
- actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
+ actorUtils.getTransactionCommitOperationTimeout().duration().toSeconds());
}
@Test
final String expPrimaryPath = "akka://test-system/find-primary-shard";
final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
- ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
}
};
- Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
PrimaryShardInfo actual = Await.result(foobar, FiniteDuration.apply(5000, TimeUnit.MILLISECONDS));
assertNotNull(actual);
expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
- Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+ Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
assertEquals(cachedInfo, actual);
- actorContext.getPrimaryShardInfoCache().remove("foobar");
+ actorUtils.getPrimaryShardInfoCache().remove("foobar");
- cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+ cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
assertNull(cached);
}
final DataTree mockDataTree = Mockito.mock(DataTree.class);
final String expPrimaryPath = "akka://test-system/find-primary-shard";
- ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
}
};
- Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
PrimaryShardInfo actual = Await.result(foobar, FiniteDuration.apply(5000, TimeUnit.MILLISECONDS));
assertNotNull(actual);
expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
- Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+ Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
assertEquals(cachedInfo, actual);
- actorContext.getPrimaryShardInfoCache().remove("foobar");
+ actorUtils.getPrimaryShardInfoCache().remove("foobar");
- cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+ cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
assertNull(cached);
}
.logicalStoreType(LogicalDatastoreType.CONFIGURATION)
.shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
- @Override
- protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
- return Futures.successful(expectedException);
- }
- };
+ @Override
+ protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
+ return Futures.successful(expectedException);
+ }
+ };
- Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
try {
Await.result(foobar, FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
}
}
- Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+ Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
assertNull(cached);
}
doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
.getAllShardNames();
- ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
+ ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
mock(ClusterWrapper.class), mockConfig,
DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
new PrimaryShardInfoFutureCache());
- actorContext.broadcast(v -> new TestMessage(), TestMessage.class);
+ actorUtils.broadcast(v -> new TestMessage(), TestMessage.class);
MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);