throttleOperation(operation, 1, true);
}
+ private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+ throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+ }
+
private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
Optional.<DataTree>absent());
}
- private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+ private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
ActorSystem actorSystem = getSystem();
ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+ // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
+ // we now allow one extra permit to be allowed for ready
+ doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
+ shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
doReturn(actorSystem.actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
if(shardFound) {
doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
+ when(mockActorContext).findPrimaryShardAsync(eq("cars"));
+
} else {
doReturn(Futures.failed(new Exception("not found")))
.when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
long end = System.nanoTime();
- long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
- expected, (end-start)), (end - start) > expected);
+ expectedCompletionTime, (end-start)),
+ ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
}
ActorSystem actorSystem = getSystem();
ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
doReturn(actorSystem.actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
ActorSystem actorSystem = getSystem();
ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
doReturn(actorSystem.actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
@Test
public void testWriteCompletionForLocalShard(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperationLocal(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteThrottlingWhenShardFound(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteThrottlingWhenShardNotFound(){
// Confirm that there is no throttling when the Shard is not found
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteCompletion(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeThrottlingWhenShardFound(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeThrottlingWhenShardNotFound(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeCompletion(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeCompletionForLocalShard(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperationLocal(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testDeleteCompletionForLocalShard(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperationLocal(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testDeleteCompletion(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testReadyThrottlingWithTwoTransactionContexts(){
-
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- transactionProxy.write(TestModel.TEST_PATH, carsNode);
+ // Trying to write to Cars will cause another transaction context to get created
+ transactionProxy.write(CarsModel.BASE_PATH, carsNode);
+ // Now ready should block for both transaction contexts
transactionProxy.ready();
}
- }, 2, true);
+ }, 1, true, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()) * 2);
}
private void testModificationOperationBatching(TransactionType type) throws Exception {
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
- doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);