Bug 3020: Use leader version in LeaderStateChanged
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
index 6cf63157e16ff7f37033ee3142f4ca1c06e84df8..26d51cbae3bf2df728ec1fa9e3322d0151b114b4 100644 (file)
@@ -59,6 +59,8 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Promise;
@@ -613,6 +615,25 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
     }
 
+    @Test
+    public void testReadyWithDebugContextEnabled() throws Exception {
+        dataStoreContextBuilder.transactionDebugContextEnabled(true);
+
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        expectBatchedModificationsReady(actorRef, true);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
+
+        transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof DebugThreePhaseCommitCohort);
+
+        verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
+    }
+
     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
 
@@ -778,9 +799,16 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     }
 
     private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
-        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), Optional.<DataTree>absent());
+        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
+                Optional.<DataTree>absent());
+    }
+
+    private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional<DataTree> dataTreeOptional){
+        return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
+                dataTreeOptional);
     }
 
+
     private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
         ActorSystem actorSystem = getSystem();
         ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
@@ -871,6 +899,73 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 expected, (end-start)), (end - start) <= expected);
     }
 
+    private void completeOperationLocal(TransactionProxyOperation operation, Optional<DataTree> dataTreeOptional){
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(actorSystem.actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTreeOptional))).
+                when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
+
+        long start = System.nanoTime();
+
+        operation.run(transactionProxy);
+
+        long end = System.nanoTime();
+
+        long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+        Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
+                expected, (end-start)), (end - start) <= expected);
+    }
+
+    private Optional<DataTree> createDataTree(){
+        DataTree dataTree = mock(DataTree.class);
+        Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
+        DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
+        DataTreeModification dataTreeModification = mock(DataTreeModification.class);
+
+        doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
+        doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
+
+        return dataTreeOptional;
+    }
+
+    private Optional<DataTree> createDataTree(NormalizedNode readResponse){
+        DataTree dataTree = mock(DataTree.class);
+        Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
+        DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
+        DataTreeModification dataTreeModification = mock(DataTreeModification.class);
+
+        doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
+        doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
+        doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
+
+        return dataTreeOptional;
+    }
+
+
+    @Test
+    public void testWriteCompletionForLocalShard(){
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+            }
+        }, createDataTree());
+    }
+
     @Test
     public void testWriteThrottlingWhenShardFound(){
         dataStoreContextBuilder.shardBatchedModificationCount(1);
@@ -977,6 +1072,23 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     }
 
+    @Test
+    public void testMergeCompletionForLocalShard(){
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+            }
+        }, createDataTree());
+    }
+
+
     @Test
     public void testDeleteThrottlingWhenShardFound(){
 
@@ -1008,6 +1120,21 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         }, false);
     }
 
+    @Test
+    public void testDeleteCompletionForLocalShard(){
+        dataStoreContextBuilder.shardBatchedModificationCount(1);
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        }, createDataTree());
+
+    }
+
     @Test
     public void testDeleteCompletion(){
         dataStoreContextBuilder.shardBatchedModificationCount(1);
@@ -1075,6 +1202,33 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     }
 
+    @Test
+    public void testReadCompletionForLocalShard(){
+        final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        }, createDataTree(nodeToRead));
+
+    }
+
+    @Test
+    public void testReadCompletionForLocalShardWhenExceptionOccurs(){
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        }, createDataTree());
+
+    }
+
     @Test
     public void testExistsThrottlingWhenShardFound(){
 
@@ -1124,6 +1278,32 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     }
 
+    @Test
+    public void testExistsCompletionForLocalShard(){
+        final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        }, createDataTree(nodeToRead));
+
+    }
+
+    @Test
+    public void testExistsCompletionForLocalShardWhenExceptionOccurs(){
+        completeOperationLocal(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        }, createDataTree());
+
+    }
     @Test
     public void testReadyThrottling(){