X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=b774fc47876dcf978adacb15cf4a6e9a6e0be4da;hp=361a221dd5d8589eb2c04cbdd42ce94f4737493e;hb=f7da1de9c83827bbb9c13632e5715c2e6720ae21;hpb=cac7138ddd81d1cc801d4b6cd9bc4372e0a8ddd3 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 361a221dd5..b774fc4787 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -5,343 +5,181 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import akka.dispatch.Mapper; -import akka.dispatch.OnComplete; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import com.google.common.base.Supplier; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; +import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; +import org.opendaylight.mdsal.common.api.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.Promise; /** - * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard - *

- * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during - * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will - * be created on each of those shards by the TransactionProxy - *

- *

- * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various - * shards will be executed. - *

+ * A transaction potentially spanning multiple backend shards. */ -public class TransactionProxy extends AbstractDOMStoreTransaction implements DOMStoreReadWriteTransaction { - - public static enum TransactionType { - READ_ONLY, - WRITE_ONLY, - READ_WRITE; - - // Cache all values - private static final TransactionType[] VALUES = values(); - - public static TransactionType fromInt(final int type) { - try { - return VALUES[type]; - } catch (IndexOutOfBoundsException e) { - throw new IllegalArgumentException("In TransactionType enum value " + type, e); - } - } - } - - private static enum TransactionState { +public class TransactionProxy extends AbstractDOMStoreTransaction + implements DOMStoreReadWriteTransaction { + private enum TransactionState { OPEN, READY, CLOSED, } - static final Mapper SAME_FAILURE_TRANSFORMER = - new Mapper() { - @Override - public Throwable apply(Throwable failure) { - return failure; - } - }; - - private static final AtomicLong counter = new AtomicLong(); - private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class); - /** - * Stores the remote Tx actors for each requested data store path to be used by the - * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The - * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the - * remoteTransactionActors list so they will be visible to the thread accessing the - * PhantomReference. - */ - List remoteTransactionActors; - volatile AtomicBoolean remoteTransactionActorsMB; - - /** - * Stores the create transaction results per shard. - */ - private final Map txFutureCallbackMap = new HashMap<>(); - - private final TransactionType transactionType; - final ActorContext actorContext; - private final String transactionChainId; - private final SchemaContext schemaContext; + private final Map txContextWrappers = new HashMap<>(); + private final AbstractTransactionContextFactory txContextFactory; + private final TransactionType type; private TransactionState state = TransactionState.OPEN; - private volatile boolean initialized; - private Semaphore operationLimiter; - private OperationCompleter operationCompleter; + @VisibleForTesting + public TransactionProxy(final AbstractTransactionContextFactory txContextFactory, final TransactionType type) { + super(txContextFactory.nextIdentifier(), txContextFactory.getActorContext().getDatastoreContext() + .isTransactionDebugContextEnabled()); + this.txContextFactory = txContextFactory; + this.type = Preconditions.checkNotNull(type); - public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { - this(actorContext, transactionType, ""); + LOG.debug("New {} Tx - {}", type, getIdentifier()); } - public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) { - super(createIdentifier(actorContext)); - this.actorContext = Preconditions.checkNotNull(actorContext, - "actorContext should not be null"); - this.transactionType = Preconditions.checkNotNull(transactionType, - "transactionType should not be null"); - this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), - "schemaContext should not be null"); - this.transactionChainId = transactionChainId; - - LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId); + @Override + public CheckedFuture exists(final YangInstanceIdentifier path) { + return executeRead(shardNameFromIdentifier(path), new DataExists(path, DataStoreVersions.CURRENT_VERSION)); } - private static TransactionIdentifier createIdentifier(ActorContext actorContext) { - String memberName = actorContext.getCurrentMemberName(); - if (memberName == null) { - memberName = "UNKNOWN-MEMBER"; - } + private CheckedFuture executeRead(final String shardName, + final AbstractRead readCmd) { + Preconditions.checkState(type != TransactionType.WRITE_ONLY, + "Reads from write-only transactions are not allowed"); - return new TransactionIdentifier(memberName, counter.getAndIncrement()); - } + LOG.trace("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath()); - @VisibleForTesting - boolean hasTransactionContext() { - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if(transactionContext != null) { - return true; + final SettableFuture proxyFuture = SettableFuture.create(); + TransactionContextWrapper contextWrapper = getContextWrapper(shardName); + contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { + @Override + public void invoke(final TransactionContext transactionContext) { + transactionContext.executeRead(readCmd, proxyFuture); } - } - - return false; - } + }); - private static boolean isRootPath(YangInstanceIdentifier path) { - return !path.getPathArguments().iterator().hasNext(); + return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } @Override public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { + Preconditions.checkState(type != TransactionType.WRITE_ONLY, + "Reads from write-only transactions are not allowed"); - Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, - "Read operation on write-only transaction is not allowed"); - - LOG.debug("Tx {} read {}", getIdentifier(), path); - - final SettableFuture>> proxyFuture = SettableFuture.create(); + LOG.trace("Tx {} read {}", getIdentifier(), path); - if(isRootPath(path)){ - readAllData(path, proxyFuture); + if (YangInstanceIdentifier.EMPTY.equals(path)) { + return readAllData(); } else { - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, proxyFuture); - } - }); - + return singleShardRead(shardNameFromIdentifier(path), path); } - - return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } - private void readAllData(final YangInstanceIdentifier path, - final SettableFuture>> proxyFuture) { - Set allShardNames = actorContext.getConfiguration().getAllShardNames(); - List>>> futures = new ArrayList<>(allShardNames.size()); - - for(String shardName : allShardNames){ - final SettableFuture>> subProxyFuture = SettableFuture.create(); + private CheckedFuture>, ReadFailedException> singleShardRead( + final String shardName, final YangInstanceIdentifier path) { + return executeRead(shardName, new ReadData(path, DataStoreVersions.CURRENT_VERSION)); + } - throttleOperation(); + private CheckedFuture>, ReadFailedException> readAllData() { + final Set allShardNames = txContextFactory.getActorContext().getConfiguration().getAllShardNames(); + final Collection>, ReadFailedException>> futures = + new ArrayList<>(allShardNames.size()); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, subProxyFuture); - } - }); - - futures.add(subProxyFuture); + for (String shardName : allShardNames) { + futures.add(singleShardRead(shardName, YangInstanceIdentifier.EMPTY)); } - final ListenableFuture>>> future = Futures.allAsList(futures); + final ListenableFuture>>> listFuture = Futures.allAsList(futures); + final ListenableFuture>> aggregateFuture; - future.addListener(new Runnable() { - @Override - public void run() { + aggregateFuture = Futures.transform(listFuture, + (Function>>, Optional>>) input -> { try { - proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(), - future.get(), actorContext.getSchemaContext())); - } catch (DataValidationFailedException | InterruptedException | ExecutionException e) { - proxyFuture.setException(e); + return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.EMPTY, input, + txContextFactory.getActorContext().getSchemaContext(), + txContextFactory.getActorContext().getDatastoreContext().getLogicalStoreType()); + } catch (DataValidationFailedException e) { + throw new IllegalArgumentException("Failed to aggregate", e); } - } - }, actorContext.getActorSystem().dispatcher()); - } - - @Override - public CheckedFuture exists(final YangInstanceIdentifier path) { - - Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, - "Exists operation on write-only transaction is not allowed"); - - LOG.debug("Tx {} exists {}", getIdentifier(), path); - - throttleOperation(); - - final SettableFuture proxyFuture = SettableFuture.create(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.dataExists(path, proxyFuture); - } - }); - - return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); - } - - private void checkModificationState() { - Preconditions.checkState(transactionType != TransactionType.READ_ONLY, - "Modification operation on read-only transaction is not allowed"); - Preconditions.checkState(state == TransactionState.OPEN, - "Transaction is sealed - further modifications are not allowed"); - } + }, MoreExecutors.directExecutor()); - private void throttleOperation() { - throttleOperation(1); + return MappingCheckedFuture.create(aggregateFuture, ReadFailedException.MAPPER); } - private void throttleOperation(int acquirePermits) { - if(!initialized) { - // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem - operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit()); - operationCompleter = new OperationCompleter(operationLimiter); - - // Make sure we write this last because it's volatile and will also publish the non-volatile writes - // above as well so they'll be visible to other threads. - initialized = true; - } - - try { - if(!operationLimiter.tryAcquire(acquirePermits, - actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ - LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); - } - } catch (InterruptedException e) { - if(LOG.isDebugEnabled()) { - LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e); - } else { - LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier()); - } - } + @Override + public void delete(final YangInstanceIdentifier path) { + executeModification(new DeleteModification(path)); } - final void ensureInitializied() { - Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier()); + @Override + public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + executeModification(new MergeModification(path, data)); } @Override public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - - checkModificationState(); - - LOG.debug("Tx {} write {}", getIdentifier(), path); - - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.writeData(path, data); - } - }); + executeModification(new WriteModification(path, data)); } - @Override - public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - + private void executeModification(final AbstractModification modification) { checkModificationState(); - LOG.debug("Tx {} merge {}", getIdentifier(), path); + LOG.trace("Tx {} executeModification {} {}", getIdentifier(), modification.getClass().getSimpleName(), + modification.getPath()); - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath()); + contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.mergeData(path, data); + protected void invoke(final TransactionContext transactionContext) { + transactionContext.executeModification(modification); } }); } - @Override - public void delete(final YangInstanceIdentifier path) { - - checkModificationState(); - - LOG.debug("Tx {} delete {}", getIdentifier(), path); - - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.deleteData(path); - } - }); + private void checkModificationState() { + Preconditions.checkState(type != TransactionType.READ_ONLY, + "Modification operation on read-only transaction is not allowed"); + Preconditions.checkState(state == TransactionState.OPEN, + "Transaction is sealed - further modifications are not allowed"); } private boolean seal(final TransactionState newState) { @@ -354,214 +192,142 @@ public class TransactionProxy extends AbstractDOMStoreTransaction ready() { - Preconditions.checkState(transactionType != TransactionType.READ_ONLY, - "Read-only transactions cannot be readied"); + public final void close() { + if (!seal(TransactionState.CLOSED)) { + Preconditions.checkState(state == TransactionState.CLOSED, "Transaction %s is ready, it cannot be closed", + getIdentifier()); + // Idempotent no-op as per AutoCloseable recommendation + return; + } + + for (TransactionContextWrapper contextWrapper : txContextWrappers.values()) { + contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { + @Override + public void invoke(final TransactionContext transactionContext) { + transactionContext.closeTransaction(); + } + }); + } + + + txContextWrappers.clear(); + } + + @Override + public final AbstractThreePhaseCommitCohort ready() { + Preconditions.checkState(type != TransactionType.READ_ONLY, "Read-only transactions cannot be readied"); final boolean success = seal(TransactionState.READY); Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state); - LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(), - txFutureCallbackMap.size()); - - if (txFutureCallbackMap.isEmpty()) { - TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext); - return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; + LOG.debug("Tx {} Readying {} components for commit", getIdentifier(), txContextWrappers.size()); + + final AbstractThreePhaseCommitCohort ret; + switch (txContextWrappers.size()) { + case 0: + ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; + break; + case 1: + final Entry e = Iterables.getOnlyElement( + txContextWrappers.entrySet()); + ret = createSingleCommitCohort(e.getKey(), e.getValue()); + break; + default: + ret = createMultiCommitCohort(txContextWrappers.entrySet()); } - throttleOperation(txFutureCallbackMap.size()); + txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures()); - final boolean isSingleShard = txFutureCallbackMap.size() == 1; - return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort(); + final Throwable debugContext = getDebugContext(); + return debugContext == null ? ret : new DebugThreePhaseCommitCohort(getIdentifier(), ret, debugContext); } @SuppressWarnings({ "rawtypes", "unchecked" }) - private AbstractThreePhaseCommitCohort createSingleCommitCohort() { - TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next(); + private AbstractThreePhaseCommitCohort createSingleCommitCohort(final String shardName, + final TransactionContextWrapper contextWrapper) { - LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), - txFutureCallback.getShardName(), transactionChainId); + LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName); final OperationCallback.Reference operationCallbackRef = new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK); - final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + + final TransactionContext transactionContext = contextWrapper.getTransactionContext(); final Future future; - if (transactionContext != null) { - // avoid the creation of a promise and a TransactionOperation - future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef); - } else { + if (transactionContext == null) { final Promise promise = akka.dispatch.Futures.promise(); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { @Override - public void invoke(TransactionContext transactionContext) { - promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef)); + public void invoke(final TransactionContext newTransactionContext) { + promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef)); } }); future = promise.future(); - } - - return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef); - } - - private Future getReadyOrDirectCommitFuture(TransactionContext transactionContext, - OperationCallback.Reference operationCallbackRef) { - if(transactionContext.supportsDirectCommit()) { - TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext); - operationCallbackRef.set(rateLimitingCallback); - rateLimitingCallback.run(); - return transactionContext.directCommit(); } else { - return transactionContext.readyTransaction(); + // avoid the creation of a promise and a TransactionOperation + future = getDirectCommitFuture(transactionContext, operationCallbackRef); } - } - - private AbstractThreePhaseCommitCohort createMultiCommitCohort() { - List> cohortFutures = new ArrayList<>(txFutureCallbackMap.size()); - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - - LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), - txFutureCallback.getShardName(), transactionChainId); - - final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - final Future future; - if (transactionContext != null) { - // avoid the creation of a promise and a TransactionOperation - future = transactionContext.readyTransaction(); - } else { - final Promise promise = akka.dispatch.Futures.promise(); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - promise.completeWith(transactionContext.readyTransaction()); - } - }); - future = promise.future(); - } - cohortFutures.add(future); - } + return new SingleCommitCohortProxy(txContextFactory.getActorContext(), future, getIdentifier(), + operationCallbackRef); + } - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString()); + private Future getDirectCommitFuture(final TransactionContext transactionContext, + final OperationCallback.Reference operationCallbackRef) { + TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback( + txContextFactory.getActorContext()); + operationCallbackRef.set(rateLimitingCallback); + rateLimitingCallback.run(); + return transactionContext.directCommit(); } - @Override - public void close() { - if (!seal(TransactionState.CLOSED)) { - if (state == TransactionState.CLOSED) { - // Idempotent no-op as per AutoCloseable recommendation - return; - } + private AbstractThreePhaseCommitCohort createMultiCommitCohort( + final Set> txContextWrapperEntries) { - throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed", - getIdentifier())); - } + final List cohorts = new ArrayList<>(txContextWrapperEntries.size()); + for (Entry e : txContextWrapperEntries) { + LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); - for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.closeTransaction(); - } - }); - } + final TransactionContextWrapper wrapper = e.getValue(); - txFutureCallbackMap.clear(); + // The remote tx version is obtained the via TransactionContext which may not be available yet so + // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the + // TransactionContext is available. + Supplier txVersionSupplier = () -> wrapper.getTransactionContext().getTransactionVersion(); - if(remoteTransactionActorsMB != null) { - remoteTransactionActors.clear(); - remoteTransactionActorsMB.set(true); + cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(), txVersionSupplier)); } - } - - private String shardNameFromIdentifier(YangInstanceIdentifier path){ - return ShardStrategyFactory.getStrategy(path).findShard(path); - } - - protected Future sendFindPrimaryShardAsync(String shardName) { - return actorContext.findPrimaryShardAsync(shardName); - } - final TransactionType getTransactionType() { - return transactionType; + return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohorts, getIdentifier()); } - final Semaphore getOperationLimiter() { - return operationLimiter; + private String shardNameFromIdentifier(final YangInstanceIdentifier path) { + return txContextFactory.getActorContext().getShardStrategyFactory().getStrategy(path).findShard(path); } - private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) { - String shardName = shardNameFromIdentifier(path); - return getOrCreateTxFutureCallback(shardName); + private TransactionContextWrapper getContextWrapper(final YangInstanceIdentifier path) { + return getContextWrapper(shardNameFromIdentifier(path)); } - private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) { - TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName); - if(txFutureCallback == null) { - Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); - - final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName); - - txFutureCallback = newTxFutureCallback; - txFutureCallbackMap.put(shardName, txFutureCallback); - - findPrimaryFuture.onComplete(new OnComplete() { - @Override - public void onComplete(Throwable failure, ActorSelection primaryShard) { - if(failure != null) { - newTxFutureCallback.createTransactionContext(failure, null); - } else { - newTxFutureCallback.setPrimaryShard(primaryShard); - } - } - }, actorContext.getClientDispatcher()); + private TransactionContextWrapper getContextWrapper(final String shardName) { + final TransactionContextWrapper existing = txContextWrappers.get(shardName); + if (existing != null) { + return existing; } - return txFutureCallback; + final TransactionContextWrapper fresh = txContextFactory.newTransactionContextWrapper(this, shardName); + txContextWrappers.put(shardName, fresh); + return fresh; } - String getTransactionChainId() { - return transactionChainId; + TransactionType getType() { + return type; } - protected ActorContext getActorContext() { - return actorContext; + boolean isReady() { + return state != TransactionState.OPEN; } - TransactionContext createValidTransactionContext(ActorSelection transactionActor, - String transactionPath, short remoteTransactionVersion) { - - if (transactionType == TransactionType.READ_ONLY) { - // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference - // to close the remote Tx's when this instance is no longer in use and is garbage - // collected. - - if(remoteTransactionActorsMB == null) { - remoteTransactionActors = Lists.newArrayList(); - remoteTransactionActorsMB = new AtomicBoolean(); - - TransactionProxyCleanupPhantomReference.track(TransactionProxy.this); - } - - // Add the actor to the remoteTransactionActors list for access by the - // cleanup PhantonReference. - remoteTransactionActors.add(transactionActor); - - // Write to the memory barrier volatile to publish the above update to the - // remoteTransactionActors list for thread visibility. - remoteTransactionActorsMB.set(true); - } - - // TxActor is always created where the leader of the shard is. - // Check if TxActor is created in the same node - boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - - if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { - return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(), - transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, - operationCompleter); - } else { - return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId, - actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); - } + ActorContext getActorContext() { + return txContextFactory.getActorContext(); } }