public void testClientDispatcherIsGlobalDispatcher(){
ActorContext actorContext =
new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), DatastoreContext.newBuilder().build());
+ mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
ActorContext actorContext =
new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), DatastoreContext.newBuilder().build());
+ mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
new JavaTestKit(getSystem()) {{
ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
mock(Configuration.class), DatastoreContext.newBuilder().
- operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
+ operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
assertEquals("getTransactionCommitOperationTimeout", 7,
final String expPrimaryPath = "akka://test-system/find-primary-shard";
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
assertEquals(cachedInfo, actual);
- // Wait for 200 Milliseconds. The cached entry should have been removed.
-
- Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ actorContext.getPrimaryShardInfoCache().remove("foobar");
cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
final String expPrimaryPath = "akka://test-system/find-primary-shard";
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
assertEquals(cachedInfo, actual);
- // Wait for 200 Milliseconds. The cached entry should have been removed.
-
- Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ actorContext.getPrimaryShardInfoCache().remove("foobar");
cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new PrimaryNotFoundException("not found"));
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new NotInitializedException("not iniislized"));
ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
mock(ClusterWrapper.class), mockConfig,
- DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
+ DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
actorContext.broadcast(new TestMessage());