import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.time.StopWatch;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
assertEquals(expected, actual);
}
- @Test
- public void testRateLimiting(){
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
- transactionCreationInitialRateLimit(155L).build();
-
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext);
-
- // Check that the initial value is being picked up from DataStoreContext
- assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
-
- actorContext.setTxCreationLimit(1.0);
-
- assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
-
-
- StopWatch watch = new StopWatch();
-
- watch.start();
-
- actorContext.acquireTxCreationPermit();
- actorContext.acquireTxCreationPermit();
- actorContext.acquireTxCreationPermit();
-
- watch.stop();
-
- assertTrue("did not take as much time as expected", watch.getTime() > 1000);
- }
@Test
public void testClientDispatcherIsGlobalDispatcher(){
ActorContext actorContext =
new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), DatastoreContext.newBuilder().build());
+ mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
ActorContext actorContext =
new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class), DatastoreContext.newBuilder().build());
+ mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
new JavaTestKit(getSystem()) {{
ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
mock(Configuration.class), DatastoreContext.newBuilder().
- operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
+ operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
assertEquals("getTransactionCommitOperationTimeout", 7,
final String expPrimaryPath = "akka://test-system/find-primary-shard";
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
assertEquals(cachedInfo, actual);
- // Wait for 200 Milliseconds. The cached entry should have been removed.
-
- Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ actorContext.getPrimaryShardInfoCache().remove("foobar");
cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
final String expPrimaryPath = "akka://test-system/find-primary-shard";
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
assertEquals(cachedInfo, actual);
- // Wait for 200 Milliseconds. The cached entry should have been removed.
-
- Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ actorContext.getPrimaryShardInfoCache().remove("foobar");
cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
@Test
public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
-
- TestActorRef<MessageCollectorActor> shardManager =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
-
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
- shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
-
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
- @Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new PrimaryNotFoundException("not found"));
- }
- };
-
-
- Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
-
- try {
- Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
- fail("Expected PrimaryNotFoundException");
- } catch(PrimaryNotFoundException e){
-
- }
-
- Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
-
- assertNull(cached);
+ testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
}
@Test
public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+ testFindPrimaryExceptions(new NotInitializedException("not initialized"));
+ }
- TestActorRef<MessageCollectorActor> shardManager =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
-
- DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
- shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
-
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
- mock(Configuration.class), dataStoreContext) {
- @Override
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new NotInitializedException("not iniislized"));
- }
- };
+ private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
+ TestActorRef<MessageCollectorActor> shardManager =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+ shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
- Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ ActorContext actorContext =
+ new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+ mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
+ @Override
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+ return Futures.successful(expectedException);
+ }
+ };
- try {
- Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
- fail("Expected NotInitializedException");
- } catch(NotInitializedException e){
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ try {
+ Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+ fail("Expected" + expectedException.getClass().toString());
+ } catch(Exception e){
+ if(!expectedException.getClass().isInstance(e)) {
+ fail("Expected Exception of type " + expectedException.getClass().toString());
}
+ }
- Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
- assertNull(cached);
+ assertNull(cached);
}
@Test
ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
mock(ClusterWrapper.class), mockConfig,
- DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
+ DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
actorContext.broadcast(new TestMessage());