X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=82258b46a44b237748342b9d43e8373cb04c0dde;hp=e397ab501c064adf98c5ee6c2f6708f05eb6f2fe;hb=daaef05cbf70e6cbec9af181258faead6d9620a6;hpb=608760751ce7fcf4e84e86a8b33d43bc1d9984d6 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index e397ab501c..82258b46a4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -5,33 +5,28 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import akka.dispatch.Mapper; -import akka.dispatch.OnComplete; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; @@ -42,284 +37,139 @@ import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.Promise; /** - * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard - *

- * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during - * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will - * be created on each of those shards by the TransactionProxy - *

- *

- * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various - * shards will be executed. - *

+ * A transaction potentially spanning multiple backend shards. */ public class TransactionProxy extends AbstractDOMStoreTransaction implements DOMStoreReadWriteTransaction { - - public static enum TransactionType { - READ_ONLY, - WRITE_ONLY, - READ_WRITE; - - // Cache all values - private static final TransactionType[] VALUES = values(); - - public static TransactionType fromInt(final int type) { - try { - return VALUES[type]; - } catch (IndexOutOfBoundsException e) { - throw new IllegalArgumentException("In TransactionType enum value " + type, e); - } - } - } - private static enum TransactionState { OPEN, READY, CLOSED, } - - static final Mapper SAME_FAILURE_TRANSFORMER = - new Mapper() { - @Override - public Throwable apply(Throwable failure) { - return failure; - } - }; - - private static final AtomicLong counter = new AtomicLong(); - private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class); - /** - * Stores the remote Tx actors for each requested data store path to be used by the - * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The - * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the - * remoteTransactionActors list so they will be visible to the thread accessing the - * PhantomReference. - */ - List remoteTransactionActors; - volatile AtomicBoolean remoteTransactionActorsMB; - - /** - * Stores the create transaction results per shard. - */ - private final Map txFutureCallbackMap = new HashMap<>(); - - private final TransactionType transactionType; - final ActorContext actorContext; - private final String transactionChainId; - private final SchemaContext schemaContext; + private final Map txContextAdapters = new HashMap<>(); + private final AbstractTransactionContextFactory txContextFactory; + private final TransactionType type; private TransactionState state = TransactionState.OPEN; + private volatile OperationCompleter operationCompleter; + private volatile Semaphore operationLimiter; - private volatile boolean initialized; - private Semaphore operationLimiter; - private OperationCompleter operationCompleter; + @VisibleForTesting + public TransactionProxy(final AbstractTransactionContextFactory txContextFactory, final TransactionType type) { + super(txContextFactory.nextIdentifier(), false); + this.txContextFactory = txContextFactory; + this.type = Preconditions.checkNotNull(type); - public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { - this(actorContext, transactionType, ""); + LOG.debug("New {} Tx - {}", type, getIdentifier()); } - public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) { - super(createIdentifier(actorContext)); - this.actorContext = Preconditions.checkNotNull(actorContext, - "actorContext should not be null"); - this.transactionType = Preconditions.checkNotNull(transactionType, - "transactionType should not be null"); - this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(), - "schemaContext should not be null"); - this.transactionChainId = transactionChainId; - - LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId); - } + @Override + public CheckedFuture exists(final YangInstanceIdentifier path) { + Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed"); - private static TransactionIdentifier createIdentifier(ActorContext actorContext) { - String memberName = actorContext.getCurrentMemberName(); - if (memberName == null) { - memberName = "UNKNOWN-MEMBER"; - } + LOG.debug("Tx {} exists {}", getIdentifier(), path); - return new TransactionIdentifier(memberName, counter.getAndIncrement()); - } + throttleOperation(); - @VisibleForTesting - boolean hasTransactionContext() { - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - if(transactionContext != null) { - return true; + final SettableFuture proxyFuture = SettableFuture.create(); + TransactionContextWrapper contextAdapter = getContextAdapter(path); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.dataExists(path, proxyFuture); } - } - - return false; - } + }); - private static boolean isRootPath(YangInstanceIdentifier path) { - return !path.getPathArguments().iterator().hasNext(); + return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } @Override public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { - - Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, - "Read operation on write-only transaction is not allowed"); + Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed"); LOG.debug("Tx {} read {}", getIdentifier(), path); - final SettableFuture>> proxyFuture = SettableFuture.create(); - - if(isRootPath(path)){ - readAllData(path, proxyFuture); + if (YangInstanceIdentifier.EMPTY.equals(path)) { + return readAllData(); } else { throttleOperation(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, proxyFuture); - } - }); - + return singleShardRead(shardNameFromIdentifier(path), path); } - - return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } - private void readAllData(final YangInstanceIdentifier path, - final SettableFuture>> proxyFuture) { - Set allShardNames = actorContext.getConfiguration().getAllShardNames(); - List>>> futures = new ArrayList<>(allShardNames.size()); - - for(String shardName : allShardNames){ - final SettableFuture>> subProxyFuture = SettableFuture.create(); - - throttleOperation(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, subProxyFuture); - } - }); - - futures.add(subProxyFuture); - } - - final ListenableFuture>>> future = Futures.allAsList(futures); - - future.addListener(new Runnable() { - @Override - public void run() { - try { - proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(), - future.get(), actorContext.getSchemaContext())); - } catch (DataValidationFailedException | InterruptedException | ExecutionException e) { - proxyFuture.setException(e); - } - } - }, actorContext.getActorSystem().dispatcher()); - } - - @Override - public CheckedFuture exists(final YangInstanceIdentifier path) { - - Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, - "Exists operation on write-only transaction is not allowed"); - - LOG.debug("Tx {} exists {}", getIdentifier(), path); - - throttleOperation(); - - final SettableFuture proxyFuture = SettableFuture.create(); - - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + private CheckedFuture>, ReadFailedException> singleShardRead( + final String shardName, final YangInstanceIdentifier path) { + final SettableFuture>> proxyFuture = SettableFuture.create(); + TransactionContextWrapper contextAdapter = getContextAdapter(shardName); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { - transactionContext.dataExists(path, proxyFuture); + transactionContext.readData(path, proxyFuture); } }); return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); } - private void checkModificationState() { - Preconditions.checkState(transactionType != TransactionType.READ_ONLY, - "Modification operation on read-only transaction is not allowed"); - Preconditions.checkState(state == TransactionState.OPEN, - "Transaction is sealed - further modifications are not allowed"); - } - - private void throttleOperation() { - throttleOperation(1); - } - - private void throttleOperation(int acquirePermits) { - if(!initialized) { - // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem - operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit()); - operationCompleter = new OperationCompleter(operationLimiter); + private CheckedFuture>, ReadFailedException> readAllData() { + final Set allShardNames = txContextFactory.getActorContext().getConfiguration().getAllShardNames(); + final Collection>, ReadFailedException>> futures = new ArrayList<>(allShardNames.size()); - // Make sure we write this last because it's volatile and will also publish the non-volatile writes - // above as well so they'll be visible to other threads. - initialized = true; + for (String shardName : allShardNames) { + futures.add(singleShardRead(shardName, YangInstanceIdentifier.EMPTY)); } - try { - if(!operationLimiter.tryAcquire(acquirePermits, - actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ - LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); - } - } catch (InterruptedException e) { - if(LOG.isDebugEnabled()) { - LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e); - } else { - LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier()); + final ListenableFuture>>> listFuture = Futures.allAsList(futures); + final ListenableFuture>> aggregateFuture; + + aggregateFuture = Futures.transform(listFuture, new Function>>, Optional>>() { + @Override + public Optional> apply(final List>> input) { + try { + return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.EMPTY, input, txContextFactory.getActorContext().getSchemaContext()); + } catch (DataValidationFailedException e) { + throw new IllegalArgumentException("Failed to aggregate", e); + } } - } - } + }); - final void ensureInitializied() { - Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier()); + return MappingCheckedFuture.create(aggregateFuture, ReadFailedException.MAPPER); } @Override - public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - + public void delete(final YangInstanceIdentifier path) { checkModificationState(); - LOG.debug("Tx {} write {}", getIdentifier(), path); + LOG.debug("Tx {} delete {}", getIdentifier(), path); throttleOperation(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + TransactionContextWrapper contextAdapter = getContextAdapter(path); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { - transactionContext.writeData(path, data); + transactionContext.deleteData(path); } }); } @Override public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - checkModificationState(); LOG.debug("Tx {} merge {}", getIdentifier(), path); throttleOperation(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + TransactionContextWrapper contextAdapter = getContextAdapter(path); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { transactionContext.mergeData(path, data); @@ -328,23 +178,29 @@ public class TransactionProxy extends AbstractDOMStoreTransaction data) { checkModificationState(); - LOG.debug("Tx {} delete {}", getIdentifier(), path); + LOG.debug("Tx {} write {}", getIdentifier(), path); throttleOperation(); - TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + TransactionContextWrapper contextAdapter = getContextAdapter(path); + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { - transactionContext.deleteData(path); + transactionContext.writeData(path, data); } }); } + private void checkModificationState() { + Preconditions.checkState(type != TransactionType.READ_ONLY, + "Modification operation on read-only transaction is not allowed"); + Preconditions.checkState(state == TransactionState.OPEN, + "Transaction is sealed - further modifications are not allowed"); + } + private boolean seal(final TransactionState newState) { if (state == TransactionState.OPEN) { state = newState; @@ -355,59 +211,88 @@ public class TransactionProxy extends AbstractDOMStoreTransaction ready() { - Preconditions.checkState(transactionType != TransactionType.READ_ONLY, - "Read-only transactions cannot be readied"); + public final void close() { + if (!seal(TransactionState.CLOSED)) { + Preconditions.checkState(state == TransactionState.CLOSED, "Transaction %s is ready, it cannot be closed", + getIdentifier()); + // Idempotent no-op as per AutoCloseable recommendation + return; + } + + for (TransactionContextWrapper contextAdapter : txContextAdapters.values()) { + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + transactionContext.closeTransaction(); + } + }); + } + + + txContextAdapters.clear(); + } + + @Override + public final AbstractThreePhaseCommitCohort ready() { + Preconditions.checkState(type != TransactionType.READ_ONLY, "Read-only transactions cannot be readied"); final boolean success = seal(TransactionState.READY); Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state); - LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(), - txFutureCallbackMap.size()); - - if (txFutureCallbackMap.isEmpty()) { - TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext); - return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; + LOG.debug("Tx {} Readying {} components for commit", getIdentifier(), txContextAdapters.size()); + + final AbstractThreePhaseCommitCohort ret; + switch (txContextAdapters.size()) { + case 0: + TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(txContextFactory.getActorContext()); + ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE; + break; + case 1: + final Entry e = Iterables.getOnlyElement(txContextAdapters.entrySet()); + ret = createSingleCommitCohort(e.getKey(), e.getValue()); + break; + default: + ret = createMultiCommitCohort(txContextAdapters.entrySet()); } - throttleOperation(txFutureCallbackMap.size()); - - final boolean isSingleShard = txFutureCallbackMap.size() == 1; - return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort(); + txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures()); + return ret; } - @SuppressWarnings({ "rawtypes", "unchecked" }) - private AbstractThreePhaseCommitCohort createSingleCommitCohort() { - TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next(); + private AbstractThreePhaseCommitCohort createSingleCommitCohort(final String shardName, + final TransactionContextWrapper contextAdapter) { + throttleOperation(); - LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), - txFutureCallback.getShardName(), transactionChainId); + LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName); final OperationCallback.Reference operationCallbackRef = new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK); - final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + + final TransactionContext transactionContext = contextAdapter.getTransactionContext(); final Future future; - if (transactionContext != null) { - // avoid the creation of a promise and a TransactionOperation - future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef); - } else { + if (transactionContext == null) { final Promise promise = akka.dispatch.Futures.promise(); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef)); } }); future = promise.future(); + } else { + // avoid the creation of a promise and a TransactionOperation + future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef); } - return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef); + return new SingleCommitCohortProxy(txContextFactory.getActorContext(), future, getIdentifier().toString(), + operationCallbackRef); } private Future getReadyOrDirectCommitFuture(TransactionContext transactionContext, OperationCallback.Reference operationCallbackRef) { - if(transactionContext.supportsDirectCommit()) { - TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext); + if (transactionContext.supportsDirectCommit()) { + TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback( + txContextFactory.getActorContext()); operationCallbackRef.set(rateLimitingCallback); rateLimitingCallback.run(); return transactionContext.directCommit(); @@ -416,153 +301,106 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createMultiCommitCohort() { - List> cohortFutures = new ArrayList<>(txFutureCallbackMap.size()); - for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { + private AbstractThreePhaseCommitCohort createMultiCommitCohort( + final Set> txContextAdapterEntries) { - LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(), - txFutureCallback.getShardName(), transactionChainId); + throttleOperation(); + final List> cohortFutures = new ArrayList<>(txContextAdapterEntries.size()); + for (Entry e : txContextAdapterEntries) { + LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); - final TransactionContext transactionContext = txFutureCallback.getTransactionContext(); - final Future future; + TransactionContextWrapper contextAdapter = e.getValue(); + final TransactionContext transactionContext = contextAdapter.getTransactionContext(); + Future future; if (transactionContext != null) { // avoid the creation of a promise and a TransactionOperation future = transactionContext.readyTransaction(); } else { final Promise promise = akka.dispatch.Futures.promise(); - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { + contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { promise.completeWith(transactionContext.readyTransaction()); } }); + future = promise.future(); } cohortFutures.add(future); } - return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString()); + return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString()); } - @Override - public void close() { - if (!seal(TransactionState.CLOSED)) { - if (state == TransactionState.CLOSED) { - // Idempotent no-op as per AutoCloseable recommendation - return; - } - - throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed", - getIdentifier())); - } - - for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { - txFutureCallback.enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.closeTransaction(); - } - }); - } - - txFutureCallbackMap.clear(); - - if(remoteTransactionActorsMB != null) { - remoteTransactionActors.clear(); - remoteTransactionActorsMB.set(true); - } - } - - private String shardNameFromIdentifier(YangInstanceIdentifier path){ + private static String shardNameFromIdentifier(final YangInstanceIdentifier path) { return ShardStrategyFactory.getStrategy(path).findShard(path); } - protected Future sendFindPrimaryShardAsync(String shardName) { - return actorContext.findPrimaryShardAsync(shardName); + private TransactionContextWrapper getContextAdapter(final YangInstanceIdentifier path) { + return getContextAdapter(shardNameFromIdentifier(path)); } - final TransactionType getTransactionType() { - return transactionType; - } + private TransactionContextWrapper getContextAdapter(final String shardName) { + final TransactionContextWrapper existing = txContextAdapters.get(shardName); + if (existing != null) { + return existing; + } - final Semaphore getOperationLimiter() { - return operationLimiter; + final TransactionContextWrapper fresh = txContextFactory.newTransactionAdapter(this, shardName); + txContextAdapters.put(shardName, fresh); + return fresh; } - private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) { - String shardName = shardNameFromIdentifier(path); - return getOrCreateTxFutureCallback(shardName); + TransactionType getType() { + return type; } - private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) { - TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName); - if(txFutureCallback == null) { - Future findPrimaryFuture = sendFindPrimaryShardAsync(shardName); - - final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName); + boolean isReady() { + return state != TransactionState.OPEN; + } - txFutureCallback = newTxFutureCallback; - txFutureCallbackMap.put(shardName, txFutureCallback); + ActorContext getActorContext() { + return txContextFactory.getActorContext(); + } - findPrimaryFuture.onComplete(new OnComplete() { - @Override - public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) { - if(failure != null) { - newTxFutureCallback.createTransactionContext(failure, null); - } else { - newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor()); - } - } - }, actorContext.getClientDispatcher()); + OperationCompleter getCompleter() { + OperationCompleter ret = operationCompleter; + if (ret == null) { + final Semaphore s = getLimiter(); + ret = new OperationCompleter(s); + operationCompleter = ret; } - return txFutureCallback; + return ret; } - String getTransactionChainId() { - return transactionChainId; + Semaphore getLimiter() { + Semaphore ret = operationLimiter; + if (ret == null) { + // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem + ret = new Semaphore(getActorContext().getTransactionOutstandingOperationLimit()); + operationLimiter = ret; + } + return ret; } - protected ActorContext getActorContext() { - return actorContext; + void throttleOperation() { + throttleOperation(1); } - TransactionContext createValidTransactionContext(ActorSelection transactionActor, - String transactionPath, short remoteTransactionVersion) { - - if (transactionType == TransactionType.READ_ONLY) { - // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference - // to close the remote Tx's when this instance is no longer in use and is garbage - // collected. - - if(remoteTransactionActorsMB == null) { - remoteTransactionActors = Lists.newArrayList(); - remoteTransactionActorsMB = new AtomicBoolean(); - - TransactionProxyCleanupPhantomReference.track(TransactionProxy.this); + private void throttleOperation(int acquirePermits) { + try { + if (!getLimiter().tryAcquire(acquirePermits, + getActorContext().getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ + LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); + } + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier(), e); + } else { + LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier()); } - - // Add the actor to the remoteTransactionActors list for access by the - // cleanup PhantonReference. - remoteTransactionActors.add(transactionActor); - - // Write to the memory barrier volatile to publish the above update to the - // remoteTransactionActors list for thread visibility. - remoteTransactionActorsMB.set(true); - } - - // TxActor is always created where the leader of the shard is. - // Check if TxActor is created in the same node - boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); - - if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { - return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(), - transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, - operationCompleter); - } else { - return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId, - actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter); } } }