import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
private final AbstractTransactionContextFactory<?> txContextFactory;
private final TransactionType type;
private TransactionState state = TransactionState.OPEN;
- private volatile OperationCompleter operationCompleter;
- private volatile Semaphore operationLimiter;
@VisibleForTesting
public TransactionProxy(final AbstractTransactionContextFactory<?> txContextFactory, final TransactionType type) {
- super(txContextFactory.nextIdentifier(), false);
+ super(txContextFactory.nextIdentifier(), txContextFactory.getActorContext().getDatastoreContext()
+ .isTransactionDebugContextEnabled());
this.txContextFactory = txContextFactory;
this.type = Preconditions.checkNotNull(type);
LOG.debug("Tx {} exists {}", getIdentifier(), path);
- throttleOperation();
-
final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
if (YangInstanceIdentifier.EMPTY.equals(path)) {
return readAllData();
} else {
- throttleOperation();
-
return singleShardRead(shardNameFromIdentifier(path), path);
}
}
LOG.debug("Tx {} delete {}", getIdentifier(), path);
- throttleOperation();
-
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
LOG.debug("Tx {} merge {}", getIdentifier(), path);
- throttleOperation();
-
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
LOG.debug("Tx {} write {}", getIdentifier(), path);
- throttleOperation();
-
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
}
txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures());
- return ret;
+
+ final Throwable debugContext = getDebugContext();
+ return debugContext == null ? ret : new DebugThreePhaseCommitCohort(getIdentifier(), ret, debugContext);
}
private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
final TransactionContextWrapper contextAdapter) {
- throttleOperation();
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
- throttleOperation();
final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
- TransactionContextWrapper contextAdapter = e.getValue();
- final TransactionContext transactionContext = contextAdapter.getTransactionContext();
- Future<ActorSelection> future;
- if (transactionContext != null) {
- // avoid the creation of a promise and a TransactionOperation
- future = transactionContext.readyTransaction();
- } else {
- final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
- contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(TransactionContext transactionContext) {
- promise.completeWith(transactionContext.readyTransaction());
- }
- });
-
- future = promise.future();
- }
-
- cohortFutures.add(future);
+ cohortFutures.add(e.getValue().readyTransaction());
}
return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString());
ActorContext getActorContext() {
return txContextFactory.getActorContext();
}
-
- OperationCompleter getCompleter() {
- OperationCompleter ret = operationCompleter;
- if (ret == null) {
- final Semaphore s = getLimiter();
- ret = new OperationCompleter(s);
- operationCompleter = ret;
- }
-
- return ret;
- }
-
- Semaphore getLimiter() {
- Semaphore ret = operationLimiter;
- if (ret == null) {
- // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
- ret = new Semaphore(getActorContext().getTransactionOutstandingOperationLimit());
- operationLimiter = ret;
- }
- return ret;
- }
-
- void throttleOperation() {
- throttleOperation(1);
- }
-
- private void throttleOperation(int acquirePermits) {
- try {
- if (!getLimiter().tryAcquire(acquirePermits,
- getActorContext().getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
- LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
- }
- } catch (InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier(), e);
- } else {
- LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
- }
- }
- }
}