The limiter tracks the number of operations invoked on the shard leader,
which does not correspond to the number of operations executed on the
frontend.
The appropriate entity to decide what is throttled how is the
TransactionContext, which unfortunately may not exist. We will solve
this problem by making TransactionContextWrapper perform pessimistic
limiting as long as the context does not exist. Once the context is
materialized, the outstanding operation queue is handed off to it and
the context becomes the entity managing the limits.
This patch has the side-effect that committing a transaction requires
number of permits equal to the number of shards it touches. It also
ensures that readAll() is properly throttled.
Change-Id: If91816d806bbb3895592e1f42b0b8e389443d0f7
Signed-off-by: Robert Varga <rovarga@cisco.com>
(cherry picked from commit
9e7a9b3725ad25f9adf85f0ad796b7cf748795a4)
12 files changed:
*/
package org.opendaylight.controller.cluster.datastore;
*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class AbstractTransactionContext implements TransactionContext {
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class AbstractTransactionContext implements TransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContext.class);
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContext.class);
+ private final OperationLimiter limiter;
private long modificationCount = 0;
private long modificationCount = 0;
+ private boolean handoffComplete;
+
+ protected AbstractTransactionContext(final OperationLimiter limiter) {
+ this.limiter = Preconditions.checkNotNull(limiter);
+ }
- private final TransactionIdentifier identifier;
+ /**
+ * Get the transaction identifier associated with this context.
+ *
+ * @return Transaction identifier.
+ */
+ @Nonnull protected final TransactionIdentifier getIdentifier() {
+ return limiter.getIdentifier();
+ }
- protected AbstractTransactionContext(TransactionIdentifier identifier) {
- this.identifier = identifier;
+ /**
+ * Return the operation limiter associated with this context.
+ * @return Operation limiter.
+ */
+ @Nonnull protected final OperationLimiter getLimiter() {
+ return limiter;
- protected final TransactionIdentifier getIdentifier() {
- return identifier;
+ /**
+ * Indicate whether all operations have been handed off by the {@link TransactionContextWrapper}.
+ *
+ * @return True if this context is responsible for throttling.
+ */
+ protected final boolean isOperationHandoffComplete() {
+ return handoffComplete;
- protected void incrementModificationCount(){
+ /**
+ * Acquire operation from the limiter if the handoff has completed. If
+ * the handoff is still ongoing, this method does nothing.
+ */
+ protected final void acquireOperation() {
+ if (handoffComplete) {
+ limiter.acquire();
+ }
+ }
+
+ /**
+ * Acquire operation from the limiter if the handoff has NOT completed. If
+ * the handoff has completed, this method does nothing.
+ */
+ protected final void releaseOperation() {
+ if (!handoffComplete) {
+ limiter.release();
+ }
+ }
+
+ protected final void incrementModificationCount() {
- protected void logModificationCount(){
- LOG.debug("Total modifications on Tx {} = [ {} ]", identifier, modificationCount);
+ protected final void logModificationCount() {
+ LOG.debug("Total modifications on Tx {} = [ {} ]", getIdentifier(), modificationCount);
+ }
+
+ @Override
+ public final void operationHandoffComplete() {
+ handoffComplete = true;
-}
\ No newline at end of file
LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
- parent.getIdentifier(), parent.getLimiter()));
}
final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
}
final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
- final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getIdentifier());
+ final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getLimiter());
Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
if(findPrimaryFuture.isCompleted()) {
Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
if(findPrimaryFuture.isCompleted()) {
switch(parent.getType()) {
case READ_ONLY:
final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
switch(parent.getType()) {
case READ_ONLY:
final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
- return new LocalTransactionContext(parent.getIdentifier(), readOnly, parent.getLimiter()) {
+ return new LocalTransactionContext(readOnly, parent.getLimiter()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
throw new UnsupportedOperationException();
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
throw new UnsupportedOperationException();
};
case READ_WRITE:
final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
};
case READ_WRITE:
final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
- return new LocalTransactionContext(parent.getIdentifier(), readWrite, parent.getLimiter()) {
+ return new LocalTransactionContext(readWrite, parent.getLimiter()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWrite;
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWrite;
};
case WRITE_ONLY:
final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
};
case WRITE_ONLY:
final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
- return new LocalTransactionContext(parent.getIdentifier(), writeOnly, parent.getLimiter()) {
+ return new LocalTransactionContext(writeOnly, parent.getLimiter()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return writeOnly;
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return writeOnly;
import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout());
}
return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout());
}
+ /**
+ * Return the {@link ActorContext} associated with this object.
+ *
+ * @return An actor context instance.
+ */
+ @Nonnull ActorContext getActorContext() {
+ return actorContext;
+ }
+
Future<ActorSelection> initiateCoordinatedCommit() {
final Future<Object> messageFuture = initiateCommit(false);
final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext,
Future<ActorSelection> initiateCoordinatedCommit() {
final Future<Object> messageFuture = initiateCommit(false);
final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext,
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
* @author Thomas Pantelis
*/
abstract class LocalTransactionContext extends AbstractTransactionContext {
* @author Thomas Pantelis
*/
abstract class LocalTransactionContext extends AbstractTransactionContext {
private final DOMStoreTransaction txDelegate;
private final DOMStoreTransaction txDelegate;
- private final OperationLimiter limiter;
- LocalTransactionContext(TransactionIdentifier identifier, DOMStoreTransaction txDelegate, OperationLimiter limiter) {
- super(identifier);
+ LocalTransactionContext(DOMStoreTransaction txDelegate, OperationLimiter limiter) {
+ super(limiter);
this.txDelegate = Preconditions.checkNotNull(txDelegate);
this.txDelegate = Preconditions.checkNotNull(txDelegate);
- this.limiter = Preconditions.checkNotNull(limiter);
}
protected abstract DOMStoreWriteTransaction getWriteDelegate();
}
protected abstract DOMStoreWriteTransaction getWriteDelegate();
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
getWriteDelegate().write(path, data);
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
getWriteDelegate().write(path, data);
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
getWriteDelegate().merge(path, data);
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
getWriteDelegate().merge(path, data);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
incrementModificationCount();
getWriteDelegate().delete(path);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
incrementModificationCount();
getWriteDelegate().delete(path);
}
@Override
public void readData(YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
Futures.addCallback(getReadDelegate().read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
@Override
}
@Override
public void readData(YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
Futures.addCallback(getReadDelegate().read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
@Override
- public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+ public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
- public void onFailure(Throwable t) {
+ public void onFailure(final Throwable t) {
proxyFuture.setException(t);
proxyFuture.setException(t);
public void dataExists(YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
Futures.addCallback(getReadDelegate().exists(path), new FutureCallback<Boolean>() {
@Override
public void dataExists(YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
Futures.addCallback(getReadDelegate().exists(path), new FutureCallback<Boolean>() {
@Override
- public void onSuccess(Boolean result) {
+ public void onSuccess(final Boolean result) {
- public void onFailure(Throwable t) {
+ public void onFailure(final Throwable t) {
proxyFuture.setException(t);
proxyFuture.setException(t);
}
});
}
private LocalThreePhaseCommitCohort ready() {
logModificationCount();
}
});
}
private LocalThreePhaseCommitCohort ready() {
logModificationCount();
- LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
- limiter.release();
- return ready;
+ acquireOperation();
+ return (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private <T extends Future> T completeOperation(final ActorContext actorContext, final T operationFuture) {
+ operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
+ return operationFuture;
}
@Override
public Future<ActorSelection> readyTransaction() {
}
@Override
public Future<ActorSelection> readyTransaction() {
- return ready().initiateCoordinatedCommit();
+ final LocalThreePhaseCommitCohort cohort = ready();
+ return completeOperation(cohort.getActorContext(), cohort.initiateCoordinatedCommit());
}
@Override
public Future<Object> directCommit() {
}
@Override
public Future<Object> directCommit() {
- return ready().initiateDirectCommit();
+ final LocalThreePhaseCommitCohort cohort = ready();
+ return completeOperation(cohort.getActorContext(), cohort.initiateDirectCommit());
@Override
public void closeTransaction() {
txDelegate.close();
@Override
public void closeTransaction() {
txDelegate.close();
import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
private final Throwable failure;
private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
private final Throwable failure;
- private final OperationLimiter operationLimiter;
- public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, OperationLimiter operationLimiter) {
- super(identifier);
+ public NoOpTransactionContext(Throwable failure, OperationLimiter limiter) {
+ super(limiter);
- this.operationLimiter = operationLimiter;
@Override
public Future<Object> directCommit() {
LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
@Override
public Future<Object> directCommit() {
LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
- operationLimiter.release();
return akka.dispatch.Futures.failed(failure);
}
@Override
public Future<ActorSelection> readyTransaction() {
LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
return akka.dispatch.Futures.failed(failure);
}
@Override
public Future<ActorSelection> readyTransaction() {
LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
- operationLimiter.release();
return akka.dispatch.Futures.failed(failure);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
return akka.dispatch.Futures.failed(failure);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
- operationLimiter.release();
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
- operationLimiter.release();
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
- operationLimiter.release();
}
@Override
public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
}
@Override
public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
- operationLimiter.release();
- Throwable t;
- if(failure instanceof NoShardLeaderException) {
+ releaseOperation();
+
+ final Throwable t;
+ if (failure instanceof NoShardLeaderException) {
t = new DataStoreUnavailableException(failure.getMessage(), failure);
} else {
t = failure;
t = new DataStoreUnavailableException(failure.getMessage(), failure);
} else {
t = failure;
@Override
public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
@Override
public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
- operationLimiter.release();
proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
}
}
proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
}
}
import akka.dispatch.OnComplete;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
import akka.dispatch.OnComplete;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
- private final OperationLimiter operationCompleter;
private BatchedModifications batchedModifications;
private int totalBatchedModificationsSent;
private BatchedModifications batchedModifications;
private int totalBatchedModificationsSent;
- protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier,
+ protected RemoteTransactionContext(ActorSelection actor,
ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationLimiter limiter) {
ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationLimiter limiter) {
this.actor = actor;
this.actorContext = actorContext;
this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
this.actor = actor;
this.actorContext = actorContext;
this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
- this.operationCompleter = limiter;
}
private Future<Object> completeOperation(Future<Object> operationFuture){
}
private Future<Object> completeOperation(Future<Object> operationFuture){
- operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher());
+ operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
return operationFuture;
}
return operationFuture;
}
private ActorSelection getActor() {
return actor;
}
private ActorSelection getActor() {
return actor;
}
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
batchModification(new DeleteModification(path));
}
batchModification(new DeleteModification(path));
}
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
batchModification(new MergeModification(path, data));
}
batchModification(new MergeModification(path, data));
}
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
batchModification(new WriteModification(path, data));
}
batchModification(new WriteModification(path, data));
}
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {
if(failure != null) {
LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
if(failure != null) {
LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
- localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), getOperationLimiter());
+ localTransactionContext = new NoOpTransactionContext(failure, getOperationLimiter());
} else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
localTransactionContext = createValidTransactionContext(
CreateTransactionReply.fromSerializable(response));
} else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
localTransactionContext = createValidTransactionContext(
CreateTransactionReply.fromSerializable(response));
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Invalid reply type %s for CreateTransaction", response.getClass()));
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Invalid reply type %s for CreateTransaction", response.getClass()));
- localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter());
+ localTransactionContext = new NoOpTransactionContext(exception, getOperationLimiter());
}
transactionContextAdapter.executePriorTransactionOperations(localTransactionContext);
}
transactionContextAdapter.executePriorTransactionOperations(localTransactionContext);
final TransactionContext ret;
if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
final TransactionContext ret;
if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
- ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
+ ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor,
getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
} else {
getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
} else {
- ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(),
+ ret = new RemoteTransactionContext(transactionActor, getActorContext(),
isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
}
isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
}
boolean supportsDirectCommit();
Future<Object> directCommit();
boolean supportsDirectCommit();
Future<Object> directCommit();
+
+ /**
+ * Invoked by {@link TransactionContextWrapper} when it has finished handing
+ * off operations to this context. From this point on, the context is responsible
+ * for throttling operations.
+ *
+ * Implementations can rely on the wrapper calling this operation in a synchronized
+ * block, so they do not need to ensure visibility of this state transition themselves.
+ */
+ void operationHandoffComplete();
*/
package org.opendaylight.controller.cluster.datastore;
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
/**
* A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
/**
* A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
*/
private volatile TransactionContext transactionContext;
*/
private volatile TransactionContext transactionContext;
- private final TransactionIdentifier identifier;
+ private final OperationLimiter limiter;
- TransactionContextWrapper(final TransactionIdentifier identifier) {
- this.identifier = identifier;
+ TransactionContextWrapper(final OperationLimiter limiter) {
+ this.limiter = Preconditions.checkNotNull(limiter);
}
TransactionContext getTransactionContext() {
}
TransactionContext getTransactionContext() {
}
TransactionIdentifier getIdentifier() {
}
TransactionIdentifier getIdentifier() {
+ return limiter.getIdentifier();
if (invokeOperation) {
operation.invoke(transactionContext);
if (invokeOperation) {
operation.invoke(transactionContext);
+ } else {
+ limiter.acquire();
// queued (eg a put operation from a client read Future callback that is notified
// synchronously).
Collection<TransactionOperation> operationsBatch = null;
// queued (eg a put operation from a client read Future callback that is notified
// synchronously).
Collection<TransactionOperation> operationsBatch = null;
- synchronized(queuedTxOperations) {
- if(queuedTxOperations.isEmpty()) {
+ synchronized (queuedTxOperations) {
+ if (queuedTxOperations.isEmpty()) {
// We're done invoking the TransactionOperations so we can now publish the
// TransactionContext.
// We're done invoking the TransactionOperations so we can now publish the
// TransactionContext.
+ localTransactionContext.operationHandoffComplete();
transactionContext = localTransactionContext;
break;
}
transactionContext = localTransactionContext;
break;
}
// Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
// A slight down-side is that we need to re-acquire the lock below but this should
// be negligible.
// Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
// A slight down-side is that we need to re-acquire the lock below but this should
// be negligible.
- for(TransactionOperation oper: operationsBatch) {
+ for (TransactionOperation oper : operationsBatch) {
oper.invoke(localTransactionContext);
}
}
}
oper.invoke(localTransactionContext);
}
}
}
+
+ Future<ActorSelection> readyTransaction() {
+ // avoid the creation of a promise and a TransactionOperation
+ if (transactionContext != null) {
+ return transactionContext.readyTransaction();
+ }
+
+ final Promise<ActorSelection> promise = Futures.promise();
+ enqueueTransactionOperation(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ promise.completeWith(transactionContext.readyTransaction());
+ }
+ });
+
+ return promise.future();
+ }
LOG.debug("Tx {} exists {}", getIdentifier(), path);
LOG.debug("Tx {} exists {}", getIdentifier(), path);
final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
if (YangInstanceIdentifier.EMPTY.equals(path)) {
return readAllData();
} else {
if (YangInstanceIdentifier.EMPTY.equals(path)) {
return readAllData();
} else {
return singleShardRead(shardNameFromIdentifier(path), path);
}
}
return singleShardRead(shardNameFromIdentifier(path), path);
}
}
LOG.debug("Tx {} delete {}", getIdentifier(), path);
LOG.debug("Tx {} delete {}", getIdentifier(), path);
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
LOG.debug("Tx {} merge {}", getIdentifier(), path);
LOG.debug("Tx {} merge {}", getIdentifier(), path);
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
LOG.debug("Tx {} write {}", getIdentifier(), path);
LOG.debug("Tx {} write {}", getIdentifier(), path);
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
final TransactionContextWrapper contextAdapter) {
private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
final TransactionContextWrapper contextAdapter) {
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
- TransactionContextWrapper contextAdapter = e.getValue();
- final TransactionContext transactionContext = contextAdapter.getTransactionContext();
- Future<ActorSelection> future;
- if (transactionContext != null) {
- // avoid the creation of a promise and a TransactionOperation
- future = transactionContext.readyTransaction();
- } else {
- final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
- contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(TransactionContext transactionContext) {
- promise.completeWith(transactionContext.readyTransaction());
- }
- });
-
- future = promise.future();
- }
-
- cohortFutures.add(future);
+ cohortFutures.add(e.getValue().readyTransaction());
}
return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString());
}
return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString());
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.OperationLimiter;
import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.OperationLimiter;
import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
private final String transactionPath;
private final String transactionPath;
- public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
+ public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor,
ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationLimiter limiter) {
ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationLimiter limiter) {
- super(actor, identifier, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
+ super(actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
this.transactionPath = transactionPath;
}
this.transactionPath = transactionPath;
}
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import akka.dispatch.ExecutionContexts;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
LocalTransactionContext localTransactionContext;
@Before
LocalTransactionContext localTransactionContext;
@Before
MockitoAnnotations.initMocks(this);
MockitoAnnotations.initMocks(this);
- localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, limiter) {
+ localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWriteTransaction;
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWriteTransaction;
- public void testWrite(){
+ public void testWrite() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
- public void testMerge(){
+ public void testMerge() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
- public void testDelete(){
+ public void testDelete() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
localTransactionContext.deleteData(yangInstanceIdentifier);
verify(limiter).release();
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
localTransactionContext.deleteData(yangInstanceIdentifier);
verify(limiter).release();
- public void testRead(){
+ public void testRead() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
- public void testExists(){
+ public void testExists() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture.<Boolean> create());
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture.<Boolean> create());
- public void testReady(){
- doReturn(mock(LocalThreePhaseCommitCohort.class)).when(readWriteTransaction).ready();
+ public void testReady() {
+ final LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
+ final ActorContext mockContext = mock(ActorContext.class);
+ doReturn(mockContext).when(mockCohort).getActorContext();
+ doReturn(ExecutionContexts.fromExecutor(MoreExecutors.directExecutor())).when(mockContext).getClientDispatcher();
+ doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
+ doReturn(mockCohort).when(readWriteTransaction).ready();
localTransactionContext.readyTransaction();
localTransactionContext.readyTransaction();
- verify(limiter).release();
+ verify(limiter).onComplete(null, null);
verify(readWriteTransaction).ready();
}
verify(readWriteTransaction).ready();
}