a2a7a12044a74e23bc181f65e450e14548059bec
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.FinalizablePhantomReference;
16 import com.google.common.base.FinalizableReferenceQueue;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.CheckedFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.Semaphore;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicLong;
33 import javax.annotation.concurrent.GuardedBy;
34 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
36 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
37 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
41 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
42 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
43 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
44 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
45 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
46 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
49 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import scala.concurrent.Future;
53 import scala.concurrent.Promise;
54 import scala.concurrent.duration.FiniteDuration;
55
56 /**
57  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
58  * <p>
59  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
60  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
61  * be created on each of those shards by the TransactionProxy
62  *</p>
63  * <p>
64  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
65  * shards will be executed.
66  * </p>
67  */
68 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
69
70     public static enum TransactionType {
71         READ_ONLY,
72         WRITE_ONLY,
73         READ_WRITE;
74
75         // Cache all values
76         private static final TransactionType[] VALUES = values();
77
78         public static TransactionType fromInt(final int type) {
79             try {
80                 return VALUES[type];
81             } catch (IndexOutOfBoundsException e) {
82                 throw new IllegalArgumentException("In TransactionType enum value " + type, e);
83             }
84         }
85     }
86
87     private static enum TransactionState {
88         OPEN,
89         READY,
90         CLOSED,
91     }
92
93     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
94                                                               new Mapper<Throwable, Throwable>() {
95         @Override
96         public Throwable apply(Throwable failure) {
97             return failure;
98         }
99     };
100
101     private static final AtomicLong counter = new AtomicLong();
102
103     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
104
105     /**
106      * Time interval in between transaction create retries.
107      */
108     private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
109             FiniteDuration.create(1, TimeUnit.SECONDS);
110
111     /**
112      * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
113      * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
114      * trickery to clean up its internal thread when the bundle is unloaded.
115      */
116     private static final FinalizableReferenceQueue phantomReferenceQueue =
117                                                                   new FinalizableReferenceQueue();
118
119     /**
120      * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
121      * necessary because PhantomReferences need a hard reference so they're not garbage collected.
122      * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
123      * and thus becomes eligible for garbage collection.
124      */
125     private static final Map<TransactionProxyCleanupPhantomReference,
126                              TransactionProxyCleanupPhantomReference> phantomReferenceCache =
127                                                                         new ConcurrentHashMap<>();
128
129     /**
130      * A PhantomReference that closes remote transactions for a TransactionProxy when it's
131      * garbage collected. This is used for read-only transactions as they're not explicitly closed
132      * by clients. So the only way to detect that a transaction is no longer in use and it's safe
133      * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
134      * but TransactionProxy instances should generally be short-lived enough to avoid being moved
135      * to the old generation space and thus should be cleaned up in a timely manner as the GC
136      * runs on the young generation (eden, swap1...) space much more frequently.
137      */
138     private static class TransactionProxyCleanupPhantomReference
139                                            extends FinalizablePhantomReference<TransactionProxy> {
140
141         private final List<ActorSelection> remoteTransactionActors;
142         private final AtomicBoolean remoteTransactionActorsMB;
143         private final ActorContext actorContext;
144         private final TransactionIdentifier identifier;
145
146         protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
147             super(referent, phantomReferenceQueue);
148
149             // Note we need to cache the relevant fields from the TransactionProxy as we can't
150             // have a hard reference to the TransactionProxy instance itself.
151
152             remoteTransactionActors = referent.remoteTransactionActors;
153             remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
154             actorContext = referent.actorContext;
155             identifier = referent.getIdentifier();
156         }
157
158         @Override
159         public void finalizeReferent() {
160             LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
161                     remoteTransactionActors.size(), identifier);
162
163             phantomReferenceCache.remove(this);
164
165             // Access the memory barrier volatile to ensure all previous updates to the
166             // remoteTransactionActors list are visible to this thread.
167
168             if(remoteTransactionActorsMB.get()) {
169                 for(ActorSelection actor : remoteTransactionActors) {
170                     LOG.trace("Sending CloseTransaction to {}", actor);
171                     actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
172                 }
173             }
174         }
175     }
176
177     /**
178      * Stores the remote Tx actors for each requested data store path to be used by the
179      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
180      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
181      * remoteTransactionActors list so they will be visible to the thread accessing the
182      * PhantomReference.
183      */
184     private List<ActorSelection> remoteTransactionActors;
185     private volatile AtomicBoolean remoteTransactionActorsMB;
186
187     /**
188      * Stores the create transaction results per shard.
189      */
190     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
191
192     private final TransactionType transactionType;
193     private final ActorContext actorContext;
194     private final String transactionChainId;
195     private final SchemaContext schemaContext;
196     private TransactionState state = TransactionState.OPEN;
197
198     private volatile boolean initialized;
199     private Semaphore operationLimiter;
200     private OperationCompleter operationCompleter;
201
202     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
203         this(actorContext, transactionType, "");
204     }
205
206     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
207         super(createIdentifier(actorContext));
208         this.actorContext = Preconditions.checkNotNull(actorContext,
209             "actorContext should not be null");
210         this.transactionType = Preconditions.checkNotNull(transactionType,
211             "transactionType should not be null");
212         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
213             "schemaContext should not be null");
214         this.transactionChainId = transactionChainId;
215
216         LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
217     }
218
219     private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
220         String memberName = actorContext.getCurrentMemberName();
221         if (memberName == null) {
222             memberName = "UNKNOWN-MEMBER";
223         }
224
225         return new TransactionIdentifier(memberName, counter.getAndIncrement());
226     }
227
228     @VisibleForTesting
229     List<Future<Object>> getRecordedOperationFutures() {
230         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
231         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
232             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
233             if (transactionContext != null) {
234                 transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
235             }
236         }
237
238         return recordedOperationFutures;
239     }
240
241     @VisibleForTesting
242     boolean hasTransactionContext() {
243         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
244             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
245             if(transactionContext != null) {
246                 return true;
247             }
248         }
249
250         return false;
251     }
252
253     @Override
254     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
255
256         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
257                 "Read operation on write-only transaction is not allowed");
258
259         LOG.debug("Tx {} read {}", getIdentifier(), path);
260
261         throttleOperation();
262
263         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
264
265         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
266         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
267             @Override
268             public void invoke(TransactionContext transactionContext) {
269                 transactionContext.readData(path, proxyFuture);
270             }
271         });
272
273         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
274     }
275
276     @Override
277     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
278
279         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
280                 "Exists operation on write-only transaction is not allowed");
281
282         LOG.debug("Tx {} exists {}", getIdentifier(), path);
283
284         throttleOperation();
285
286         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
287
288         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
289         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
290             @Override
291             public void invoke(TransactionContext transactionContext) {
292                 transactionContext.dataExists(path, proxyFuture);
293             }
294         });
295
296         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
297     }
298
299     private void checkModificationState() {
300         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
301                 "Modification operation on read-only transaction is not allowed");
302         Preconditions.checkState(state == TransactionState.OPEN,
303                 "Transaction is sealed - further modifications are not allowed");
304     }
305
306     private void throttleOperation() {
307         throttleOperation(1);
308     }
309
310     private void throttleOperation(int acquirePermits) {
311         if(!initialized) {
312             // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
313             operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
314             operationCompleter = new OperationCompleter(operationLimiter);
315
316             // Make sure we write this last because it's volatile and will also publish the non-volatile writes
317             // above as well so they'll be visible to other threads.
318             initialized = true;
319         }
320
321         try {
322             if(!operationLimiter.tryAcquire(acquirePermits,
323                     actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
324                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
325             }
326         } catch (InterruptedException e) {
327             if(LOG.isDebugEnabled()) {
328                 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
329             } else {
330                 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
331             }
332         }
333     }
334
335     @Override
336     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
337
338         checkModificationState();
339
340         LOG.debug("Tx {} write {}", getIdentifier(), path);
341
342         throttleOperation();
343
344         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
345         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
346             @Override
347             public void invoke(TransactionContext transactionContext) {
348                 transactionContext.writeData(path, data);
349             }
350         });
351     }
352
353     @Override
354     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
355
356         checkModificationState();
357
358         LOG.debug("Tx {} merge {}", getIdentifier(), path);
359
360         throttleOperation();
361
362         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
363         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
364             @Override
365             public void invoke(TransactionContext transactionContext) {
366                 transactionContext.mergeData(path, data);
367             }
368         });
369     }
370
371     @Override
372     public void delete(final YangInstanceIdentifier path) {
373
374         checkModificationState();
375
376         LOG.debug("Tx {} delete {}", getIdentifier(), path);
377
378         throttleOperation();
379
380         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
381         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
382             @Override
383             public void invoke(TransactionContext transactionContext) {
384                 transactionContext.deleteData(path);
385             }
386         });
387     }
388
389     private boolean seal(final TransactionState newState) {
390         if (state == TransactionState.OPEN) {
391             state = newState;
392             return true;
393         } else {
394             return false;
395         }
396     }
397
398     @Override
399     public DOMStoreThreePhaseCommitCohort ready() {
400         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
401                 "Read-only transactions cannot be readied");
402
403         final boolean success = seal(TransactionState.READY);
404         Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
405
406         LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
407                     txFutureCallbackMap.size());
408
409         if (txFutureCallbackMap.isEmpty()) {
410             onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
411             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
412             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
413         }
414
415         throttleOperation(txFutureCallbackMap.size());
416
417         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
418
419         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
420
421             LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
422                         txFutureCallback.getShardName(), transactionChainId);
423
424             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
425             final Future<ActorSelection> future;
426             if (transactionContext != null) {
427                 // avoid the creation of a promise and a TransactionOperation
428                 future = transactionContext.readyTransaction();
429             } else {
430                 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
431                 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
432                     @Override
433                     public void invoke(TransactionContext transactionContext) {
434                         promise.completeWith(transactionContext.readyTransaction());
435                     }
436                 });
437                 future = promise.future();
438             }
439
440             cohortFutures.add(future);
441         }
442
443         onTransactionReady(cohortFutures);
444
445         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
446             getIdentifier().toString());
447     }
448
449     /**
450      * Method for derived classes to be notified when the transaction has been readied.
451      *
452      * @param cohortFutures the cohort Futures for each shard transaction.
453      */
454     protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
455     }
456
457     @Override
458     public void close() {
459         if (!seal(TransactionState.CLOSED)) {
460             if (state == TransactionState.CLOSED) {
461                 // Idempotent no-op as per AutoCloseable recommendation
462                 return;
463             }
464
465             throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
466                 getIdentifier()));
467         }
468
469         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
470             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
471                 @Override
472                 public void invoke(TransactionContext transactionContext) {
473                     transactionContext.closeTransaction();
474                 }
475             });
476         }
477
478         txFutureCallbackMap.clear();
479
480         if(remoteTransactionActorsMB != null) {
481             remoteTransactionActors.clear();
482             remoteTransactionActorsMB.set(true);
483         }
484     }
485
486     private String shardNameFromIdentifier(YangInstanceIdentifier path){
487         return ShardStrategyFactory.getStrategy(path).findShard(path);
488     }
489
490     protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
491         return actorContext.findPrimaryShardAsync(shardName);
492     }
493
494     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
495         String shardName = shardNameFromIdentifier(path);
496         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
497         if(txFutureCallback == null) {
498             Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
499
500             final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
501
502             txFutureCallback = newTxFutureCallback;
503             txFutureCallbackMap.put(shardName, txFutureCallback);
504
505             findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
506                 @Override
507                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
508                     if(failure != null) {
509                         newTxFutureCallback.createTransactionContext(failure, null);
510                     } else {
511                         newTxFutureCallback.setPrimaryShard(primaryShard);
512                     }
513                 }
514             }, actorContext.getClientDispatcher());
515         }
516
517         return txFutureCallback;
518     }
519
520     public String getTransactionChainId() {
521         return transactionChainId;
522     }
523
524     protected ActorContext getActorContext() {
525         return actorContext;
526     }
527
528     /**
529      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
530      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
531      * retry task after a short delay.
532      * <p>
533      * The end result from a completed CreateTransaction message is a TransactionContext that is
534      * used to perform transaction operations. Transaction operations that occur before the
535      * CreateTransaction completes are cache and executed once the CreateTransaction completes,
536      * successfully or not.
537      */
538     private class TransactionFutureCallback extends OnComplete<Object> {
539
540         /**
541          * The list of transaction operations to execute once the CreateTransaction completes.
542          */
543         @GuardedBy("txOperationsOnComplete")
544         private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
545
546         /**
547          * The TransactionContext resulting from the CreateTransaction reply.
548          */
549         private volatile TransactionContext transactionContext;
550
551         /**
552          * The target primary shard.
553          */
554         private volatile ActorSelection primaryShard;
555
556         private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
557                 getShardLeaderElectionTimeout().duration().toMillis() /
558                 CREATE_TX_TRY_INTERVAL.toMillis());
559
560         private final String shardName;
561
562         TransactionFutureCallback(String shardName) {
563             this.shardName = shardName;
564         }
565
566         String getShardName() {
567             return shardName;
568         }
569
570         TransactionContext getTransactionContext() {
571             return transactionContext;
572         }
573
574
575         /**
576          * Sets the target primary shard and initiates a CreateTransaction try.
577          */
578         void setPrimaryShard(ActorSelection primaryShard) {
579             this.primaryShard = primaryShard;
580
581             if(transactionType == TransactionType.WRITE_ONLY &&
582                     actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
583                 LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
584                     getIdentifier(), primaryShard);
585
586                 // For write-only Tx's we prepare the transaction modifications directly on the shard actor
587                 // to avoid the overhead of creating a separate transaction actor.
588                 // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
589                 executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
590                         this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
591             } else {
592                 tryCreateTransaction();
593             }
594         }
595
596         /**
597          * Adds a TransactionOperation to be executed after the CreateTransaction completes.
598          */
599         void addTxOperationOnComplete(TransactionOperation operation) {
600             boolean invokeOperation = true;
601             synchronized(txOperationsOnComplete) {
602                 if(transactionContext == null) {
603                     LOG.debug("Tx {} Adding operation on complete", getIdentifier());
604
605                     invokeOperation = false;
606                     txOperationsOnComplete.add(operation);
607                 }
608             }
609
610             if(invokeOperation) {
611                 operation.invoke(transactionContext);
612             }
613         }
614
615         void enqueueTransactionOperation(final TransactionOperation op) {
616
617             if (transactionContext != null) {
618                 op.invoke(transactionContext);
619             } else {
620                 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
621                 // callback to be executed after the Tx is created.
622                 addTxOperationOnComplete(op);
623             }
624         }
625
626         /**
627          * Performs a CreateTransaction try async.
628          */
629         private void tryCreateTransaction() {
630             if(LOG.isDebugEnabled()) {
631                 LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
632             }
633
634             Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
635                     TransactionProxy.this.transactionType.ordinal(),
636                     getTransactionChainId()).toSerializable();
637
638             Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
639
640             createTxFuture.onComplete(this, actorContext.getClientDispatcher());
641         }
642
643         @Override
644         public void onComplete(Throwable failure, Object response) {
645             if(failure instanceof NoShardLeaderException) {
646                 // There's no leader for the shard yet - schedule and try again, unless we're out
647                 // of retries. Note: createTxTries is volatile as it may be written by different
648                 // threads however not concurrently, therefore decrementing it non-atomically here
649                 // is ok.
650                 if(--createTxTries > 0) {
651                     LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
652                         getIdentifier(), shardName);
653
654                     actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
655                             new Runnable() {
656                                 @Override
657                                 public void run() {
658                                     tryCreateTransaction();
659                                 }
660                             }, actorContext.getClientDispatcher());
661                     return;
662                 }
663             }
664
665             createTransactionContext(failure, response);
666         }
667
668         private void createTransactionContext(Throwable failure, Object response) {
669             // Mainly checking for state violation here to perform a volatile read of "initialized" to
670             // ensure updates to operationLimter et al are visible to this thread (ie we're doing
671             // "piggy-back" synchronization here).
672             Preconditions.checkState(initialized, "Tx was not propertly initialized.");
673
674             // Create the TransactionContext from the response or failure. Store the new
675             // TransactionContext locally until we've completed invoking the
676             // TransactionOperations. This avoids thread timing issues which could cause
677             // out-of-order TransactionOperations. Eg, on a modification operation, if the
678             // TransactionContext is non-null, then we directly call the TransactionContext.
679             // However, at the same time, the code may be executing the cached
680             // TransactionOperations. So to avoid thus timing, we don't publish the
681             // TransactionContext until after we've executed all cached TransactionOperations.
682             TransactionContext localTransactionContext;
683             if(failure != null) {
684                 LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
685
686                 localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
687             } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
688                 localTransactionContext = createValidTransactionContext(
689                         CreateTransactionReply.fromSerializable(response));
690             } else {
691                 IllegalArgumentException exception = new IllegalArgumentException(String.format(
692                         "Invalid reply type %s for CreateTransaction", response.getClass()));
693
694                 localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter);
695             }
696
697             executeTxOperatonsOnComplete(localTransactionContext);
698         }
699
700         private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
701             while(true) {
702                 // Access to txOperationsOnComplete and transactionContext must be protected and atomic
703                 // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
704                 // issues and ensure no TransactionOperation is missed and that they are processed
705                 // in the order they occurred.
706
707                 // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
708                 // in case a TransactionOperation results in another transaction operation being
709                 // queued (eg a put operation from a client read Future callback that is notified
710                 // synchronously).
711                 Collection<TransactionOperation> operationsBatch = null;
712                 synchronized(txOperationsOnComplete) {
713                     if(txOperationsOnComplete.isEmpty()) {
714                         // We're done invoking the TransactionOperations so we can now publish the
715                         // TransactionContext.
716                         transactionContext = localTransactionContext;
717                         break;
718                     }
719
720                     operationsBatch = new ArrayList<>(txOperationsOnComplete);
721                     txOperationsOnComplete.clear();
722                 }
723
724                 // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
725                 // A slight down-side is that we need to re-acquire the lock below but this should
726                 // be negligible.
727                 for(TransactionOperation oper: operationsBatch) {
728                     oper.invoke(localTransactionContext);
729                 }
730             }
731         }
732
733         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
734             LOG.debug("Tx {} Received {}", getIdentifier(), reply);
735
736             return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
737                     reply.getTransactionPath(), reply.getVersion());
738         }
739
740         private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
741                 String transactionPath, short remoteTransactionVersion) {
742
743             if (transactionType == TransactionType.READ_ONLY) {
744                 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
745                 // to close the remote Tx's when this instance is no longer in use and is garbage
746                 // collected.
747
748                 if(remoteTransactionActorsMB == null) {
749                     remoteTransactionActors = Lists.newArrayList();
750                     remoteTransactionActorsMB = new AtomicBoolean();
751
752                     TransactionProxyCleanupPhantomReference cleanup =
753                             new TransactionProxyCleanupPhantomReference(TransactionProxy.this);
754                     phantomReferenceCache.put(cleanup, cleanup);
755                 }
756
757                 // Add the actor to the remoteTransactionActors list for access by the
758                 // cleanup PhantonReference.
759                 remoteTransactionActors.add(transactionActor);
760
761                 // Write to the memory barrier volatile to publish the above update to the
762                 // remoteTransactionActors list for thread visibility.
763                 remoteTransactionActorsMB.set(true);
764             }
765
766             // TxActor is always created where the leader of the shard is.
767             // Check if TxActor is created in the same node
768             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
769
770             if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
771                 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
772                         transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
773                         operationCompleter);
774             } else if (transactionType == TransactionType.WRITE_ONLY &&
775                     actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
776                 return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
777                     actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
778             } else {
779                 return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
780                         actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
781             }
782         }
783     }
784 }