CDS: Split out TransactionFutureCallback 59/17859/6
authorRobert Varga <rovarga@cisco.com>
Tue, 7 Apr 2015 15:13:30 +0000 (17:13 +0200)
committerRobert Varga <nite@hq.sk>
Fri, 10 Apr 2015 15:02:43 +0000 (15:02 +0000)
This moves the TransactionFutureCallback inner class out of
TransactionProxy, so its interaction with TransactionProxy is defined
using methods.

Change-Id: Ib07bf91e32074b721552c8ef4952bbd40369283a
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionFutureCallback.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionFutureCallback.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionFutureCallback.java
new file mode 100644 (file)
index 0000000..a8a93c5
--- /dev/null
@@ -0,0 +1,264 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
+ * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
+ * retry task after a short delay.
+ * <p>
+ * The end result from a completed CreateTransaction message is a TransactionContext that is
+ * used to perform transaction operations. Transaction operations that occur before the
+ * CreateTransaction completes are cache and executed once the CreateTransaction completes,
+ * successfully or not.
+ */
+final class TransactionFutureCallback extends OnComplete<Object> {
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionFutureCallback.class);
+
+    /**
+     * Time interval in between transaction create retries.
+     */
+    private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS);
+
+    /**
+     * The list of transaction operations to execute once the CreateTransaction completes.
+     */
+    @GuardedBy("txOperationsOnComplete")
+    private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
+    private final TransactionProxy proxy;
+    private final String shardName;
+
+    /**
+     * The TransactionContext resulting from the CreateTransaction reply.
+     */
+    private volatile TransactionContext transactionContext;
+
+    /**
+     * The target primary shard.
+     */
+    private volatile ActorSelection primaryShard;
+    private volatile int createTxTries;
+
+    TransactionFutureCallback(final TransactionProxy proxy, final String shardName) {
+        this.proxy = Preconditions.checkNotNull(proxy);
+        this.shardName = shardName;
+        createTxTries = (int) (proxy.getActorContext().getDatastoreContext().
+                getShardLeaderElectionTimeout().duration().toMillis() /
+                CREATE_TX_TRY_INTERVAL.toMillis());
+    }
+
+    String getShardName() {
+        return shardName;
+    }
+
+    TransactionContext getTransactionContext() {
+        return transactionContext;
+    }
+
+    private TransactionType getTransactionType() {
+        return proxy.getTransactionType();
+    }
+
+    private TransactionIdentifier getIdentifier() {
+        return proxy.getIdentifier();
+    }
+
+    private ActorContext getActorContext() {
+        return proxy.getActorContext();
+    }
+
+    private Semaphore getOperationLimiter() {
+        return proxy.getOperationLimiter();
+    }
+
+    /**
+     * Sets the target primary shard and initiates a CreateTransaction try.
+     */
+    void setPrimaryShard(ActorSelection primaryShard) {
+        this.primaryShard = primaryShard;
+
+        if (getTransactionType() == TransactionType.WRITE_ONLY &&
+                getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+            LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
+                getIdentifier(), primaryShard);
+
+            // For write-only Tx's we prepare the transaction modifications directly on the shard actor
+            // to avoid the overhead of creating a separate transaction actor.
+            // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
+            executeTxOperatonsOnComplete(proxy.createValidTransactionContext(this.primaryShard,
+                    this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
+        } else {
+            tryCreateTransaction();
+        }
+    }
+
+    /**
+     * Adds a TransactionOperation to be executed after the CreateTransaction completes.
+     */
+    private void addTxOperationOnComplete(TransactionOperation operation) {
+        boolean invokeOperation = true;
+        synchronized(txOperationsOnComplete) {
+            if(transactionContext == null) {
+                LOG.debug("Tx {} Adding operation on complete", getIdentifier());
+
+                invokeOperation = false;
+                txOperationsOnComplete.add(operation);
+            }
+        }
+
+        if(invokeOperation) {
+            operation.invoke(transactionContext);
+        }
+    }
+
+    void enqueueTransactionOperation(final TransactionOperation op) {
+
+        if (transactionContext != null) {
+            op.invoke(transactionContext);
+        } else {
+            // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+            // callback to be executed after the Tx is created.
+            addTxOperationOnComplete(op);
+        }
+    }
+
+    /**
+     * Performs a CreateTransaction try async.
+     */
+    private void tryCreateTransaction() {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
+        }
+
+        Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
+            getTransactionType().ordinal(), proxy.getTransactionChainId()).toSerializable();
+
+        Future<Object> createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage);
+
+        createTxFuture.onComplete(this, getActorContext().getClientDispatcher());
+    }
+
+    @Override
+    public void onComplete(Throwable failure, Object response) {
+        if(failure instanceof NoShardLeaderException) {
+            // There's no leader for the shard yet - schedule and try again, unless we're out
+            // of retries. Note: createTxTries is volatile as it may be written by different
+            // threads however not concurrently, therefore decrementing it non-atomically here
+            // is ok.
+            if(--createTxTries > 0) {
+                LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
+                    getIdentifier(), shardName);
+
+                getActorContext().getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
+                        new Runnable() {
+                            @Override
+                            public void run() {
+                                tryCreateTransaction();
+                            }
+                        }, getActorContext().getClientDispatcher());
+                return;
+            }
+        }
+
+        createTransactionContext(failure, response);
+    }
+
+    void createTransactionContext(Throwable failure, Object response) {
+        // Mainly checking for state violation here to perform a volatile read of "initialized" to
+        // ensure updates to operationLimter et al are visible to this thread (ie we're doing
+        // "piggy-back" synchronization here).
+        proxy.ensureInitializied();
+
+        // Create the TransactionContext from the response or failure. Store the new
+        // TransactionContext locally until we've completed invoking the
+        // TransactionOperations. This avoids thread timing issues which could cause
+        // out-of-order TransactionOperations. Eg, on a modification operation, if the
+        // TransactionContext is non-null, then we directly call the TransactionContext.
+        // However, at the same time, the code may be executing the cached
+        // TransactionOperations. So to avoid thus timing, we don't publish the
+        // TransactionContext until after we've executed all cached TransactionOperations.
+        TransactionContext localTransactionContext;
+        if(failure != null) {
+            LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
+
+            localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), getOperationLimiter());
+        } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
+            localTransactionContext = createValidTransactionContext(
+                    CreateTransactionReply.fromSerializable(response));
+        } else {
+            IllegalArgumentException exception = new IllegalArgumentException(String.format(
+                    "Invalid reply type %s for CreateTransaction", response.getClass()));
+
+            localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter());
+        }
+
+        executeTxOperatonsOnComplete(localTransactionContext);
+    }
+
+    private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
+        while(true) {
+            // Access to txOperationsOnComplete and transactionContext must be protected and atomic
+            // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
+            // issues and ensure no TransactionOperation is missed and that they are processed
+            // in the order they occurred.
+
+            // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
+            // in case a TransactionOperation results in another transaction operation being
+            // queued (eg a put operation from a client read Future callback that is notified
+            // synchronously).
+            Collection<TransactionOperation> operationsBatch = null;
+            synchronized(txOperationsOnComplete) {
+                if(txOperationsOnComplete.isEmpty()) {
+                    // We're done invoking the TransactionOperations so we can now publish the
+                    // TransactionContext.
+                    transactionContext = localTransactionContext;
+                    break;
+                }
+
+                operationsBatch = new ArrayList<>(txOperationsOnComplete);
+                txOperationsOnComplete.clear();
+            }
+
+            // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
+            // A slight down-side is that we need to re-acquire the lock below but this should
+            // be negligible.
+            for(TransactionOperation oper: operationsBatch) {
+                oper.invoke(localTransactionContext);
+            }
+        }
+    }
+
+    private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
+        LOG.debug("Tx {} Received {}", getIdentifier(), reply);
+
+        return proxy.createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()),
+                reply.getTransactionPath(), reply.getVersion());
+    }
+
+}
\ No newline at end of file
index 388dd9f4bda2fee3e82bba4660a137876cf9dbda..a0987cd5d63943979985527794dbe8471c0afe78 100644 (file)
@@ -20,7 +20,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -30,12 +29,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
@@ -50,7 +45,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
 
 /**
  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
 
 /**
  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
@@ -101,12 +95,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
 
 
     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
 
-    /**
-     * Time interval in between transaction create retries.
-     */
-    private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
-            FiniteDuration.create(1, TimeUnit.SECONDS);
-
     /**
      * Stores the remote Tx actors for each requested data store path to be used by the
      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
     /**
      * Stores the remote Tx actors for each requested data store path to be used by the
      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
@@ -297,6 +285,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         }
     }
 
         }
     }
 
+    final void ensureInitializied() {
+        Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier());
+    }
+
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
 
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
 
@@ -444,6 +436,14 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return actorContext.findPrimaryShardAsync(shardName);
     }
 
         return actorContext.findPrimaryShardAsync(shardName);
     }
 
+    final TransactionType getTransactionType() {
+        return transactionType;
+    }
+
+    final Semaphore getOperationLimiter() {
+        return operationLimiter;
+    }
+
     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
         String shardName = shardNameFromIdentifier(path);
         return getOrCreateTxFutureCallback(shardName);
     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
         String shardName = shardNameFromIdentifier(path);
         return getOrCreateTxFutureCallback(shardName);
@@ -454,7 +454,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         if(txFutureCallback == null) {
             Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
 
         if(txFutureCallback == null) {
             Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
 
-            final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
+            final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
 
             txFutureCallback = newTxFutureCallback;
             txFutureCallbackMap.put(shardName, txFutureCallback);
 
             txFutureCallback = newTxFutureCallback;
             txFutureCallbackMap.put(shardName, txFutureCallback);
@@ -474,7 +474,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return txFutureCallback;
     }
 
         return txFutureCallback;
     }
 
-    public String getTransactionChainId() {
+    String getTransactionChainId() {
         return transactionChainId;
     }
 
         return transactionChainId;
     }
 
@@ -482,254 +482,41 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return actorContext;
     }
 
         return actorContext;
     }
 
-    /**
-     * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
-     * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
-     * retry task after a short delay.
-     * <p>
-     * The end result from a completed CreateTransaction message is a TransactionContext that is
-     * used to perform transaction operations. Transaction operations that occur before the
-     * CreateTransaction completes are cache and executed once the CreateTransaction completes,
-     * successfully or not.
-     */
-    private class TransactionFutureCallback extends OnComplete<Object> {
-
-        /**
-         * The list of transaction operations to execute once the CreateTransaction completes.
-         */
-        @GuardedBy("txOperationsOnComplete")
-        private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
-
-        /**
-         * The TransactionContext resulting from the CreateTransaction reply.
-         */
-        private volatile TransactionContext transactionContext;
-
-        /**
-         * The target primary shard.
-         */
-        private volatile ActorSelection primaryShard;
-
-        private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
-                getShardLeaderElectionTimeout().duration().toMillis() /
-                CREATE_TX_TRY_INTERVAL.toMillis());
-
-        private final String shardName;
-
-        TransactionFutureCallback(String shardName) {
-            this.shardName = shardName;
-        }
-
-        String getShardName() {
-            return shardName;
-        }
-
-        TransactionContext getTransactionContext() {
-            return transactionContext;
-        }
-
-
-        /**
-         * Sets the target primary shard and initiates a CreateTransaction try.
-         */
-        void setPrimaryShard(ActorSelection primaryShard) {
-            this.primaryShard = primaryShard;
-
-            if(transactionType == TransactionType.WRITE_ONLY &&
-                    actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
-                LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
-                    getIdentifier(), primaryShard);
-
-                // For write-only Tx's we prepare the transaction modifications directly on the shard actor
-                // to avoid the overhead of creating a separate transaction actor.
-                // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
-                executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
-                        this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
-            } else {
-                tryCreateTransaction();
-            }
-        }
-
-        /**
-         * Adds a TransactionOperation to be executed after the CreateTransaction completes.
-         */
-        void addTxOperationOnComplete(TransactionOperation operation) {
-            boolean invokeOperation = true;
-            synchronized(txOperationsOnComplete) {
-                if(transactionContext == null) {
-                    LOG.debug("Tx {} Adding operation on complete", getIdentifier());
-
-                    invokeOperation = false;
-                    txOperationsOnComplete.add(operation);
-                }
-            }
-
-            if(invokeOperation) {
-                operation.invoke(transactionContext);
-            }
-        }
-
-        void enqueueTransactionOperation(final TransactionOperation op) {
-
-            if (transactionContext != null) {
-                op.invoke(transactionContext);
-            } else {
-                // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-                // callback to be executed after the Tx is created.
-                addTxOperationOnComplete(op);
-            }
-        }
-
-        /**
-         * Performs a CreateTransaction try async.
-         */
-        private void tryCreateTransaction() {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
-            }
-
-            Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
-                    TransactionProxy.this.transactionType.ordinal(),
-                    getTransactionChainId()).toSerializable();
-
-            Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
-
-            createTxFuture.onComplete(this, actorContext.getClientDispatcher());
-        }
-
-        @Override
-        public void onComplete(Throwable failure, Object response) {
-            if(failure instanceof NoShardLeaderException) {
-                // There's no leader for the shard yet - schedule and try again, unless we're out
-                // of retries. Note: createTxTries is volatile as it may be written by different
-                // threads however not concurrently, therefore decrementing it non-atomically here
-                // is ok.
-                if(--createTxTries > 0) {
-                    LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
-                        getIdentifier(), shardName);
-
-                    actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
-                            new Runnable() {
-                                @Override
-                                public void run() {
-                                    tryCreateTransaction();
-                                }
-                            }, actorContext.getClientDispatcher());
-                    return;
-                }
-            }
-
-            createTransactionContext(failure, response);
-        }
-
-        private void createTransactionContext(Throwable failure, Object response) {
-            // Mainly checking for state violation here to perform a volatile read of "initialized" to
-            // ensure updates to operationLimter et al are visible to this thread (ie we're doing
-            // "piggy-back" synchronization here).
-            Preconditions.checkState(initialized, "Tx was not propertly initialized.");
-
-            // Create the TransactionContext from the response or failure. Store the new
-            // TransactionContext locally until we've completed invoking the
-            // TransactionOperations. This avoids thread timing issues which could cause
-            // out-of-order TransactionOperations. Eg, on a modification operation, if the
-            // TransactionContext is non-null, then we directly call the TransactionContext.
-            // However, at the same time, the code may be executing the cached
-            // TransactionOperations. So to avoid thus timing, we don't publish the
-            // TransactionContext until after we've executed all cached TransactionOperations.
-            TransactionContext localTransactionContext;
-            if(failure != null) {
-                LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
-
-                localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
-            } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
-                localTransactionContext = createValidTransactionContext(
-                        CreateTransactionReply.fromSerializable(response));
-            } else {
-                IllegalArgumentException exception = new IllegalArgumentException(String.format(
-                        "Invalid reply type %s for CreateTransaction", response.getClass()));
-
-                localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter);
-            }
+    TransactionContext createValidTransactionContext(ActorSelection transactionActor,
+            String transactionPath, short remoteTransactionVersion) {
 
 
-            executeTxOperatonsOnComplete(localTransactionContext);
-        }
+        if (transactionType == TransactionType.READ_ONLY) {
+            // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
+            // to close the remote Tx's when this instance is no longer in use and is garbage
+            // collected.
 
 
-        private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
-            while(true) {
-                // Access to txOperationsOnComplete and transactionContext must be protected and atomic
-                // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
-                // issues and ensure no TransactionOperation is missed and that they are processed
-                // in the order they occurred.
-
-                // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
-                // in case a TransactionOperation results in another transaction operation being
-                // queued (eg a put operation from a client read Future callback that is notified
-                // synchronously).
-                Collection<TransactionOperation> operationsBatch = null;
-                synchronized(txOperationsOnComplete) {
-                    if(txOperationsOnComplete.isEmpty()) {
-                        // We're done invoking the TransactionOperations so we can now publish the
-                        // TransactionContext.
-                        transactionContext = localTransactionContext;
-                        break;
-                    }
-
-                    operationsBatch = new ArrayList<>(txOperationsOnComplete);
-                    txOperationsOnComplete.clear();
-                }
+            if(remoteTransactionActorsMB == null) {
+                remoteTransactionActors = Lists.newArrayList();
+                remoteTransactionActorsMB = new AtomicBoolean();
 
 
-                // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
-                // A slight down-side is that we need to re-acquire the lock below but this should
-                // be negligible.
-                for(TransactionOperation oper: operationsBatch) {
-                    oper.invoke(localTransactionContext);
-                }
+                TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
             }
             }
-        }
 
 
-        private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
-            LOG.debug("Tx {} Received {}", getIdentifier(), reply);
+            // Add the actor to the remoteTransactionActors list for access by the
+            // cleanup PhantonReference.
+            remoteTransactionActors.add(transactionActor);
 
 
-            return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
-                    reply.getTransactionPath(), reply.getVersion());
+            // Write to the memory barrier volatile to publish the above update to the
+            // remoteTransactionActors list for thread visibility.
+            remoteTransactionActorsMB.set(true);
         }
 
         }
 
-        private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
-                String transactionPath, short remoteTransactionVersion) {
-
-            if (transactionType == TransactionType.READ_ONLY) {
-                // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
-                // to close the remote Tx's when this instance is no longer in use and is garbage
-                // collected.
-
-                if(remoteTransactionActorsMB == null) {
-                    remoteTransactionActors = Lists.newArrayList();
-                    remoteTransactionActorsMB = new AtomicBoolean();
+        // TxActor is always created where the leader of the shard is.
+        // Check if TxActor is created in the same node
+        boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
 
-                    TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
-                }
-
-                // Add the actor to the remoteTransactionActors list for access by the
-                // cleanup PhantonReference.
-                remoteTransactionActors.add(transactionActor);
-
-                // Write to the memory barrier volatile to publish the above update to the
-                // remoteTransactionActors list for thread visibility.
-                remoteTransactionActorsMB.set(true);
-            }
-
-            // TxActor is always created where the leader of the shard is.
-            // Check if TxActor is created in the same node
-            boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
-
-            if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
-                return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
-                        transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
-                        operationCompleter);
-            } else {
-                return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
-                        actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
-            }
+        if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
+            return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
+                    transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
+                    operationCompleter);
+        } else {
+            return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
+                    actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
         }
     }
 }
         }
     }
 }