This patch straightens out where exactly limiting is done.
A TransactionProxy creates a TransactionContext for every shard on
which a transaction needs to be done. There are 3 types of TransactionContexts.
NoOpTransactionContext, LocalTransactionContext and RemoteTransactionContext.
When a operation is done on TransactionProxy it does not know which of these
TransactionContexts it should create so it first createas a TransactionContextWrapper.
All operations on TransactionProxy are then queued up in the TransactionContextWrapper
till we determine which TransactionContext to create. This patch creates an
OperationLimiter per TransactionContextWrapper. Everytime an operation is enqueued
we acquire a permit.
When the TransactionContext is finally created we do different things depending on
the TransactionContext. For NoOp and Local TransactionContexts we completely ignore
the limiter - that is for these TransactionContexts there is no limiting done. For
RemoteTransactionContext we do limiting. RemoteTransactionContext does not acquire
Operation permits till it is made visible by the TransactionContextWrapper - this is
signaled be the setting of the handOffComplete flag in AbstractTransactionContext.
After that RemoteTransactionContext takes over the business of acquiring permits.
OperationLimiter which also serves as the Operation completion handler is the only
component that releases the permits.
Another thing which this patch addresses is which configuration option we use for
operation limiting. We now use ShardBatchedModificationCount instead of the mailbox
limit from akka.conf. This removes the possibility of mis-configuration where
making ShardedBatchedModificationCount higher than mailbox limit could cause
unexpected blocking.
Change-Id: I571ba5278630e5166be6bcb3ff8e1c527c5e3343
Signed-off-by: Moiz Raja <moraja@cisco.com>
# The number of transaction modification operations (put, merge, delete) to batch before sending to the
# shard transaction actor. Batching improves performance as less modifications messages are sent to the
# actor and thus lessens the chance that the transaction actor's mailbox queue could get full.
-#shard-batched-modification-count=100
+#shard-batched-modification-count=1000
# The maximum amount of time for akka operations (remote or local) to complete before failing.
#operation-timeout-in-seconds=5
*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.base.Preconditions;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.slf4j.Logger;
abstract class AbstractTransactionContext implements TransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContext.class);
- private final OperationLimiter limiter;
+ private final TransactionIdentifier transactionIdentifier;
private long modificationCount = 0;
- private boolean handoffComplete;
+ private boolean handOffComplete;
- protected AbstractTransactionContext(final OperationLimiter limiter) {
- this.limiter = Preconditions.checkNotNull(limiter);
+ protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier) {
+ this.transactionIdentifier = transactionIdentifier;
}
/**
* @return Transaction identifier.
*/
@Nonnull protected final TransactionIdentifier getIdentifier() {
- return limiter.getIdentifier();
- }
-
- /**
- * Return the operation limiter associated with this context.
- * @return Operation limiter.
- */
- @Nonnull protected final OperationLimiter getLimiter() {
- return limiter;
- }
-
- /**
- * Indicate whether all operations have been handed off by the {@link TransactionContextWrapper}.
- *
- * @return True if this context is responsible for throttling.
- */
- protected final boolean isOperationHandoffComplete() {
- return handoffComplete;
- }
-
- /**
- * Acquire operation from the limiter if the handoff has completed. If
- * the handoff is still ongoing, this method does nothing.
- */
- protected final void acquireOperation() {
- if (handoffComplete) {
- limiter.acquire();
- }
- }
-
- /**
- * Acquire operation from the limiter if the handoff has NOT completed. If
- * the handoff has completed, this method does nothing.
- */
- protected final void releaseOperation() {
- if (!handoffComplete) {
- limiter.release();
- }
+ return transactionIdentifier;
}
protected final void incrementModificationCount() {
}
@Override
- public final void operationHandoffComplete() {
- handoffComplete = true;
+ public final void operationHandOffComplete() {
+ handOffComplete = true;
+ }
+
+ protected boolean isOperationHandOffComplete(){
+ return handOffComplete;
+ }
+
+ @Override
+ public boolean usesOperationLimiting() {
+ return false;
}
}
LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
- parent.getLimiter()));
+ parent.getIdentifier()));
}
final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
- final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getLimiter());
+ final TransactionContextWrapper transactionContextAdapter =
+ new TransactionContextWrapper(parent.getIdentifier(), actorContext);
Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
if(findPrimaryFuture.isCompleted()) {
*/
protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
- private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
+ private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
+ final TransactionProxy parent) {
+
switch(parent.getType()) {
case READ_ONLY:
final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
- return new LocalTransactionContext(readOnly, parent.getLimiter()) {
+ return new LocalTransactionContext(readOnly, parent.getIdentifier()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
throw new UnsupportedOperationException();
};
case READ_WRITE:
final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
- return new LocalTransactionContext(readWrite, parent.getLimiter()) {
+ return new LocalTransactionContext(readWrite, parent.getIdentifier()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWrite;
};
case WRITE_ONLY:
final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
- return new LocalTransactionContext(writeOnly, parent.getLimiter()) {
+ return new LocalTransactionContext(writeOnly, parent.getIdentifier()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return writeOnly;
public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
- public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 100;
+ public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
private static Set<String> globalDatastoreTypes = Sets.newConcurrentHashSet();
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
abstract class LocalTransactionContext extends AbstractTransactionContext {
private final DOMStoreTransaction txDelegate;
- LocalTransactionContext(DOMStoreTransaction txDelegate, OperationLimiter limiter) {
- super(limiter);
+ LocalTransactionContext(DOMStoreTransaction txDelegate, TransactionIdentifier identifier) {
+ super(identifier);
this.txDelegate = Preconditions.checkNotNull(txDelegate);
}
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
getWriteDelegate().write(path, data);
- releaseOperation();
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
getWriteDelegate().merge(path, data);
- releaseOperation();
}
@Override
public void deleteData(YangInstanceIdentifier path) {
incrementModificationCount();
getWriteDelegate().delete(path);
- releaseOperation();
}
@Override
@Override
public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
proxyFuture.set(result);
- releaseOperation();
}
@Override
public void onFailure(final Throwable t) {
proxyFuture.setException(t);
- releaseOperation();
}
});
}
@Override
public void onSuccess(final Boolean result) {
proxyFuture.set(result);
- releaseOperation();
}
@Override
public void onFailure(final Throwable t) {
proxyFuture.setException(t);
- releaseOperation();
}
});
}
private LocalThreePhaseCommitCohort ready() {
logModificationCount();
- acquireOperation();
return (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private <T extends Future> T completeOperation(final ActorContext actorContext, final T operationFuture) {
- operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
- return operationFuture;
- }
-
@Override
public Future<ActorSelection> readyTransaction() {
final LocalThreePhaseCommitCohort cohort = ready();
- return completeOperation(cohort.getActorContext(), cohort.initiateCoordinatedCommit());
+ return cohort.initiateCoordinatedCommit();
}
@Override
public Future<Object> directCommit() {
final LocalThreePhaseCommitCohort cohort = ready();
- return completeOperation(cohort.getActorContext(), cohort.initiateDirectCommit());
+ return cohort.initiateDirectCommit();
}
@Override
@Override
public void closeTransaction() {
txDelegate.close();
- releaseOperation();
}
}
import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private final Throwable failure;
- public NoOpTransactionContext(Throwable failure, OperationLimiter limiter) {
- super(limiter);
+ public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier) {
+ super(identifier);
this.failure = failure;
}
@Override
public Future<Object> directCommit() {
LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
- releaseOperation();
return akka.dispatch.Futures.failed(failure);
}
@Override
public Future<ActorSelection> readyTransaction() {
LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
- releaseOperation();
return akka.dispatch.Futures.failed(failure);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
- releaseOperation();
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
- releaseOperation();
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
- releaseOperation();
}
@Override
public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
- releaseOperation();
final Throwable t;
if (failure instanceof NoShardLeaderException) {
@Override
public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
- releaseOperation();
proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
}
}
private final TransactionIdentifier identifier;
private final long acquireTimeout;
private final Semaphore semaphore;
+ private final int maxPermits;
OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final int acquireTimeoutSeconds) {
this.identifier = Preconditions.checkNotNull(identifier);
this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
Preconditions.checkArgument(maxPermits >= 0);
+ this.maxPermits = maxPermits;
this.semaphore = new Semaphore(maxPermits);
}
acquire(1);
}
- private void acquire(final int acquirePermits) {
+ void acquire(final int acquirePermits) {
try {
if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
}
}
- void release() {
- this.semaphore.release();
- }
-
@Override
public void onComplete(final Throwable throwable, final Object message) {
if (message instanceof BatchedModificationsReply) {
}
@VisibleForTesting
- Semaphore getSemaphore() {
- return semaphore;
+ int availablePermits(){
+ return semaphore.availablePermits();
+ }
+
+ /**
+ * Release all the permits
+ */
+ public void releaseAll() {
+ this.semaphore.release(maxPermits-availablePermits());
}
}
import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
private final ActorSelection actor;
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
+ private final OperationLimiter limiter;
private BatchedModifications batchedModifications;
private int totalBatchedModificationsSent;
- protected RemoteTransactionContext(ActorSelection actor,
+ protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationLimiter limiter) {
- super(limiter);
+ super(identifier);
+ this.limiter = Preconditions.checkNotNull(limiter);
this.actor = actor;
this.actorContext = actorContext;
this.isTxActorLocal = isTxActorLocal;
}
private Future<Object> completeOperation(Future<Object> operationFuture){
- operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
+ operationFuture.onComplete(limiter, actorContext.getClientDispatcher());
return operationFuture;
}
future.onComplete(onComplete, actorContext.getClientDispatcher());
}
+
+ /**
+ * Acquire operation from the limiter if the hand-off has completed. If
+ * the hand-off is still ongoing, this method does nothing.
+ */
+ private final void acquireOperation() {
+ if (isOperationHandOffComplete()) {
+ limiter.acquire();
+ }
+ }
+
+ @Override
+ public boolean usesOperationLimiting() {
+ return true;
+ }
}
}
private OperationLimiter getOperationLimiter() {
- return parent.getLimiter();
+ return transactionContextAdapter.getLimiter();
}
private TransactionIdentifier getIdentifier() {
if(failure != null) {
LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
- localTransactionContext = new NoOpTransactionContext(failure, getOperationLimiter());
+ localTransactionContext = new NoOpTransactionContext(failure, getIdentifier());
} else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
localTransactionContext = createValidTransactionContext(
CreateTransactionReply.fromSerializable(response));
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Invalid reply type %s for CreateTransaction", response.getClass()));
- localTransactionContext = new NoOpTransactionContext(exception, getOperationLimiter());
+ localTransactionContext = new NoOpTransactionContext(exception, getIdentifier());
}
transactionContextAdapter.executePriorTransactionOperations(localTransactionContext);
final TransactionContext ret;
if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
- ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor,
- getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
+ ret = new PreLithiumTransactionContextImpl(transactionContextAdapter.getIdentifier(), transactionPath, transactionActor,
+ getActorContext(), isTxActorLocal, remoteTransactionVersion, transactionContextAdapter.getLimiter());
} else {
- ret = new RemoteTransactionContext(transactionActor, getActorContext(),
- isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
+ ret = new RemoteTransactionContext(transactionContextAdapter.getIdentifier(), transactionActor, getActorContext(),
+ isTxActorLocal, remoteTransactionVersion, transactionContextAdapter.getLimiter());
}
if(parent.getType() == TransactionType.READ_ONLY) {
* Implementations can rely on the wrapper calling this operation in a synchronized
* block, so they do not need to ensure visibility of this state transition themselves.
*/
- void operationHandoffComplete();
+ void operationHandOffComplete();
+
+ /**
+ * A TransactionContext that uses Operation limiting should return true else false
+ * @return
+ */
+ boolean usesOperationLimiting();
}
import java.util.List;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
@GuardedBy("queuedTxOperations")
private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
+ private final TransactionIdentifier identifier;
+
/**
* The resulting TransactionContext.
*/
private final OperationLimiter limiter;
- TransactionContextWrapper(final OperationLimiter limiter) {
- this.limiter = Preconditions.checkNotNull(limiter);
+ TransactionContextWrapper(TransactionIdentifier identifier, final ActorContext actorContext) {
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.limiter = new OperationLimiter(identifier,
+ actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, // 1 extra permit for the ready operation
+ actorContext.getDatastoreContext().getOperationTimeoutInSeconds());
}
TransactionContext getTransactionContext() {
}
TransactionIdentifier getIdentifier() {
- return limiter.getIdentifier();
+ return identifier;
}
/**
if (queuedTxOperations.isEmpty()) {
// We're done invoking the TransactionOperations so we can now publish the
// TransactionContext.
- localTransactionContext.operationHandoffComplete();
+ localTransactionContext.operationHandOffComplete();
+ if(!localTransactionContext.usesOperationLimiting()){
+ limiter.releaseAll();
+ }
transactionContext = localTransactionContext;
break;
}
return promise.future();
}
+
+ public OperationLimiter getLimiter() {
+ return limiter;
+ }
+
+
}
private final Map<String, TransactionContextWrapper> txContextAdapters = new HashMap<>();
private final AbstractTransactionContextFactory<?> txContextFactory;
- private final OperationLimiter limiter;
private final TransactionType type;
private TransactionState state = TransactionState.OPEN;
this.txContextFactory = txContextFactory;
this.type = Preconditions.checkNotNull(type);
- // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
- this.limiter = new OperationLimiter(getIdentifier(),
- getActorContext().getTransactionOutstandingOperationLimit(),
- getActorContext().getDatastoreContext().getOperationTimeoutInSeconds());
-
LOG.debug("New {} Tx - {}", type, getIdentifier());
}
ActorContext getActorContext() {
return txContextFactory.getActorContext();
}
-
- OperationLimiter getLimiter() {
- return limiter;
- }
}
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.OperationLimiter;
import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
private final String transactionPath;
- public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor,
+ public PreLithiumTransactionContextImpl(TransactionIdentifier identifier, String transactionPath, ActorSelection actor,
ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationLimiter limiter) {
- super(actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
+ super(identifier, actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
this.transactionPath = transactionPath;
}
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
private Timeout operationTimeout;
private final String selfAddressHostPort;
private TransactionRateLimiter txRateLimiter;
- private final int transactionOutstandingOperationLimit;
private Timeout transactionCommitOperationTimeout;
private Timeout shardInitializationTimeout;
private final Dispatchers dispatchers;
selfAddressHostPort = null;
}
- transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
}
private void setCachedProperties() {
return builder.toString();
}
- /**
- * Get the maximum number of operations that are to be permitted within a transaction before the transaction
- * should begin throttling the operations
- *
- * Parking reading this configuration here because we need to get to the actor system settings
- *
- * @return
- */
- public int getTransactionOutstandingOperationLimit(){
- return transactionOutstandingOperationLimit;
- }
-
/**
* This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
* us to create a timer for pretty much anything.
}
leaf shard-batched-modification-count {
- default 100;
+ default 1000;
type non-zero-uint32-type;
description "The number of transaction modification operations (put, merge, delete) to
batch before sending to the shard transaction actor. Batching improves
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
public String findShard(YangInstanceIdentifier path) {
return "junk";
}
+ }).put(
+ "cars", new ShardStrategy() {
+ @Override
+ public String findShard(YangInstanceIdentifier path) {
+ return "cars";
+ }
}).build();
}
@Override
public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
- return TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace) ?
- Optional.of("junk") : Optional.<String>absent();
+ if(TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
+ return Optional.of("junk");
+ } else if(CarsModel.BASE_QNAME.getNamespace().toASCIIString().equals(nameSpace)){
+ return Optional.of("cars");
+ }
+ return Optional.<String>absent();
}
};
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
- doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
mockComponentFactory = TransactionContextFactory.create(mockActorContext);
doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
- doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
return actorRef;
}
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter) {
+ localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter.getIdentifier()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWriteTransaction;
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
- verify(limiter).release();
verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
}
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
- verify(limiter).release();
verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
}
public void testDelete() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
localTransactionContext.deleteData(yangInstanceIdentifier);
- verify(limiter).release();
verify(readWriteTransaction).delete(yangInstanceIdentifier);
}
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.<Optional<NormalizedNode<?,?>>>create());
- verify(limiter).release();
verify(readWriteTransaction).read(yangInstanceIdentifier);
}
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture.<Boolean> create());
- verify(limiter).release();
verify(readWriteTransaction).exists(yangInstanceIdentifier);
}
doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
doReturn(mockCohort).when(readWriteTransaction).ready();
localTransactionContext.readyTransaction();
- verify(limiter).onComplete(null, null);
verify(readWriteTransaction).ready();
}
package org.opendaylight.controller.cluster.datastore;
import static org.junit.Assert.assertEquals;
-import java.util.concurrent.Semaphore;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
public void testOnComplete() throws Exception {
int permits = 10;
OperationLimiter limiter = new OperationLimiter(new TransactionIdentifier("foo", 1), permits, 1);
- Semaphore semaphore = limiter.getSemaphore();
- semaphore.acquire(permits);
+ limiter.acquire(permits);
int availablePermits = 0;
limiter.onComplete(null, DataExistsReply.create(true));
- assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+ assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
limiter.onComplete(null, DataExistsReply.create(true));
- assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+ assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
limiter.onComplete(null, new IllegalArgumentException());
- assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+ assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
limiter.onComplete(null, new BatchedModificationsReply(4));
availablePermits += 4;
- assertEquals("availablePermits", availablePermits, semaphore.availablePermits());
+ assertEquals("availablePermits", availablePermits, limiter.availablePermits());
}
+
}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+
+public class TransactionContextWrapperTest {
+
+ @Mock
+ TransactionIdentifier identifier;
+
+ @Mock
+ ActorContext actorContext;
+
+ @Mock
+ TransactionContext transactionContext;
+
+ TransactionContextWrapper transactionContextWrapper;
+
+ @Before
+ public void setUp(){
+ MockitoAnnotations.initMocks(this);
+ doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
+ transactionContextWrapper = new TransactionContextWrapper(identifier, actorContext);
+ }
+
+ @Test
+ public void testExecutePriorTransactionOperations(){
+ for(int i=0;i<100;i++) {
+ transactionContextWrapper.maybeExecuteTransactionOperation(mock(TransactionOperation.class));
+ }
+ assertEquals(901, transactionContextWrapper.getLimiter().availablePermits());
+
+ transactionContextWrapper.executePriorTransactionOperations(transactionContext);
+
+ assertEquals(1001, transactionContextWrapper.getLimiter().availablePermits());
+ }
+}
\ No newline at end of file
throttleOperation(operation, 1, true);
}
+ private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+ throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+ }
+
private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
Optional.<DataTree>absent());
}
- private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+ private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
ActorSystem actorSystem = getSystem();
ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+ // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
+ // we now allow one extra permit to be allowed for ready
+ doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
+ shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
doReturn(actorSystem.actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
if(shardFound) {
doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
+ when(mockActorContext).findPrimaryShardAsync(eq("cars"));
+
} else {
doReturn(Futures.failed(new Exception("not found")))
.when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
long end = System.nanoTime();
- long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
- expected, (end-start)), (end - start) > expected);
+ expectedCompletionTime, (end-start)),
+ ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
}
ActorSystem actorSystem = getSystem();
ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
doReturn(actorSystem.actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
ActorSystem actorSystem = getSystem();
ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
doReturn(actorSystem.actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
@Test
public void testWriteCompletionForLocalShard(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperationLocal(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteThrottlingWhenShardFound(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteThrottlingWhenShardNotFound(){
// Confirm that there is no throttling when the Shard is not found
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteCompletion(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeThrottlingWhenShardFound(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeThrottlingWhenShardNotFound(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeCompletion(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeCompletionForLocalShard(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperationLocal(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testDeleteCompletionForLocalShard(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperationLocal(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testDeleteCompletion(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testReadyThrottlingWithTwoTransactionContexts(){
-
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- transactionProxy.write(TestModel.TEST_PATH, carsNode);
+ // Trying to write to Cars will cause another transaction context to get created
+ transactionProxy.write(CarsModel.BASE_PATH, carsNode);
+ // Now ready should block for both transaction contexts
transactionProxy.ready();
}
- }, 2, true);
+ }, 1, true, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()) * 2);
}
private void testModificationOperationBatching(TransactionType type) throws Exception {
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
- doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);