package org.opendaylight.controller.cluster.datastore;
import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.SortedSet;
-import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.slf4j.Logger;
import scala.concurrent.Promise;
/**
- * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
- * TransactionContext instance are cached until the TransactionContext instance becomes available at which
- * time they are executed.
+ * Delayed implementation of TransactionContextWrapper. Operations destined for the target
+ * TransactionContext instance are cached until the TransactionContext instance becomes
+ * available at which time they are executed.
*
* @author Thomas Pantelis
*/
-class TransactionContextWrapper {
- private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
+final class DelayedTransactionContextWrapper extends AbstractTransactionContextWrapper {
+ private static final Logger LOG = LoggerFactory.getLogger(DelayedTransactionContextWrapper.class);
/**
* The list of transaction operations to execute once the TransactionContext becomes available.
*/
@GuardedBy("queuedTxOperations")
private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
- private final TransactionIdentifier identifier;
- private final OperationLimiter limiter;
- private final String shardName;
/**
* The resulting TransactionContext.
@GuardedBy("queuedTxOperations")
private boolean pendingEnqueue;
- TransactionContextWrapper(final TransactionIdentifier identifier, final ActorUtils actorUtils,
- final String shardName) {
- this.identifier = requireNonNull(identifier);
- this.limiter = new OperationLimiter(identifier,
- // 1 extra permit for the ready operation
- actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1,
- TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis()));
- this.shardName = requireNonNull(shardName);
+ DelayedTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
+ @NonNull final ActorUtils actorUtils, @NonNull final String shardName) {
+ super(identifier, actorUtils, shardName);
}
+ @Override
TransactionContext getTransactionContext() {
return transactionContext;
}
- TransactionIdentifier getIdentifier() {
- return identifier;
+ @Override
+ void maybeExecuteTransactionOperation(final TransactionOperation op) {
+ final TransactionContext localContext = transactionContext;
+ if (localContext != null) {
+ op.invoke(localContext, null);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ enqueueTransactionOperation(op);
+ }
+ }
+
+ @Override
+ Future<ActorSelection> readyTransaction(final Optional<SortedSet<String>> participatingShardNames) {
+ // avoid the creation of a promise and a TransactionOperation
+ final TransactionContext localContext = transactionContext;
+ if (localContext != null) {
+ return localContext.readyTransaction(null, participatingShardNames);
+ }
+
+ final Promise<ActorSelection> promise = Futures.promise();
+ enqueueTransactionOperation(new TransactionOperation() {
+ @Override
+ public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+ promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
+ }
+ });
+
+ return promise.future();
}
/**
synchronized (queuedTxOperations) {
contextOnEntry = transactionContext;
if (contextOnEntry == null) {
- checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", identifier);
+ checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", getIdentifier());
pendingEnqueue = true;
}
}
TransactionContext finishHandoff = null;
try {
// Acquire the permit,
- final boolean havePermit = limiter.acquire();
+ final boolean havePermit = getLimiter().acquire();
if (!havePermit) {
- LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
- shardName);
+ LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", getIdentifier(),
+ getShardName());
}
// Ready to enqueue, take the lock again and append the operation
synchronized (queuedTxOperations) {
- LOG.debug("Tx {} Queuing TransactionOperation", identifier);
+ LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
pendingEnqueue = false;
cleanupEnqueue = false;
}
}
- void maybeExecuteTransactionOperation(final TransactionOperation op) {
- final TransactionContext localContext = transactionContext;
- if (localContext != null) {
- op.invoke(localContext, null);
- } else {
- // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
- // callback to be executed after the Tx is created.
- enqueueTransactionOperation(op);
- }
- }
-
void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
while (true) {
// Access to queuedTxOperations and transactionContext must be protected and atomic
if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
// If the context is not using limiting we need to release operations as we are queueing them, so
// user threads are not charged for them.
- limiter.release();
+ getLimiter().release();
}
oper.getKey().invoke(localTransactionContext, permit);
}
}
}
- Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames) {
- // avoid the creation of a promise and a TransactionOperation
- final TransactionContext localContext = transactionContext;
- if (localContext != null) {
- return localContext.readyTransaction(null, participatingShardNames);
- }
-
- final Promise<ActorSelection> promise = Futures.promise();
- enqueueTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
- promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
- }
- });
-
- return promise.future();
- }
-
- OperationLimiter getLimiter() {
- return limiter;
- }
}