Use BatchedModifications message in place of ReadyTransaction message
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 58b37be2a2727babd9b0305e868d85d90c079052..388dd9f4bda2fee3e82bba4660a137876cf9dbda 100644 (file)
@@ -12,34 +12,36 @@ import akka.actor.ActorSelection;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.FinalizablePhantomReference;
-import com.google.common.base.FinalizableReferenceQueue;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -62,12 +64,29 @@ import scala.concurrent.duration.FiniteDuration;
  * shards will be executed.
  * </p>
  */
-public class TransactionProxy implements DOMStoreReadWriteTransaction {
+public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
 
     public static enum TransactionType {
         READ_ONLY,
         WRITE_ONLY,
-        READ_WRITE
+        READ_WRITE;
+
+        // Cache all values
+        private static final TransactionType[] VALUES = values();
+
+        public static TransactionType fromInt(final int type) {
+            try {
+                return VALUES[type];
+            } catch (IndexOutOfBoundsException e) {
+                throw new IllegalArgumentException("In TransactionType enum value " + type, e);
+            }
+        }
+    }
+
+    private static enum TransactionState {
+        OPEN,
+        READY,
+        CLOSED,
     }
 
     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
@@ -88,72 +107,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
             FiniteDuration.create(1, TimeUnit.SECONDS);
 
-    /**
-     * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
-     * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
-     * trickery to clean up its internal thread when the bundle is unloaded.
-     */
-    private static final FinalizableReferenceQueue phantomReferenceQueue =
-                                                                  new FinalizableReferenceQueue();
-
-    /**
-     * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
-     * necessary because PhantomReferences need a hard reference so they're not garbage collected.
-     * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
-     * and thus becomes eligible for garbage collection.
-     */
-    private static final Map<TransactionProxyCleanupPhantomReference,
-                             TransactionProxyCleanupPhantomReference> phantomReferenceCache =
-                                                                        new ConcurrentHashMap<>();
-
-    /**
-     * A PhantomReference that closes remote transactions for a TransactionProxy when it's
-     * garbage collected. This is used for read-only transactions as they're not explicitly closed
-     * by clients. So the only way to detect that a transaction is no longer in use and it's safe
-     * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
-     * but TransactionProxy instances should generally be short-lived enough to avoid being moved
-     * to the old generation space and thus should be cleaned up in a timely manner as the GC
-     * runs on the young generation (eden, swap1...) space much more frequently.
-     */
-    private static class TransactionProxyCleanupPhantomReference
-                                           extends FinalizablePhantomReference<TransactionProxy> {
-
-        private final List<ActorSelection> remoteTransactionActors;
-        private final AtomicBoolean remoteTransactionActorsMB;
-        private final ActorContext actorContext;
-        private final TransactionIdentifier identifier;
-
-        protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
-            super(referent, phantomReferenceQueue);
-
-            // Note we need to cache the relevant fields from the TransactionProxy as we can't
-            // have a hard reference to the TransactionProxy instance itself.
-
-            remoteTransactionActors = referent.remoteTransactionActors;
-            remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
-            actorContext = referent.actorContext;
-            identifier = referent.identifier;
-        }
-
-        @Override
-        public void finalizeReferent() {
-            LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
-                    remoteTransactionActors.size(), identifier);
-
-            phantomReferenceCache.remove(this);
-
-            // Access the memory barrier volatile to ensure all previous updates to the
-            // remoteTransactionActors list are visible to this thread.
-
-            if(remoteTransactionActorsMB.get()) {
-                for(ActorSelection actor : remoteTransactionActors) {
-                    LOG.trace("Sending CloseTransaction to {}", actor);
-                    actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
-                }
-            }
-        }
-    }
-
     /**
      * Stores the remote Tx actors for each requested data store path to be used by the
      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
@@ -161,8 +114,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
      * remoteTransactionActors list so they will be visible to the thread accessing the
      * PhantomReference.
      */
-    private List<ActorSelection> remoteTransactionActors;
-    private AtomicBoolean remoteTransactionActorsMB;
+    List<ActorSelection> remoteTransactionActors;
+    volatile AtomicBoolean remoteTransactionActorsMB;
 
     /**
      * Stores the create transaction results per shard.
@@ -170,20 +123,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
 
     private final TransactionType transactionType;
-    private final ActorContext actorContext;
-    private final TransactionIdentifier identifier;
+    final ActorContext actorContext;
     private final String transactionChainId;
     private final SchemaContext schemaContext;
-    private boolean inReadyState;
-    private final Semaphore operationLimiter;
-    private final OperationCompleter operationCompleter;
+    private TransactionState state = TransactionState.OPEN;
+
+    private volatile boolean initialized;
+    private Semaphore operationLimiter;
+    private OperationCompleter operationCompleter;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
         this(actorContext, transactionType, "");
     }
 
-    public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
-            String transactionChainId) {
+    public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
+        super(createIdentifier(actorContext));
         this.actorContext = Preconditions.checkNotNull(actorContext,
             "actorContext should not be null");
         this.transactionType = Preconditions.checkNotNull(transactionType,
@@ -192,45 +146,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             "schemaContext should not be null");
         this.transactionChainId = transactionChainId;
 
-        String memberName = actorContext.getCurrentMemberName();
-        if(memberName == null){
-            memberName = "UNKNOWN-MEMBER";
-        }
-
-        this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
-            counter.getAndIncrement()).build();
-
-        if(transactionType == TransactionType.READ_ONLY) {
-            // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
-            // to close the remote Tx's when this instance is no longer in use and is garbage
-            // collected.
-
-            remoteTransactionActors = Lists.newArrayList();
-            remoteTransactionActorsMB = new AtomicBoolean();
-
-            TransactionProxyCleanupPhantomReference cleanup =
-                new TransactionProxyCleanupPhantomReference(this);
-            phantomReferenceCache.put(cleanup, cleanup);
-        }
-
-        // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
-        this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
-        this.operationCompleter = new OperationCompleter(operationLimiter);
-
-        LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
+        LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
     }
 
-    @VisibleForTesting
-    List<Future<Object>> getRecordedOperationFutures() {
-        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
-        for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-            TransactionContext transactionContext = txFutureCallback.getTransactionContext();
-            if(transactionContext != null) {
-                recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
-            }
+    private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
+        String memberName = actorContext.getCurrentMemberName();
+        if (memberName == null) {
+            memberName = "UNKNOWN-MEMBER";
         }
 
-        return recordedOperationFutures;
+        return new TransactionIdentifier(memberName, counter.getAndIncrement());
     }
 
     @VisibleForTesting
@@ -245,36 +170,81 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return false;
     }
 
+    private boolean isRootPath(YangInstanceIdentifier path){
+        return !path.getPathArguments().iterator().hasNext();
+    }
+
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
 
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Read operation on write-only transaction is not allowed");
 
-        LOG.debug("Tx {} read {}", identifier, path);
-
-        throttleOperation();
+        LOG.debug("Tx {} read {}", getIdentifier(), path);
 
         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
 
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
-            @Override
-            public void invoke(TransactionContext transactionContext) {
-                transactionContext.readData(path, proxyFuture);
-            }
-        });
+        if(isRootPath(path)){
+            readAllData(path, proxyFuture);
+        } else {
+            throttleOperation();
+
+            TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    transactionContext.readData(path, proxyFuture);
+                }
+            });
+
+        }
 
         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
     }
 
+    private void readAllData(final YangInstanceIdentifier path,
+                             final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+        Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
+        List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
+
+        for(String shardName : allShardNames){
+            final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
+
+            throttleOperation();
+
+            TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    transactionContext.readData(path, subProxyFuture);
+                }
+            });
+
+            futures.add(subProxyFuture);
+        }
+
+        final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
+
+        future.addListener(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
+                            future.get(), actorContext.getSchemaContext()));
+                } catch (InterruptedException | ExecutionException e) {
+                    proxyFuture.setException(e);
+                }
+            }
+        }, actorContext.getActorSystem().dispatcher());
+    }
+
     @Override
     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
 
         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
                 "Exists operation on write-only transaction is not allowed");
 
-        LOG.debug("Tx {} exists {}", identifier, path);
+        LOG.debug("Tx {} exists {}", getIdentifier(), path);
 
         throttleOperation();
 
@@ -294,7 +264,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private void checkModificationState() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
                 "Modification operation on read-only transaction is not allowed");
-        Preconditions.checkState(!inReadyState,
+        Preconditions.checkState(state == TransactionState.OPEN,
                 "Transaction is sealed - further modifications are not allowed");
     }
 
@@ -303,6 +273,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     }
 
     private void throttleOperation(int acquirePermits) {
+        if(!initialized) {
+            // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+            operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
+            operationCompleter = new OperationCompleter(operationLimiter);
+
+            // Make sure we write this last because it's volatile and will also publish the non-volatile writes
+            // above as well so they'll be visible to other threads.
+            initialized = true;
+        }
+
         try {
             if(!operationLimiter.tryAcquire(acquirePermits,
                     actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
@@ -317,13 +297,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
     }
 
-
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
 
         checkModificationState();
 
-        LOG.debug("Tx {} write {}", identifier, path);
+        LOG.debug("Tx {} write {}", getIdentifier(), path);
 
         throttleOperation();
 
@@ -341,7 +320,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("Tx {} merge {}", identifier, path);
+        LOG.debug("Tx {} merge {}", getIdentifier(), path);
 
         throttleOperation();
 
@@ -359,7 +338,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
-        LOG.debug("Tx {} delete {}", identifier, path);
+        LOG.debug("Tx {} delete {}", getIdentifier(), path);
 
         throttleOperation();
 
@@ -372,23 +351,37 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         });
     }
 
-    @Override
-    public DOMStoreThreePhaseCommitCohort ready() {
-
-        checkModificationState();
+    private boolean seal(final TransactionState newState) {
+        if (state == TransactionState.OPEN) {
+            state = newState;
+            return true;
+        } else {
+            return false;
+        }
+    }
 
-        throttleOperation(txFutureCallbackMap.size());
+    @Override
+    public AbstractThreePhaseCommitCohort ready() {
+        Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
+                "Read-only transactions cannot be readied");
 
-        inReadyState = true;
+        final boolean success = seal(TransactionState.READY);
+        Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
 
-        LOG.debug("Tx {} Readying {} transactions for commit", identifier,
+        LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
                     txFutureCallbackMap.size());
 
-        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
+        if (txFutureCallbackMap.isEmpty()) {
+            TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
+            return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
+        }
 
+        throttleOperation(txFutureCallbackMap.size());
+
+        List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
 
-            LOG.debug("Tx {} Readying transaction for shard {} chain {}", identifier,
+            LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
                         txFutureCallback.getShardName(), transactionChainId);
 
             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
@@ -410,39 +403,22 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             cohortFutures.add(future);
         }
 
-        onTransactionReady(cohortFutures);
-
         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
-                identifier.toString());
-    }
-
-    /**
-     * Method for derived classes to be notified when the transaction has been readied.
-     *
-     * @param cohortFutures the cohort Futures for each shard transaction.
-     */
-    protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
-    }
-
-    /**
-     * Method called to send a CreateTransaction message to a shard.
-     *
-     * @param shard the shard actor to send to
-     * @param serializedCreateMessage the serialized message to send
-     * @return the response Future
-     */
-    protected Future<Object> sendCreateTransaction(ActorSelection shard,
-            Object serializedCreateMessage) {
-        return actorContext.executeOperationAsync(shard, serializedCreateMessage);
-    }
-
-    @Override
-    public Object getIdentifier() {
-        return this.identifier;
+            getIdentifier().toString());
     }
 
     @Override
     public void close() {
+        if (!seal(TransactionState.CLOSED)) {
+            if (state == TransactionState.CLOSED) {
+                // Idempotent no-op as per AutoCloseable recommendation
+                return;
+            }
+
+            throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
+                getIdentifier()));
+        }
+
         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
                 @Override
@@ -454,7 +430,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         txFutureCallbackMap.clear();
 
-        if(transactionType == TransactionType.READ_ONLY) {
+        if(remoteTransactionActorsMB != null) {
             remoteTransactionActors.clear();
             remoteTransactionActorsMB.set(true);
         }
@@ -464,14 +440,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return ShardStrategyFactory.getStrategy(path).findShard(path);
     }
 
+    protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
+        return actorContext.findPrimaryShardAsync(shardName);
+    }
+
     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
         String shardName = shardNameFromIdentifier(path);
+        return getOrCreateTxFutureCallback(shardName);
+    }
+
+    private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
         if(txFutureCallback == null) {
-            Future<ActorSelection> findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName);
+            Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
 
-            final TransactionFutureCallback newTxFutureCallback =
-                    new TransactionFutureCallback(shardName);
+            final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
 
             txFutureCallback = newTxFutureCallback;
             txFutureCallbackMap.put(shardName, txFutureCallback);
@@ -480,7 +463,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 @Override
                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
                     if(failure != null) {
-                        newTxFutureCallback.onComplete(failure, null);
+                        newTxFutureCallback.createTransactionContext(failure, null);
                     } else {
                         newTxFutureCallback.setPrimaryShard(primaryShard);
                     }
@@ -499,13 +482,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return actorContext;
     }
 
-    /**
-     * Interfaces for transaction operations to be invoked later.
-     */
-    private static interface TransactionOperation {
-        void invoke(TransactionContext transactionContext);
-    }
-
     /**
      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
@@ -557,10 +533,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Sets the target primary shard and initiates a CreateTransaction try.
          */
         void setPrimaryShard(ActorSelection primaryShard) {
-            LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
-
             this.primaryShard = primaryShard;
-            tryCreateTransaction();
+
+            if(transactionType == TransactionType.WRITE_ONLY &&
+                    actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+                LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
+                    getIdentifier(), primaryShard);
+
+                // For write-only Tx's we prepare the transaction modifications directly on the shard actor
+                // to avoid the overhead of creating a separate transaction actor.
+                // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
+                executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
+                        this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
+            } else {
+                tryCreateTransaction();
+            }
         }
 
         /**
@@ -570,7 +557,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             boolean invokeOperation = true;
             synchronized(txOperationsOnComplete) {
                 if(transactionContext == null) {
-                    LOG.debug("Tx {} Adding operation on complete {}", identifier);
+                    LOG.debug("Tx {} Adding operation on complete", getIdentifier());
 
                     invokeOperation = false;
                     txOperationsOnComplete.add(operation);
@@ -597,10 +584,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Performs a CreateTransaction try async.
          */
         private void tryCreateTransaction() {
-            Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
-                    new CreateTransaction(identifier.toString(),
-                            TransactionProxy.this.transactionType.ordinal(),
-                            getTransactionChainId()).toSerializable());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
+            }
+
+            Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
+                    TransactionProxy.this.transactionType.ordinal(),
+                    getTransactionChainId()).toSerializable();
+
+            Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
 
             createTxFuture.onComplete(this, actorContext.getClientDispatcher());
         }
@@ -614,7 +606,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 // is ok.
                 if(--createTxTries > 0) {
                     LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
-                            identifier, shardName);
+                        getIdentifier(), shardName);
 
                     actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
                             new Runnable() {
@@ -627,6 +619,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             }
 
+            createTransactionContext(failure, response);
+        }
+
+        private void createTransactionContext(Throwable failure, Object response) {
+            // Mainly checking for state violation here to perform a volatile read of "initialized" to
+            // ensure updates to operationLimter et al are visible to this thread (ie we're doing
+            // "piggy-back" synchronization here).
+            Preconditions.checkState(initialized, "Tx was not propertly initialized.");
+
             // Create the TransactionContext from the response or failure. Store the new
             // TransactionContext locally until we've completed invoking the
             // TransactionOperations. This avoids thread timing issues which could cause
@@ -637,18 +638,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // TransactionContext until after we've executed all cached TransactionOperations.
             TransactionContext localTransactionContext;
             if(failure != null) {
-                LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
-                        failure.getMessage());
+                LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
 
-                localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
-            } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+                localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
+            } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
                 localTransactionContext = createValidTransactionContext(
                         CreateTransactionReply.fromSerializable(response));
             } else {
                 IllegalArgumentException exception = new IllegalArgumentException(String.format(
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-                localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
+                localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter);
             }
 
             executeTxOperatonsOnComplete(localTransactionContext);
@@ -688,13 +688,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
-            String transactionPath = reply.getTransactionPath();
+            LOG.debug("Tx {} Received {}", getIdentifier(), reply);
 
-            LOG.debug("Tx {} Received {}", identifier, reply);
+            return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
+                    reply.getTransactionPath(), reply.getVersion());
+        }
 
-            ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+        private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
+                String transactionPath, short remoteTransactionVersion) {
 
             if (transactionType == TransactionType.READ_ONLY) {
+                // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
+                // to close the remote Tx's when this instance is no longer in use and is garbage
+                // collected.
+
+                if(remoteTransactionActorsMB == null) {
+                    remoteTransactionActors = Lists.newArrayList();
+                    remoteTransactionActorsMB = new AtomicBoolean();
+
+                    TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
+                }
+
                 // Add the actor to the remoteTransactionActors list for access by the
                 // cleanup PhantonReference.
                 remoteTransactionActors.add(transactionActor);
@@ -708,12 +722,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
-            if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
-                return new TransactionContextImpl(transactionPath, transactionActor, identifier,
-                    actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
+                return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
+                        transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
+                        operationCompleter);
             } else {
-                return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier,
-                        actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+                return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
+                        actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
             }
         }
     }