+ @Test
+ public void testWriteAfterAsyncRead() throws Throwable {
+ ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
+
+ Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
+ doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
+ eq(getSystem().actorSelection(actorRef.path())),
+ eqCreateTransaction(memberName, READ_WRITE));
+
+ doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
+
+ final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+
+ final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+ final CountDownLatch readComplete = new CountDownLatch(1);
+ final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
+ com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
+ new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+ @Override
+ public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+ try {
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+ } catch (Exception e) {
+ caughtEx.set(e);
+ } finally {
+ readComplete.countDown();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ caughtEx.set(t);
+ readComplete.countDown();
+ }
+ });
+
+ createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
+
+ Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
+
+ if(caughtEx.get() != null) {
+ throw caughtEx.get();
+ }
+
+ verify(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ WriteDataReply.class);
+ }
+