import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
private final Map<String, TransactionContextWrapper> txContextAdapters = new HashMap<>();
private final AbstractTransactionContextFactory<?> txContextFactory;
+ private final OperationLimiter limiter;
private final TransactionType type;
private TransactionState state = TransactionState.OPEN;
- private volatile OperationCompleter operationCompleter;
- private volatile Semaphore operationLimiter;
@VisibleForTesting
public TransactionProxy(final AbstractTransactionContextFactory<?> txContextFactory, final TransactionType type) {
this.txContextFactory = txContextFactory;
this.type = Preconditions.checkNotNull(type);
+ // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+ this.limiter = new OperationLimiter(getIdentifier(),
+ getActorContext().getTransactionOutstandingOperationLimit(),
+ getActorContext().getDatastoreContext().getOperationTimeoutInSeconds());
+
LOG.debug("New {} Tx - {}", type, getIdentifier());
}
LOG.debug("Tx {} exists {}", getIdentifier(), path);
- throttleOperation();
+ limiter.acquire();
final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
TransactionContextWrapper contextAdapter = getContextAdapter(path);
if (YangInstanceIdentifier.EMPTY.equals(path)) {
return readAllData();
} else {
- throttleOperation();
+ limiter.acquire();
return singleShardRead(shardNameFromIdentifier(path), path);
}
LOG.debug("Tx {} delete {}", getIdentifier(), path);
- throttleOperation();
+ limiter.acquire();
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
LOG.debug("Tx {} merge {}", getIdentifier(), path);
- throttleOperation();
+ limiter.acquire();
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
LOG.debug("Tx {} write {}", getIdentifier(), path);
- throttleOperation();
+ limiter.acquire();
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
final TransactionContextWrapper contextAdapter) {
- throttleOperation();
+ limiter.acquire();
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
- throttleOperation();
+ limiter.acquire();
final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
return txContextFactory.getActorContext();
}
- OperationCompleter getCompleter() {
- OperationCompleter ret = operationCompleter;
- if (ret == null) {
- final Semaphore s = getLimiter();
- ret = new OperationCompleter(s);
- operationCompleter = ret;
- }
-
- return ret;
- }
-
- Semaphore getLimiter() {
- Semaphore ret = operationLimiter;
- if (ret == null) {
- // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
- ret = new Semaphore(getActorContext().getTransactionOutstandingOperationLimit());
- operationLimiter = ret;
- }
- return ret;
- }
-
- void throttleOperation() {
- throttleOperation(1);
- }
-
- private void throttleOperation(int acquirePermits) {
- try {
- if (!getLimiter().tryAcquire(acquirePermits,
- getActorContext().getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
- LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
- }
- } catch (InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier(), e);
- } else {
- LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
- }
- }
+ OperationLimiter getLimiter() {
+ return limiter;
}
}