Merge "BUG-1848 : OrderedNormalizedNodeWriter implementation"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Semaphore;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicLong;
33 import javax.annotation.concurrent.GuardedBy;
34 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
36 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
37 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
41 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
42 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
43 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
44 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
45 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.Future;
52 import scala.concurrent.Promise;
53 import scala.concurrent.duration.FiniteDuration;
54
55 /**
56  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
57  * <p>
58  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
59  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
60  * be created on each of those shards by the TransactionProxy
61  *</p>
62  * <p>
63  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
64  * shards will be executed.
65  * </p>
66  */
67 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
68
69     public static enum TransactionType {
70         READ_ONLY,
71         WRITE_ONLY,
72         READ_WRITE;
73
74         // Cache all values
75         private static final TransactionType[] VALUES = values();
76
77         public static TransactionType fromInt(final int type) {
78             try {
79                 return VALUES[type];
80             } catch (IndexOutOfBoundsException e) {
81                 throw new IllegalArgumentException("In TransactionType enum value " + type, e);
82             }
83         }
84     }
85
86     private static enum TransactionState {
87         OPEN,
88         READY,
89         CLOSED,
90     }
91
92     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
93                                                               new Mapper<Throwable, Throwable>() {
94         @Override
95         public Throwable apply(Throwable failure) {
96             return failure;
97         }
98     };
99
100     private static final AtomicLong counter = new AtomicLong();
101
102     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
103
104     /**
105      * Time interval in between transaction create retries.
106      */
107     private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
108             FiniteDuration.create(1, TimeUnit.SECONDS);
109
110     /**
111      * Stores the remote Tx actors for each requested data store path to be used by the
112      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
113      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
114      * remoteTransactionActors list so they will be visible to the thread accessing the
115      * PhantomReference.
116      */
117     List<ActorSelection> remoteTransactionActors;
118     volatile AtomicBoolean remoteTransactionActorsMB;
119
120     /**
121      * Stores the create transaction results per shard.
122      */
123     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
124
125     private final TransactionType transactionType;
126     final ActorContext actorContext;
127     private final String transactionChainId;
128     private final SchemaContext schemaContext;
129     private TransactionState state = TransactionState.OPEN;
130
131     private volatile boolean initialized;
132     private Semaphore operationLimiter;
133     private OperationCompleter operationCompleter;
134
135     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
136         this(actorContext, transactionType, "");
137     }
138
139     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
140         super(createIdentifier(actorContext));
141         this.actorContext = Preconditions.checkNotNull(actorContext,
142             "actorContext should not be null");
143         this.transactionType = Preconditions.checkNotNull(transactionType,
144             "transactionType should not be null");
145         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
146             "schemaContext should not be null");
147         this.transactionChainId = transactionChainId;
148
149         LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
150     }
151
152     private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
153         String memberName = actorContext.getCurrentMemberName();
154         if (memberName == null) {
155             memberName = "UNKNOWN-MEMBER";
156         }
157
158         return new TransactionIdentifier(memberName, counter.getAndIncrement());
159     }
160
161     @VisibleForTesting
162     List<Future<Object>> getRecordedOperationFutures() {
163         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
164         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
165             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
166             if (transactionContext != null) {
167                 transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
168             }
169         }
170
171         return recordedOperationFutures;
172     }
173
174     @VisibleForTesting
175     boolean hasTransactionContext() {
176         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
177             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
178             if(transactionContext != null) {
179                 return true;
180             }
181         }
182
183         return false;
184     }
185
186     private boolean isRootPath(YangInstanceIdentifier path){
187         return !path.getPathArguments().iterator().hasNext();
188     }
189
190     @Override
191     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
192
193         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
194                 "Read operation on write-only transaction is not allowed");
195
196         LOG.debug("Tx {} read {}", getIdentifier(), path);
197
198         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
199
200         if(isRootPath(path)){
201             readAllData(path, proxyFuture);
202         } else {
203             throttleOperation();
204
205             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
206             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
207                 @Override
208                 public void invoke(TransactionContext transactionContext) {
209                     transactionContext.readData(path, proxyFuture);
210                 }
211             });
212
213         }
214
215         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
216     }
217
218     private void readAllData(final YangInstanceIdentifier path,
219                              final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
220         Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
221         List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
222
223         for(String shardName : allShardNames){
224             final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
225
226             throttleOperation();
227
228             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
229             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
230                 @Override
231                 public void invoke(TransactionContext transactionContext) {
232                     transactionContext.readData(path, subProxyFuture);
233                 }
234             });
235
236             futures.add(subProxyFuture);
237         }
238
239         final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
240
241         future.addListener(new Runnable() {
242             @Override
243             public void run() {
244                 try {
245                     proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
246                             future.get(), actorContext.getSchemaContext()));
247                 } catch (InterruptedException | ExecutionException e) {
248                     proxyFuture.setException(e);
249                 }
250             }
251         }, actorContext.getActorSystem().dispatcher());
252     }
253
254     @Override
255     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
256
257         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
258                 "Exists operation on write-only transaction is not allowed");
259
260         LOG.debug("Tx {} exists {}", getIdentifier(), path);
261
262         throttleOperation();
263
264         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
265
266         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
267         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
268             @Override
269             public void invoke(TransactionContext transactionContext) {
270                 transactionContext.dataExists(path, proxyFuture);
271             }
272         });
273
274         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
275     }
276
277     private void checkModificationState() {
278         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
279                 "Modification operation on read-only transaction is not allowed");
280         Preconditions.checkState(state == TransactionState.OPEN,
281                 "Transaction is sealed - further modifications are not allowed");
282     }
283
284     private void throttleOperation() {
285         throttleOperation(1);
286     }
287
288     private void throttleOperation(int acquirePermits) {
289         if(!initialized) {
290             // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
291             operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
292             operationCompleter = new OperationCompleter(operationLimiter);
293
294             // Make sure we write this last because it's volatile and will also publish the non-volatile writes
295             // above as well so they'll be visible to other threads.
296             initialized = true;
297         }
298
299         try {
300             if(!operationLimiter.tryAcquire(acquirePermits,
301                     actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
302                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
303             }
304         } catch (InterruptedException e) {
305             if(LOG.isDebugEnabled()) {
306                 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
307             } else {
308                 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
309             }
310         }
311     }
312
313     @Override
314     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
315
316         checkModificationState();
317
318         LOG.debug("Tx {} write {}", getIdentifier(), path);
319
320         throttleOperation();
321
322         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
323         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
324             @Override
325             public void invoke(TransactionContext transactionContext) {
326                 transactionContext.writeData(path, data);
327             }
328         });
329     }
330
331     @Override
332     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
333
334         checkModificationState();
335
336         LOG.debug("Tx {} merge {}", getIdentifier(), path);
337
338         throttleOperation();
339
340         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
341         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
342             @Override
343             public void invoke(TransactionContext transactionContext) {
344                 transactionContext.mergeData(path, data);
345             }
346         });
347     }
348
349     @Override
350     public void delete(final YangInstanceIdentifier path) {
351
352         checkModificationState();
353
354         LOG.debug("Tx {} delete {}", getIdentifier(), path);
355
356         throttleOperation();
357
358         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
359         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
360             @Override
361             public void invoke(TransactionContext transactionContext) {
362                 transactionContext.deleteData(path);
363             }
364         });
365     }
366
367     private boolean seal(final TransactionState newState) {
368         if (state == TransactionState.OPEN) {
369             state = newState;
370             return true;
371         } else {
372             return false;
373         }
374     }
375
376     @Override
377     public AbstractThreePhaseCommitCohort ready() {
378         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
379                 "Read-only transactions cannot be readied");
380
381         final boolean success = seal(TransactionState.READY);
382         Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
383
384         LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
385                     txFutureCallbackMap.size());
386
387         if (txFutureCallbackMap.isEmpty()) {
388             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
389             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
390         }
391
392         throttleOperation(txFutureCallbackMap.size());
393
394         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
395         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
396
397             LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
398                         txFutureCallback.getShardName(), transactionChainId);
399
400             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
401             final Future<ActorSelection> future;
402             if (transactionContext != null) {
403                 // avoid the creation of a promise and a TransactionOperation
404                 future = transactionContext.readyTransaction();
405             } else {
406                 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
407                 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
408                     @Override
409                     public void invoke(TransactionContext transactionContext) {
410                         promise.completeWith(transactionContext.readyTransaction());
411                     }
412                 });
413                 future = promise.future();
414             }
415
416             cohortFutures.add(future);
417         }
418
419         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
420             getIdentifier().toString());
421     }
422
423     @Override
424     public void close() {
425         if (!seal(TransactionState.CLOSED)) {
426             if (state == TransactionState.CLOSED) {
427                 // Idempotent no-op as per AutoCloseable recommendation
428                 return;
429             }
430
431             throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
432                 getIdentifier()));
433         }
434
435         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
436             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
437                 @Override
438                 public void invoke(TransactionContext transactionContext) {
439                     transactionContext.closeTransaction();
440                 }
441             });
442         }
443
444         txFutureCallbackMap.clear();
445
446         if(remoteTransactionActorsMB != null) {
447             remoteTransactionActors.clear();
448             remoteTransactionActorsMB.set(true);
449         }
450     }
451
452     private String shardNameFromIdentifier(YangInstanceIdentifier path){
453         return ShardStrategyFactory.getStrategy(path).findShard(path);
454     }
455
456     protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
457         return actorContext.findPrimaryShardAsync(shardName);
458     }
459
460     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
461         String shardName = shardNameFromIdentifier(path);
462         return getOrCreateTxFutureCallback(shardName);
463     }
464
465     private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
466         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
467         if(txFutureCallback == null) {
468             Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
469
470             final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
471
472             txFutureCallback = newTxFutureCallback;
473             txFutureCallbackMap.put(shardName, txFutureCallback);
474
475             findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
476                 @Override
477                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
478                     if(failure != null) {
479                         newTxFutureCallback.createTransactionContext(failure, null);
480                     } else {
481                         newTxFutureCallback.setPrimaryShard(primaryShard);
482                     }
483                 }
484             }, actorContext.getClientDispatcher());
485         }
486
487         return txFutureCallback;
488     }
489
490     public String getTransactionChainId() {
491         return transactionChainId;
492     }
493
494     protected ActorContext getActorContext() {
495         return actorContext;
496     }
497
498     /**
499      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
500      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
501      * retry task after a short delay.
502      * <p>
503      * The end result from a completed CreateTransaction message is a TransactionContext that is
504      * used to perform transaction operations. Transaction operations that occur before the
505      * CreateTransaction completes are cache and executed once the CreateTransaction completes,
506      * successfully or not.
507      */
508     private class TransactionFutureCallback extends OnComplete<Object> {
509
510         /**
511          * The list of transaction operations to execute once the CreateTransaction completes.
512          */
513         @GuardedBy("txOperationsOnComplete")
514         private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
515
516         /**
517          * The TransactionContext resulting from the CreateTransaction reply.
518          */
519         private volatile TransactionContext transactionContext;
520
521         /**
522          * The target primary shard.
523          */
524         private volatile ActorSelection primaryShard;
525
526         private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
527                 getShardLeaderElectionTimeout().duration().toMillis() /
528                 CREATE_TX_TRY_INTERVAL.toMillis());
529
530         private final String shardName;
531
532         TransactionFutureCallback(String shardName) {
533             this.shardName = shardName;
534         }
535
536         String getShardName() {
537             return shardName;
538         }
539
540         TransactionContext getTransactionContext() {
541             return transactionContext;
542         }
543
544
545         /**
546          * Sets the target primary shard and initiates a CreateTransaction try.
547          */
548         void setPrimaryShard(ActorSelection primaryShard) {
549             this.primaryShard = primaryShard;
550
551             if(transactionType == TransactionType.WRITE_ONLY &&
552                     actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
553                 LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
554                     getIdentifier(), primaryShard);
555
556                 // For write-only Tx's we prepare the transaction modifications directly on the shard actor
557                 // to avoid the overhead of creating a separate transaction actor.
558                 // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
559                 executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
560                         this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
561             } else {
562                 tryCreateTransaction();
563             }
564         }
565
566         /**
567          * Adds a TransactionOperation to be executed after the CreateTransaction completes.
568          */
569         void addTxOperationOnComplete(TransactionOperation operation) {
570             boolean invokeOperation = true;
571             synchronized(txOperationsOnComplete) {
572                 if(transactionContext == null) {
573                     LOG.debug("Tx {} Adding operation on complete", getIdentifier());
574
575                     invokeOperation = false;
576                     txOperationsOnComplete.add(operation);
577                 }
578             }
579
580             if(invokeOperation) {
581                 operation.invoke(transactionContext);
582             }
583         }
584
585         void enqueueTransactionOperation(final TransactionOperation op) {
586
587             if (transactionContext != null) {
588                 op.invoke(transactionContext);
589             } else {
590                 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
591                 // callback to be executed after the Tx is created.
592                 addTxOperationOnComplete(op);
593             }
594         }
595
596         /**
597          * Performs a CreateTransaction try async.
598          */
599         private void tryCreateTransaction() {
600             if(LOG.isDebugEnabled()) {
601                 LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
602             }
603
604             Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
605                     TransactionProxy.this.transactionType.ordinal(),
606                     getTransactionChainId()).toSerializable();
607
608             Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
609
610             createTxFuture.onComplete(this, actorContext.getClientDispatcher());
611         }
612
613         @Override
614         public void onComplete(Throwable failure, Object response) {
615             if(failure instanceof NoShardLeaderException) {
616                 // There's no leader for the shard yet - schedule and try again, unless we're out
617                 // of retries. Note: createTxTries is volatile as it may be written by different
618                 // threads however not concurrently, therefore decrementing it non-atomically here
619                 // is ok.
620                 if(--createTxTries > 0) {
621                     LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
622                         getIdentifier(), shardName);
623
624                     actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
625                             new Runnable() {
626                                 @Override
627                                 public void run() {
628                                     tryCreateTransaction();
629                                 }
630                             }, actorContext.getClientDispatcher());
631                     return;
632                 }
633             }
634
635             createTransactionContext(failure, response);
636         }
637
638         private void createTransactionContext(Throwable failure, Object response) {
639             // Mainly checking for state violation here to perform a volatile read of "initialized" to
640             // ensure updates to operationLimter et al are visible to this thread (ie we're doing
641             // "piggy-back" synchronization here).
642             Preconditions.checkState(initialized, "Tx was not propertly initialized.");
643
644             // Create the TransactionContext from the response or failure. Store the new
645             // TransactionContext locally until we've completed invoking the
646             // TransactionOperations. This avoids thread timing issues which could cause
647             // out-of-order TransactionOperations. Eg, on a modification operation, if the
648             // TransactionContext is non-null, then we directly call the TransactionContext.
649             // However, at the same time, the code may be executing the cached
650             // TransactionOperations. So to avoid thus timing, we don't publish the
651             // TransactionContext until after we've executed all cached TransactionOperations.
652             TransactionContext localTransactionContext;
653             if(failure != null) {
654                 LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
655
656                 localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
657             } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
658                 localTransactionContext = createValidTransactionContext(
659                         CreateTransactionReply.fromSerializable(response));
660             } else {
661                 IllegalArgumentException exception = new IllegalArgumentException(String.format(
662                         "Invalid reply type %s for CreateTransaction", response.getClass()));
663
664                 localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter);
665             }
666
667             executeTxOperatonsOnComplete(localTransactionContext);
668         }
669
670         private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
671             while(true) {
672                 // Access to txOperationsOnComplete and transactionContext must be protected and atomic
673                 // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
674                 // issues and ensure no TransactionOperation is missed and that they are processed
675                 // in the order they occurred.
676
677                 // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
678                 // in case a TransactionOperation results in another transaction operation being
679                 // queued (eg a put operation from a client read Future callback that is notified
680                 // synchronously).
681                 Collection<TransactionOperation> operationsBatch = null;
682                 synchronized(txOperationsOnComplete) {
683                     if(txOperationsOnComplete.isEmpty()) {
684                         // We're done invoking the TransactionOperations so we can now publish the
685                         // TransactionContext.
686                         transactionContext = localTransactionContext;
687                         break;
688                     }
689
690                     operationsBatch = new ArrayList<>(txOperationsOnComplete);
691                     txOperationsOnComplete.clear();
692                 }
693
694                 // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
695                 // A slight down-side is that we need to re-acquire the lock below but this should
696                 // be negligible.
697                 for(TransactionOperation oper: operationsBatch) {
698                     oper.invoke(localTransactionContext);
699                 }
700             }
701         }
702
703         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
704             LOG.debug("Tx {} Received {}", getIdentifier(), reply);
705
706             return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
707                     reply.getTransactionPath(), reply.getVersion());
708         }
709
710         private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
711                 String transactionPath, short remoteTransactionVersion) {
712
713             if (transactionType == TransactionType.READ_ONLY) {
714                 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
715                 // to close the remote Tx's when this instance is no longer in use and is garbage
716                 // collected.
717
718                 if(remoteTransactionActorsMB == null) {
719                     remoteTransactionActors = Lists.newArrayList();
720                     remoteTransactionActorsMB = new AtomicBoolean();
721
722                     TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
723                 }
724
725                 // Add the actor to the remoteTransactionActors list for access by the
726                 // cleanup PhantonReference.
727                 remoteTransactionActors.add(transactionActor);
728
729                 // Write to the memory barrier volatile to publish the above update to the
730                 // remoteTransactionActors list for thread visibility.
731                 remoteTransactionActorsMB.set(true);
732             }
733
734             // TxActor is always created where the leader of the shard is.
735             // Check if TxActor is created in the same node
736             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
737
738             if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
739                 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
740                         transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
741                         operationCompleter);
742             } else if (transactionType == TransactionType.WRITE_ONLY &&
743                     actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
744                 return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
745                     actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
746             } else {
747                 return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
748                         actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
749             }
750         }
751     }
752 }