private final Map<String, TransactionContextWrapper> txContextAdapters = new HashMap<>();
private final AbstractTransactionContextFactory<?> txContextFactory;
- private final OperationLimiter limiter;
private final TransactionType type;
private TransactionState state = TransactionState.OPEN;
this.txContextFactory = txContextFactory;
this.type = Preconditions.checkNotNull(type);
- // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
- this.limiter = new OperationLimiter(getIdentifier(),
- getActorContext().getTransactionOutstandingOperationLimit(),
- getActorContext().getDatastoreContext().getOperationTimeoutInSeconds());
-
LOG.debug("New {} Tx - {}", type, getIdentifier());
}
LOG.debug("Tx {} exists {}", getIdentifier(), path);
- limiter.acquire();
-
final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
if (YangInstanceIdentifier.EMPTY.equals(path)) {
return readAllData();
} else {
- limiter.acquire();
-
return singleShardRead(shardNameFromIdentifier(path), path);
}
}
LOG.debug("Tx {} delete {}", getIdentifier(), path);
- limiter.acquire();
-
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
LOG.debug("Tx {} merge {}", getIdentifier(), path);
- limiter.acquire();
-
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
LOG.debug("Tx {} write {}", getIdentifier(), path);
- limiter.acquire();
-
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
final TransactionContextWrapper contextAdapter) {
- limiter.acquire();
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
- limiter.acquire();
final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
- TransactionContextWrapper contextAdapter = e.getValue();
- final TransactionContext transactionContext = contextAdapter.getTransactionContext();
- Future<ActorSelection> future;
- if (transactionContext != null) {
- // avoid the creation of a promise and a TransactionOperation
- future = transactionContext.readyTransaction();
- } else {
- final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
- contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(TransactionContext transactionContext) {
- promise.completeWith(transactionContext.readyTransaction());
- }
- });
-
- future = promise.future();
- }
-
- cohortFutures.add(future);
+ cohortFutures.add(e.getValue().readyTransaction());
}
return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString());
ActorContext getActorContext() {
return txContextFactory.getActorContext();
}
-
- OperationLimiter getLimiter() {
- return limiter;
- }
}