2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.FinalizablePhantomReference;
16 import com.google.common.base.FinalizableReferenceQueue;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.CheckedFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.List;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.Semaphore;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicLong;
33 import javax.annotation.concurrent.GuardedBy;
34 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
36 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
37 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
41 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
42 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
43 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
44 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
45 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
46 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
49 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import scala.concurrent.Future;
53 import scala.concurrent.Promise;
54 import scala.concurrent.duration.FiniteDuration;
57 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
59 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
60 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
61 * be created on each of those shards by the TransactionProxy
64 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
65 * shards will be executed.
68 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
70 public static enum TransactionType {
76 private static final TransactionType[] VALUES = values();
78 public static TransactionType fromInt(final int type) {
81 } catch (IndexOutOfBoundsException e) {
82 throw new IllegalArgumentException("In TransactionType enum value " + type, e);
87 private static enum TransactionState {
93 static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
94 new Mapper<Throwable, Throwable>() {
96 public Throwable apply(Throwable failure) {
101 private static final AtomicLong counter = new AtomicLong();
103 private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
106 * Time interval in between transaction create retries.
108 private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
109 FiniteDuration.create(1, TimeUnit.SECONDS);
112 * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
113 * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
114 * trickery to clean up its internal thread when the bundle is unloaded.
116 private static final FinalizableReferenceQueue phantomReferenceQueue =
117 new FinalizableReferenceQueue();
120 * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
121 * necessary because PhantomReferences need a hard reference so they're not garbage collected.
122 * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
123 * and thus becomes eligible for garbage collection.
125 private static final Map<TransactionProxyCleanupPhantomReference,
126 TransactionProxyCleanupPhantomReference> phantomReferenceCache =
127 new ConcurrentHashMap<>();
130 * A PhantomReference that closes remote transactions for a TransactionProxy when it's
131 * garbage collected. This is used for read-only transactions as they're not explicitly closed
132 * by clients. So the only way to detect that a transaction is no longer in use and it's safe
133 * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
134 * but TransactionProxy instances should generally be short-lived enough to avoid being moved
135 * to the old generation space and thus should be cleaned up in a timely manner as the GC
136 * runs on the young generation (eden, swap1...) space much more frequently.
138 private static class TransactionProxyCleanupPhantomReference
139 extends FinalizablePhantomReference<TransactionProxy> {
141 private final List<ActorSelection> remoteTransactionActors;
142 private final AtomicBoolean remoteTransactionActorsMB;
143 private final ActorContext actorContext;
144 private final TransactionIdentifier identifier;
146 protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
147 super(referent, phantomReferenceQueue);
149 // Note we need to cache the relevant fields from the TransactionProxy as we can't
150 // have a hard reference to the TransactionProxy instance itself.
152 remoteTransactionActors = referent.remoteTransactionActors;
153 remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
154 actorContext = referent.actorContext;
155 identifier = referent.getIdentifier();
159 public void finalizeReferent() {
160 LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
161 remoteTransactionActors.size(), identifier);
163 phantomReferenceCache.remove(this);
165 // Access the memory barrier volatile to ensure all previous updates to the
166 // remoteTransactionActors list are visible to this thread.
168 if(remoteTransactionActorsMB.get()) {
169 for(ActorSelection actor : remoteTransactionActors) {
170 LOG.trace("Sending CloseTransaction to {}", actor);
171 actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
178 * Stores the remote Tx actors for each requested data store path to be used by the
179 * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
180 * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
181 * remoteTransactionActors list so they will be visible to the thread accessing the
184 private List<ActorSelection> remoteTransactionActors;
185 private volatile AtomicBoolean remoteTransactionActorsMB;
188 * Stores the create transaction results per shard.
190 private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
192 private final TransactionType transactionType;
193 private final ActorContext actorContext;
194 private final String transactionChainId;
195 private final SchemaContext schemaContext;
196 private TransactionState state = TransactionState.OPEN;
198 private volatile boolean initialized;
199 private Semaphore operationLimiter;
200 private OperationCompleter operationCompleter;
202 public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
203 this(actorContext, transactionType, "");
206 public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
207 super(createIdentifier(actorContext));
208 this.actorContext = Preconditions.checkNotNull(actorContext,
209 "actorContext should not be null");
210 this.transactionType = Preconditions.checkNotNull(transactionType,
211 "transactionType should not be null");
212 this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
213 "schemaContext should not be null");
214 this.transactionChainId = transactionChainId;
216 LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
219 private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
220 String memberName = actorContext.getCurrentMemberName();
221 if (memberName == null) {
222 memberName = "UNKNOWN-MEMBER";
225 return new TransactionIdentifier(memberName, counter.getAndIncrement());
229 List<Future<Object>> getRecordedOperationFutures() {
230 List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
231 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
232 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
233 if (transactionContext != null) {
234 transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
238 return recordedOperationFutures;
242 boolean hasTransactionContext() {
243 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
244 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
245 if(transactionContext != null) {
254 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
256 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
257 "Read operation on write-only transaction is not allowed");
259 LOG.debug("Tx {} read {}", getIdentifier(), path);
263 final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
265 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
266 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
268 public void invoke(TransactionContext transactionContext) {
269 transactionContext.readData(path, proxyFuture);
273 return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
277 public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
279 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
280 "Exists operation on write-only transaction is not allowed");
282 LOG.debug("Tx {} exists {}", getIdentifier(), path);
286 final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
288 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
289 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
291 public void invoke(TransactionContext transactionContext) {
292 transactionContext.dataExists(path, proxyFuture);
296 return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
299 private void checkModificationState() {
300 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
301 "Modification operation on read-only transaction is not allowed");
302 Preconditions.checkState(state == TransactionState.OPEN,
303 "Transaction is sealed - further modifications are not allowed");
306 private void throttleOperation() {
307 throttleOperation(1);
310 private void throttleOperation(int acquirePermits) {
312 // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
313 operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
314 operationCompleter = new OperationCompleter(operationLimiter);
316 // Make sure we write this last because it's volatile and will also publish the non-volatile writes
317 // above as well so they'll be visible to other threads.
322 if(!operationLimiter.tryAcquire(acquirePermits,
323 actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
324 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
326 } catch (InterruptedException e) {
327 if(LOG.isDebugEnabled()) {
328 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
330 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
336 public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
338 checkModificationState();
340 LOG.debug("Tx {} write {}", getIdentifier(), path);
344 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
345 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
347 public void invoke(TransactionContext transactionContext) {
348 transactionContext.writeData(path, data);
354 public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
356 checkModificationState();
358 LOG.debug("Tx {} merge {}", getIdentifier(), path);
362 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
363 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
365 public void invoke(TransactionContext transactionContext) {
366 transactionContext.mergeData(path, data);
372 public void delete(final YangInstanceIdentifier path) {
374 checkModificationState();
376 LOG.debug("Tx {} delete {}", getIdentifier(), path);
380 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
381 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
383 public void invoke(TransactionContext transactionContext) {
384 transactionContext.deleteData(path);
389 private boolean seal(final TransactionState newState) {
390 if (state == TransactionState.OPEN) {
399 public DOMStoreThreePhaseCommitCohort ready() {
400 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
401 "Read-only transactions cannot be readied");
403 final boolean success = seal(TransactionState.READY);
404 Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
406 LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
407 txFutureCallbackMap.size());
409 if (txFutureCallbackMap.isEmpty()) {
410 onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
411 TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
412 return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
415 throttleOperation(txFutureCallbackMap.size());
417 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
419 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
421 LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
422 txFutureCallback.getShardName(), transactionChainId);
424 final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
425 final Future<ActorSelection> future;
426 if (transactionContext != null) {
427 // avoid the creation of a promise and a TransactionOperation
428 future = transactionContext.readyTransaction();
430 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
431 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
433 public void invoke(TransactionContext transactionContext) {
434 promise.completeWith(transactionContext.readyTransaction());
437 future = promise.future();
440 cohortFutures.add(future);
443 onTransactionReady(cohortFutures);
445 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
446 getIdentifier().toString());
450 * Method for derived classes to be notified when the transaction has been readied.
452 * @param cohortFutures the cohort Futures for each shard transaction.
454 protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
458 public void close() {
459 if (!seal(TransactionState.CLOSED)) {
460 if (state == TransactionState.CLOSED) {
461 // Idempotent no-op as per AutoCloseable recommendation
465 throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
469 for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
470 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
472 public void invoke(TransactionContext transactionContext) {
473 transactionContext.closeTransaction();
478 txFutureCallbackMap.clear();
480 if(remoteTransactionActorsMB != null) {
481 remoteTransactionActors.clear();
482 remoteTransactionActorsMB.set(true);
486 private String shardNameFromIdentifier(YangInstanceIdentifier path){
487 return ShardStrategyFactory.getStrategy(path).findShard(path);
490 protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
491 return actorContext.findPrimaryShardAsync(shardName);
494 private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
495 String shardName = shardNameFromIdentifier(path);
496 TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
497 if(txFutureCallback == null) {
498 Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
500 final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
502 txFutureCallback = newTxFutureCallback;
503 txFutureCallbackMap.put(shardName, txFutureCallback);
505 findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
507 public void onComplete(Throwable failure, ActorSelection primaryShard) {
508 if(failure != null) {
509 newTxFutureCallback.createTransactionContext(failure, null);
511 newTxFutureCallback.setPrimaryShard(primaryShard);
514 }, actorContext.getClientDispatcher());
517 return txFutureCallback;
520 public String getTransactionChainId() {
521 return transactionChainId;
524 protected ActorContext getActorContext() {
529 * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
530 * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
531 * retry task after a short delay.
533 * The end result from a completed CreateTransaction message is a TransactionContext that is
534 * used to perform transaction operations. Transaction operations that occur before the
535 * CreateTransaction completes are cache and executed once the CreateTransaction completes,
536 * successfully or not.
538 private class TransactionFutureCallback extends OnComplete<Object> {
541 * The list of transaction operations to execute once the CreateTransaction completes.
543 @GuardedBy("txOperationsOnComplete")
544 private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
547 * The TransactionContext resulting from the CreateTransaction reply.
549 private volatile TransactionContext transactionContext;
552 * The target primary shard.
554 private volatile ActorSelection primaryShard;
556 private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
557 getShardLeaderElectionTimeout().duration().toMillis() /
558 CREATE_TX_TRY_INTERVAL.toMillis());
560 private final String shardName;
562 TransactionFutureCallback(String shardName) {
563 this.shardName = shardName;
566 String getShardName() {
570 TransactionContext getTransactionContext() {
571 return transactionContext;
576 * Sets the target primary shard and initiates a CreateTransaction try.
578 void setPrimaryShard(ActorSelection primaryShard) {
579 this.primaryShard = primaryShard;
581 if(transactionType == TransactionType.WRITE_ONLY &&
582 actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
583 LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
584 getIdentifier(), primaryShard);
586 // For write-only Tx's we prepare the transaction modifications directly on the shard actor
587 // to avoid the overhead of creating a separate transaction actor.
588 // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
589 executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
590 this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
592 tryCreateTransaction();
597 * Adds a TransactionOperation to be executed after the CreateTransaction completes.
599 void addTxOperationOnComplete(TransactionOperation operation) {
600 boolean invokeOperation = true;
601 synchronized(txOperationsOnComplete) {
602 if(transactionContext == null) {
603 LOG.debug("Tx {} Adding operation on complete", getIdentifier());
605 invokeOperation = false;
606 txOperationsOnComplete.add(operation);
610 if(invokeOperation) {
611 operation.invoke(transactionContext);
615 void enqueueTransactionOperation(final TransactionOperation op) {
617 if (transactionContext != null) {
618 op.invoke(transactionContext);
620 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
621 // callback to be executed after the Tx is created.
622 addTxOperationOnComplete(op);
627 * Performs a CreateTransaction try async.
629 private void tryCreateTransaction() {
630 if(LOG.isDebugEnabled()) {
631 LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
634 Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
635 TransactionProxy.this.transactionType.ordinal(),
636 getTransactionChainId()).toSerializable();
638 Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
640 createTxFuture.onComplete(this, actorContext.getClientDispatcher());
644 public void onComplete(Throwable failure, Object response) {
645 if(failure instanceof NoShardLeaderException) {
646 // There's no leader for the shard yet - schedule and try again, unless we're out
647 // of retries. Note: createTxTries is volatile as it may be written by different
648 // threads however not concurrently, therefore decrementing it non-atomically here
650 if(--createTxTries > 0) {
651 LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
652 getIdentifier(), shardName);
654 actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
658 tryCreateTransaction();
660 }, actorContext.getClientDispatcher());
665 createTransactionContext(failure, response);
668 private void createTransactionContext(Throwable failure, Object response) {
669 // Mainly checking for state violation here to perform a volatile read of "initialized" to
670 // ensure updates to operationLimter et al are visible to this thread (ie we're doing
671 // "piggy-back" synchronization here).
672 Preconditions.checkState(initialized, "Tx was not propertly initialized.");
674 // Create the TransactionContext from the response or failure. Store the new
675 // TransactionContext locally until we've completed invoking the
676 // TransactionOperations. This avoids thread timing issues which could cause
677 // out-of-order TransactionOperations. Eg, on a modification operation, if the
678 // TransactionContext is non-null, then we directly call the TransactionContext.
679 // However, at the same time, the code may be executing the cached
680 // TransactionOperations. So to avoid thus timing, we don't publish the
681 // TransactionContext until after we've executed all cached TransactionOperations.
682 TransactionContext localTransactionContext;
683 if(failure != null) {
684 LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
686 localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
687 } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
688 localTransactionContext = createValidTransactionContext(
689 CreateTransactionReply.fromSerializable(response));
691 IllegalArgumentException exception = new IllegalArgumentException(String.format(
692 "Invalid reply type %s for CreateTransaction", response.getClass()));
694 localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter);
697 executeTxOperatonsOnComplete(localTransactionContext);
700 private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
702 // Access to txOperationsOnComplete and transactionContext must be protected and atomic
703 // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
704 // issues and ensure no TransactionOperation is missed and that they are processed
705 // in the order they occurred.
707 // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
708 // in case a TransactionOperation results in another transaction operation being
709 // queued (eg a put operation from a client read Future callback that is notified
711 Collection<TransactionOperation> operationsBatch = null;
712 synchronized(txOperationsOnComplete) {
713 if(txOperationsOnComplete.isEmpty()) {
714 // We're done invoking the TransactionOperations so we can now publish the
715 // TransactionContext.
716 transactionContext = localTransactionContext;
720 operationsBatch = new ArrayList<>(txOperationsOnComplete);
721 txOperationsOnComplete.clear();
724 // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
725 // A slight down-side is that we need to re-acquire the lock below but this should
727 for(TransactionOperation oper: operationsBatch) {
728 oper.invoke(localTransactionContext);
733 private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
734 LOG.debug("Tx {} Received {}", getIdentifier(), reply);
736 return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
737 reply.getTransactionPath(), reply.getVersion());
740 private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
741 String transactionPath, short remoteTransactionVersion) {
743 if (transactionType == TransactionType.READ_ONLY) {
744 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
745 // to close the remote Tx's when this instance is no longer in use and is garbage
748 if(remoteTransactionActorsMB == null) {
749 remoteTransactionActors = Lists.newArrayList();
750 remoteTransactionActorsMB = new AtomicBoolean();
752 TransactionProxyCleanupPhantomReference cleanup =
753 new TransactionProxyCleanupPhantomReference(TransactionProxy.this);
754 phantomReferenceCache.put(cleanup, cleanup);
757 // Add the actor to the remoteTransactionActors list for access by the
758 // cleanup PhantonReference.
759 remoteTransactionActors.add(transactionActor);
761 // Write to the memory barrier volatile to publish the above update to the
762 // remoteTransactionActors list for thread visibility.
763 remoteTransactionActorsMB.set(true);
766 // TxActor is always created where the leader of the shard is.
767 // Check if TxActor is created in the same node
768 boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
770 if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
771 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
772 transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
774 } else if (transactionType == TransactionType.WRITE_ONLY &&
775 actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
776 return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
777 actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
779 return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
780 actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);