Merge "Bug 1576: Handle remote failures for write Tx async"
authorMoiz Raja <moraja@cisco.com>
Tue, 26 Aug 2014 18:33:11 +0000 (18:33 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 26 Aug 2014 18:33:11 +0000 (18:33 +0000)
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java

index fc455b193e27118f6dcfcc2de93032f2676c5619..c557118b1e8f92234d597ce6372cda2674f6a6bd 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -31,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import scala.concurrent.Future;
+import scala.runtime.AbstractFunction1;
 
 import java.util.Collections;
 import java.util.List;
@@ -40,32 +42,79 @@ import java.util.List;
  */
 public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
 
-    private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
 
     private final ActorContext actorContext;
-    private final List<ActorPath> cohortPaths;
+    private final List<Future<ActorPath>> cohortPathFutures;
+    private volatile List<ActorPath> cohortPaths;
     private final String transactionId;
 
-    public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths,
-            String transactionId) {
+    public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+            List<Future<ActorPath>> cohortPathFutures, String transactionId) {
         this.actorContext = actorContext;
-        this.cohortPaths = cohortPaths;
+        this.cohortPathFutures = cohortPathFutures;
         this.transactionId = transactionId;
     }
 
+    private Future<Void> buildCohortPathsList() {
+
+        Future<Iterable<ActorPath>> combinedFutures = Futures.sequence(cohortPathFutures,
+                actorContext.getActorSystem().dispatcher());
+
+        return combinedFutures.transform(new AbstractFunction1<Iterable<ActorPath>, Void>() {
+            @Override
+            public Void apply(Iterable<ActorPath> paths) {
+                cohortPaths = Lists.newArrayList(paths);
+
+                LOG.debug("Tx {} successfully built cohort path list: {}",
+                        transactionId, cohortPaths);
+                return null;
+            }
+        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
+    }
+
     @Override
     public ListenableFuture<Boolean> canCommit() {
-        LOG.debug("txn {} canCommit", transactionId);
+        LOG.debug("Tx {} canCommit", transactionId);
+
+        final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+        // The first phase of canCommit is to gather the list of cohort actor paths that will
+        // participate in the commit. buildCohortPathsList combines the cohort path Futures into
+        // one Future which we wait on asynchronously here. The cohort actor paths are
+        // extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
+        // and passed to us from upstream processing. If any one fails then  we'll fail canCommit.
+
+        buildCohortPathsList().onComplete(new OnComplete<Void>() {
+            @Override
+            public void onComplete(Throwable failure, Void notUsed) throws Throwable {
+                if(failure != null) {
+                    LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+                    returnFuture.setException(failure);
+                } else {
+                    finishCanCommit(returnFuture);
+                }
+            }
+        }, actorContext.getActorSystem().dispatcher());
+
+        return returnFuture;
+    }
+
+    private void finishCanCommit(final SettableFuture<Boolean> returnFuture) {
+
+        LOG.debug("Tx {} finishCanCommit", transactionId);
+
+        // The last phase of canCommit is to invoke all the cohort actors asynchronously to perform
+        // their canCommit processing. If any one fails then we'll fail canCommit.
 
         Future<Iterable<Object>> combinedFuture =
                 invokeCohorts(new CanCommitTransaction().toSerializable());
 
-        final SettableFuture<Boolean> returnFuture = SettableFuture.create();
-
         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
             @Override
             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
                 if(failure != null) {
+                    LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
                     returnFuture.setException(failure);
                     return;
                 }
@@ -87,18 +136,18 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                     }
                 }
 
+                LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
+
                 returnFuture.set(Boolean.valueOf(result));
             }
         }, actorContext.getActorSystem().dispatcher());
-
-        return returnFuture;
     }
 
     private Future<Iterable<Object>> invokeCohorts(Object message) {
         List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
         for(ActorPath actorPath : cohortPaths) {
 
-            LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+            LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
 
             ActorSelection cohort = actorContext.actorSelection(actorPath);
 
@@ -111,39 +160,73 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
     @Override
     public ListenableFuture<Void> preCommit() {
-        LOG.debug("txn {} preCommit", transactionId);
-        return voidOperation(new PreCommitTransaction().toSerializable(),
+        return voidOperation("preCommit",  new PreCommitTransaction().toSerializable(),
                 PreCommitTransactionReply.SERIALIZABLE_CLASS, true);
     }
 
     @Override
     public ListenableFuture<Void> abort() {
-        LOG.debug("txn {} abort", transactionId);
-
         // Note - we pass false for propagateException. In the front-end data broker, this method
         // is called when one of the 3 phases fails with an exception. We'd rather have that
         // original exception propagated to the client. If our abort fails and we propagate the
         // exception then that exception will supersede and suppress the original exception. But
         // it's the original exception that is the root cause and of more interest to the client.
 
-        return voidOperation(new AbortTransaction().toSerializable(),
+        return voidOperation("abort", new AbortTransaction().toSerializable(),
                 AbortTransactionReply.SERIALIZABLE_CLASS, false);
     }
 
     @Override
     public ListenableFuture<Void> commit() {
-        LOG.debug("txn {} commit", transactionId);
-        return voidOperation(new CommitTransaction().toSerializable(),
+        return voidOperation("commit",  new CommitTransaction().toSerializable(),
                 CommitTransactionReply.SERIALIZABLE_CLASS, true);
     }
 
-    private ListenableFuture<Void> voidOperation(final Object message,
+    private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
             final Class<?> expectedResponseClass, final boolean propagateException) {
 
-        Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+        LOG.debug("Tx {} {}", transactionId, operationName);
 
         final SettableFuture<Void> returnFuture = SettableFuture.create();
 
+        // The cohort actor list should already be built at this point by the canCommit phase but,
+        // if not for some reason, we'll try to build it here.
+
+        if(cohortPaths != null) {
+            finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
+                    returnFuture);
+        } else {
+            buildCohortPathsList().onComplete(new OnComplete<Void>() {
+                @Override
+                public void onComplete(Throwable failure, Void notUsed) throws Throwable {
+                    if(failure != null) {
+                        LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId,
+                                operationName, failure);
+
+                        if(propagateException) {
+                            returnFuture.setException(failure);
+                        } else {
+                            returnFuture.set(null);
+                        }
+                    } else {
+                        finishVoidOperation(operationName, message, expectedResponseClass,
+                                propagateException, returnFuture);
+                    }
+                }
+            }, actorContext.getActorSystem().dispatcher());
+        }
+
+        return returnFuture;
+    }
+
+    private void finishVoidOperation(final String operationName, final Object message,
+            final Class<?> expectedResponseClass, final boolean propagateException,
+            final SettableFuture<Void> returnFuture) {
+
+        LOG.debug("Tx {} finish {}", transactionId, operationName);
+
+        Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
+
         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
             @Override
             public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
@@ -161,6 +244,9 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
 
                 if(exceptionToPropagate != null) {
+                    LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
+                            operationName, exceptionToPropagate);
+
                     if(propagateException) {
                         // We don't log the exception here to avoid redundant logging since we're
                         // propagating to the caller in MD-SAL core who will log it.
@@ -174,15 +260,15 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                         returnFuture.set(null);
                     }
                 } else {
+                    LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
                     returnFuture.set(null);
                 }
             }
         }, actorContext.getActorSystem().dispatcher());
-
-        return returnFuture;
     }
 
-    public List<ActorPath> getCohortPaths() {
-        return Collections.unmodifiableList(this.cohortPaths);
+    @VisibleForTesting
+    List<Future<ActorPath>> getCohortPathFutures() {
+        return Collections.unmodifiableList(cohortPathFutures);
     }
 }
index 6183c489c4cdbc56a1ba9ac0c3c3939963277edb..2e7b2feb85bd97c7b80d09b8156f4b44c6fb343e 100644 (file)
@@ -9,13 +9,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorPath;
-import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import akka.actor.Props;
 import akka.dispatch.OnComplete;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
@@ -45,9 +45,10 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.Function1;
 import scala.concurrent.Future;
+import scala.runtime.AbstractFunction1;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -72,6 +73,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         READ_WRITE
     }
 
+    static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
+                                                                          Throwable, Throwable>() {
+        @Override
+        public Throwable apply(Throwable failure) {
+            return failure;
+        }
+    };
+
     private static final AtomicLong counter = new AtomicLong();
 
     private static final Logger
@@ -103,6 +112,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
     }
 
+    @VisibleForTesting
+    List<Future<Object>> getRecordedOperationFutures() {
+        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+        for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+            recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+        }
+
+        return recordedOperationFutures;
+    }
+
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
             final YangInstanceIdentifier path) {
@@ -110,7 +129,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Read operation on write-only transaction is not allowed");
 
-        LOG.debug("txn {} read {}", identifier, path);
+        LOG.debug("Tx {} read {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -123,7 +142,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Exists operation on write-only transaction is not allowed");
 
-        LOG.debug("txn {} exists {}", identifier, path);
+        LOG.debug("Tx {} exists {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -142,7 +161,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("txn {} write {}", identifier, path);
+        LOG.debug("Tx {} write {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -154,7 +173,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("txn {} merge {}", identifier, path);
+        LOG.debug("Tx {} merge {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -166,7 +185,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("txn {} delete {}", identifier, path);
+        LOG.debug("Tx {} delete {}", identifier, path);
 
         createTransactionIfMissing(actorContext, path);
 
@@ -180,31 +199,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         inReadyState = true;
 
-        List<ActorPath> cohortPaths = new ArrayList<>();
-
-        LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier,
+        LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
                 remoteTransactionPaths.size());
 
+        List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+
         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
 
-            LOG.debug("txn {} Readying transaction for shard {}", identifier,
+            LOG.debug("Tx {} Readying transaction for shard {}", identifier,
                     transactionContext.getShardName());
 
-            Object result = transactionContext.readyTransaction();
-
-            if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
-                ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
-                        actorContext.getActorSystem(),result);
-                String resolvedCohortPath = transactionContext.getResolvedCohortPath(
-                        reply.getCohortPath().toString());
-                cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
-            } else {
-                LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS,
-                        result.getClass());
-            }
+            cohortPathFutures.add(transactionContext.readyTransaction());
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString());
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
+                identifier.toString());
     }
 
     @Override
@@ -249,7 +258,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                 String transactionPath = reply.getTransactionPath();
 
-                LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
+                LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
 
                 ActorSelection transactionActor =
                     actorContext.actorSelection(transactionPath);
@@ -259,11 +268,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                 remoteTransactionPaths.put(shardName, transactionContext);
             } else {
-                LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS,
-                        response.getClass());
+                throw new IllegalArgumentException(String.format(
+                        "Invalid reply type {} for CreateTransaction", response.getClass()));
             }
         } catch(Exception e){
-            LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+            LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
             remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
         }
     }
@@ -271,11 +280,11 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private interface TransactionContext {
         String getShardName();
 
-        String getResolvedCohortPath(String cohortPath);
+        void closeTransaction();
 
-        public void closeTransaction();
+        Future<ActorPath> readyTransaction();
 
-        public Object readyTransaction();
+        void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
         void deleteData(YangInstanceIdentifier path);
 
@@ -284,23 +293,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
                 final YangInstanceIdentifier path);
 
-        void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
-
         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
-    }
 
+        List<Future<Object>> getRecordedOperationFutures();
+    }
 
-    private class TransactionContextImpl implements TransactionContext {
-        private final String shardName;
-        private final String actorPath;
-        private final ActorSelection actor;
+    private abstract class AbstractTransactionContext implements TransactionContext {
 
+        protected final String shardName;
+        protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
 
-        private TransactionContextImpl(String shardName, String actorPath,
-            ActorSelection actor) {
+        AbstractTransactionContext(String shardName) {
             this.shardName = shardName;
-            this.actorPath = actorPath;
-            this.actor = actor;
         }
 
         @Override
@@ -308,53 +312,193 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             return shardName;
         }
 
+        @Override
+        public List<Future<Object>> getRecordedOperationFutures() {
+            return recordedOperationFutures;
+        }
+    }
+
+    private class TransactionContextImpl extends AbstractTransactionContext {
+        private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
+
+        private final String actorPath;
+        private final ActorSelection actor;
+
+        private TransactionContextImpl(String shardName, String actorPath,
+            ActorSelection actor) {
+            super(shardName);
+            this.actorPath = actorPath;
+            this.actor = actor;
+        }
+
         private ActorSelection getActor() {
             return actor;
         }
 
-        @Override
-        public String getResolvedCohortPath(String cohortPath) {
+        private String getResolvedCohortPath(String cohortPath) {
             return actorContext.resolvePath(actorPath, cohortPath);
         }
 
         @Override
         public void closeTransaction() {
+            LOG.debug("Tx {} closeTransaction called", identifier);
             actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
         }
 
         @Override
-        public Object readyTransaction() {
-            return actorContext.executeRemoteOperation(getActor(),
+        public Future<ActorPath> readyTransaction() {
+            LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+                    identifier, recordedOperationFutures.size());
+
+            // Send the ReadyTransaction message to the Tx actor.
+
+            final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
                     new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
+
+            // Combine all the previously recorded put/merge/delete operation reply Futures and the
+            // ReadyTransactionReply Future into one Future. If any one fails then the combined
+            // Future will fail. We need all prior operations and the ready operation to succeed
+            // in order to attempt commit.
+
+            List<Future<Object>> futureList =
+                    Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
+            futureList.addAll(recordedOperationFutures);
+            futureList.add(replyFuture);
+
+            Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
+                    actorContext.getActorSystem().dispatcher());
+
+            // Transform the combined Future into a Future that returns the cohort actor path from
+            // the ReadyTransactionReply. That's the end result of the ready operation.
+
+            return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
+                @Override
+                public ActorPath apply(Iterable<Object> notUsed) {
+
+                    LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
+                            identifier);
+
+                    // At this point all the Futures succeeded and we need to extract the cohort
+                    // actor path from the ReadyTransactionReply. For the recorded operations, they
+                    // don't return any data so we're only interested that they completed
+                    // successfully. We could be paranoid and verify the correct reply types but
+                    // that really should never happen so it's not worth the overhead of
+                    // de-serializing each reply.
+
+                    // Note the Future get call here won't block as it's complete.
+                    Object serializedReadyReply = replyFuture.value().get().get();
+                    if(serializedReadyReply.getClass().equals(
+                                                     ReadyTransactionReply.SERIALIZABLE_CLASS)) {
+                        ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
+                                actorContext.getActorSystem(), serializedReadyReply);
+
+                        String resolvedCohortPath = getResolvedCohortPath(
+                                reply.getCohortPath().toString());
+
+                        LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
+                                identifier, resolvedCohortPath);
+
+                        return actorContext.actorFor(resolvedCohortPath);
+                    } else {
+                        // Throwing an exception here will fail the Future.
+
+                        throw new IllegalArgumentException(String.format("Invalid reply type {}",
+                                serializedReadyReply.getClass()));
+                    }
+                }
+            }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
         }
 
         @Override
         public void deleteData(YangInstanceIdentifier path) {
-            actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() );
+            LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+                    new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            actorContext.sendRemoteOperationAsync(getActor(),
-                    new MergeData(path, data, schemaContext).toSerializable());
+            LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+                    new MergeData(path, data, schemaContext).toSerializable(),
+                    ActorContext.ASK_DURATION));
+        }
+
+        @Override
+        public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+                    new WriteData(path, data, schemaContext).toSerializable(),
+                    ActorContext.ASK_DURATION));
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
-            final YangInstanceIdentifier path) {
+                final YangInstanceIdentifier path) {
+
+            LOG.debug("Tx {} readData called path = {}", identifier, path);
 
             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
 
+            // If there were any previous recorded put/merge/delete operation reply Futures then we
+            // must wait for them to successfully complete. This is necessary to honor the read
+            // uncommitted semantics of the public API contract. If any one fails then fail the read.
+
+            if(recordedOperationFutures.isEmpty()) {
+                finishReadData(path, returnFuture);
+            } else {
+                LOG.debug("Tx {} readData: verifying {} previous recorded operations",
+                        identifier, recordedOperationFutures.size());
+
+                // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+                // Futures#sequence accesses the passed List on a different thread, as
+                // recordedOperationFutures is not synchronized.
+
+                Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+                        Lists.newArrayList(recordedOperationFutures),
+                        actorContext.getActorSystem().dispatcher());
+                OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+                    @Override
+                    public void onComplete(Throwable failure, Iterable<Object> notUsed)
+                            throws Throwable {
+                        if(failure != null) {
+                            LOG.debug("Tx {} readData: a recorded operation failed: {}",
+                                    identifier, failure);
+
+                            returnFuture.setException(new ReadFailedException(
+                                    "The read could not be performed because a previous put, merge,"
+                                    + "or delete operation failed", failure));
+                        } else {
+                            finishReadData(path, returnFuture);
+                        }
+                    }
+                };
+
+                combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+            }
+
+            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+        }
+
+        private void finishReadData(final YangInstanceIdentifier path,
+                final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
+
+            LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
+
             OnComplete<Object> onComplete = new OnComplete<Object>() {
                 @Override
-                public void onComplete(Throwable failure, Object response) throws Throwable {
+                public void onComplete(Throwable failure, Object readResponse) throws Throwable {
                     if(failure != null) {
+                        LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+
                         returnFuture.setException(new ReadFailedException(
                                 "Error reading data for path " + path, failure));
                     } else {
-                        if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+                        LOG.debug("Tx {} read operation succeeded", identifier, failure);
+
+                        if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
-                                    path, response);
+                                    path, readResponse);
                             if (reply.getNormalizedNode() == null) {
                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
                             } else {
@@ -369,32 +513,76 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             };
 
-            Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+            Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
                     new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
-            future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
-
-            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
-        }
-
-        @Override
-        public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            actorContext.sendRemoteOperationAsync(getActor(),
-                    new WriteData(path, data, schemaContext).toSerializable());
+            readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
 
         @Override
         public CheckedFuture<Boolean, ReadFailedException> dataExists(
                 final YangInstanceIdentifier path) {
 
+            LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+
             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
 
+            // If there were any previous recorded put/merge/delete operation reply Futures then we
+            // must wait for them to successfully complete. This is necessary to honor the read
+            // uncommitted semantics of the public API contract. If any one fails then fail this
+            // request.
+
+            if(recordedOperationFutures.isEmpty()) {
+                finishDataExists(path, returnFuture);
+            } else {
+                LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
+                        identifier, recordedOperationFutures.size());
+
+                // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+                // Futures#sequence accesses the passed List on a different thread, as
+                // recordedOperationFutures is not synchronized.
+
+                Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+                        Lists.newArrayList(recordedOperationFutures),
+                        actorContext.getActorSystem().dispatcher());
+                OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+                    @Override
+                    public void onComplete(Throwable failure, Iterable<Object> notUsed)
+                            throws Throwable {
+                        if(failure != null) {
+                            LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
+                                    identifier, failure);
+
+                            returnFuture.setException(new ReadFailedException(
+                                    "The data exists could not be performed because a previous "
+                                    + "put, merge, or delete operation failed", failure));
+                        } else {
+                            finishDataExists(path, returnFuture);
+                        }
+                    }
+                };
+
+                combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+            }
+
+            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+        }
+
+        private void finishDataExists(final YangInstanceIdentifier path,
+                final SettableFuture<Boolean> returnFuture) {
+
+            LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
+
             OnComplete<Object> onComplete = new OnComplete<Object>() {
                 @Override
                 public void onComplete(Throwable failure, Object response) throws Throwable {
                     if(failure != null) {
+                        LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+
                         returnFuture.setException(new ReadFailedException(
-                                "Error checking exists for path " + path, failure));
+                                "Error checking data exists for path " + path, failure));
                     } else {
+                        LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+
                         if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
                             returnFuture.set(Boolean.valueOf(DataExistsReply.
                                         fromSerializable(response).exists()));
@@ -409,80 +597,60 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
                     new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
-
-            return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
         }
     }
 
-    private class NoOpTransactionContext implements TransactionContext {
+    private class NoOpTransactionContext extends AbstractTransactionContext {
 
-        private final Logger
-            LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
+        private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
-        private final String shardName;
         private final Exception failure;
 
-        private ActorRef cohort;
-
         public NoOpTransactionContext(String shardName, Exception failure){
-            this.shardName = shardName;
+            super(shardName);
             this.failure = failure;
         }
 
         @Override
-        public String getShardName() {
-            return  shardName;
-
+        public void closeTransaction() {
+            LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
         }
 
         @Override
-        public String getResolvedCohortPath(String cohortPath) {
-            return cohort.path().toString();
+        public Future<ActorPath> readyTransaction() {
+            LOG.debug("Tx {} readyTransaction called", identifier);
+            return akka.dispatch.Futures.failed(failure);
         }
 
         @Override
-        public void closeTransaction() {
-            LOG.warn("txn {} closeTransaction called", identifier);
-        }
-
-        @Override public Object readyTransaction() {
-            LOG.warn("txn {} readyTransaction called", identifier);
-            cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
-            return new ReadyTransactionReply(cohort.path()).toSerializable();
+        public void deleteData(YangInstanceIdentifier path) {
+            LOG.debug("Tx {} deleteData called path = {}", identifier, path);
         }
 
         @Override
-        public void deleteData(YangInstanceIdentifier path) {
-            LOG.warn("txt {} deleteData called path = {}", identifier, path);
+        public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            LOG.debug("Tx {} mergeData called path = {}", identifier, path);
         }
 
         @Override
-        public void mergeData(YangInstanceIdentifier path,
-            NormalizedNode<?, ?> data) {
-            LOG.warn("txn {} mergeData called path = {}", identifier, path);
+        public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+            LOG.debug("Tx {} writeData called path = {}", identifier, path);
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
             YangInstanceIdentifier path) {
-            LOG.warn("txn {} readData called path = {}", identifier, path);
+            LOG.debug("Tx {} readData called path = {}", identifier, path);
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error reading data for path " + path, failure));
         }
 
-        @Override public void writeData(YangInstanceIdentifier path,
-            NormalizedNode<?, ?> data) {
-            LOG.warn("txn {} writeData called path = {}", identifier, path);
-        }
-
-        @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
+        @Override
+        public CheckedFuture<Boolean, ReadFailedException> dataExists(
             YangInstanceIdentifier path) {
-            LOG.warn("txn {} dataExists called path = {}", identifier, path);
+            LOG.debug("Tx {} dataExists called path = {}", identifier, path);
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error checking exists for path " + path, failure));
         }
     }
-
-
-
 }
index 87231f08849ed02398472bb792a40a48753a96be..adb12b298e99260b6f33a6c309f3c6eb16ca78bb 100644 (file)
@@ -7,8 +7,9 @@ import akka.dispatch.Futures;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.isA;
@@ -31,14 +32,21 @@ import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
+    @SuppressWarnings("serial")
+    static class TestException extends RuntimeException {
+    }
+
     @Mock
     private ActorContext actorContext;
 
@@ -49,15 +57,28 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         doReturn(getSystem()).when(actorContext).getActorSystem();
     }
 
-    private ThreePhaseCommitCohortProxy setupProxy(int nCohorts) {
-        List<ActorPath> cohorts = Lists.newArrayList();
+    private Future<ActorPath> newCohortPath() {
+        ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
+        doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+        return Futures.successful(path);
+    }
+
+    private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
+        List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
         for(int i = 1; i <= nCohorts; i++) {
-            ActorPath path = getSystem().actorOf(Props.create(MessageCollectorActor.class)).path();
-            cohorts.add(path);
-            doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
+            cohortPathFutures.add(newCohortPath());
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
+    }
+
+    private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
+            throws Exception {
+        List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+        cohortPathFutures.add(newCohortPath());
+        cohortPathFutures.add(Futures.<ActorPath>failed(new TestException()));
+
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
     }
 
     private void setupMockActorContext(Class<?> requestType, Object... responses) {
@@ -80,6 +101,16 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
                 any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
     }
 
+    private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
+
+        try {
+            future.get(5, TimeUnit.SECONDS);
+            fail("Expected ExecutionException");
+        } catch(ExecutionException e) {
+            throw e.getCause();
+        }
+    }
+
     @Test
     public void testCanCommitWithOneCohort() throws Exception {
 
@@ -90,14 +121,14 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        assertEquals("canCommit", true, future.get());
+        assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
 
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
                 new CanCommitTransactionReply(false));
 
         future = proxy.canCommit();
 
-        assertEquals("canCommit", false, future.get());
+        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
 
         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
@@ -112,7 +143,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        assertEquals("canCommit", true, future.get());
+        assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
 
         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
@@ -128,19 +159,19 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
-        assertEquals("canCommit", false, future.get());
+        assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
 
         verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
     }
 
-    @Test(expected = ExecutionException.class)
-    public void testCanCommitWithExceptionFailure() throws Exception {
+    @Test(expected = TestException.class)
+    public void testCanCommitWithExceptionFailure() throws Throwable {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
-        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
 
-        proxy.canCommit().get();
+        propagateExecutionExceptionCause(proxy.canCommit());
     }
 
     @Test(expected = ExecutionException.class)
@@ -151,7 +182,19 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
                 new PreCommitTransactionReply());
 
-        proxy.canCommit().get();
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+    }
+
+    @Test(expected = TestException.class)
+    public void testCanCommitWithFailedCohortPath() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+        try {
+            propagateExecutionExceptionCause(proxy.canCommit());
+        } finally {
+            verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
+        }
     }
 
     @Test
@@ -161,7 +204,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
                 new PreCommitTransactionReply());
 
-        proxy.preCommit().get();
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
     }
@@ -173,7 +216,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
                 new PreCommitTransactionReply(), new RuntimeException("mock"));
 
-        proxy.preCommit().get();
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
     }
 
     @Test
@@ -182,7 +225,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
 
-        proxy.abort().get();
+        proxy.abort().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
     }
@@ -194,11 +237,22 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
 
         // The exception should not get propagated.
-        proxy.abort().get();
+        proxy.abort().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
     }
 
+    @Test
+    public void testAbortWithFailedCohortPath() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+        // The exception should not get propagated.
+        proxy.abort().get(5, TimeUnit.SECONDS);
+
+        verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
+    }
+
     @Test
     public void testCommit() throws Exception {
 
@@ -207,39 +261,64 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
                 new CommitTransactionReply());
 
-        proxy.commit().get();
+        proxy.commit().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
     }
 
-    @Test(expected = ExecutionException.class)
-    public void testCommitWithFailure() throws Exception {
+    @Test(expected = TestException.class)
+    public void testCommitWithFailure() throws Throwable {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
-                new RuntimeException("mock"));
+                new TestException());
 
-        proxy.commit().get();
+        propagateExecutionExceptionCause(proxy.commit());
     }
 
     @Test(expected = ExecutionException.class)
-    public void teseCommitWithInvalidResponseType() throws Exception {
+    public void testCommitWithInvalidResponseType() throws Exception {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
 
         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
 
-        proxy.commit().get();
+        proxy.commit().get(5, TimeUnit.SECONDS);
+    }
+
+    @Test(expected = TestException.class)
+    public void testCommitWithFailedCohortPath() throws Throwable {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
+
+        try {
+            propagateExecutionExceptionCause(proxy.commit());
+        } finally {
+            verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
+        }
     }
 
     @Test
-    public void testGetCohortPaths() {
+    public void testAllThreePhasesSuccessful() throws Exception {
 
         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
 
-        List<ActorPath> paths = proxy.getCohortPaths();
-        assertNotNull("getCohortPaths returned null", paths);
-        assertEquals("getCohortPaths size", 2, paths.size());
+        setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
+                new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
+
+        setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
+                new PreCommitTransactionReply(), new PreCommitTransactionReply());
+
+        setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
+                new CommitTransactionReply(), new CommitTransactionReply());
+
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
+        proxy.commit().get(5, TimeUnit.SECONDS);
+
+        verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
+        verifyCohortInvocations(2, PreCommitTransaction.SERIALIZABLE_CLASS);
+        verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
     }
 }
index 14696f786e7e36888be3b5517c14ae4b9779340f..6b11a24e9cedbb3a8234fb6e337f7750a413721f 100644 (file)
@@ -9,7 +9,9 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.Futures;
+
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -29,12 +31,15 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -48,10 +53,12 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
+import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
@@ -62,6 +69,7 @@ import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.times;
 
 @SuppressWarnings("resource")
 public class TransactionProxyTest extends AbstractActorTest {
@@ -71,7 +79,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     }
 
     static interface Invoker {
-        void invoke(TransactionProxy proxy) throws Exception;
+        CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception;
     }
 
     private final Configuration configuration = new MockConfiguration();
@@ -90,6 +98,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         schemaContext = TestModel.createTestContext();
 
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
+        doReturn(memberName).when(mockActorContext).getCurrentMemberName();
 
         ShardStrategyFactory.setConfiguration(configuration);
     }
@@ -112,8 +121,8 @@ public class TransactionProxyTest extends AbstractActorTest {
         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
             @Override
             public boolean matches(Object argument) {
-                DataExists obj = DataExists.fromSerializable(argument);
-                return obj.getPath().equals(TestModel.TEST_PATH);
+                return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+                       DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
             }
         };
 
@@ -124,8 +133,8 @@ public class TransactionProxyTest extends AbstractActorTest {
         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
             @Override
             public boolean matches(Object argument) {
-                ReadData obj = ReadData.fromSerializable(argument);
-                return obj.getPath().equals(TestModel.TEST_PATH);
+                return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+                       ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
             }
         };
 
@@ -136,6 +145,10 @@ public class TransactionProxyTest extends AbstractActorTest {
         ArgumentMatcher<WriteData> matcher = new ArgumentMatcher<WriteData>() {
             @Override
             public boolean matches(Object argument) {
+                if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+                    return false;
+                }
+
                 WriteData obj = WriteData.fromSerializable(argument, schemaContext);
                 return obj.getPath().equals(TestModel.TEST_PATH) &&
                        obj.getData().equals(nodeToWrite);
@@ -149,6 +162,10 @@ public class TransactionProxyTest extends AbstractActorTest {
         ArgumentMatcher<MergeData> matcher = new ArgumentMatcher<MergeData>() {
             @Override
             public boolean matches(Object argument) {
+                if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) {
+                    return false;
+                }
+
                 MergeData obj = MergeData.fromSerializable(argument, schemaContext);
                 return obj.getPath().equals(TestModel.TEST_PATH) &&
                        obj.getData().equals(nodeToWrite);
@@ -162,27 +179,38 @@ public class TransactionProxyTest extends AbstractActorTest {
         ArgumentMatcher<DeleteData> matcher = new ArgumentMatcher<DeleteData>() {
             @Override
             public boolean matches(Object argument) {
-                DeleteData obj = DeleteData.fromSerializable(argument);
-                return obj.getPath().equals(TestModel.TEST_PATH);
+                return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
+                       DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
             }
         };
 
         return argThat(matcher);
     }
 
-    private Object readyTxReply(ActorPath path) {
-        return new ReadyTransactionReply(path).toSerializable();
+    private Future<Object> readyTxReply(ActorPath path) {
+        return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
     }
 
     private Future<Object> readDataReply(NormalizedNode<?, ?> data) {
-        return Futures.successful(new ReadDataReply(schemaContext, data)
-                .toSerializable());
+        return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable());
     }
 
     private Future<Object> dataExistsReply(boolean exists) {
         return Futures.successful(new DataExistsReply(exists).toSerializable());
     }
 
+    private Future<Object> writeDataReply() {
+        return Futures.successful(new WriteDataReply().toSerializable());
+    }
+
+    private Future<Object> mergeDataReply() {
+        return Futures.successful(new MergeDataReply().toSerializable());
+    }
+
+    private Future<Object> deleteDataReply() {
+        return Futures.successful(new DeleteDataReply().toSerializable());
+    }
+
     private ActorSelection actorSelection(ActorRef actorRef) {
         return getSystem().actorSelection(actorRef.path());
     }
@@ -201,7 +229,6 @@ public class TransactionProxyTest extends AbstractActorTest {
         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
         doReturn(getSystem().actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
-        doReturn(memberName).when(mockActorContext).getCurrentMemberName();
         doReturn(createTransactionReply(actorRef)).when(mockActorContext).
                 executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
                         eqCreateTransaction(memberName, type), anyDuration());
@@ -212,6 +239,17 @@ public class TransactionProxyTest extends AbstractActorTest {
         return actorRef;
     }
 
+    private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
+            throws Throwable {
+
+        try {
+            future.checkedGet(5, TimeUnit.SECONDS);
+            fail("Expected ReadFailedException");
+        } catch(ReadFailedException e) {
+            throw e.getCause();
+        }
+    }
+
     @Test
     public void testRead() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
@@ -240,7 +278,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     }
 
     @Test(expected = ReadFailedException.class)
-    public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception {
+    public void testReadWithInvalidReplyMessageType() throws Exception {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
@@ -256,19 +294,13 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
-        doThrow(new TestException()).when(mockActorContext).
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY, schemaContext);
 
-        try {
-            transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
-            fail("Expected ReadFailedException");
-        } catch(ReadFailedException e) {
-            // Expected - throw cause - expects TestException.
-            throw e.getCause();
-        }
+        propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
     }
 
     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
@@ -280,20 +312,14 @@ public class TransactionProxyTest extends AbstractActorTest {
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY, schemaContext);
 
-        try {
-            invoker.invoke(transactionProxy);
-            fail("Expected ReadFailedException");
-        } catch(ReadFailedException e) {
-            // Expected - throw cause - expects TestException.
-            throw e.getCause();
-        }
+        propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
     }
 
     private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable {
         testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() {
             @Override
-            public void invoke(TransactionProxy proxy) throws Exception {
-                proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+            public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
+                return proxy.read(TestModel.TEST_PATH);
             }
         });
     }
@@ -314,6 +340,71 @@ public class TransactionProxyTest extends AbstractActorTest {
         testReadWithExceptionOnInitialCreateTransaction(new TestException());
     }
 
+    @Test(expected = TestException.class)
+    public void testReadWithPriorRecordingOperationFailure() throws Throwable {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
+                        anyDuration());
+
+        doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_WRITE, schemaContext);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        transactionProxy.delete(TestModel.TEST_PATH);
+
+        try {
+            propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
+        } finally {
+            verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+                    eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+        }
+    }
+
+    @Test
+    public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+        NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
+
+        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_WRITE, schemaContext);
+
+        transactionProxy.write(TestModel.TEST_PATH, expectedNode);
+
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+                TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+
+        assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testReadPreConditionCheck() {
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+    }
+
     @Test
     public void testExists() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
@@ -340,14 +431,14 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
         testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() {
             @Override
-            public void invoke(TransactionProxy proxy) throws Exception {
-                proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
+            public CheckedFuture<?, ReadFailedException> invoke(TransactionProxy proxy) throws Exception {
+                return proxy.exists(TestModel.TEST_PATH);
             }
         });
     }
 
     @Test(expected = ReadFailedException.class)
-    public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception {
+    public void testExistsWithInvalidReplyMessageType() throws Exception {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
@@ -363,62 +454,206 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
-        doThrow(new TestException()).when(mockActorContext).
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY, schemaContext);
 
+        propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
+    }
+
+    @Test(expected = TestException.class)
+    public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
+                        anyDuration());
+
+        doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_WRITE, schemaContext);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        transactionProxy.delete(TestModel.TEST_PATH);
+
         try {
-            transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
-            fail("Expected ReadFailedException");
-        } catch(ReadFailedException e) {
-            // Expected - throw cause - expects TestException.
-            throw e.getCause();
+            propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
+        } finally {
+            verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+                    eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
         }
     }
 
     @Test
-    public void testWrite() throws Exception {
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+    public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_WRITE, schemaContext);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+
+        assertEquals("Exists response", true, exists);
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testxistsPreConditionCheck() {
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY, schemaContext);
 
+        transactionProxy.exists(TestModel.TEST_PATH);
+    }
+
+    private void verifyRecordingOperationFutures(List<Future<Object>> futures,
+            Class<?>... expResultTypes) throws Exception {
+        assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
+
+        int i = 0;
+        for( Future<Object> future: futures) {
+            assertNotNull("Recording operation Future is null", future);
+
+            Class<?> expResultType = expResultTypes[i++];
+            if(Throwable.class.isAssignableFrom(expResultType)) {
+                try {
+                    Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+                    fail("Expected exception from recording operation Future");
+                } catch(Exception e) {
+                    // Expected
+                }
+            } else {
+                assertEquals("Recording operation Future result type", expResultType,
+                             Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
+            }
+        }
+    }
+
+    @Test
+    public void testWrite() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-        verify(mockActorContext).sendRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+        verify(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                WriteDataReply.SERIALIZABLE_CLASS);
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testWritePreConditionCheck() {
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_ONLY, schemaContext);
+
+        transactionProxy.write(TestModel.TEST_PATH,
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testWriteAfterReadyPreConditionCheck() {
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
+        transactionProxy.ready();
+
+        transactionProxy.write(TestModel.TEST_PATH,
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
     }
 
     @Test
     public void testMerge() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
 
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY, schemaContext);
 
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
-        verify(mockActorContext).sendRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
+        verify(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                MergeDataReply.SERIALIZABLE_CLASS);
     }
 
     @Test
     public void testDelete() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
 
+        doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY, schemaContext);
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
-        verify(mockActorContext).sendRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDeleteData());
+        verify(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                DeleteDataReply.SERIALIZABLE_CLASS);
+    }
+
+    private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
+            Object... expReplies) throws Exception {
+        assertEquals("getReadyOperationFutures size", expReplies.length,
+                proxy.getCohortPathFutures().size());
+
+        int i = 0;
+        for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
+            assertNotNull("Ready operation Future is null", future);
+
+            Object expReply = expReplies[i++];
+            if(expReply instanceof ActorPath) {
+                ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+                assertEquals("Cohort actor path", expReply, actual);
+            } else {
+                // Expecting exception.
+                try {
+                    Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+                    fail("Expected exception from ready operation Future");
+                } catch(Exception e) {
+                    // Expected
+                }
+            }
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -426,10 +661,15 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testReady() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
 
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
 
-        doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation(
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -437,13 +677,139 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         transactionProxy.read(TestModel.TEST_PATH);
 
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                WriteDataReply.SERIALIZABLE_CLASS);
+
+        verifyCohortPathFutures(proxy, actorRef.path());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReadyWithRecordingOperationFailure() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
+                        anyDuration());
+
+        doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
+        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
+
+        verifyCohortPathFutures(proxy, TestException.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReadyWithReplyFailure() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+
+        doReturn(Futures.failed(new TestException())).when(mockActorContext).
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+                        isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
+        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                MergeDataReply.SERIALIZABLE_CLASS);
+
+        verifyCohortPathFutures(proxy, TestException.class);
+    }
+
+    @Test
+    public void testReadyWithInitialCreateTransactionFailure() throws Exception {
+
+        doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
+                anyString(), any(), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        transactionProxy.delete(TestModel.TEST_PATH);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReadyWithInvalidReplyMessageType() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+
+        doReturn(Futures.successful(new Object())).when(mockActorContext).
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+                        isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                WRITE_ONLY, schemaContext);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths());
+        verifyCohortPathFutures(proxy, IllegalArgumentException.class);
     }
 
     @Test