X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=82258b46a44b237748342b9d43e8373cb04c0dde;hp=59c9298499c4ed0a58961b57fe40019f15214de1;hb=daaef05cbf70e6cbec9af181258faead6d9620a6;hpb=78527e81f8cc82140af5cb2649863a597f380291 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 59c9298499..82258b46a4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -5,196 +5,84 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import akka.dispatch.Mapper; -import akka.dispatch.OnComplete; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; -import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.Promise; -import scala.concurrent.duration.FiniteDuration; /** - * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard - *

- * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during - * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will - * be created on each of those shards by the TransactionProxy - *

- *

- * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various - * shards will be executed. - *

+ * A transaction potentially spanning multiple backend shards. */ public class TransactionProxy extends AbstractDOMStoreTransaction implements DOMStoreReadWriteTransaction { - - public static enum TransactionType { - READ_ONLY, - WRITE_ONLY, - READ_WRITE; - - // Cache all values - private static final TransactionType[] VALUES = values(); - - public static TransactionType fromInt(final int type) { - try { - return VALUES[type]; - } catch (IndexOutOfBoundsException e) { - throw new IllegalArgumentException("In TransactionType enum value " + type, e); - } - } - } - private static enum TransactionState { OPEN, READY, CLOSED, } - - static final Mapper SAME_FAILURE_TRANSFORMER = - new Mapper() { - @Override - public Throwable apply(Throwable failure) { - return failure; - } - }; - - private static final AtomicLong counter = new AtomicLong(); - private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class); - /** - * Time interval in between transaction create retries. - */ - private static final FiniteDuration CREATE_TX_TRY_INTERVAL = - FiniteDuration.create(1, TimeUnit.SECONDS); - - /** - * Stores the remote Tx actors for each requested data store path to be used by the - * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The - * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the - * remoteTransactionActors list so they will be visible to the thread accessing the - * PhantomReference. - */ - List remoteTransactionActors; - volatile AtomicBoolean remoteTransactionActorsMB; - - /** - * Stores the create transaction results per shard. - */ - private final Map txFutureCallbackMap = new HashMap<>(); - - private final TransactionType transactionType; - final ActorContext actorContext; - private final String transactionChainId; - private final SchemaContext schemaContext; + private final Map txContextAdapters = new HashMap<>(); + private final AbstractTransactionContextFactory txContextFactory; + private final TransactionType type; private TransactionState state = TransactionState.OPEN; - - private volatile boolean initialized; - private Semaphore operationLimiter; - private OperationCompleter operationCompleter; - - public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { - this(actorContext, transactionType, ""); - } - - public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) { - super(createIdentifier(actorContext)); - this.actorContext = Preconditions.checkNotNull(actorContext, - "actorContext should not be null"); - this.transactionType = Preconditions.checkNotNull(transactionType, - "transactionType should not be null"); - this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), - "schemaContext should not be null"); - this.transactionChainId = transactionChainId; - - LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId); - } - - private static TransactionIdentifier createIdentifier(ActorContext actorContext) { - String memberName = actorContext.getCurrentMemberName(); - if (memberName == null) { - memberName = "UNKNOWN-MEMBER"; - } - - return new TransactionIdentifier(memberName, counter.getAndIncrement()); - } - - @VisibleForTesting - List> getRecordedOperationFutures() { - List> recordedOperationFutures = Lists.newArrayList(); - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if (transactionContext != null) { - transactionContext.copyRecordedOperationFutures(recordedOperationFutures); - } - } - - return recordedOperationFutures; - } + private volatile OperationCompleter operationCompleter; + private volatile Semaphore operationLimiter; @VisibleForTesting - boolean hasTransactionContext() { - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if(transactionContext != null) { - return true; - } - } + public TransactionProxy(final AbstractTransactionContextFactory txContextFactory, final TransactionType type) { + super(txContextFactory.nextIdentifier(), false); + this.txContextFactory = txContextFactory; + this.type = Preconditions.checkNotNull(type); - return false; + LOG.debug("New {} Tx - {}", type, getIdentifier()); } @Override - public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { - - Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, - "Read operation on write-only transaction is not allowed"); + public CheckedFuture exists(final YangInstanceIdentifier path) { + Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed"); - LOG.debug("Tx {} read {}", getIdentifier(), path); + LOG.debug("Tx {} exists {}", getIdentifier(), path); throttleOperation(); - final SettableFuture>> proxyFuture = SettableFuture.create(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + final SettableFuture proxyFuture = SettableFuture.create(); + TransactionContextWrapper contextAdapter = getContextAdapter(path); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, proxyFuture); + transactionContext.dataExists(path, proxyFuture); } }); @@ -202,93 +90,86 @@ public class TransactionProxy extends AbstractDOMStoreTransaction exists(final YangInstanceIdentifier path) { - - Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, - "Exists operation on write-only transaction is not allowed"); + public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { + Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed"); - LOG.debug("Tx {} exists {}", getIdentifier(), path); + LOG.debug("Tx {} read {}", getIdentifier(), path); - throttleOperation(); + if (YangInstanceIdentifier.EMPTY.equals(path)) { + return readAllData(); + } else { + throttleOperation(); - final SettableFuture proxyFuture = SettableFuture.create(); + return singleShardRead(shardNameFromIdentifier(path), path); + } + } - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + private CheckedFuture>, ReadFailedException> singleShardRead( + final String shardName, final YangInstanceIdentifier path) { + final SettableFuture>> proxyFuture = SettableFuture.create(); + TransactionContextWrapper contextAdapter = getContextAdapter(shardName); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { - transactionContext.dataExists(path, proxyFuture); + transactionContext.readData(path, proxyFuture); } }); return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } - private void checkModificationState() { - Preconditions.checkState(transactionType != TransactionType.READ_ONLY, - "Modification operation on read-only transaction is not allowed"); - Preconditions.checkState(state == TransactionState.OPEN, - "Transaction is sealed - further modifications are not allowed"); - } - - private void throttleOperation() { - throttleOperation(1); - } + private CheckedFuture>, ReadFailedException> readAllData() { + final Set allShardNames = txContextFactory.getActorContext().getConfiguration().getAllShardNames(); + final Collection>, ReadFailedException>> futures = new ArrayList<>(allShardNames.size()); - private void throttleOperation(int acquirePermits) { - if(!initialized) { - // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem - operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit()); - operationCompleter = new OperationCompleter(operationLimiter); - - // Make sure we write this last because it's volatile and will also publish the non-volatile writes - // above as well so they'll be visible to other threads. - initialized = true; + for (String shardName : allShardNames) { + futures.add(singleShardRead(shardName, YangInstanceIdentifier.EMPTY)); } - try { - if(!operationLimiter.tryAcquire(acquirePermits, - actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ - LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); - } - } catch (InterruptedException e) { - if(LOG.isDebugEnabled()) { - LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e); - } else { - LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier()); + final ListenableFuture>>> listFuture = Futures.allAsList(futures); + final ListenableFuture>> aggregateFuture; + + aggregateFuture = Futures.transform(listFuture, new Function>>, Optional>>() { + @Override + public Optional> apply(final List>> input) { + try { + return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.EMPTY, input, txContextFactory.getActorContext().getSchemaContext()); + } catch (DataValidationFailedException e) { + throw new IllegalArgumentException("Failed to aggregate", e); + } } - } + }); + + return MappingCheckedFuture.create(aggregateFuture, ReadFailedException.MAPPER); } @Override - public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - + public void delete(final YangInstanceIdentifier path) { checkModificationState(); - LOG.debug("Tx {} write {}", getIdentifier(), path); + LOG.debug("Tx {} delete {}", getIdentifier(), path); throttleOperation(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + TransactionContextWrapper contextAdapter = getContextAdapter(path); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { - transactionContext.writeData(path, data); + transactionContext.deleteData(path); } }); } @Override public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - checkModificationState(); LOG.debug("Tx {} merge {}", getIdentifier(), path); throttleOperation(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + TransactionContextWrapper contextAdapter = getContextAdapter(path); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.mergeData(path, data); @@ -297,23 +178,29 @@ public class TransactionProxy extends AbstractDOMStoreTransaction data) { checkModificationState(); - LOG.debug("Tx {} delete {}", getIdentifier(), path); + LOG.debug("Tx {} write {}", getIdentifier(), path); throttleOperation(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + TransactionContextWrapper contextAdapter = getContextAdapter(path); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { - transactionContext.deleteData(path); + transactionContext.writeData(path, data); } }); } + private void checkModificationState() { + Preconditions.checkState(type != TransactionType.READ_ONLY, + "Modification operation on read-only transaction is not allowed"); + Preconditions.checkState(state == TransactionState.OPEN, + "Transaction is sealed - further modifications are not allowed"); + } + private boolean seal(final TransactionState newState) { if (state == TransactionState.OPEN) { state = newState; @@ -324,66 +211,16 @@ public class TransactionProxy extends AbstractDOMStoreTransaction> cohortFutures = new ArrayList<>(txFutureCallbackMap.size()); - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - - LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(), - txFutureCallback.getShardName(), transactionChainId); - - final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - final Future future; - if (transactionContext != null) { - // avoid the creation of a promise and a TransactionOperation - future = transactionContext.readyTransaction(); - } else { - final Promise promise = akka.dispatch.Futures.promise(); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - promise.completeWith(transactionContext.readyTransaction()); - } - }); - future = promise.future(); - } - - cohortFutures.add(future); - } - - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, - getIdentifier().toString()); - } - - @Override - public void close() { + public final void close() { if (!seal(TransactionState.CLOSED)) { - if (state == TransactionState.CLOSED) { - // Idempotent no-op as per AutoCloseable recommendation - return; - } - - throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed", - getIdentifier())); + Preconditions.checkState(state == TransactionState.CLOSED, "Transaction %s is ready, it cannot be closed", + getIdentifier()); + // Idempotent no-op as per AutoCloseable recommendation + return; } - for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + for (TransactionContextWrapper contextAdapter : txContextAdapters.values()) { + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.closeTransaction(); @@ -391,307 +228,178 @@ public class TransactionProxy extends AbstractDOMStoreTransaction sendFindPrimaryShardAsync(String shardName) { - return actorContext.findPrimaryShardAsync(shardName); + txContextAdapters.clear(); } - private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) { - String shardName = shardNameFromIdentifier(path); - TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName); - if(txFutureCallback == null) { - Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); - - final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName); + @Override + public final AbstractThreePhaseCommitCohort ready() { + Preconditions.checkState(type != TransactionType.READ_ONLY, "Read-only transactions cannot be readied"); - txFutureCallback = newTxFutureCallback; - txFutureCallbackMap.put(shardName, txFutureCallback); + final boolean success = seal(TransactionState.READY); + Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state); - findPrimaryFuture.onComplete(new OnComplete() { - @Override - public void onComplete(Throwable failure, ActorSelection primaryShard) { - if(failure != null) { - newTxFutureCallback.createTransactionContext(failure, null); - } else { - newTxFutureCallback.setPrimaryShard(primaryShard); - } - } - }, actorContext.getClientDispatcher()); + LOG.debug("Tx {} Readying {} components for commit", getIdentifier(), txContextAdapters.size()); + + final AbstractThreePhaseCommitCohort ret; + switch (txContextAdapters.size()) { + case 0: + TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(txContextFactory.getActorContext()); + ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; + break; + case 1: + final Entry e = Iterables.getOnlyElement(txContextAdapters.entrySet()); + ret = createSingleCommitCohort(e.getKey(), e.getValue()); + break; + default: + ret = createMultiCommitCohort(txContextAdapters.entrySet()); } - return txFutureCallback; + txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures()); + return ret; } - public String getTransactionChainId() { - return transactionChainId; - } - - protected ActorContext getActorContext() { - return actorContext; - } + private AbstractThreePhaseCommitCohort createSingleCommitCohort(final String shardName, + final TransactionContextWrapper contextAdapter) { + throttleOperation(); - /** - * Implements a Future OnComplete callback for a CreateTransaction message. This class handles - * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a - * retry task after a short delay. - *

- * The end result from a completed CreateTransaction message is a TransactionContext that is - * used to perform transaction operations. Transaction operations that occur before the - * CreateTransaction completes are cache and executed once the CreateTransaction completes, - * successfully or not. - */ - private class TransactionFutureCallback extends OnComplete { - - /** - * The list of transaction operations to execute once the CreateTransaction completes. - */ - @GuardedBy("txOperationsOnComplete") - private final List txOperationsOnComplete = Lists.newArrayList(); - - /** - * The TransactionContext resulting from the CreateTransaction reply. - */ - private volatile TransactionContext transactionContext; - - /** - * The target primary shard. - */ - private volatile ActorSelection primaryShard; - - private volatile int createTxTries = (int) (actorContext.getDatastoreContext(). - getShardLeaderElectionTimeout().duration().toMillis() / - CREATE_TX_TRY_INTERVAL.toMillis()); - - private final String shardName; - - TransactionFutureCallback(String shardName) { - this.shardName = shardName; - } + LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName); - String getShardName() { - return shardName; - } + final OperationCallback.Reference operationCallbackRef = + new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK); - TransactionContext getTransactionContext() { - return transactionContext; + final TransactionContext transactionContext = contextAdapter.getTransactionContext(); + final Future future; + if (transactionContext == null) { + final Promise promise = akka.dispatch.Futures.promise(); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef)); + } + }); + future = promise.future(); + } else { + // avoid the creation of a promise and a TransactionOperation + future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef); } + return new SingleCommitCohortProxy(txContextFactory.getActorContext(), future, getIdentifier().toString(), + operationCallbackRef); + } - /** - * Sets the target primary shard and initiates a CreateTransaction try. - */ - void setPrimaryShard(ActorSelection primaryShard) { - this.primaryShard = primaryShard; - - if(transactionType == TransactionType.WRITE_ONLY && - actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { - LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context", - getIdentifier(), primaryShard); - - // For write-only Tx's we prepare the transaction modifications directly on the shard actor - // to avoid the overhead of creating a separate transaction actor. - // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow. - executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard, - this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION)); - } else { - tryCreateTransaction(); - } + private Future getReadyOrDirectCommitFuture(TransactionContext transactionContext, + OperationCallback.Reference operationCallbackRef) { + if (transactionContext.supportsDirectCommit()) { + TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback( + txContextFactory.getActorContext()); + operationCallbackRef.set(rateLimitingCallback); + rateLimitingCallback.run(); + return transactionContext.directCommit(); + } else { + return transactionContext.readyTransaction(); } + } - /** - * Adds a TransactionOperation to be executed after the CreateTransaction completes. - */ - void addTxOperationOnComplete(TransactionOperation operation) { - boolean invokeOperation = true; - synchronized(txOperationsOnComplete) { - if(transactionContext == null) { - LOG.debug("Tx {} Adding operation on complete", getIdentifier()); - - invokeOperation = false; - txOperationsOnComplete.add(operation); - } - } - - if(invokeOperation) { - operation.invoke(transactionContext); - } - } + private AbstractThreePhaseCommitCohort createMultiCommitCohort( + final Set> txContextAdapterEntries) { - void enqueueTransactionOperation(final TransactionOperation op) { + throttleOperation(); + final List> cohortFutures = new ArrayList<>(txContextAdapterEntries.size()); + for (Entry e : txContextAdapterEntries) { + LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); + TransactionContextWrapper contextAdapter = e.getValue(); + final TransactionContext transactionContext = contextAdapter.getTransactionContext(); + Future future; if (transactionContext != null) { - op.invoke(transactionContext); + // avoid the creation of a promise and a TransactionOperation + future = transactionContext.readyTransaction(); } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - addTxOperationOnComplete(op); - } - } + final Promise promise = akka.dispatch.Futures.promise(); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + promise.completeWith(transactionContext.readyTransaction()); + } + }); - /** - * Performs a CreateTransaction try async. - */ - private void tryCreateTransaction() { - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard); + future = promise.future(); } - Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(), - TransactionProxy.this.transactionType.ordinal(), - getTransactionChainId()).toSerializable(); - - Future createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage); - - createTxFuture.onComplete(this, actorContext.getClientDispatcher()); + cohortFutures.add(future); } - @Override - public void onComplete(Throwable failure, Object response) { - if(failure instanceof NoShardLeaderException) { - // There's no leader for the shard yet - schedule and try again, unless we're out - // of retries. Note: createTxTries is volatile as it may be written by different - // threads however not concurrently, therefore decrementing it non-atomically here - // is ok. - if(--createTxTries > 0) { - LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry", - getIdentifier(), shardName); - - actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL, - new Runnable() { - @Override - public void run() { - tryCreateTransaction(); - } - }, actorContext.getClientDispatcher()); - return; - } - } - - createTransactionContext(failure, response); - } + return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString()); + } - private void createTransactionContext(Throwable failure, Object response) { - // Mainly checking for state violation here to perform a volatile read of "initialized" to - // ensure updates to operationLimter et al are visible to this thread (ie we're doing - // "piggy-back" synchronization here). - Preconditions.checkState(initialized, "Tx was not propertly initialized."); - - // Create the TransactionContext from the response or failure. Store the new - // TransactionContext locally until we've completed invoking the - // TransactionOperations. This avoids thread timing issues which could cause - // out-of-order TransactionOperations. Eg, on a modification operation, if the - // TransactionContext is non-null, then we directly call the TransactionContext. - // However, at the same time, the code may be executing the cached - // TransactionOperations. So to avoid thus timing, we don't publish the - // TransactionContext until after we've executed all cached TransactionOperations. - TransactionContext localTransactionContext; - if(failure != null) { - LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure); - - localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter); - } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) { - localTransactionContext = createValidTransactionContext( - CreateTransactionReply.fromSerializable(response)); - } else { - IllegalArgumentException exception = new IllegalArgumentException(String.format( - "Invalid reply type %s for CreateTransaction", response.getClass())); + private static String shardNameFromIdentifier(final YangInstanceIdentifier path) { + return ShardStrategyFactory.getStrategy(path).findShard(path); + } - localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), operationLimiter); - } + private TransactionContextWrapper getContextAdapter(final YangInstanceIdentifier path) { + return getContextAdapter(shardNameFromIdentifier(path)); + } - executeTxOperatonsOnComplete(localTransactionContext); + private TransactionContextWrapper getContextAdapter(final String shardName) { + final TransactionContextWrapper existing = txContextAdapters.get(shardName); + if (existing != null) { + return existing; } - private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) { - while(true) { - // Access to txOperationsOnComplete and transactionContext must be protected and atomic - // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing - // issues and ensure no TransactionOperation is missed and that they are processed - // in the order they occurred. - - // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy - // in case a TransactionOperation results in another transaction operation being - // queued (eg a put operation from a client read Future callback that is notified - // synchronously). - Collection operationsBatch = null; - synchronized(txOperationsOnComplete) { - if(txOperationsOnComplete.isEmpty()) { - // We're done invoking the TransactionOperations so we can now publish the - // TransactionContext. - transactionContext = localTransactionContext; - break; - } + final TransactionContextWrapper fresh = txContextFactory.newTransactionAdapter(this, shardName); + txContextAdapters.put(shardName, fresh); + return fresh; + } - operationsBatch = new ArrayList<>(txOperationsOnComplete); - txOperationsOnComplete.clear(); - } + TransactionType getType() { + return type; + } - // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. - // A slight down-side is that we need to re-acquire the lock below but this should - // be negligible. - for(TransactionOperation oper: operationsBatch) { - oper.invoke(localTransactionContext); - } - } - } + boolean isReady() { + return state != TransactionState.OPEN; + } - private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { - LOG.debug("Tx {} Received {}", getIdentifier(), reply); + ActorContext getActorContext() { + return txContextFactory.getActorContext(); + } - return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()), - reply.getTransactionPath(), reply.getVersion()); + OperationCompleter getCompleter() { + OperationCompleter ret = operationCompleter; + if (ret == null) { + final Semaphore s = getLimiter(); + ret = new OperationCompleter(s); + operationCompleter = ret; } - private TransactionContext createValidTransactionContext(ActorSelection transactionActor, - String transactionPath, short remoteTransactionVersion) { - - if (transactionType == TransactionType.READ_ONLY) { - // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference - // to close the remote Tx's when this instance is no longer in use and is garbage - // collected. - - if(remoteTransactionActorsMB == null) { - remoteTransactionActors = Lists.newArrayList(); - remoteTransactionActorsMB = new AtomicBoolean(); + return ret; + } - TransactionProxyCleanupPhantomReference.track(TransactionProxy.this); - } + Semaphore getLimiter() { + Semaphore ret = operationLimiter; + if (ret == null) { + // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem + ret = new Semaphore(getActorContext().getTransactionOutstandingOperationLimit()); + operationLimiter = ret; + } + return ret; + } - // Add the actor to the remoteTransactionActors list for access by the - // cleanup PhantonReference. - remoteTransactionActors.add(transactionActor); + void throttleOperation() { + throttleOperation(1); + } - // Write to the memory barrier volatile to publish the above update to the - // remoteTransactionActors list for thread visibility. - remoteTransactionActorsMB.set(true); + private void throttleOperation(int acquirePermits) { + try { + if (!getLimiter().tryAcquire(acquirePermits, + getActorContext().getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ + LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); } - - // TxActor is always created where the leader of the shard is. - // Check if TxActor is created in the same node - boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - - if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { - return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(), - transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, - operationCompleter); - } else if (transactionType == TransactionType.WRITE_ONLY && - actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) { - return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId, - actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier(), e); } else { - return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId, - actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); + LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier()); } } }