2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.List;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Semaphore;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicLong;
33 import javax.annotation.concurrent.GuardedBy;
34 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
36 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
37 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
41 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
42 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
43 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
44 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
45 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.Future;
52 import scala.concurrent.Promise;
53 import scala.concurrent.duration.FiniteDuration;
56 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
58 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
59 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
60 * be created on each of those shards by the TransactionProxy
63 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
64 * shards will be executed.
67 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
69 public static enum TransactionType {
75 private static final TransactionType[] VALUES = values();
77 public static TransactionType fromInt(final int type) {
80 } catch (IndexOutOfBoundsException e) {
81 throw new IllegalArgumentException("In TransactionType enum value " + type, e);
86 private static enum TransactionState {
92 static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
93 new Mapper<Throwable, Throwable>() {
95 public Throwable apply(Throwable failure) {
100 private static final AtomicLong counter = new AtomicLong();
102 private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
105 * Time interval in between transaction create retries.
107 private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
108 FiniteDuration.create(1, TimeUnit.SECONDS);
111 * Stores the remote Tx actors for each requested data store path to be used by the
112 * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
113 * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
114 * remoteTransactionActors list so they will be visible to the thread accessing the
117 List<ActorSelection> remoteTransactionActors;
118 volatile AtomicBoolean remoteTransactionActorsMB;
121 * Stores the create transaction results per shard.
123 private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
125 private final TransactionType transactionType;
126 final ActorContext actorContext;
127 private final String transactionChainId;
128 private final SchemaContext schemaContext;
129 private TransactionState state = TransactionState.OPEN;
131 private volatile boolean initialized;
132 private Semaphore operationLimiter;
133 private OperationCompleter operationCompleter;
135 public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
136 this(actorContext, transactionType, "");
139 public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
140 super(createIdentifier(actorContext));
141 this.actorContext = Preconditions.checkNotNull(actorContext,
142 "actorContext should not be null");
143 this.transactionType = Preconditions.checkNotNull(transactionType,
144 "transactionType should not be null");
145 this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
146 "schemaContext should not be null");
147 this.transactionChainId = transactionChainId;
149 LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
152 private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
153 String memberName = actorContext.getCurrentMemberName();
154 if (memberName == null) {
155 memberName = "UNKNOWN-MEMBER";
158 return new TransactionIdentifier(memberName, counter.getAndIncrement());
162 List<Future<Object>> getRecordedOperationFutures() {
163 List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
164 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
165 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
166 if (transactionContext != null) {
167 transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
171 return recordedOperationFutures;
175 boolean hasTransactionContext() {
176 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
177 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
178 if(transactionContext != null) {
186 private boolean isRootPath(YangInstanceIdentifier path){
187 return !path.getPathArguments().iterator().hasNext();
191 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
193 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
194 "Read operation on write-only transaction is not allowed");
196 LOG.debug("Tx {} read {}", getIdentifier(), path);
198 final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
200 if(isRootPath(path)){
201 readAllData(path, proxyFuture);
205 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
206 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
208 public void invoke(TransactionContext transactionContext) {
209 transactionContext.readData(path, proxyFuture);
215 return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
218 private void readAllData(final YangInstanceIdentifier path,
219 final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
220 Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
221 List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
223 for(String shardName : allShardNames){
224 final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
228 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
229 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
231 public void invoke(TransactionContext transactionContext) {
232 transactionContext.readData(path, subProxyFuture);
236 futures.add(subProxyFuture);
239 final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
241 future.addListener(new Runnable() {
245 proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
246 future.get(), actorContext.getSchemaContext()));
247 } catch (InterruptedException | ExecutionException e) {
248 proxyFuture.setException(e);
251 }, actorContext.getActorSystem().dispatcher());
255 public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
257 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
258 "Exists operation on write-only transaction is not allowed");
260 LOG.debug("Tx {} exists {}", getIdentifier(), path);
264 final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
266 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
267 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
269 public void invoke(TransactionContext transactionContext) {
270 transactionContext.dataExists(path, proxyFuture);
274 return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
277 private void checkModificationState() {
278 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
279 "Modification operation on read-only transaction is not allowed");
280 Preconditions.checkState(state == TransactionState.OPEN,
281 "Transaction is sealed - further modifications are not allowed");
284 private void throttleOperation() {
285 throttleOperation(1);
288 private void throttleOperation(int acquirePermits) {
290 // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
291 operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
292 operationCompleter = new OperationCompleter(operationLimiter);
294 // Make sure we write this last because it's volatile and will also publish the non-volatile writes
295 // above as well so they'll be visible to other threads.
300 if(!operationLimiter.tryAcquire(acquirePermits,
301 actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
302 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
304 } catch (InterruptedException e) {
305 if(LOG.isDebugEnabled()) {
306 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
308 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
314 public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
316 checkModificationState();
318 LOG.debug("Tx {} write {}", getIdentifier(), path);
322 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
323 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
325 public void invoke(TransactionContext transactionContext) {
326 transactionContext.writeData(path, data);
332 public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
334 checkModificationState();
336 LOG.debug("Tx {} merge {}", getIdentifier(), path);
340 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
341 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
343 public void invoke(TransactionContext transactionContext) {
344 transactionContext.mergeData(path, data);
350 public void delete(final YangInstanceIdentifier path) {
352 checkModificationState();
354 LOG.debug("Tx {} delete {}", getIdentifier(), path);
358 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
359 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
361 public void invoke(TransactionContext transactionContext) {
362 transactionContext.deleteData(path);
367 private boolean seal(final TransactionState newState) {
368 if (state == TransactionState.OPEN) {
377 public AbstractThreePhaseCommitCohort ready() {
378 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
379 "Read-only transactions cannot be readied");
381 final boolean success = seal(TransactionState.READY);
382 Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
384 LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
385 txFutureCallbackMap.size());
387 if (txFutureCallbackMap.isEmpty()) {
388 TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
389 return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
392 throttleOperation(txFutureCallbackMap.size());
394 List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
395 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
397 LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
398 txFutureCallback.getShardName(), transactionChainId);
400 final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
401 final Future<ActorSelection> future;
402 if (transactionContext != null) {
403 // avoid the creation of a promise and a TransactionOperation
404 future = transactionContext.readyTransaction();
406 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
407 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
409 public void invoke(TransactionContext transactionContext) {
410 promise.completeWith(transactionContext.readyTransaction());
413 future = promise.future();
416 cohortFutures.add(future);
419 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
420 getIdentifier().toString());
424 public void close() {
425 if (!seal(TransactionState.CLOSED)) {
426 if (state == TransactionState.CLOSED) {
427 // Idempotent no-op as per AutoCloseable recommendation
431 throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
435 for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
436 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
438 public void invoke(TransactionContext transactionContext) {
439 transactionContext.closeTransaction();
444 txFutureCallbackMap.clear();
446 if(remoteTransactionActorsMB != null) {
447 remoteTransactionActors.clear();
448 remoteTransactionActorsMB.set(true);
452 private String shardNameFromIdentifier(YangInstanceIdentifier path){
453 return ShardStrategyFactory.getStrategy(path).findShard(path);
456 protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
457 return actorContext.findPrimaryShardAsync(shardName);
460 private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
461 String shardName = shardNameFromIdentifier(path);
462 return getOrCreateTxFutureCallback(shardName);
465 private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
466 TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
467 if(txFutureCallback == null) {
468 Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
470 final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
472 txFutureCallback = newTxFutureCallback;
473 txFutureCallbackMap.put(shardName, txFutureCallback);
475 findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
477 public void onComplete(Throwable failure, ActorSelection primaryShard) {
478 if(failure != null) {
479 newTxFutureCallback.createTransactionContext(failure, null);
481 newTxFutureCallback.setPrimaryShard(primaryShard);
484 }, actorContext.getClientDispatcher());
487 return txFutureCallback;
490 public String getTransactionChainId() {
491 return transactionChainId;
494 protected ActorContext getActorContext() {
499 * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
500 * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
501 * retry task after a short delay.
503 * The end result from a completed CreateTransaction message is a TransactionContext that is
504 * used to perform transaction operations. Transaction operations that occur before the
505 * CreateTransaction completes are cache and executed once the CreateTransaction completes,
506 * successfully or not.
508 private class TransactionFutureCallback extends OnComplete<Object> {
511 * The list of transaction operations to execute once the CreateTransaction completes.
513 @GuardedBy("txOperationsOnComplete")
514 private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
517 * The TransactionContext resulting from the CreateTransaction reply.
519 private volatile TransactionContext transactionContext;
522 * The target primary shard.
524 private volatile ActorSelection primaryShard;
526 private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
527 getShardLeaderElectionTimeout().duration().toMillis() /
528 CREATE_TX_TRY_INTERVAL.toMillis());
530 private final String shardName;
532 TransactionFutureCallback(String shardName) {
533 this.shardName = shardName;
536 String getShardName() {
540 TransactionContext getTransactionContext() {
541 return transactionContext;
546 * Sets the target primary shard and initiates a CreateTransaction try.
548 void setPrimaryShard(ActorSelection primaryShard) {
549 this.primaryShard = primaryShard;
551 if(transactionType == TransactionType.WRITE_ONLY &&
552 actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
553 LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
554 getIdentifier(), primaryShard);
556 // For write-only Tx's we prepare the transaction modifications directly on the shard actor
557 // to avoid the overhead of creating a separate transaction actor.
558 // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
559 executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
560 this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
562 tryCreateTransaction();
567 * Adds a TransactionOperation to be executed after the CreateTransaction completes.
569 void addTxOperationOnComplete(TransactionOperation operation) {
570 boolean invokeOperation = true;
571 synchronized(txOperationsOnComplete) {
572 if(transactionContext == null) {
573 LOG.debug("Tx {} Adding operation on complete", getIdentifier());
575 invokeOperation = false;
576 txOperationsOnComplete.add(operation);
580 if(invokeOperation) {
581 operation.invoke(transactionContext);
585 void enqueueTransactionOperation(final TransactionOperation op) {
587 if (transactionContext != null) {
588 op.invoke(transactionContext);
590 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
591 // callback to be executed after the Tx is created.
592 addTxOperationOnComplete(op);
597 * Performs a CreateTransaction try async.
599 private void tryCreateTransaction() {
600 if(LOG.isDebugEnabled()) {
601 LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
604 Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
605 TransactionProxy.this.transactionType.ordinal(),
606 getTransactionChainId()).toSerializable();
608 Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
610 createTxFuture.onComplete(this, actorContext.getClientDispatcher());
614 public void onComplete(Throwable failure, Object response) {
615 if(failure instanceof NoShardLeaderException) {
616 // There's no leader for the shard yet - schedule and try again, unless we're out
617 // of retries. Note: createTxTries is volatile as it may be written by different
618 // threads however not concurrently, therefore decrementing it non-atomically here
620 if(--createTxTries > 0) {
621 LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
622 getIdentifier(), shardName);
624 actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
628 tryCreateTransaction();
630 }, actorContext.getClientDispatcher());
635 createTransactionContext(failure, response);
638 private void createTransactionContext(Throwable failure, Object response) {
639 // Mainly checking for state violation here to perform a volatile read of "initialized" to
640 // ensure updates to operationLimter et al are visible to this thread (ie we're doing
641 // "piggy-back" synchronization here).
642 Preconditions.checkState(initialized, "Tx was not propertly initialized.");
644 // Create the TransactionContext from the response or failure. Store the new
645 // TransactionContext locally until we've completed invoking the
646 // TransactionOperations. This avoids thread timing issues which could cause
647 // out-of-order TransactionOperations. Eg, on a modification operation, if the
648 // TransactionContext is non-null, then we directly call the TransactionContext.
649 // However, at the same time, the code may be executing the cached
650 // TransactionOperations. So to avoid thus timing, we don't publish the
651 // TransactionContext until after we've executed all cached TransactionOperations.
652 TransactionContext localTransactionContext;
653 if(failure != null) {
654 LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
656 localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
657 } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
658 localTransactionContext = createValidTransactionContext(
659 CreateTransactionReply.fromSerializable(response));
661 IllegalArgumentException exception = new IllegalArgumentException(String.format(
662 "Invalid reply type %s for CreateTransaction", response.getClass()));
664 localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter);
667 executeTxOperatonsOnComplete(localTransactionContext);
670 private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
672 // Access to txOperationsOnComplete and transactionContext must be protected and atomic
673 // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
674 // issues and ensure no TransactionOperation is missed and that they are processed
675 // in the order they occurred.
677 // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
678 // in case a TransactionOperation results in another transaction operation being
679 // queued (eg a put operation from a client read Future callback that is notified
681 Collection<TransactionOperation> operationsBatch = null;
682 synchronized(txOperationsOnComplete) {
683 if(txOperationsOnComplete.isEmpty()) {
684 // We're done invoking the TransactionOperations so we can now publish the
685 // TransactionContext.
686 transactionContext = localTransactionContext;
690 operationsBatch = new ArrayList<>(txOperationsOnComplete);
691 txOperationsOnComplete.clear();
694 // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
695 // A slight down-side is that we need to re-acquire the lock below but this should
697 for(TransactionOperation oper: operationsBatch) {
698 oper.invoke(localTransactionContext);
703 private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
704 LOG.debug("Tx {} Received {}", getIdentifier(), reply);
706 return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
707 reply.getTransactionPath(), reply.getVersion());
710 private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
711 String transactionPath, short remoteTransactionVersion) {
713 if (transactionType == TransactionType.READ_ONLY) {
714 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
715 // to close the remote Tx's when this instance is no longer in use and is garbage
718 if(remoteTransactionActorsMB == null) {
719 remoteTransactionActors = Lists.newArrayList();
720 remoteTransactionActorsMB = new AtomicBoolean();
722 TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
725 // Add the actor to the remoteTransactionActors list for access by the
726 // cleanup PhantonReference.
727 remoteTransactionActors.add(transactionActor);
729 // Write to the memory barrier volatile to publish the above update to the
730 // remoteTransactionActors list for thread visibility.
731 remoteTransactionActorsMB.set(true);
734 // TxActor is always created where the leader of the shard is.
735 // Check if TxActor is created in the same node
736 boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
738 if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
739 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
740 transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
742 } else if (transactionType == TransactionType.WRITE_ONLY &&
743 actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
744 return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
745 actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
747 return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
748 actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);