Bug-2136 : Clustering : When a transaction is local then do not serialize the Reading...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index f447f3c718980285d32e79c5a9b2579e26309971..715f48c3492156d1b14005462da2c26aacb1768c 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 
@@ -22,6 +21,7 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -71,6 +71,11 @@ import java.util.concurrent.atomic.AtomicLong;
  * </p>
  */
 public class TransactionProxy implements DOMStoreReadWriteTransaction {
+
+    private final TransactionChainProxy transactionChainProxy;
+
+
+
     public enum TransactionType {
         READ_ONLY,
         WRITE_ONLY,
@@ -151,8 +156,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(remoteTransactionActorsMB.get()) {
                 for(ActorSelection actor : remoteTransactionActors) {
                     LOG.trace("Sending CloseTransaction to {}", actor);
-                    actorContext.sendRemoteOperationAsync(actor,
-                            new CloseTransaction().toSerializable());
+                    actorContext.sendOperationAsync(actor,
+                        new CloseTransaction().toSerializable());
                 }
             }
         }
@@ -177,12 +182,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private boolean inReadyState;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
+        this(actorContext, transactionType, null);
+    }
+
+    @VisibleForTesting
+    List<Future<Object>> getRecordedOperationFutures() {
+        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+        for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+            recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+        }
+
+        return recordedOperationFutures;
+    }
+
+    public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) {
         this.actorContext = Preconditions.checkNotNull(actorContext,
-                "actorContext should not be null");
+            "actorContext should not be null");
         this.transactionType = Preconditions.checkNotNull(transactionType,
-                "transactionType should not be null");
+            "transactionType should not be null");
         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
-                "schemaContext should not be null");
+            "schemaContext should not be null");
+        this.transactionChainProxy = transactionChainProxy;
 
         String memberName = actorContext.getCurrentMemberName();
         if(memberName == null){
@@ -190,7 +210,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
-                counter.getAndIncrement()).build();
+            counter.getAndIncrement()).build();
 
         if(transactionType == TransactionType.READ_ONLY) {
             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
@@ -201,21 +221,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             remoteTransactionActorsMB = new AtomicBoolean();
 
             TransactionProxyCleanupPhantomReference cleanup =
-                                              new TransactionProxyCleanupPhantomReference(this);
+                new TransactionProxyCleanupPhantomReference(this);
             phantomReferenceCache.put(cleanup, cleanup);
         }
-
-        LOG.debug("Created txn {} of type {}", identifier, transactionType);
-    }
-
-    @VisibleForTesting
-    List<Future<Object>> getRecordedOperationFutures() {
-        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
-        for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
-            recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Created txn {} of type {}", identifier, transactionType);
         }
-
-        return recordedOperationFutures;
     }
 
     @Override
@@ -225,8 +236,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Read operation on write-only transaction is not allowed");
 
-        LOG.debug("Tx {} read {}", identifier, path);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} read {}", identifier, path);
+        }
         createTransactionIfMissing(actorContext, path);
 
         return transactionContext(path).readData(path);
@@ -238,8 +250,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Exists operation on write-only transaction is not allowed");
 
-        LOG.debug("Tx {} exists {}", identifier, path);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} exists {}", identifier, path);
+        }
         createTransactionIfMissing(actorContext, path);
 
         return transactionContext(path).dataExists(path);
@@ -249,7 +262,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
                 "Modification operation on read-only transaction is not allowed");
         Preconditions.checkState(!inReadyState,
-                "Transaction is sealed - further modifications are allowed");
+                "Transaction is sealed - further modifications are not allowed");
     }
 
     @Override
@@ -257,8 +270,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("Tx {} write {}", identifier, path);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} write {}", identifier, path);
+        }
         createTransactionIfMissing(actorContext, path);
 
         transactionContext(path).writeData(path, data);
@@ -269,8 +283,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("Tx {} merge {}", identifier, path);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} merge {}", identifier, path);
+        }
         createTransactionIfMissing(actorContext, path);
 
         transactionContext(path).mergeData(path, data);
@@ -280,9 +295,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     public void delete(YangInstanceIdentifier path) {
 
         checkModificationState();
-
-        LOG.debug("Tx {} delete {}", identifier, path);
-
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} delete {}", identifier, path);
+        }
         createTransactionIfMissing(actorContext, path);
 
         transactionContext(path).deleteData(path);
@@ -295,20 +310,26 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         inReadyState = true;
 
-        LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
                 remoteTransactionPaths.size());
-
-        List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+        }
+        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
 
         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
 
-            LOG.debug("Tx {} Readying transaction for shard {}", identifier,
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} Readying transaction for shard {}", identifier,
                     transactionContext.getShardName());
+            }
+            cohortFutures.add(transactionContext.readyTransaction());
+        }
 
-            cohortPathFutures.add(transactionContext.readyTransaction());
+        if(transactionChainProxy != null){
+            transactionChainProxy.onTransactionReady(cohortFutures);
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
                 identifier.toString());
     }
 
@@ -340,32 +361,44 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return ShardStrategyFactory.getStrategy(path).findShard(path);
     }
 
-    private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
+    private void createTransactionIfMissing(ActorContext actorContext,
+        YangInstanceIdentifier path) {
+
+        if(transactionChainProxy != null){
+            transactionChainProxy.waitTillCurrentTransactionReady();
+        }
+
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
         TransactionContext transactionContext =
             remoteTransactionPaths.get(shardName);
 
-        if(transactionContext != null){
+        if (transactionContext != null) {
             // A transaction already exists with that shard
             return;
         }
 
         try {
-            Object response = actorContext.executeShardOperation(shardName,
-                new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
-                ActorContext.ASK_DURATION);
+            Optional<ActorSelection> primaryShard = actorContext.findPrimaryShard(shardName);
+            if (!primaryShard.isPresent()) {
+                throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName);
+            }
+
+            Object response = actorContext.executeOperation(primaryShard.get(),
+                new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
+                    getTransactionChainId()).toSerializable());
             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                 CreateTransactionReply reply =
                     CreateTransactionReply.fromSerializable(response);
 
                 String transactionPath = reply.getTransactionPath();
 
-                LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
-
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
+                }
                 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
 
-                if(transactionType == TransactionType.READ_ONLY) {
+                if (transactionType == TransactionType.READ_ONLY) {
                     // Add the actor to the remoteTransactionActors list for access by the
                     // cleanup PhantonReference.
                     remoteTransactionActors.add(transactionActor);
@@ -375,26 +408,41 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     remoteTransactionActorsMB.set(true);
                 }
 
+                // TxActor is always created where the leader of the shard is.
+                // Check if TxActor is created in the same node
+                boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
+
                 transactionContext = new TransactionContextImpl(shardName, transactionPath,
-                        transactionActor, identifier, actorContext, schemaContext);
+                    transactionActor, identifier, actorContext, schemaContext, isTxActorLocal);
 
                 remoteTransactionPaths.put(shardName, transactionContext);
             } else {
                 throw new IllegalArgumentException(String.format(
-                        "Invalid reply type {} for CreateTransaction", response.getClass()));
+                    "Invalid reply type {} for CreateTransaction", response.getClass()));
             }
-        } catch(Exception e){
-            LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
-            remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e, identifier));
+        } catch (Exception e) {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+            }
+            remoteTransactionPaths
+                .put(shardName, new NoOpTransactionContext(shardName, e, identifier));
+        }
+    }
+
+    public String getTransactionChainId() {
+        if(transactionChainProxy == null){
+            return "";
         }
+        return transactionChainProxy.getTransactionChainId();
     }
 
+
     private interface TransactionContext {
         String getShardName();
 
         void closeTransaction();
 
-        Future<ActorPath> readyTransaction();
+        Future<ActorSelection> readyTransaction();
 
         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
@@ -439,40 +487,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private final SchemaContext schemaContext;
         private final String actorPath;
         private final ActorSelection actor;
+        private final boolean isTxActorLocal;
 
         private TransactionContextImpl(String shardName, String actorPath,
                 ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
-                SchemaContext schemaContext) {
+                SchemaContext schemaContext, boolean isTxActorLocal) {
             super(shardName, identifier);
             this.actorPath = actorPath;
             this.actor = actor;
             this.actorContext = actorContext;
             this.schemaContext = schemaContext;
+            this.isTxActorLocal = isTxActorLocal;
         }
 
         private ActorSelection getActor() {
             return actor;
         }
 
-        private String getResolvedCohortPath(String cohortPath) {
-            return actorContext.resolvePath(actorPath, cohortPath);
-        }
-
         @Override
         public void closeTransaction() {
-            LOG.debug("Tx {} closeTransaction called", identifier);
-            actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} closeTransaction called", identifier);
+            }
+            actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
         }
 
         @Override
-        public Future<ActorPath> readyTransaction() {
-            LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
+        public Future<ActorSelection> readyTransaction() {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
                     identifier, recordedOperationFutures.size());
-
+            }
             // Send the ReadyTransaction message to the Tx actor.
 
-            final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
-                    new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
+            ReadyTransaction readyTransaction = new ReadyTransaction();
+            final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
+                isTxActorLocal ? readyTransaction : readyTransaction.toSerializable());
 
             // Combine all the previously recorded put/merge/delete operation reply Futures and the
             // ReadyTransactionReply Future into one Future. If any one fails then the combined
@@ -490,13 +540,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Transform the combined Future into a Future that returns the cohort actor path from
             // the ReadyTransactionReply. That's the end result of the ready operation.
 
-            return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
+            return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorSelection>() {
                 @Override
-                public ActorPath apply(Iterable<Object> notUsed) {
-
-                    LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
+                public ActorSelection apply(Iterable<Object> notUsed) {
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
                             identifier);
-
+                    }
                     // At this point all the Futures succeeded and we need to extract the cohort
                     // actor path from the ReadyTransactionReply. For the recorded operations, they
                     // don't return any data so we're only interested that they completed
@@ -506,21 +556,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                     // Note the Future get call here won't block as it's complete.
                     Object serializedReadyReply = replyFuture.value().get().get();
-                    if(serializedReadyReply.getClass().equals(
-                                                     ReadyTransactionReply.SERIALIZABLE_CLASS)) {
-                        ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
-                                actorContext.getActorSystem(), serializedReadyReply);
+                    if (serializedReadyReply instanceof ReadyTransactionReply) {
+                        return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
 
-                        String resolvedCohortPath = getResolvedCohortPath(
-                                reply.getCohortPath().toString());
+                    } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
+                        ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
+                        return actorContext.actorSelection(reply.getCohortPath());
 
-                        LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
-                                identifier, resolvedCohortPath);
-
-                        return actorContext.actorFor(resolvedCohortPath);
                     } else {
                         // Throwing an exception here will fail the Future.
-
                         throw new IllegalArgumentException(String.format("Invalid reply type {}",
                                 serializedReadyReply.getClass()));
                     }
@@ -530,33 +574,44 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         @Override
         public void deleteData(YangInstanceIdentifier path) {
-            LOG.debug("Tx {} deleteData called path = {}", identifier, path);
-            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            }
+
+            DeleteData deleteData = new DeleteData(path);
+            recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
+                isTxActorLocal ? deleteData : deleteData.toSerializable()));
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            LOG.debug("Tx {} mergeData called path = {}", identifier, path);
-            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new MergeData(path, data, schemaContext).toSerializable(),
-                    ActorContext.ASK_DURATION));
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            }
+
+            MergeData mergeData = new MergeData(path, data, schemaContext);
+            recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
+                isTxActorLocal ? mergeData : mergeData.toSerializable()));
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            LOG.debug("Tx {} writeData called path = {}", identifier, path);
-            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new WriteData(path, data, schemaContext).toSerializable(),
-                    ActorContext.ASK_DURATION));
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            }
+
+            WriteData writeData = new WriteData(path, data, schemaContext);
+            recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
+                isTxActorLocal ? writeData : writeData.toSerializable()));
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
                 final YangInstanceIdentifier path) {
 
-            LOG.debug("Tx {} readData called path = {}", identifier, path);
-
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} readData called path = {}", identifier, path);
+            }
             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
 
             // If there were any previous recorded put/merge/delete operation reply Futures then we
@@ -566,9 +621,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(recordedOperationFutures.isEmpty()) {
                 finishReadData(path, returnFuture);
             } else {
-                LOG.debug("Tx {} readData: verifying {} previous recorded operations",
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Tx {} readData: verifying {} previous recorded operations",
                         identifier, recordedOperationFutures.size());
-
+                }
                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
                 // Futures#sequence accesses the passed List on a different thread, as
                 // recordedOperationFutures is not synchronized.
@@ -576,14 +632,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
                         Lists.newArrayList(recordedOperationFutures),
                         actorContext.getActorSystem().dispatcher());
+
                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
                     @Override
                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
                             throws Throwable {
                         if(failure != null) {
-                            LOG.debug("Tx {} readData: a recorded operation failed: {}",
+                            if(LOG.isDebugEnabled()) {
+                                LOG.debug("Tx {} readData: a recorded operation failed: {}",
                                     identifier, failure);
-
+                            }
                             returnFuture.setException(new ReadFailedException(
                                     "The read could not be performed because a previous put, merge,"
                                     + "or delete operation failed", failure));
@@ -602,38 +660,44 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private void finishReadData(final YangInstanceIdentifier path,
                 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
 
-            LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
-
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
+            }
             OnComplete<Object> onComplete = new OnComplete<Object>() {
                 @Override
                 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
                     if(failure != null) {
-                        LOG.debug("Tx {} read operation failed: {}", identifier, failure);
-
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+                        }
                         returnFuture.setException(new ReadFailedException(
                                 "Error reading data for path " + path, failure));
+
                     } else {
-                        LOG.debug("Tx {} read operation succeeded", identifier, failure);
-
-                        if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
-                            ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
-                                    path, readResponse);
-                            if (reply.getNormalizedNode() == null) {
-                                returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
-                            } else {
-                                returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
-                                        reply.getNormalizedNode()));
-                            }
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("Tx {} read operation succeeded", identifier, failure);
+                        }
+
+                        if (readResponse instanceof ReadDataReply) {
+                            ReadDataReply reply = (ReadDataReply) readResponse;
+                            returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
+
+                        } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+                            ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
+                            returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
+
                         } else {
                             returnFuture.setException(new ReadFailedException(
-                                    "Invalid response reading data for path " + path));
+                                "Invalid response reading data for path " + path));
                         }
                     }
                 }
             };
 
-            Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
-                    new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
+            ReadData readData = new ReadData(path);
+            Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
+                isTxActorLocal ? readData : readData.toSerializable());
+
             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
 
@@ -641,8 +705,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public CheckedFuture<Boolean, ReadFailedException> dataExists(
                 final YangInstanceIdentifier path) {
 
-            LOG.debug("Tx {} dataExists called path = {}", identifier, path);
-
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+            }
             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
 
             // If there were any previous recorded put/merge/delete operation reply Futures then we
@@ -653,9 +718,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(recordedOperationFutures.isEmpty()) {
                 finishDataExists(path, returnFuture);
             } else {
-                LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
                         identifier, recordedOperationFutures.size());
-
+                }
                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
                 // Futures#sequence accesses the passed List on a different thread, as
                 // recordedOperationFutures is not synchronized.
@@ -668,9 +734,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
                             throws Throwable {
                         if(failure != null) {
-                            LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
+                            if(LOG.isDebugEnabled()) {
+                                LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
                                     identifier, failure);
-
+                            }
                             returnFuture.setException(new ReadFailedException(
                                     "The data exists could not be performed because a previous "
                                     + "put, merge, or delete operation failed", failure));
@@ -689,22 +756,29 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private void finishDataExists(final YangInstanceIdentifier path,
                 final SettableFuture<Boolean> returnFuture) {
 
-            LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
-
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
+            }
             OnComplete<Object> onComplete = new OnComplete<Object>() {
                 @Override
                 public void onComplete(Throwable failure, Object response) throws Throwable {
                     if(failure != null) {
-                        LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
-
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+                        }
                         returnFuture.setException(new ReadFailedException(
                                 "Error checking data exists for path " + path, failure));
                     } else {
-                        LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+                        }
+
+                        if (response instanceof DataExistsReply) {
+                            returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
+
+                        } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
+                            returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
 
-                        if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
-                            returnFuture.set(Boolean.valueOf(DataExistsReply.
-                                        fromSerializable(response).exists()));
                         } else {
                             returnFuture.setException(new ReadFailedException(
                                     "Invalid response checking exists for path " + path));
@@ -713,8 +787,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             };
 
-            Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
-                    new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
+            DataExists dataExists = new DataExists(path);
+            Future<Object> future = actorContext.executeOperationAsync(getActor(),
+                isTxActorLocal ? dataExists : dataExists.toSerializable());
+
             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
     }
@@ -733,34 +809,46 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         @Override
         public void closeTransaction() {
-            LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
+            }
         }
 
         @Override
-        public Future<ActorPath> readyTransaction() {
-            LOG.debug("Tx {} readyTransaction called", identifier);
+        public Future<ActorSelection> readyTransaction() {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} readyTransaction called", identifier);
+            }
             return akka.dispatch.Futures.failed(failure);
         }
 
         @Override
         public void deleteData(YangInstanceIdentifier path) {
-            LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            }
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            }
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-            LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            }
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
             YangInstanceIdentifier path) {
-            LOG.debug("Tx {} readData called path = {}", identifier, path);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} readData called path = {}", identifier, path);
+            }
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error reading data for path " + path, failure));
         }
@@ -768,7 +856,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         @Override
         public CheckedFuture<Boolean, ReadFailedException> dataExists(
             YangInstanceIdentifier path) {
-            LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+            }
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error checking exists for path " + path, failure));
         }