X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=8a17ddf8c3a8a3c327c8fd16afee398085dfcdb3;hb=204f45f8b3233dbea87e2c8065914f0d2a0ded07;hp=f19bace5b99ce21862a04455241cf913a34fc350;hpb=95538aa1baf80a90f03e4ae6d8268c9db34b3bfa;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index f19bace5b9..8a17ddf8c3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -1,3 +1,11 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; @@ -8,6 +16,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -19,6 +28,7 @@ import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; import akka.dispatch.Futures; +import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.collect.Sets; import com.google.common.util.concurrent.CheckedFuture; @@ -34,6 +44,7 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; +import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; @@ -47,8 +58,8 @@ import org.opendaylight.controller.cluster.datastore.modification.DeleteModifica import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; -import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -138,7 +149,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { } doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), any()); + any(ActorSelection.class), any(), any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); @@ -216,7 +227,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync( - eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY)); + eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY), + any(Timeout.class)); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); @@ -334,7 +346,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { Promise createTxPromise = akka.dispatch.Futures.promise(); doReturn(createTxPromise).when(mockActorContext).executeOperationAsync( eq(getSystem().actorSelection(actorRef.path())), - eqCreateTransaction(memberName, READ_WRITE)); + eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class)); doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); @@ -615,6 +627,73 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class); } + @Test + public void testReadyWithDebugContextEnabled() throws Exception { + dataStoreContextBuilder.transactionDebugContextEnabled(true); + + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + expectBatchedModificationsReady(actorRef, true); + + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE); + + transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof DebugThreePhaseCommitCohort); + + verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable()); + } + + @Test + public void testReadyWithLocalTransaction() throws Exception { + ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); + + doReturn(getSystem().actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); + + expectReadyLocalTransaction(shardActorRef, true); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + assertTrue(ready instanceof SingleCommitCohortProxy); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); + } + + @Test + public void testReadyWithLocalTransactionWithFailure() throws Exception { + ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); + + doReturn(getSystem().actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + Optional mockDataTree = createDataTree(); + DataTreeModification mockModification = mockDataTree.get().takeSnapshot().newModification(); + doThrow(new RuntimeException("mock")).when(mockModification).ready(); + + doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY); + + expectReadyLocalTransaction(shardActorRef, true); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + assertTrue(ready instanceof SingleCommitCohortProxy); + verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class); + } + private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception { doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString()); @@ -779,20 +858,30 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { throttleOperation(operation, 1, true); } + private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){ + throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos( + mockActorContext.getDatastoreContext().getOperationTimeoutInMillis())); + } + private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){ - return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), Optional.absent()); + return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION, + Optional.absent()); } private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional dataTreeOptional){ - return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), dataTreeOptional); + return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION, + dataTreeOptional); } - private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){ + private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){ ActorSystem actorSystem = getSystem(); ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit(); + // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy + // we now allow one extra permit to be allowed for ready + doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2). + shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext(); doReturn(actorSystem.actorSelection(shardActorRef.path())). when(mockActorContext).actorSelection(shardActorRef.path().toString()); @@ -800,19 +889,19 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { if(shardFound) { doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))). + when(mockActorContext).findPrimaryShardAsync(eq("cars")); + } else { doReturn(Futures.failed(new Exception("not found"))) .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); } String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; - CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder(). - setTransactionId("txn-1").setTransactionActorPath(actorPath). - setMessageVersion(DataStoreVersions.CURRENT_VERSION).build(); - doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(memberName, READ_WRITE)); + doReturn(incompleteFuture()).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class)); doReturn(true).when(mockActorContext).isPathLocal(actorPath); @@ -824,9 +913,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { long end = System.nanoTime(); - long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()); Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", - expected, (end-start)), (end - start) > expected); + expectedCompletionTime, (end-start)), + ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2)); } @@ -838,8 +927,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorSystem actorSystem = getSystem(); ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit(); - doReturn(actorSystem.actorSelection(shardActorRef.path())). when(mockActorContext).actorSelection(shardActorRef.path().toString()); @@ -861,7 +948,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(memberName, READ_WRITE)); + eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class)); doReturn(true).when(mockActorContext).isPathLocal(anyString()); @@ -873,7 +960,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { long end = System.nanoTime(); - long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()); + long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()); Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, (end-start)), (end - start) <= expected); } @@ -882,8 +969,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorSystem actorSystem = getSystem(); ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit(); - doReturn(actorSystem.actorSelection(shardActorRef.path())). when(mockActorContext).actorSelection(shardActorRef.path().toString()); @@ -898,12 +983,12 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { long end = System.nanoTime(); - long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()); + long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()); Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, (end-start)), (end - start) <= expected); } - private Optional createDataTree(){ + private static Optional createDataTree(){ DataTree dataTree = mock(DataTree.class); Optional dataTreeOptional = Optional.of(dataTree); DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class); @@ -915,7 +1000,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { return dataTreeOptional; } - private Optional createDataTree(NormalizedNode readResponse){ + private static Optional createDataTree(NormalizedNode readResponse){ DataTree dataTree = mock(DataTree.class); Optional dataTreeOptional = Optional.of(dataTree); DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class); @@ -931,7 +1016,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteCompletionForLocalShard(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperationLocal(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -947,7 +1031,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteThrottlingWhenShardFound(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); throttleOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -965,7 +1048,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteThrottlingWhenShardNotFound(){ // Confirm that there is no throttling when the Shard is not found - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -984,7 +1066,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteCompletion(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1001,7 +1082,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeThrottlingWhenShardFound(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); throttleOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1018,7 +1098,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeThrottlingWhenShardNotFound(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1035,7 +1114,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeCompletion(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1053,7 +1131,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeCompletionForLocalShard(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperationLocal(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1101,7 +1178,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testDeleteCompletionForLocalShard(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperationLocal(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1116,7 +1192,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testDeleteCompletion(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1293,9 +1368,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModifications(1); - doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), any(ReadyTransaction.class)); - transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); transactionProxy.ready(); @@ -1305,7 +1377,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testReadyThrottlingWithTwoTransactionContexts(){ - throttleOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1319,11 +1390,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - transactionProxy.write(TestModel.TEST_PATH, carsNode); + // Trying to write to Cars will cause another transaction context to get created + transactionProxy.write(CarsModel.BASE_PATH, carsNode); + // Now ready should block for both transaction contexts transactionProxy.ready(); } - }, 2, true); + }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()) * 2); } private void testModificationOperationBatching(TransactionType type) throws Exception { @@ -1505,8 +1578,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(memberName).when(mockActorContext).getCurrentMemberName(); - doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); - doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); @@ -1557,7 +1628,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext). executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), - eqCreateTransaction(memberName, TransactionType.READ_ONLY)); + eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class)); doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));