CDS: introduce AbstractThreePhaseCommitCohort
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.FinalizablePhantomReference;
16 import com.google.common.base.FinalizableReferenceQueue;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.CheckedFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32 import javax.annotation.concurrent.GuardedBy;
33 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
34 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
35 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
36 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
41 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
42 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
43 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
44 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import scala.concurrent.Future;
51 import scala.concurrent.Promise;
52 import scala.concurrent.duration.FiniteDuration;
53
54 /**
55  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
56  * <p>
57  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
58  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
59  * be created on each of those shards by the TransactionProxy
60  *</p>
61  * <p>
62  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
63  * shards will be executed.
64  * </p>
65  */
66 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
67
68     public static enum TransactionType {
69         READ_ONLY,
70         WRITE_ONLY,
71         READ_WRITE;
72
73         // Cache all values
74         private static final TransactionType[] VALUES = values();
75
76         public static TransactionType fromInt(final int type) {
77             try {
78                 return VALUES[type];
79             } catch (IndexOutOfBoundsException e) {
80                 throw new IllegalArgumentException("In TransactionType enum value " + type, e);
81             }
82         }
83     }
84
85     private static enum TransactionState {
86         OPEN,
87         READY,
88         CLOSED,
89     }
90
91     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
92                                                               new Mapper<Throwable, Throwable>() {
93         @Override
94         public Throwable apply(Throwable failure) {
95             return failure;
96         }
97     };
98
99     private static final AtomicLong counter = new AtomicLong();
100
101     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
102
103     /**
104      * Time interval in between transaction create retries.
105      */
106     private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
107             FiniteDuration.create(1, TimeUnit.SECONDS);
108
109     /**
110      * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
111      * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
112      * trickery to clean up its internal thread when the bundle is unloaded.
113      */
114     private static final FinalizableReferenceQueue phantomReferenceQueue =
115                                                                   new FinalizableReferenceQueue();
116
117     /**
118      * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
119      * necessary because PhantomReferences need a hard reference so they're not garbage collected.
120      * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
121      * and thus becomes eligible for garbage collection.
122      */
123     private static final Map<TransactionProxyCleanupPhantomReference,
124                              TransactionProxyCleanupPhantomReference> phantomReferenceCache =
125                                                                         new ConcurrentHashMap<>();
126
127     /**
128      * A PhantomReference that closes remote transactions for a TransactionProxy when it's
129      * garbage collected. This is used for read-only transactions as they're not explicitly closed
130      * by clients. So the only way to detect that a transaction is no longer in use and it's safe
131      * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
132      * but TransactionProxy instances should generally be short-lived enough to avoid being moved
133      * to the old generation space and thus should be cleaned up in a timely manner as the GC
134      * runs on the young generation (eden, swap1...) space much more frequently.
135      */
136     private static class TransactionProxyCleanupPhantomReference
137                                            extends FinalizablePhantomReference<TransactionProxy> {
138
139         private final List<ActorSelection> remoteTransactionActors;
140         private final AtomicBoolean remoteTransactionActorsMB;
141         private final ActorContext actorContext;
142         private final TransactionIdentifier identifier;
143
144         protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
145             super(referent, phantomReferenceQueue);
146
147             // Note we need to cache the relevant fields from the TransactionProxy as we can't
148             // have a hard reference to the TransactionProxy instance itself.
149
150             remoteTransactionActors = referent.remoteTransactionActors;
151             remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
152             actorContext = referent.actorContext;
153             identifier = referent.getIdentifier();
154         }
155
156         @Override
157         public void finalizeReferent() {
158             LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
159                     remoteTransactionActors.size(), identifier);
160
161             phantomReferenceCache.remove(this);
162
163             // Access the memory barrier volatile to ensure all previous updates to the
164             // remoteTransactionActors list are visible to this thread.
165
166             if(remoteTransactionActorsMB.get()) {
167                 for(ActorSelection actor : remoteTransactionActors) {
168                     LOG.trace("Sending CloseTransaction to {}", actor);
169                     actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
170                 }
171             }
172         }
173     }
174
175     /**
176      * Stores the remote Tx actors for each requested data store path to be used by the
177      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
178      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
179      * remoteTransactionActors list so they will be visible to the thread accessing the
180      * PhantomReference.
181      */
182     private List<ActorSelection> remoteTransactionActors;
183     private volatile AtomicBoolean remoteTransactionActorsMB;
184
185     /**
186      * Stores the create transaction results per shard.
187      */
188     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
189
190     private final TransactionType transactionType;
191     private final ActorContext actorContext;
192     private final String transactionChainId;
193     private final SchemaContext schemaContext;
194     private TransactionState state = TransactionState.OPEN;
195
196     private volatile boolean initialized;
197     private Semaphore operationLimiter;
198     private OperationCompleter operationCompleter;
199
200     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
201         this(actorContext, transactionType, "");
202     }
203
204     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
205         super(createIdentifier(actorContext));
206         this.actorContext = Preconditions.checkNotNull(actorContext,
207             "actorContext should not be null");
208         this.transactionType = Preconditions.checkNotNull(transactionType,
209             "transactionType should not be null");
210         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
211             "schemaContext should not be null");
212         this.transactionChainId = transactionChainId;
213
214         LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
215     }
216
217     private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
218         String memberName = actorContext.getCurrentMemberName();
219         if (memberName == null) {
220             memberName = "UNKNOWN-MEMBER";
221         }
222
223         return new TransactionIdentifier(memberName, counter.getAndIncrement());
224     }
225
226     @VisibleForTesting
227     List<Future<Object>> getRecordedOperationFutures() {
228         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
229         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
230             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
231             if (transactionContext != null) {
232                 transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
233             }
234         }
235
236         return recordedOperationFutures;
237     }
238
239     @VisibleForTesting
240     boolean hasTransactionContext() {
241         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
242             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
243             if(transactionContext != null) {
244                 return true;
245             }
246         }
247
248         return false;
249     }
250
251     @Override
252     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
253
254         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
255                 "Read operation on write-only transaction is not allowed");
256
257         LOG.debug("Tx {} read {}", getIdentifier(), path);
258
259         throttleOperation();
260
261         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
262
263         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
264         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
265             @Override
266             public void invoke(TransactionContext transactionContext) {
267                 transactionContext.readData(path, proxyFuture);
268             }
269         });
270
271         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
272     }
273
274     @Override
275     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
276
277         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
278                 "Exists operation on write-only transaction is not allowed");
279
280         LOG.debug("Tx {} exists {}", getIdentifier(), path);
281
282         throttleOperation();
283
284         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
285
286         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
287         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
288             @Override
289             public void invoke(TransactionContext transactionContext) {
290                 transactionContext.dataExists(path, proxyFuture);
291             }
292         });
293
294         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
295     }
296
297     private void checkModificationState() {
298         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
299                 "Modification operation on read-only transaction is not allowed");
300         Preconditions.checkState(state == TransactionState.OPEN,
301                 "Transaction is sealed - further modifications are not allowed");
302     }
303
304     private void throttleOperation() {
305         throttleOperation(1);
306     }
307
308     private void throttleOperation(int acquirePermits) {
309         if(!initialized) {
310             // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
311             operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
312             operationCompleter = new OperationCompleter(operationLimiter);
313
314             // Make sure we write this last because it's volatile and will also publish the non-volatile writes
315             // above as well so they'll be visible to other threads.
316             initialized = true;
317         }
318
319         try {
320             if(!operationLimiter.tryAcquire(acquirePermits,
321                     actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
322                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
323             }
324         } catch (InterruptedException e) {
325             if(LOG.isDebugEnabled()) {
326                 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
327             } else {
328                 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
329             }
330         }
331     }
332
333     @Override
334     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
335
336         checkModificationState();
337
338         LOG.debug("Tx {} write {}", getIdentifier(), path);
339
340         throttleOperation();
341
342         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
343         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
344             @Override
345             public void invoke(TransactionContext transactionContext) {
346                 transactionContext.writeData(path, data);
347             }
348         });
349     }
350
351     @Override
352     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
353
354         checkModificationState();
355
356         LOG.debug("Tx {} merge {}", getIdentifier(), path);
357
358         throttleOperation();
359
360         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
361         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
362             @Override
363             public void invoke(TransactionContext transactionContext) {
364                 transactionContext.mergeData(path, data);
365             }
366         });
367     }
368
369     @Override
370     public void delete(final YangInstanceIdentifier path) {
371
372         checkModificationState();
373
374         LOG.debug("Tx {} delete {}", getIdentifier(), path);
375
376         throttleOperation();
377
378         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
379         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
380             @Override
381             public void invoke(TransactionContext transactionContext) {
382                 transactionContext.deleteData(path);
383             }
384         });
385     }
386
387     private boolean seal(final TransactionState newState) {
388         if (state == TransactionState.OPEN) {
389             state = newState;
390             return true;
391         } else {
392             return false;
393         }
394     }
395
396     @Override
397     public AbstractThreePhaseCommitCohort ready() {
398         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
399                 "Read-only transactions cannot be readied");
400
401         final boolean success = seal(TransactionState.READY);
402         Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
403
404         LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
405                     txFutureCallbackMap.size());
406
407         if (txFutureCallbackMap.isEmpty()) {
408             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
409             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
410         }
411
412         throttleOperation(txFutureCallbackMap.size());
413
414         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
415         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
416
417             LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
418                         txFutureCallback.getShardName(), transactionChainId);
419
420             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
421             final Future<ActorSelection> future;
422             if (transactionContext != null) {
423                 // avoid the creation of a promise and a TransactionOperation
424                 future = transactionContext.readyTransaction();
425             } else {
426                 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
427                 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
428                     @Override
429                     public void invoke(TransactionContext transactionContext) {
430                         promise.completeWith(transactionContext.readyTransaction());
431                     }
432                 });
433                 future = promise.future();
434             }
435
436             cohortFutures.add(future);
437         }
438
439         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
440             getIdentifier().toString());
441     }
442
443     @Override
444     public void close() {
445         if (!seal(TransactionState.CLOSED)) {
446             if (state == TransactionState.CLOSED) {
447                 // Idempotent no-op as per AutoCloseable recommendation
448                 return;
449             }
450
451             throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
452                 getIdentifier()));
453         }
454
455         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
456             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
457                 @Override
458                 public void invoke(TransactionContext transactionContext) {
459                     transactionContext.closeTransaction();
460                 }
461             });
462         }
463
464         txFutureCallbackMap.clear();
465
466         if(remoteTransactionActorsMB != null) {
467             remoteTransactionActors.clear();
468             remoteTransactionActorsMB.set(true);
469         }
470     }
471
472     private String shardNameFromIdentifier(YangInstanceIdentifier path){
473         return ShardStrategyFactory.getStrategy(path).findShard(path);
474     }
475
476     protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
477         return actorContext.findPrimaryShardAsync(shardName);
478     }
479
480     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
481         String shardName = shardNameFromIdentifier(path);
482         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
483         if(txFutureCallback == null) {
484             Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
485
486             final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
487
488             txFutureCallback = newTxFutureCallback;
489             txFutureCallbackMap.put(shardName, txFutureCallback);
490
491             findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
492                 @Override
493                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
494                     if(failure != null) {
495                         newTxFutureCallback.createTransactionContext(failure, null);
496                     } else {
497                         newTxFutureCallback.setPrimaryShard(primaryShard);
498                     }
499                 }
500             }, actorContext.getClientDispatcher());
501         }
502
503         return txFutureCallback;
504     }
505
506     public String getTransactionChainId() {
507         return transactionChainId;
508     }
509
510     protected ActorContext getActorContext() {
511         return actorContext;
512     }
513
514     /**
515      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
516      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
517      * retry task after a short delay.
518      * <p>
519      * The end result from a completed CreateTransaction message is a TransactionContext that is
520      * used to perform transaction operations. Transaction operations that occur before the
521      * CreateTransaction completes are cache and executed once the CreateTransaction completes,
522      * successfully or not.
523      */
524     private class TransactionFutureCallback extends OnComplete<Object> {
525
526         /**
527          * The list of transaction operations to execute once the CreateTransaction completes.
528          */
529         @GuardedBy("txOperationsOnComplete")
530         private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
531
532         /**
533          * The TransactionContext resulting from the CreateTransaction reply.
534          */
535         private volatile TransactionContext transactionContext;
536
537         /**
538          * The target primary shard.
539          */
540         private volatile ActorSelection primaryShard;
541
542         private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
543                 getShardLeaderElectionTimeout().duration().toMillis() /
544                 CREATE_TX_TRY_INTERVAL.toMillis());
545
546         private final String shardName;
547
548         TransactionFutureCallback(String shardName) {
549             this.shardName = shardName;
550         }
551
552         String getShardName() {
553             return shardName;
554         }
555
556         TransactionContext getTransactionContext() {
557             return transactionContext;
558         }
559
560
561         /**
562          * Sets the target primary shard and initiates a CreateTransaction try.
563          */
564         void setPrimaryShard(ActorSelection primaryShard) {
565             this.primaryShard = primaryShard;
566
567             if(transactionType == TransactionType.WRITE_ONLY &&
568                     actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
569                 LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
570                     getIdentifier(), primaryShard);
571
572                 // For write-only Tx's we prepare the transaction modifications directly on the shard actor
573                 // to avoid the overhead of creating a separate transaction actor.
574                 // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
575                 executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
576                         this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
577             } else {
578                 tryCreateTransaction();
579             }
580         }
581
582         /**
583          * Adds a TransactionOperation to be executed after the CreateTransaction completes.
584          */
585         void addTxOperationOnComplete(TransactionOperation operation) {
586             boolean invokeOperation = true;
587             synchronized(txOperationsOnComplete) {
588                 if(transactionContext == null) {
589                     LOG.debug("Tx {} Adding operation on complete", getIdentifier());
590
591                     invokeOperation = false;
592                     txOperationsOnComplete.add(operation);
593                 }
594             }
595
596             if(invokeOperation) {
597                 operation.invoke(transactionContext);
598             }
599         }
600
601         void enqueueTransactionOperation(final TransactionOperation op) {
602
603             if (transactionContext != null) {
604                 op.invoke(transactionContext);
605             } else {
606                 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
607                 // callback to be executed after the Tx is created.
608                 addTxOperationOnComplete(op);
609             }
610         }
611
612         /**
613          * Performs a CreateTransaction try async.
614          */
615         private void tryCreateTransaction() {
616             if(LOG.isDebugEnabled()) {
617                 LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
618             }
619
620             Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
621                     TransactionProxy.this.transactionType.ordinal(),
622                     getTransactionChainId()).toSerializable();
623
624             Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
625
626             createTxFuture.onComplete(this, actorContext.getClientDispatcher());
627         }
628
629         @Override
630         public void onComplete(Throwable failure, Object response) {
631             if(failure instanceof NoShardLeaderException) {
632                 // There's no leader for the shard yet - schedule and try again, unless we're out
633                 // of retries. Note: createTxTries is volatile as it may be written by different
634                 // threads however not concurrently, therefore decrementing it non-atomically here
635                 // is ok.
636                 if(--createTxTries > 0) {
637                     LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
638                         getIdentifier(), shardName);
639
640                     actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
641                             new Runnable() {
642                                 @Override
643                                 public void run() {
644                                     tryCreateTransaction();
645                                 }
646                             }, actorContext.getClientDispatcher());
647                     return;
648                 }
649             }
650
651             createTransactionContext(failure, response);
652         }
653
654         private void createTransactionContext(Throwable failure, Object response) {
655             // Mainly checking for state violation here to perform a volatile read of "initialized" to
656             // ensure updates to operationLimter et al are visible to this thread (ie we're doing
657             // "piggy-back" synchronization here).
658             Preconditions.checkState(initialized, "Tx was not propertly initialized.");
659
660             // Create the TransactionContext from the response or failure. Store the new
661             // TransactionContext locally until we've completed invoking the
662             // TransactionOperations. This avoids thread timing issues which could cause
663             // out-of-order TransactionOperations. Eg, on a modification operation, if the
664             // TransactionContext is non-null, then we directly call the TransactionContext.
665             // However, at the same time, the code may be executing the cached
666             // TransactionOperations. So to avoid thus timing, we don't publish the
667             // TransactionContext until after we've executed all cached TransactionOperations.
668             TransactionContext localTransactionContext;
669             if(failure != null) {
670                 LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
671
672                 localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
673             } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
674                 localTransactionContext = createValidTransactionContext(
675                         CreateTransactionReply.fromSerializable(response));
676             } else {
677                 IllegalArgumentException exception = new IllegalArgumentException(String.format(
678                         "Invalid reply type %s for CreateTransaction", response.getClass()));
679
680                 localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter);
681             }
682
683             executeTxOperatonsOnComplete(localTransactionContext);
684         }
685
686         private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
687             while(true) {
688                 // Access to txOperationsOnComplete and transactionContext must be protected and atomic
689                 // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
690                 // issues and ensure no TransactionOperation is missed and that they are processed
691                 // in the order they occurred.
692
693                 // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
694                 // in case a TransactionOperation results in another transaction operation being
695                 // queued (eg a put operation from a client read Future callback that is notified
696                 // synchronously).
697                 Collection<TransactionOperation> operationsBatch = null;
698                 synchronized(txOperationsOnComplete) {
699                     if(txOperationsOnComplete.isEmpty()) {
700                         // We're done invoking the TransactionOperations so we can now publish the
701                         // TransactionContext.
702                         transactionContext = localTransactionContext;
703                         break;
704                     }
705
706                     operationsBatch = new ArrayList<>(txOperationsOnComplete);
707                     txOperationsOnComplete.clear();
708                 }
709
710                 // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
711                 // A slight down-side is that we need to re-acquire the lock below but this should
712                 // be negligible.
713                 for(TransactionOperation oper: operationsBatch) {
714                     oper.invoke(localTransactionContext);
715                 }
716             }
717         }
718
719         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
720             LOG.debug("Tx {} Received {}", getIdentifier(), reply);
721
722             return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
723                     reply.getTransactionPath(), reply.getVersion());
724         }
725
726         private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
727                 String transactionPath, short remoteTransactionVersion) {
728
729             if (transactionType == TransactionType.READ_ONLY) {
730                 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
731                 // to close the remote Tx's when this instance is no longer in use and is garbage
732                 // collected.
733
734                 if(remoteTransactionActorsMB == null) {
735                     remoteTransactionActors = Lists.newArrayList();
736                     remoteTransactionActorsMB = new AtomicBoolean();
737
738                     TransactionProxyCleanupPhantomReference cleanup =
739                             new TransactionProxyCleanupPhantomReference(TransactionProxy.this);
740                     phantomReferenceCache.put(cleanup, cleanup);
741                 }
742
743                 // Add the actor to the remoteTransactionActors list for access by the
744                 // cleanup PhantonReference.
745                 remoteTransactionActors.add(transactionActor);
746
747                 // Write to the memory barrier volatile to publish the above update to the
748                 // remoteTransactionActors list for thread visibility.
749                 remoteTransactionActorsMB.set(true);
750             }
751
752             // TxActor is always created where the leader of the shard is.
753             // Check if TxActor is created in the same node
754             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
755
756             if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
757                 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
758                         transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
759                         operationCompleter);
760             } else if (transactionType == TransactionType.WRITE_ONLY &&
761                     actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
762                 return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
763                     actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
764             } else {
765                 return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
766                         actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
767             }
768         }
769     }
770 }