Merge "Create odl-nsf-service feature, which excludes neutron feature"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
index 5e53b29db13f7fff0accf1397dc691a1f071d8a6..fa2f9187d6059f1585a1475531556d8563db3c5a 100644 (file)
@@ -10,6 +10,7 @@ import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
@@ -24,12 +25,17 @@ import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -59,6 +65,7 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
@@ -69,6 +76,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.Promise;
 import scala.concurrent.duration.Duration;
 
 @SuppressWarnings("resource")
@@ -118,14 +126,16 @@ public class TransactionProxyTest {
 
         schemaContext = TestModel.createTestContext();
 
-        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build();
+        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
 
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
+        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
 
         ShardStrategyFactory.setConfiguration(configuration);
     }
@@ -358,12 +368,20 @@ public class TransactionProxyTest {
         return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
     }
 
+    private Future<Object> incompleteFuture(){
+        return mock(Future.class);
+    }
+
     private Future<MergeDataReply> mergeDataReply() {
         return Futures.successful(new MergeDataReply());
     }
 
+    private Future<Object> deleteSerializedDataReply(short version) {
+        return Futures.successful(new DeleteDataReply().toSerializable(version));
+    }
+
     private Future<Object> deleteSerializedDataReply() {
-        return Futures.successful(new DeleteDataReply().toSerializable());
+        return deleteSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
     }
 
     private Future<DeleteDataReply> deleteDataReply() {
@@ -382,8 +400,7 @@ public class TransactionProxyTest {
             .build();
     }
 
-    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
-            TransactionType type, int transactionVersion) {
+    private ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         doReturn(actorSystem.actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
@@ -391,6 +408,17 @@ public class TransactionProxyTest {
         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
 
+        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
+
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        return actorRef;
+    }
+
+    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+            TransactionType type, int transactionVersion) {
+        ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
+
         doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
                         eqCreateTransaction(memberName, type));
@@ -756,6 +784,62 @@ public class TransactionProxyTest {
                 WriteDataReply.class);
     }
 
+    @Test
+    public void testWriteAfterAsyncRead() throws Throwable {
+        ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
+
+        Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
+        doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
+                eq(getSystem().actorSelection(actorRef.path())),
+                eqCreateTransaction(memberName, READ_WRITE));
+
+        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
+
+        final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+
+        final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        final CountDownLatch readComplete = new CountDownLatch(1);
+        final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
+        com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
+                new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+                    @Override
+                    public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+                        try {
+                            transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+                        } catch (Exception e) {
+                            caughtEx.set(e);
+                        } finally {
+                            readComplete.countDown();
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        caughtEx.set(t);
+                        readComplete.countDown();
+                    }
+                });
+
+        createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
+
+        Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
+
+        if(caughtEx.get() != null) {
+            throw caughtEx.get();
+        }
+
+        verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                WriteDataReply.class);
+    }
+
     @Test(expected=IllegalStateException.class)
     public void testWritePreConditionCheck() {
 
@@ -814,7 +898,7 @@ public class TransactionProxyTest {
                 eq(actorSelection(actorRef)), eqSerializedDeleteData());
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                DeleteDataReply.SERIALIZABLE_CLASS);
+                DeleteDataReply.class);
     }
 
     private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
@@ -1222,4 +1306,425 @@ public class TransactionProxyTest {
 
         verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
     }
+
+    private static interface TransactionProxyOperation {
+        void run(TransactionProxy transactionProxy);
+    }
+
+    private void throttleOperation(TransactionProxyOperation operation) {
+        throttleOperation(operation, 1, true);
+    }
+
+    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(actorSystem.actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        if(shardFound) {
+            doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+                    when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        } else {
+            doReturn(Futures.failed(new Exception("not found")))
+                    .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        }
+
+        String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+                .setTransactionId("txn-1")
+                .setTransactionActorPath(actorPath)
+                .build();
+
+        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(memberName, READ_WRITE));
+
+        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        long start = System.nanoTime();
+
+        operation.run(transactionProxy);
+
+        long end = System.nanoTime();
+
+        Assert.assertTrue(String.format("took less time than expected %s was %s",
+                TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
+                (end-start)), (end - start) > TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+
+    }
+
+    private void completeOperation(TransactionProxyOperation operation){
+        completeOperation(operation, true);
+    }
+
+    private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(actorSystem.actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        if(shardFound) {
+            doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+                    when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        } else {
+            doReturn(Futures.failed(new Exception("not found")))
+                    .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        }
+
+        String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+                .setTransactionId("txn-1")
+                .setTransactionActorPath(actorPath)
+                .build();
+
+        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(memberName, READ_WRITE));
+
+        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        long start = System.nanoTime();
+
+        operation.run(transactionProxy);
+
+        long end = System.nanoTime();
+
+        Assert.assertTrue(String.format("took more time than expected %s was %s",
+                TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()),
+                (end-start)), (end - start) <= TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+    }
+
+    public void testWriteThrottling(boolean shardFound){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        }, 1, shardFound);
+    }
+
+    @Test
+    public void testWriteThrottlingWhenShardFound(){
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        });
+
+    }
+
+    @Test
+    public void testWriteThrottlingWhenShardNotFound(){
+        // Confirm that there is no throttling when the Shard is not found
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        }, false);
+
+    }
+
+
+    @Test
+    public void testWriteCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        });
+
+    }
+
+    @Test
+    public void testMergeThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        });
+    }
+
+    @Test
+    public void testMergeThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        }, false);
+    }
+
+    @Test
+    public void testMergeCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        });
+
+    }
+
+    @Test
+    public void testDeleteThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+
+    @Test
+    public void testDeleteThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+    @Test
+    public void testDeleteCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testReadThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+    @Test
+    public void testReadThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+
+    @Test
+    public void testReadCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testExistsThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+    @Test
+    public void testExistsThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+
+    @Test
+    public void testExistsCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testReadyThrottling(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), any(ReadyTransaction.class));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.ready();
+            }
+        });
+    }
+
+    @Test
+    public void testReadyThrottlingWithTwoTransactionContexts(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+                NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
+
+                doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(carsNode));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), any(ReadyTransaction.class));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, carsNode);
+
+                transactionProxy.ready();
+            }
+        }, 2, true);
+    }
 }