X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=98e115efc6270342e4f54ce99f5dcf0e95d99bc8;hp=f12fdd99eab792080069f38bebe1a721a71f1b32;hb=5b69c8e66b12a29a36457955cac4a45affd7c73f;hpb=228af4aa1ef1a802fd24e7e010f3bba959ee03dd diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index f12fdd99ea..98e115efc6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -5,563 +5,312 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorSelection; -import akka.dispatch.Mapper; -import akka.dispatch.OnComplete; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.DeleteOperation; +import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.MergeOperation; +import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.WriteOperation; +import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; +import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.Promise; /** - * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard - *

- * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during - * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will - * be created on each of those shards by the TransactionProxy - *

- *

- * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various - * shards will be executed. - *

+ * A transaction potentially spanning multiple backend shards. */ -public class TransactionProxy extends AbstractDOMStoreTransaction implements DOMStoreReadWriteTransaction { - - public static enum TransactionType { - READ_ONLY, - WRITE_ONLY, - READ_WRITE; - - // Cache all values - private static final TransactionType[] VALUES = values(); - - public static TransactionType fromInt(final int type) { - try { - return VALUES[type]; - } catch (IndexOutOfBoundsException e) { - throw new IllegalArgumentException("In TransactionType enum value " + type, e); - } - } - } - - private static enum TransactionState { +public class TransactionProxy extends AbstractDOMStoreTransaction + implements DOMStoreReadWriteTransaction { + private enum TransactionState { OPEN, READY, CLOSED, } - static final Mapper SAME_FAILURE_TRANSFORMER = - new Mapper() { - @Override - public Throwable apply(Throwable failure) { - return failure; - } - }; - - private static final AtomicLong counter = new AtomicLong(); - private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class); - /** - * Stores the remote Tx actors for each requested data store path to be used by the - * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The - * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the - * remoteTransactionActors list so they will be visible to the thread accessing the - * PhantomReference. - */ - List remoteTransactionActors; - volatile AtomicBoolean remoteTransactionActorsMB; - - /** - * Stores the create transaction results per shard. - */ - private final Map txFutureCallbackMap = new HashMap<>(); - - private final TransactionType transactionType; - final ActorContext actorContext; - private final String transactionChainId; - private final SchemaContext schemaContext; + private final Map txContextWrappers = new TreeMap<>(); + private final AbstractTransactionContextFactory txContextFactory; + private final TransactionType type; private TransactionState state = TransactionState.OPEN; - private volatile boolean initialized; - private Semaphore operationLimiter; - private OperationCompleter operationCompleter; - - public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { - this(actorContext, transactionType, ""); - } - - public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) { - super(createIdentifier(actorContext)); - this.actorContext = Preconditions.checkNotNull(actorContext, - "actorContext should not be null"); - this.transactionType = Preconditions.checkNotNull(transactionType, - "transactionType should not be null"); - this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), - "schemaContext should not be null"); - this.transactionChainId = transactionChainId; - - LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId); - } - - private static TransactionIdentifier createIdentifier(ActorContext actorContext) { - String memberName = actorContext.getCurrentMemberName(); - if (memberName == null) { - memberName = "UNKNOWN-MEMBER"; - } - - return new TransactionIdentifier(memberName, counter.getAndIncrement()); - } - @VisibleForTesting - boolean hasTransactionContext() { - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if(transactionContext != null) { - return true; - } - } + public TransactionProxy(final AbstractTransactionContextFactory txContextFactory, final TransactionType type) { + super(txContextFactory.nextIdentifier(), txContextFactory.getActorUtils().getDatastoreContext() + .isTransactionDebugContextEnabled()); + this.txContextFactory = txContextFactory; + this.type = requireNonNull(type); - return false; - } - - private boolean isRootPath(YangInstanceIdentifier path){ - return !path.getPathArguments().iterator().hasNext(); + LOG.debug("New {} Tx - {}", type, getIdentifier()); } @Override - public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { - - Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, - "Read operation on write-only transaction is not allowed"); - - LOG.debug("Tx {} read {}", getIdentifier(), path); - - final SettableFuture>> proxyFuture = SettableFuture.create(); - - if(isRootPath(path)){ - readAllData(path, proxyFuture); - } else { - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, proxyFuture); - } - }); - - } - - return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); - } - - private void readAllData(final YangInstanceIdentifier path, - final SettableFuture>> proxyFuture) { - Set allShardNames = actorContext.getConfiguration().getAllShardNames(); - List>>> futures = new ArrayList<>(allShardNames.size()); - - for(String shardName : allShardNames){ - final SettableFuture>> subProxyFuture = SettableFuture.create(); - - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, subProxyFuture); - } - }); - - futures.add(subProxyFuture); - } - - final ListenableFuture>>> future = Futures.allAsList(futures); - - future.addListener(new Runnable() { - @Override - public void run() { - try { - proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(), - future.get(), actorContext.getSchemaContext())); - } catch (DataValidationFailedException | InterruptedException | ExecutionException e) { - proxyFuture.setException(e); - } - } - }, actorContext.getActorSystem().dispatcher()); + public FluentFuture exists(final YangInstanceIdentifier path) { + return executeRead(shardNameFromIdentifier(path), new DataExists(path, DataStoreVersions.CURRENT_VERSION)); } - @Override - public CheckedFuture exists(final YangInstanceIdentifier path) { - - Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, - "Exists operation on write-only transaction is not allowed"); - - LOG.debug("Tx {} exists {}", getIdentifier(), path); - - throttleOperation(); + private FluentFuture executeRead(final String shardName, final AbstractRead readCmd) { + checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed"); - final SettableFuture proxyFuture = SettableFuture.create(); + LOG.trace("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath()); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + final SettableFuture proxyFuture = SettableFuture.create(); + TransactionContextWrapper contextWrapper = getContextWrapper(shardName); + contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.dataExists(path, proxyFuture); + public void invoke(final TransactionContext transactionContext, final Boolean havePermit) { + transactionContext.executeRead(readCmd, proxyFuture, havePermit); } }); - return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); + return FluentFuture.from(proxyFuture); } - private void checkModificationState() { - Preconditions.checkState(transactionType != TransactionType.READ_ONLY, - "Modification operation on read-only transaction is not allowed"); - Preconditions.checkState(state == TransactionState.OPEN, - "Transaction is sealed - further modifications are not allowed"); + @Override + public FluentFuture>> read(final YangInstanceIdentifier path) { + checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed"); + requireNonNull(path, "path should not be null"); + + LOG.trace("Tx {} read {}", getIdentifier(), path); + return path.isEmpty() ? readAllData() : singleShardRead(shardNameFromIdentifier(path), path); } - private void throttleOperation() { - throttleOperation(1); + private FluentFuture>> singleShardRead( + final String shardName, final YangInstanceIdentifier path) { + return executeRead(shardName, new ReadData(path, DataStoreVersions.CURRENT_VERSION)); } - private void throttleOperation(int acquirePermits) { - if(!initialized) { - // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem - operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit()); - operationCompleter = new OperationCompleter(operationLimiter); + private FluentFuture>> readAllData() { + final Set allShardNames = txContextFactory.getActorUtils().getConfiguration().getAllShardNames(); + final Collection>>> futures = new ArrayList<>(allShardNames.size()); - // Make sure we write this last because it's volatile and will also publish the non-volatile writes - // above as well so they'll be visible to other threads. - initialized = true; + for (String shardName : allShardNames) { + futures.add(singleShardRead(shardName, YangInstanceIdentifier.empty())); } - try { - if(!operationLimiter.tryAcquire(acquirePermits, - actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ - LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); - } - } catch (InterruptedException e) { - if(LOG.isDebugEnabled()) { - LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e); - } else { - LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier()); + final ListenableFuture>>> listFuture = Futures.allAsList(futures); + final ListenableFuture>> aggregateFuture; + + aggregateFuture = Futures.transform(listFuture, input -> { + try { + return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.empty(), input, + txContextFactory.getActorUtils().getSchemaContext(), + txContextFactory.getActorUtils().getDatastoreContext().getLogicalStoreType()); + } catch (DataValidationFailedException e) { + throw new IllegalArgumentException("Failed to aggregate", e); } - } - } + }, MoreExecutors.directExecutor()); - final void ensureInitializied() { - Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier()); + return FluentFuture.from(aggregateFuture); } @Override - public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - - checkModificationState(); - - LOG.debug("Tx {} write {}", getIdentifier(), path); - - throttleOperation(); + public void delete(final YangInstanceIdentifier path) { + checkModificationState("delete", path); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.writeData(path, data); - } - }); + executeModification(new DeleteOperation(path)); } @Override public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + checkModificationState("merge", path); - checkModificationState(); - - LOG.debug("Tx {} merge {}", getIdentifier(), path); - - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.mergeData(path, data); - } - }); + executeModification(new MergeOperation(path, data)); } @Override - public void delete(final YangInstanceIdentifier path) { - - checkModificationState(); + public void write(final YangInstanceIdentifier path, final NormalizedNode data) { + checkModificationState("write", path); - LOG.debug("Tx {} delete {}", getIdentifier(), path); + executeModification(new WriteOperation(path, data)); + } - throttleOperation(); + private void executeModification(final TransactionModificationOperation operation) { + getContextWrapper(operation.path()).maybeExecuteTransactionOperation(operation); + } - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.deleteData(path); - } - }); + private void checkModificationState(final String opName, final YangInstanceIdentifier path) { + checkState(type != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed"); + checkState(state == TransactionState.OPEN, "Transaction is sealed - further modifications are not allowed"); + LOG.trace("Tx {} {} {}", getIdentifier(), opName, path); } private boolean seal(final TransactionState newState) { if (state == TransactionState.OPEN) { state = newState; return true; - } else { - return false; } + return false; } @Override - public AbstractThreePhaseCommitCohort ready() { - Preconditions.checkState(transactionType != TransactionType.READ_ONLY, - "Read-only transactions cannot be readied"); + public final void close() { + if (!seal(TransactionState.CLOSED)) { + checkState(state == TransactionState.CLOSED, "Transaction %s is ready, it cannot be closed", + getIdentifier()); + // Idempotent no-op as per AutoCloseable recommendation + return; + } - final boolean success = seal(TransactionState.READY); - Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state); + for (TransactionContextWrapper contextWrapper : txContextWrappers.values()) { + contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { + @Override + public void invoke(final TransactionContext transactionContext, final Boolean havePermit) { + transactionContext.closeTransaction(); + } + }); + } - LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(), - txFutureCallbackMap.size()); - if (txFutureCallbackMap.isEmpty()) { - TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext); - return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; + txContextWrappers.clear(); + } + + @Override + public final AbstractThreePhaseCommitCohort ready() { + checkState(type != TransactionType.READ_ONLY, "Read-only transactions cannot be readied"); + + final boolean success = seal(TransactionState.READY); + checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state); + + LOG.debug("Tx {} Readying {} components for commit", getIdentifier(), txContextWrappers.size()); + + final AbstractThreePhaseCommitCohort ret; + switch (txContextWrappers.size()) { + case 0: + ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; + break; + case 1: + final Entry e = Iterables.getOnlyElement( + txContextWrappers.entrySet()); + ret = createSingleCommitCohort(e.getKey(), e.getValue()); + break; + default: + ret = createMultiCommitCohort(); } - throttleOperation(txFutureCallbackMap.size()); + txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures()); - final boolean isSingleShard = txFutureCallbackMap.size() == 1; - return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort(); + final Throwable debugContext = getDebugContext(); + return debugContext == null ? ret : new DebugThreePhaseCommitCohort(getIdentifier(), ret, debugContext); } @SuppressWarnings({ "rawtypes", "unchecked" }) - private AbstractThreePhaseCommitCohort createSingleCommitCohort() { - TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next(); + private AbstractThreePhaseCommitCohort createSingleCommitCohort(final String shardName, + final TransactionContextWrapper contextWrapper) { - LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), - txFutureCallback.getShardName(), transactionChainId); + LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName); final OperationCallback.Reference operationCallbackRef = new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK); - final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + + final TransactionContext transactionContext = contextWrapper.getTransactionContext(); final Future future; - if (transactionContext != null) { - // avoid the creation of a promise and a TransactionOperation - future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef); - } else { + if (transactionContext == null) { final Promise promise = akka.dispatch.Futures.promise(); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { @Override - public void invoke(TransactionContext transactionContext) { - promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef)); + public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) { + promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef, + havePermit)); } }); future = promise.future(); + } else { + // avoid the creation of a promise and a TransactionOperation + future = getDirectCommitFuture(transactionContext, operationCallbackRef, null); } - return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef); + return new SingleCommitCohortProxy(txContextFactory.getActorUtils(), future, getIdentifier(), + operationCallbackRef); } - private Future getReadyOrDirectCommitFuture(TransactionContext transactionContext, - OperationCallback.Reference operationCallbackRef) { - if(transactionContext.supportsDirectCommit()) { - TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext); - operationCallbackRef.set(rateLimitingCallback); - rateLimitingCallback.run(); - return transactionContext.directCommit(); - } else { - return transactionContext.readyTransaction(); - } + private Future getDirectCommitFuture(final TransactionContext transactionContext, + final OperationCallback.Reference operationCallbackRef, final Boolean havePermit) { + TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback( + txContextFactory.getActorUtils()); + operationCallbackRef.set(rateLimitingCallback); + rateLimitingCallback.run(); + return transactionContext.directCommit(havePermit); } private AbstractThreePhaseCommitCohort createMultiCommitCohort() { - List> cohortFutures = new ArrayList<>(txFutureCallbackMap.size()); - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - - LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), - txFutureCallback.getShardName(), transactionChainId); - - final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - final Future future; - if (transactionContext != null) { - // avoid the creation of a promise and a TransactionOperation - future = transactionContext.readyTransaction(); - } else { - final Promise promise = akka.dispatch.Futures.promise(); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - promise.completeWith(transactionContext.readyTransaction()); - } - }); - future = promise.future(); - } - cohortFutures.add(future); - } - - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString()); - } - - @Override - public void close() { - if (!seal(TransactionState.CLOSED)) { - if (state == TransactionState.CLOSED) { - // Idempotent no-op as per AutoCloseable recommendation - return; - } - - throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed", - getIdentifier())); - } + final List cohorts = new ArrayList<>(txContextWrappers.size()); + final Optional> shardNames = Optional.of(new TreeSet<>(txContextWrappers.keySet())); + for (Entry e : txContextWrappers.entrySet()) { + LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); - for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.closeTransaction(); - } - }); - } + final TransactionContextWrapper wrapper = e.getValue(); - txFutureCallbackMap.clear(); - - if(remoteTransactionActorsMB != null) { - remoteTransactionActors.clear(); - remoteTransactionActorsMB.set(true); + // The remote tx version is obtained the via TransactionContext which may not be available yet so + // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the + // TransactionContext is available. + cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(shardNames), + () -> wrapper.getTransactionContext().getTransactionVersion())); } - } - - private String shardNameFromIdentifier(YangInstanceIdentifier path){ - return ShardStrategyFactory.getStrategy(path).findShard(path); - } - protected Future sendFindPrimaryShardAsync(String shardName) { - return actorContext.findPrimaryShardAsync(shardName); + return new ThreePhaseCommitCohortProxy(txContextFactory.getActorUtils(), cohorts, getIdentifier()); } - final TransactionType getTransactionType() { - return transactionType; + private String shardNameFromIdentifier(final YangInstanceIdentifier path) { + return txContextFactory.getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path); } - final Semaphore getOperationLimiter() { - return operationLimiter; + private TransactionContextWrapper getContextWrapper(final YangInstanceIdentifier path) { + return getContextWrapper(shardNameFromIdentifier(path)); } - private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) { - String shardName = shardNameFromIdentifier(path); - return getOrCreateTxFutureCallback(shardName); - } - - private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) { - TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName); - if(txFutureCallback == null) { - Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); - - final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName); - - txFutureCallback = newTxFutureCallback; - txFutureCallbackMap.put(shardName, txFutureCallback); - - findPrimaryFuture.onComplete(new OnComplete() { - @Override - public void onComplete(Throwable failure, ActorSelection primaryShard) { - if(failure != null) { - newTxFutureCallback.createTransactionContext(failure, null); - } else { - newTxFutureCallback.setPrimaryShard(primaryShard); - } - } - }, actorContext.getClientDispatcher()); + private TransactionContextWrapper getContextWrapper(final String shardName) { + final TransactionContextWrapper existing = txContextWrappers.get(shardName); + if (existing != null) { + return existing; } - return txFutureCallback; + final TransactionContextWrapper fresh = txContextFactory.newTransactionContextWrapper(this, shardName); + txContextWrappers.put(shardName, fresh); + return fresh; } - String getTransactionChainId() { - return transactionChainId; + TransactionType getType() { + return type; } - protected ActorContext getActorContext() { - return actorContext; + boolean isReady() { + return state != TransactionState.OPEN; } - TransactionContext createValidTransactionContext(ActorSelection transactionActor, - String transactionPath, short remoteTransactionVersion) { - - if (transactionType == TransactionType.READ_ONLY) { - // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference - // to close the remote Tx's when this instance is no longer in use and is garbage - // collected. - - if(remoteTransactionActorsMB == null) { - remoteTransactionActors = Lists.newArrayList(); - remoteTransactionActorsMB = new AtomicBoolean(); - - TransactionProxyCleanupPhantomReference.track(TransactionProxy.this); - } - - // Add the actor to the remoteTransactionActors list for access by the - // cleanup PhantonReference. - remoteTransactionActors.add(transactionActor); - - // Write to the memory barrier volatile to publish the above update to the - // remoteTransactionActors list for thread visibility. - remoteTransactionActorsMB.set(true); - } - - // TxActor is always created where the leader of the shard is. - // Check if TxActor is created in the same node - boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - - if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { - return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(), - transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, - operationCompleter); - } else { - return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId, - actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); - } + ActorUtils getActorUtils() { + return txContextFactory.getActorUtils(); } }