Use BatchedModifications message in place of ReadyTransaction message
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Semaphore;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicLong;
33 import javax.annotation.concurrent.GuardedBy;
34 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
36 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
37 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
41 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
42 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
43 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
44 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
45 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.Future;
52 import scala.concurrent.Promise;
53 import scala.concurrent.duration.FiniteDuration;
54
55 /**
56  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
57  * <p>
58  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
59  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
60  * be created on each of those shards by the TransactionProxy
61  *</p>
62  * <p>
63  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
64  * shards will be executed.
65  * </p>
66  */
67 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
68
69     public static enum TransactionType {
70         READ_ONLY,
71         WRITE_ONLY,
72         READ_WRITE;
73
74         // Cache all values
75         private static final TransactionType[] VALUES = values();
76
77         public static TransactionType fromInt(final int type) {
78             try {
79                 return VALUES[type];
80             } catch (IndexOutOfBoundsException e) {
81                 throw new IllegalArgumentException("In TransactionType enum value " + type, e);
82             }
83         }
84     }
85
86     private static enum TransactionState {
87         OPEN,
88         READY,
89         CLOSED,
90     }
91
92     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
93                                                               new Mapper<Throwable, Throwable>() {
94         @Override
95         public Throwable apply(Throwable failure) {
96             return failure;
97         }
98     };
99
100     private static final AtomicLong counter = new AtomicLong();
101
102     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
103
104     /**
105      * Time interval in between transaction create retries.
106      */
107     private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
108             FiniteDuration.create(1, TimeUnit.SECONDS);
109
110     /**
111      * Stores the remote Tx actors for each requested data store path to be used by the
112      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
113      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
114      * remoteTransactionActors list so they will be visible to the thread accessing the
115      * PhantomReference.
116      */
117     List<ActorSelection> remoteTransactionActors;
118     volatile AtomicBoolean remoteTransactionActorsMB;
119
120     /**
121      * Stores the create transaction results per shard.
122      */
123     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
124
125     private final TransactionType transactionType;
126     final ActorContext actorContext;
127     private final String transactionChainId;
128     private final SchemaContext schemaContext;
129     private TransactionState state = TransactionState.OPEN;
130
131     private volatile boolean initialized;
132     private Semaphore operationLimiter;
133     private OperationCompleter operationCompleter;
134
135     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
136         this(actorContext, transactionType, "");
137     }
138
139     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
140         super(createIdentifier(actorContext));
141         this.actorContext = Preconditions.checkNotNull(actorContext,
142             "actorContext should not be null");
143         this.transactionType = Preconditions.checkNotNull(transactionType,
144             "transactionType should not be null");
145         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
146             "schemaContext should not be null");
147         this.transactionChainId = transactionChainId;
148
149         LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
150     }
151
152     private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
153         String memberName = actorContext.getCurrentMemberName();
154         if (memberName == null) {
155             memberName = "UNKNOWN-MEMBER";
156         }
157
158         return new TransactionIdentifier(memberName, counter.getAndIncrement());
159     }
160
161     @VisibleForTesting
162     boolean hasTransactionContext() {
163         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
164             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
165             if(transactionContext != null) {
166                 return true;
167             }
168         }
169
170         return false;
171     }
172
173     private boolean isRootPath(YangInstanceIdentifier path){
174         return !path.getPathArguments().iterator().hasNext();
175     }
176
177     @Override
178     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
179
180         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
181                 "Read operation on write-only transaction is not allowed");
182
183         LOG.debug("Tx {} read {}", getIdentifier(), path);
184
185         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
186
187         if(isRootPath(path)){
188             readAllData(path, proxyFuture);
189         } else {
190             throttleOperation();
191
192             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
193             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
194                 @Override
195                 public void invoke(TransactionContext transactionContext) {
196                     transactionContext.readData(path, proxyFuture);
197                 }
198             });
199
200         }
201
202         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
203     }
204
205     private void readAllData(final YangInstanceIdentifier path,
206                              final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
207         Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
208         List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
209
210         for(String shardName : allShardNames){
211             final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
212
213             throttleOperation();
214
215             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
216             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
217                 @Override
218                 public void invoke(TransactionContext transactionContext) {
219                     transactionContext.readData(path, subProxyFuture);
220                 }
221             });
222
223             futures.add(subProxyFuture);
224         }
225
226         final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
227
228         future.addListener(new Runnable() {
229             @Override
230             public void run() {
231                 try {
232                     proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
233                             future.get(), actorContext.getSchemaContext()));
234                 } catch (InterruptedException | ExecutionException e) {
235                     proxyFuture.setException(e);
236                 }
237             }
238         }, actorContext.getActorSystem().dispatcher());
239     }
240
241     @Override
242     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
243
244         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
245                 "Exists operation on write-only transaction is not allowed");
246
247         LOG.debug("Tx {} exists {}", getIdentifier(), path);
248
249         throttleOperation();
250
251         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
252
253         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
254         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
255             @Override
256             public void invoke(TransactionContext transactionContext) {
257                 transactionContext.dataExists(path, proxyFuture);
258             }
259         });
260
261         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
262     }
263
264     private void checkModificationState() {
265         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
266                 "Modification operation on read-only transaction is not allowed");
267         Preconditions.checkState(state == TransactionState.OPEN,
268                 "Transaction is sealed - further modifications are not allowed");
269     }
270
271     private void throttleOperation() {
272         throttleOperation(1);
273     }
274
275     private void throttleOperation(int acquirePermits) {
276         if(!initialized) {
277             // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
278             operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
279             operationCompleter = new OperationCompleter(operationLimiter);
280
281             // Make sure we write this last because it's volatile and will also publish the non-volatile writes
282             // above as well so they'll be visible to other threads.
283             initialized = true;
284         }
285
286         try {
287             if(!operationLimiter.tryAcquire(acquirePermits,
288                     actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
289                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
290             }
291         } catch (InterruptedException e) {
292             if(LOG.isDebugEnabled()) {
293                 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
294             } else {
295                 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
296             }
297         }
298     }
299
300     @Override
301     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
302
303         checkModificationState();
304
305         LOG.debug("Tx {} write {}", getIdentifier(), path);
306
307         throttleOperation();
308
309         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
310         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
311             @Override
312             public void invoke(TransactionContext transactionContext) {
313                 transactionContext.writeData(path, data);
314             }
315         });
316     }
317
318     @Override
319     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
320
321         checkModificationState();
322
323         LOG.debug("Tx {} merge {}", getIdentifier(), path);
324
325         throttleOperation();
326
327         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
328         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
329             @Override
330             public void invoke(TransactionContext transactionContext) {
331                 transactionContext.mergeData(path, data);
332             }
333         });
334     }
335
336     @Override
337     public void delete(final YangInstanceIdentifier path) {
338
339         checkModificationState();
340
341         LOG.debug("Tx {} delete {}", getIdentifier(), path);
342
343         throttleOperation();
344
345         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
346         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
347             @Override
348             public void invoke(TransactionContext transactionContext) {
349                 transactionContext.deleteData(path);
350             }
351         });
352     }
353
354     private boolean seal(final TransactionState newState) {
355         if (state == TransactionState.OPEN) {
356             state = newState;
357             return true;
358         } else {
359             return false;
360         }
361     }
362
363     @Override
364     public AbstractThreePhaseCommitCohort ready() {
365         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
366                 "Read-only transactions cannot be readied");
367
368         final boolean success = seal(TransactionState.READY);
369         Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
370
371         LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
372                     txFutureCallbackMap.size());
373
374         if (txFutureCallbackMap.isEmpty()) {
375             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
376             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
377         }
378
379         throttleOperation(txFutureCallbackMap.size());
380
381         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
382         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
383
384             LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
385                         txFutureCallback.getShardName(), transactionChainId);
386
387             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
388             final Future<ActorSelection> future;
389             if (transactionContext != null) {
390                 // avoid the creation of a promise and a TransactionOperation
391                 future = transactionContext.readyTransaction();
392             } else {
393                 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
394                 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
395                     @Override
396                     public void invoke(TransactionContext transactionContext) {
397                         promise.completeWith(transactionContext.readyTransaction());
398                     }
399                 });
400                 future = promise.future();
401             }
402
403             cohortFutures.add(future);
404         }
405
406         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
407             getIdentifier().toString());
408     }
409
410     @Override
411     public void close() {
412         if (!seal(TransactionState.CLOSED)) {
413             if (state == TransactionState.CLOSED) {
414                 // Idempotent no-op as per AutoCloseable recommendation
415                 return;
416             }
417
418             throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
419                 getIdentifier()));
420         }
421
422         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
423             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
424                 @Override
425                 public void invoke(TransactionContext transactionContext) {
426                     transactionContext.closeTransaction();
427                 }
428             });
429         }
430
431         txFutureCallbackMap.clear();
432
433         if(remoteTransactionActorsMB != null) {
434             remoteTransactionActors.clear();
435             remoteTransactionActorsMB.set(true);
436         }
437     }
438
439     private String shardNameFromIdentifier(YangInstanceIdentifier path){
440         return ShardStrategyFactory.getStrategy(path).findShard(path);
441     }
442
443     protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
444         return actorContext.findPrimaryShardAsync(shardName);
445     }
446
447     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
448         String shardName = shardNameFromIdentifier(path);
449         return getOrCreateTxFutureCallback(shardName);
450     }
451
452     private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
453         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
454         if(txFutureCallback == null) {
455             Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
456
457             final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
458
459             txFutureCallback = newTxFutureCallback;
460             txFutureCallbackMap.put(shardName, txFutureCallback);
461
462             findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
463                 @Override
464                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
465                     if(failure != null) {
466                         newTxFutureCallback.createTransactionContext(failure, null);
467                     } else {
468                         newTxFutureCallback.setPrimaryShard(primaryShard);
469                     }
470                 }
471             }, actorContext.getClientDispatcher());
472         }
473
474         return txFutureCallback;
475     }
476
477     public String getTransactionChainId() {
478         return transactionChainId;
479     }
480
481     protected ActorContext getActorContext() {
482         return actorContext;
483     }
484
485     /**
486      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
487      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
488      * retry task after a short delay.
489      * <p>
490      * The end result from a completed CreateTransaction message is a TransactionContext that is
491      * used to perform transaction operations. Transaction operations that occur before the
492      * CreateTransaction completes are cache and executed once the CreateTransaction completes,
493      * successfully or not.
494      */
495     private class TransactionFutureCallback extends OnComplete<Object> {
496
497         /**
498          * The list of transaction operations to execute once the CreateTransaction completes.
499          */
500         @GuardedBy("txOperationsOnComplete")
501         private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
502
503         /**
504          * The TransactionContext resulting from the CreateTransaction reply.
505          */
506         private volatile TransactionContext transactionContext;
507
508         /**
509          * The target primary shard.
510          */
511         private volatile ActorSelection primaryShard;
512
513         private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
514                 getShardLeaderElectionTimeout().duration().toMillis() /
515                 CREATE_TX_TRY_INTERVAL.toMillis());
516
517         private final String shardName;
518
519         TransactionFutureCallback(String shardName) {
520             this.shardName = shardName;
521         }
522
523         String getShardName() {
524             return shardName;
525         }
526
527         TransactionContext getTransactionContext() {
528             return transactionContext;
529         }
530
531
532         /**
533          * Sets the target primary shard and initiates a CreateTransaction try.
534          */
535         void setPrimaryShard(ActorSelection primaryShard) {
536             this.primaryShard = primaryShard;
537
538             if(transactionType == TransactionType.WRITE_ONLY &&
539                     actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
540                 LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
541                     getIdentifier(), primaryShard);
542
543                 // For write-only Tx's we prepare the transaction modifications directly on the shard actor
544                 // to avoid the overhead of creating a separate transaction actor.
545                 // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
546                 executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
547                         this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
548             } else {
549                 tryCreateTransaction();
550             }
551         }
552
553         /**
554          * Adds a TransactionOperation to be executed after the CreateTransaction completes.
555          */
556         void addTxOperationOnComplete(TransactionOperation operation) {
557             boolean invokeOperation = true;
558             synchronized(txOperationsOnComplete) {
559                 if(transactionContext == null) {
560                     LOG.debug("Tx {} Adding operation on complete", getIdentifier());
561
562                     invokeOperation = false;
563                     txOperationsOnComplete.add(operation);
564                 }
565             }
566
567             if(invokeOperation) {
568                 operation.invoke(transactionContext);
569             }
570         }
571
572         void enqueueTransactionOperation(final TransactionOperation op) {
573
574             if (transactionContext != null) {
575                 op.invoke(transactionContext);
576             } else {
577                 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
578                 // callback to be executed after the Tx is created.
579                 addTxOperationOnComplete(op);
580             }
581         }
582
583         /**
584          * Performs a CreateTransaction try async.
585          */
586         private void tryCreateTransaction() {
587             if(LOG.isDebugEnabled()) {
588                 LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
589             }
590
591             Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
592                     TransactionProxy.this.transactionType.ordinal(),
593                     getTransactionChainId()).toSerializable();
594
595             Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
596
597             createTxFuture.onComplete(this, actorContext.getClientDispatcher());
598         }
599
600         @Override
601         public void onComplete(Throwable failure, Object response) {
602             if(failure instanceof NoShardLeaderException) {
603                 // There's no leader for the shard yet - schedule and try again, unless we're out
604                 // of retries. Note: createTxTries is volatile as it may be written by different
605                 // threads however not concurrently, therefore decrementing it non-atomically here
606                 // is ok.
607                 if(--createTxTries > 0) {
608                     LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
609                         getIdentifier(), shardName);
610
611                     actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
612                             new Runnable() {
613                                 @Override
614                                 public void run() {
615                                     tryCreateTransaction();
616                                 }
617                             }, actorContext.getClientDispatcher());
618                     return;
619                 }
620             }
621
622             createTransactionContext(failure, response);
623         }
624
625         private void createTransactionContext(Throwable failure, Object response) {
626             // Mainly checking for state violation here to perform a volatile read of "initialized" to
627             // ensure updates to operationLimter et al are visible to this thread (ie we're doing
628             // "piggy-back" synchronization here).
629             Preconditions.checkState(initialized, "Tx was not propertly initialized.");
630
631             // Create the TransactionContext from the response or failure. Store the new
632             // TransactionContext locally until we've completed invoking the
633             // TransactionOperations. This avoids thread timing issues which could cause
634             // out-of-order TransactionOperations. Eg, on a modification operation, if the
635             // TransactionContext is non-null, then we directly call the TransactionContext.
636             // However, at the same time, the code may be executing the cached
637             // TransactionOperations. So to avoid thus timing, we don't publish the
638             // TransactionContext until after we've executed all cached TransactionOperations.
639             TransactionContext localTransactionContext;
640             if(failure != null) {
641                 LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
642
643                 localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
644             } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
645                 localTransactionContext = createValidTransactionContext(
646                         CreateTransactionReply.fromSerializable(response));
647             } else {
648                 IllegalArgumentException exception = new IllegalArgumentException(String.format(
649                         "Invalid reply type %s for CreateTransaction", response.getClass()));
650
651                 localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter);
652             }
653
654             executeTxOperatonsOnComplete(localTransactionContext);
655         }
656
657         private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
658             while(true) {
659                 // Access to txOperationsOnComplete and transactionContext must be protected and atomic
660                 // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
661                 // issues and ensure no TransactionOperation is missed and that they are processed
662                 // in the order they occurred.
663
664                 // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
665                 // in case a TransactionOperation results in another transaction operation being
666                 // queued (eg a put operation from a client read Future callback that is notified
667                 // synchronously).
668                 Collection<TransactionOperation> operationsBatch = null;
669                 synchronized(txOperationsOnComplete) {
670                     if(txOperationsOnComplete.isEmpty()) {
671                         // We're done invoking the TransactionOperations so we can now publish the
672                         // TransactionContext.
673                         transactionContext = localTransactionContext;
674                         break;
675                     }
676
677                     operationsBatch = new ArrayList<>(txOperationsOnComplete);
678                     txOperationsOnComplete.clear();
679                 }
680
681                 // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
682                 // A slight down-side is that we need to re-acquire the lock below but this should
683                 // be negligible.
684                 for(TransactionOperation oper: operationsBatch) {
685                     oper.invoke(localTransactionContext);
686                 }
687             }
688         }
689
690         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
691             LOG.debug("Tx {} Received {}", getIdentifier(), reply);
692
693             return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
694                     reply.getTransactionPath(), reply.getVersion());
695         }
696
697         private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
698                 String transactionPath, short remoteTransactionVersion) {
699
700             if (transactionType == TransactionType.READ_ONLY) {
701                 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
702                 // to close the remote Tx's when this instance is no longer in use and is garbage
703                 // collected.
704
705                 if(remoteTransactionActorsMB == null) {
706                     remoteTransactionActors = Lists.newArrayList();
707                     remoteTransactionActorsMB = new AtomicBoolean();
708
709                     TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
710                 }
711
712                 // Add the actor to the remoteTransactionActors list for access by the
713                 // cleanup PhantonReference.
714                 remoteTransactionActors.add(transactionActor);
715
716                 // Write to the memory barrier volatile to publish the above update to the
717                 // remoteTransactionActors list for thread visibility.
718                 remoteTransactionActorsMB.set(true);
719             }
720
721             // TxActor is always created where the leader of the shard is.
722             // Check if TxActor is created in the same node
723             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
724
725             if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
726                 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
727                         transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
728                         operationCompleter);
729             } else {
730                 return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
731                         actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
732             }
733         }
734     }
735 }