Merge "BUG 2889 : migration of netconf-cli to NormalizedNode api's"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.Semaphore;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicLong;
29 import javax.annotation.concurrent.GuardedBy;
30 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
31 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
32 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
36 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
37 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
38 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
40 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import scala.concurrent.Future;
47 import scala.concurrent.Promise;
48 import scala.concurrent.duration.FiniteDuration;
49
50 /**
51  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
52  * <p>
53  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
54  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
55  * be created on each of those shards by the TransactionProxy
56  *</p>
57  * <p>
58  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
59  * shards will be executed.
60  * </p>
61  */
62 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
63
64     public static enum TransactionType {
65         READ_ONLY,
66         WRITE_ONLY,
67         READ_WRITE;
68
69         // Cache all values
70         private static final TransactionType[] VALUES = values();
71
72         public static TransactionType fromInt(final int type) {
73             try {
74                 return VALUES[type];
75             } catch (IndexOutOfBoundsException e) {
76                 throw new IllegalArgumentException("In TransactionType enum value " + type, e);
77             }
78         }
79     }
80
81     private static enum TransactionState {
82         OPEN,
83         READY,
84         CLOSED,
85     }
86
87     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
88                                                               new Mapper<Throwable, Throwable>() {
89         @Override
90         public Throwable apply(Throwable failure) {
91             return failure;
92         }
93     };
94
95     private static final AtomicLong counter = new AtomicLong();
96
97     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
98
99     /**
100      * Time interval in between transaction create retries.
101      */
102     private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
103             FiniteDuration.create(1, TimeUnit.SECONDS);
104
105     /**
106      * Stores the remote Tx actors for each requested data store path to be used by the
107      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
108      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
109      * remoteTransactionActors list so they will be visible to the thread accessing the
110      * PhantomReference.
111      */
112     List<ActorSelection> remoteTransactionActors;
113     volatile AtomicBoolean remoteTransactionActorsMB;
114
115     /**
116      * Stores the create transaction results per shard.
117      */
118     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
119
120     private final TransactionType transactionType;
121     final ActorContext actorContext;
122     private final String transactionChainId;
123     private final SchemaContext schemaContext;
124     private TransactionState state = TransactionState.OPEN;
125
126     private volatile boolean initialized;
127     private Semaphore operationLimiter;
128     private OperationCompleter operationCompleter;
129
130     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
131         this(actorContext, transactionType, "");
132     }
133
134     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
135         super(createIdentifier(actorContext));
136         this.actorContext = Preconditions.checkNotNull(actorContext,
137             "actorContext should not be null");
138         this.transactionType = Preconditions.checkNotNull(transactionType,
139             "transactionType should not be null");
140         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
141             "schemaContext should not be null");
142         this.transactionChainId = transactionChainId;
143
144         LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
145     }
146
147     private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
148         String memberName = actorContext.getCurrentMemberName();
149         if (memberName == null) {
150             memberName = "UNKNOWN-MEMBER";
151         }
152
153         return new TransactionIdentifier(memberName, counter.getAndIncrement());
154     }
155
156     @VisibleForTesting
157     List<Future<Object>> getRecordedOperationFutures() {
158         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
159         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
160             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
161             if (transactionContext != null) {
162                 transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
163             }
164         }
165
166         return recordedOperationFutures;
167     }
168
169     @VisibleForTesting
170     boolean hasTransactionContext() {
171         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
172             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
173             if(transactionContext != null) {
174                 return true;
175             }
176         }
177
178         return false;
179     }
180
181     @Override
182     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
183
184         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
185                 "Read operation on write-only transaction is not allowed");
186
187         LOG.debug("Tx {} read {}", getIdentifier(), path);
188
189         throttleOperation();
190
191         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
192
193         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
194         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
195             @Override
196             public void invoke(TransactionContext transactionContext) {
197                 transactionContext.readData(path, proxyFuture);
198             }
199         });
200
201         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
202     }
203
204     @Override
205     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
206
207         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
208                 "Exists operation on write-only transaction is not allowed");
209
210         LOG.debug("Tx {} exists {}", getIdentifier(), path);
211
212         throttleOperation();
213
214         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
215
216         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
217         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
218             @Override
219             public void invoke(TransactionContext transactionContext) {
220                 transactionContext.dataExists(path, proxyFuture);
221             }
222         });
223
224         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
225     }
226
227     private void checkModificationState() {
228         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
229                 "Modification operation on read-only transaction is not allowed");
230         Preconditions.checkState(state == TransactionState.OPEN,
231                 "Transaction is sealed - further modifications are not allowed");
232     }
233
234     private void throttleOperation() {
235         throttleOperation(1);
236     }
237
238     private void throttleOperation(int acquirePermits) {
239         if(!initialized) {
240             // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
241             operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
242             operationCompleter = new OperationCompleter(operationLimiter);
243
244             // Make sure we write this last because it's volatile and will also publish the non-volatile writes
245             // above as well so they'll be visible to other threads.
246             initialized = true;
247         }
248
249         try {
250             if(!operationLimiter.tryAcquire(acquirePermits,
251                     actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
252                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
253             }
254         } catch (InterruptedException e) {
255             if(LOG.isDebugEnabled()) {
256                 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
257             } else {
258                 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
259             }
260         }
261     }
262
263     @Override
264     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
265
266         checkModificationState();
267
268         LOG.debug("Tx {} write {}", getIdentifier(), path);
269
270         throttleOperation();
271
272         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
273         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
274             @Override
275             public void invoke(TransactionContext transactionContext) {
276                 transactionContext.writeData(path, data);
277             }
278         });
279     }
280
281     @Override
282     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
283
284         checkModificationState();
285
286         LOG.debug("Tx {} merge {}", getIdentifier(), path);
287
288         throttleOperation();
289
290         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
291         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
292             @Override
293             public void invoke(TransactionContext transactionContext) {
294                 transactionContext.mergeData(path, data);
295             }
296         });
297     }
298
299     @Override
300     public void delete(final YangInstanceIdentifier path) {
301
302         checkModificationState();
303
304         LOG.debug("Tx {} delete {}", getIdentifier(), path);
305
306         throttleOperation();
307
308         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
309         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
310             @Override
311             public void invoke(TransactionContext transactionContext) {
312                 transactionContext.deleteData(path);
313             }
314         });
315     }
316
317     private boolean seal(final TransactionState newState) {
318         if (state == TransactionState.OPEN) {
319             state = newState;
320             return true;
321         } else {
322             return false;
323         }
324     }
325
326     @Override
327     public AbstractThreePhaseCommitCohort ready() {
328         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
329                 "Read-only transactions cannot be readied");
330
331         final boolean success = seal(TransactionState.READY);
332         Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
333
334         LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
335                     txFutureCallbackMap.size());
336
337         if (txFutureCallbackMap.isEmpty()) {
338             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
339             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
340         }
341
342         throttleOperation(txFutureCallbackMap.size());
343
344         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
345         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
346
347             LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
348                         txFutureCallback.getShardName(), transactionChainId);
349
350             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
351             final Future<ActorSelection> future;
352             if (transactionContext != null) {
353                 // avoid the creation of a promise and a TransactionOperation
354                 future = transactionContext.readyTransaction();
355             } else {
356                 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
357                 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
358                     @Override
359                     public void invoke(TransactionContext transactionContext) {
360                         promise.completeWith(transactionContext.readyTransaction());
361                     }
362                 });
363                 future = promise.future();
364             }
365
366             cohortFutures.add(future);
367         }
368
369         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
370             getIdentifier().toString());
371     }
372
373     @Override
374     public void close() {
375         if (!seal(TransactionState.CLOSED)) {
376             if (state == TransactionState.CLOSED) {
377                 // Idempotent no-op as per AutoCloseable recommendation
378                 return;
379             }
380
381             throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
382                 getIdentifier()));
383         }
384
385         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
386             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
387                 @Override
388                 public void invoke(TransactionContext transactionContext) {
389                     transactionContext.closeTransaction();
390                 }
391             });
392         }
393
394         txFutureCallbackMap.clear();
395
396         if(remoteTransactionActorsMB != null) {
397             remoteTransactionActors.clear();
398             remoteTransactionActorsMB.set(true);
399         }
400     }
401
402     private String shardNameFromIdentifier(YangInstanceIdentifier path){
403         return ShardStrategyFactory.getStrategy(path).findShard(path);
404     }
405
406     protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
407         return actorContext.findPrimaryShardAsync(shardName);
408     }
409
410     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
411         String shardName = shardNameFromIdentifier(path);
412         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
413         if(txFutureCallback == null) {
414             Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
415
416             final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
417
418             txFutureCallback = newTxFutureCallback;
419             txFutureCallbackMap.put(shardName, txFutureCallback);
420
421             findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
422                 @Override
423                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
424                     if(failure != null) {
425                         newTxFutureCallback.createTransactionContext(failure, null);
426                     } else {
427                         newTxFutureCallback.setPrimaryShard(primaryShard);
428                     }
429                 }
430             }, actorContext.getClientDispatcher());
431         }
432
433         return txFutureCallback;
434     }
435
436     public String getTransactionChainId() {
437         return transactionChainId;
438     }
439
440     protected ActorContext getActorContext() {
441         return actorContext;
442     }
443
444     /**
445      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
446      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
447      * retry task after a short delay.
448      * <p>
449      * The end result from a completed CreateTransaction message is a TransactionContext that is
450      * used to perform transaction operations. Transaction operations that occur before the
451      * CreateTransaction completes are cache and executed once the CreateTransaction completes,
452      * successfully or not.
453      */
454     private class TransactionFutureCallback extends OnComplete<Object> {
455
456         /**
457          * The list of transaction operations to execute once the CreateTransaction completes.
458          */
459         @GuardedBy("txOperationsOnComplete")
460         private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
461
462         /**
463          * The TransactionContext resulting from the CreateTransaction reply.
464          */
465         private volatile TransactionContext transactionContext;
466
467         /**
468          * The target primary shard.
469          */
470         private volatile ActorSelection primaryShard;
471
472         private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
473                 getShardLeaderElectionTimeout().duration().toMillis() /
474                 CREATE_TX_TRY_INTERVAL.toMillis());
475
476         private final String shardName;
477
478         TransactionFutureCallback(String shardName) {
479             this.shardName = shardName;
480         }
481
482         String getShardName() {
483             return shardName;
484         }
485
486         TransactionContext getTransactionContext() {
487             return transactionContext;
488         }
489
490
491         /**
492          * Sets the target primary shard and initiates a CreateTransaction try.
493          */
494         void setPrimaryShard(ActorSelection primaryShard) {
495             this.primaryShard = primaryShard;
496
497             if(transactionType == TransactionType.WRITE_ONLY &&
498                     actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
499                 LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
500                     getIdentifier(), primaryShard);
501
502                 // For write-only Tx's we prepare the transaction modifications directly on the shard actor
503                 // to avoid the overhead of creating a separate transaction actor.
504                 // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
505                 executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
506                         this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
507             } else {
508                 tryCreateTransaction();
509             }
510         }
511
512         /**
513          * Adds a TransactionOperation to be executed after the CreateTransaction completes.
514          */
515         void addTxOperationOnComplete(TransactionOperation operation) {
516             boolean invokeOperation = true;
517             synchronized(txOperationsOnComplete) {
518                 if(transactionContext == null) {
519                     LOG.debug("Tx {} Adding operation on complete", getIdentifier());
520
521                     invokeOperation = false;
522                     txOperationsOnComplete.add(operation);
523                 }
524             }
525
526             if(invokeOperation) {
527                 operation.invoke(transactionContext);
528             }
529         }
530
531         void enqueueTransactionOperation(final TransactionOperation op) {
532
533             if (transactionContext != null) {
534                 op.invoke(transactionContext);
535             } else {
536                 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
537                 // callback to be executed after the Tx is created.
538                 addTxOperationOnComplete(op);
539             }
540         }
541
542         /**
543          * Performs a CreateTransaction try async.
544          */
545         private void tryCreateTransaction() {
546             if(LOG.isDebugEnabled()) {
547                 LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
548             }
549
550             Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
551                     TransactionProxy.this.transactionType.ordinal(),
552                     getTransactionChainId()).toSerializable();
553
554             Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
555
556             createTxFuture.onComplete(this, actorContext.getClientDispatcher());
557         }
558
559         @Override
560         public void onComplete(Throwable failure, Object response) {
561             if(failure instanceof NoShardLeaderException) {
562                 // There's no leader for the shard yet - schedule and try again, unless we're out
563                 // of retries. Note: createTxTries is volatile as it may be written by different
564                 // threads however not concurrently, therefore decrementing it non-atomically here
565                 // is ok.
566                 if(--createTxTries > 0) {
567                     LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
568                         getIdentifier(), shardName);
569
570                     actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
571                             new Runnable() {
572                                 @Override
573                                 public void run() {
574                                     tryCreateTransaction();
575                                 }
576                             }, actorContext.getClientDispatcher());
577                     return;
578                 }
579             }
580
581             createTransactionContext(failure, response);
582         }
583
584         private void createTransactionContext(Throwable failure, Object response) {
585             // Mainly checking for state violation here to perform a volatile read of "initialized" to
586             // ensure updates to operationLimter et al are visible to this thread (ie we're doing
587             // "piggy-back" synchronization here).
588             Preconditions.checkState(initialized, "Tx was not propertly initialized.");
589
590             // Create the TransactionContext from the response or failure. Store the new
591             // TransactionContext locally until we've completed invoking the
592             // TransactionOperations. This avoids thread timing issues which could cause
593             // out-of-order TransactionOperations. Eg, on a modification operation, if the
594             // TransactionContext is non-null, then we directly call the TransactionContext.
595             // However, at the same time, the code may be executing the cached
596             // TransactionOperations. So to avoid thus timing, we don't publish the
597             // TransactionContext until after we've executed all cached TransactionOperations.
598             TransactionContext localTransactionContext;
599             if(failure != null) {
600                 LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
601
602                 localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
603             } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
604                 localTransactionContext = createValidTransactionContext(
605                         CreateTransactionReply.fromSerializable(response));
606             } else {
607                 IllegalArgumentException exception = new IllegalArgumentException(String.format(
608                         "Invalid reply type %s for CreateTransaction", response.getClass()));
609
610                 localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter);
611             }
612
613             executeTxOperatonsOnComplete(localTransactionContext);
614         }
615
616         private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
617             while(true) {
618                 // Access to txOperationsOnComplete and transactionContext must be protected and atomic
619                 // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
620                 // issues and ensure no TransactionOperation is missed and that they are processed
621                 // in the order they occurred.
622
623                 // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
624                 // in case a TransactionOperation results in another transaction operation being
625                 // queued (eg a put operation from a client read Future callback that is notified
626                 // synchronously).
627                 Collection<TransactionOperation> operationsBatch = null;
628                 synchronized(txOperationsOnComplete) {
629                     if(txOperationsOnComplete.isEmpty()) {
630                         // We're done invoking the TransactionOperations so we can now publish the
631                         // TransactionContext.
632                         transactionContext = localTransactionContext;
633                         break;
634                     }
635
636                     operationsBatch = new ArrayList<>(txOperationsOnComplete);
637                     txOperationsOnComplete.clear();
638                 }
639
640                 // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
641                 // A slight down-side is that we need to re-acquire the lock below but this should
642                 // be negligible.
643                 for(TransactionOperation oper: operationsBatch) {
644                     oper.invoke(localTransactionContext);
645                 }
646             }
647         }
648
649         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
650             LOG.debug("Tx {} Received {}", getIdentifier(), reply);
651
652             return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
653                     reply.getTransactionPath(), reply.getVersion());
654         }
655
656         private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
657                 String transactionPath, short remoteTransactionVersion) {
658
659             if (transactionType == TransactionType.READ_ONLY) {
660                 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
661                 // to close the remote Tx's when this instance is no longer in use and is garbage
662                 // collected.
663
664                 if(remoteTransactionActorsMB == null) {
665                     remoteTransactionActors = Lists.newArrayList();
666                     remoteTransactionActorsMB = new AtomicBoolean();
667
668                     TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
669                 }
670
671                 // Add the actor to the remoteTransactionActors list for access by the
672                 // cleanup PhantonReference.
673                 remoteTransactionActors.add(transactionActor);
674
675                 // Write to the memory barrier volatile to publish the above update to the
676                 // remoteTransactionActors list for thread visibility.
677                 remoteTransactionActorsMB.set(true);
678             }
679
680             // TxActor is always created where the leader of the shard is.
681             // Check if TxActor is created in the same node
682             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
683
684             if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
685                 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
686                         transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
687                         operationCompleter);
688             } else if (transactionType == TransactionType.WRITE_ONLY &&
689                     actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
690                 return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
691                     actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
692             } else {
693                 return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
694                         actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
695             }
696         }
697     }
698 }