# The number of transaction modification operations (put, merge, delete) to batch before sending to the
# shard transaction actor. Batching improves performance as less modifications messages are sent to the
# actor and thus lessens the chance that the transaction actor's mailbox queue could get full.
-#shard-batched-modification-count=100
+#shard-batched-modification-count=1000
# The maximum amount of time for akka operations (remote or local) to complete before failing.
#operation-timeout-in-seconds=5
*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.base.Preconditions;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.slf4j.Logger;
abstract class AbstractTransactionContext implements TransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContext.class);
- private final OperationLimiter limiter;
+ private final TransactionIdentifier transactionIdentifier;
private long modificationCount = 0;
- private boolean handoffComplete;
+ private boolean handOffComplete;
- protected AbstractTransactionContext(final OperationLimiter limiter) {
- this.limiter = Preconditions.checkNotNull(limiter);
+ protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier) {
+ this.transactionIdentifier = transactionIdentifier;
}
/**
* @return Transaction identifier.
*/
@Nonnull protected final TransactionIdentifier getIdentifier() {
- return limiter.getIdentifier();
- }
-
- /**
- * Return the operation limiter associated with this context.
- * @return Operation limiter.
- */
- @Nonnull protected final OperationLimiter getLimiter() {
- return limiter;
- }
-
- /**
- * Indicate whether all operations have been handed off by the {@link TransactionContextWrapper}.
- *
- * @return True if this context is responsible for throttling.
- */
- protected final boolean isOperationHandoffComplete() {
- return handoffComplete;
- }
-
- /**
- * Acquire operation from the limiter if the handoff has completed. If
- * the handoff is still ongoing, this method does nothing.
- */
- protected final void acquireOperation() {
- if (handoffComplete) {
- limiter.acquire();
- }
- }
-
- /**
- * Acquire operation from the limiter if the handoff has NOT completed. If
- * the handoff has completed, this method does nothing.
- */
- protected final void releaseOperation() {
- if (!handoffComplete) {
- limiter.release();
- }
+ return transactionIdentifier;
}
protected final void incrementModificationCount() {
}
@Override
- public final void operationHandoffComplete() {
- handoffComplete = true;
+ public final void operationHandOffComplete() {
+ handOffComplete = true;
+ }
+
+ protected boolean isOperationHandOffComplete(){
+ return handOffComplete;
+ }
+
+ @Override
+ public boolean usesOperationLimiting() {
+ return false;
}
}
LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
- parent.getLimiter()));
+ parent.getIdentifier()));
}
final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
- final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getLimiter());
+ final TransactionContextWrapper transactionContextAdapter =
+ new TransactionContextWrapper(parent.getIdentifier(), actorContext);
Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
if(findPrimaryFuture.isCompleted()) {
*/
protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
- private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
+ private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
+ final TransactionProxy parent) {
+
switch(parent.getType()) {
case READ_ONLY:
final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
- return new LocalTransactionContext(readOnly, parent.getLimiter()) {
+ return new LocalTransactionContext(readOnly, parent.getIdentifier()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
throw new UnsupportedOperationException();
};
case READ_WRITE:
final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
- return new LocalTransactionContext(readWrite, parent.getLimiter()) {
+ return new LocalTransactionContext(readWrite, parent.getIdentifier()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWrite;
};
case WRITE_ONLY:
final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
- return new LocalTransactionContext(writeOnly, parent.getLimiter()) {
+ return new LocalTransactionContext(writeOnly, parent.getIdentifier()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return writeOnly;
public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
- public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 100;
+ public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
private static Set<String> globalDatastoreTypes = Sets.newConcurrentHashSet();
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
abstract class LocalTransactionContext extends AbstractTransactionContext {
private final DOMStoreTransaction txDelegate;
- LocalTransactionContext(DOMStoreTransaction txDelegate, OperationLimiter limiter) {
- super(limiter);
+ LocalTransactionContext(DOMStoreTransaction txDelegate, TransactionIdentifier identifier) {
+ super(identifier);
this.txDelegate = Preconditions.checkNotNull(txDelegate);
}
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
getWriteDelegate().write(path, data);
- releaseOperation();
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
getWriteDelegate().merge(path, data);
- releaseOperation();
}
@Override
public void deleteData(YangInstanceIdentifier path) {
incrementModificationCount();
getWriteDelegate().delete(path);
- releaseOperation();
}
@Override
@Override
public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
proxyFuture.set(result);
- releaseOperation();
}
@Override
public void onFailure(final Throwable t) {
proxyFuture.setException(t);
- releaseOperation();
}
});
}
@Override
public void onSuccess(final Boolean result) {
proxyFuture.set(result);
- releaseOperation();
}
@Override
public void onFailure(final Throwable t) {
proxyFuture.setException(t);
- releaseOperation();
}
});
}
private LocalThreePhaseCommitCohort ready() {
logModificationCount();
- acquireOperation();
return (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private <T extends Future> T completeOperation(final ActorContext actorContext, final T operationFuture) {
- operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
- return operationFuture;
- }
-
@Override
public Future<ActorSelection> readyTransaction() {
final LocalThreePhaseCommitCohort cohort = ready();
- return completeOperation(cohort.getActorContext(), cohort.initiateCoordinatedCommit());
+ return cohort.initiateCoordinatedCommit();
}
@Override
public Future<Object> directCommit() {
final LocalThreePhaseCommitCohort cohort = ready();
- return completeOperation(cohort.getActorContext(), cohort.initiateDirectCommit());
+ return cohort.initiateDirectCommit();
}
@Override
@Override
public void closeTransaction() {
txDelegate.close();
- releaseOperation();
}
}
import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private final Throwable failure;
- public NoOpTransactionContext(Throwable failure, OperationLimiter limiter) {
- super(limiter);
+ public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier) {
+ super(identifier);
this.failure = failure;
}
@Override
public Future<Object> directCommit() {
LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
- releaseOperation();
return akka.dispatch.Futures.failed(failure);
}
@Override
public Future<ActorSelection> readyTransaction() {
LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
- releaseOperation();
return akka.dispatch.Futures.failed(failure);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
- releaseOperation();
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
- releaseOperation();
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
- releaseOperation();
}
@Override
public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
- releaseOperation();
final Throwable t;
if (failure instanceof NoShardLeaderException) {
@Override
public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
- releaseOperation();
proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
}
}
private final TransactionIdentifier identifier;
private final long acquireTimeout;
private final Semaphore semaphore;
+ private final int maxPermits;
OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final int acquireTimeoutSeconds) {
this.identifier = Preconditions.checkNotNull(identifier);
this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
Preconditions.checkArgument(maxPermits >= 0);
+ this.maxPermits = maxPermits;
this.semaphore = new Semaphore(maxPermits);
}
acquire(1);
}
- private void acquire(final int acquirePermits) {
+ void acquire(final int acquirePermits) {
try {
if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
LOG.warn("Failed to acquire operation permit for transaction {}", identifier);
}
}
- void release() {
- this.semaphore.release();
- }
-
@Override
public void onComplete(final Throwable throwable, final Object message) {
if (message instanceof BatchedModificationsReply) {
}
@VisibleForTesting
- Semaphore getSemaphore() {
- return semaphore;
+ int availablePermits(){
+ return semaphore.availablePermits();
+ }
+
+ /**
+ * Release all the permits
+ */
+ public void releaseAll() {
+ this.semaphore.release(maxPermits-availablePermits());
}
}
import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
private final ActorSelection actor;
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
+ private final OperationLimiter limiter;
private BatchedModifications batchedModifications;
private int totalBatchedModificationsSent;
- protected RemoteTransactionContext(ActorSelection actor,
+ protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationLimiter limiter) {
- super(limiter);
+ super(identifier);
+ this.limiter = Preconditions.checkNotNull(limiter);
this.actor = actor;
this.actorContext = actorContext;
this.isTxActorLocal = isTxActorLocal;
}
private Future<Object> completeOperation(Future<Object> operationFuture){
- operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
+ operationFuture.onComplete(limiter, actorContext.getClientDispatcher());
return operationFuture;
}
future.onComplete(onComplete, actorContext.getClientDispatcher());
}
+
+ /**
+ * Acquire operation from the limiter if the hand-off has completed. If
+ * the hand-off is still ongoing, this method does nothing.
+ */
+ private final void acquireOperation() {
+ if (isOperationHandOffComplete()) {
+ limiter.acquire();
+ }
+ }
+
+ @Override
+ public boolean usesOperationLimiting() {
+ return true;
+ }
}
}
private OperationLimiter getOperationLimiter() {
- return parent.getLimiter();
+ return transactionContextAdapter.getLimiter();
}
private TransactionIdentifier getIdentifier() {
if(failure != null) {
LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
- localTransactionContext = new NoOpTransactionContext(failure, getOperationLimiter());
+ localTransactionContext = new NoOpTransactionContext(failure, getIdentifier());
} else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
localTransactionContext = createValidTransactionContext(
CreateTransactionReply.fromSerializable(response));
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Invalid reply type %s for CreateTransaction", response.getClass()));
- localTransactionContext = new NoOpTransactionContext(exception, getOperationLimiter());
+ localTransactionContext = new NoOpTransactionContext(exception, getIdentifier());
}
transactionContextAdapter.executePriorTransactionOperations(localTransactionContext);
final TransactionContext ret;
if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
- ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor,
- getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
+ ret = new PreLithiumTransactionContextImpl(transactionContextAdapter.getIdentifier(), transactionPath, transactionActor,
+ getActorContext(), isTxActorLocal, remoteTransactionVersion, transactionContextAdapter.getLimiter());
} else {
- ret = new RemoteTransactionContext(transactionActor, getActorContext(),
- isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
+ ret = new RemoteTransactionContext(transactionContextAdapter.getIdentifier(), transactionActor, getActorContext(),
+ isTxActorLocal, remoteTransactionVersion, transactionContextAdapter.getLimiter());
}
if(parent.getType() == TransactionType.READ_ONLY) {
* Implementations can rely on the wrapper calling this operation in a synchronized
* block, so they do not need to ensure visibility of this state transition themselves.
*/
- void operationHandoffComplete();
+ void operationHandOffComplete();
+
+ /**
+ * A TransactionContext that uses Operation limiting should return true else false
+ * @return
+ */
+ boolean usesOperationLimiting();
}
import java.util.List;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
@GuardedBy("queuedTxOperations")
private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
+ private final TransactionIdentifier identifier;
+
/**
* The resulting TransactionContext.
*/
private final OperationLimiter limiter;
- TransactionContextWrapper(final OperationLimiter limiter) {
- this.limiter = Preconditions.checkNotNull(limiter);
+ TransactionContextWrapper(TransactionIdentifier identifier, final ActorContext actorContext) {
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.limiter = new OperationLimiter(identifier,
+ actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, // 1 extra permit for the ready operation
+ actorContext.getDatastoreContext().getOperationTimeoutInSeconds());
}
TransactionContext getTransactionContext() {
}
TransactionIdentifier getIdentifier() {
- return limiter.getIdentifier();
+ return identifier;
}
/**
if (queuedTxOperations.isEmpty()) {
// We're done invoking the TransactionOperations so we can now publish the
// TransactionContext.
- localTransactionContext.operationHandoffComplete();
+ localTransactionContext.operationHandOffComplete();
+ if(!localTransactionContext.usesOperationLimiting()){
+ limiter.releaseAll();
+ }
transactionContext = localTransactionContext;
break;
}
return promise.future();
}
+
+ public OperationLimiter getLimiter() {
+ return limiter;
+ }
+
+
}
private final Map<String, TransactionContextWrapper> txContextAdapters = new HashMap<>();
private final AbstractTransactionContextFactory<?> txContextFactory;
- private final OperationLimiter limiter;
private final TransactionType type;
private TransactionState state = TransactionState.OPEN;
this.txContextFactory = txContextFactory;
this.type = Preconditions.checkNotNull(type);
- // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
- this.limiter = new OperationLimiter(getIdentifier(),
- getActorContext().getTransactionOutstandingOperationLimit(),
- getActorContext().getDatastoreContext().getOperationTimeoutInSeconds());
-
LOG.debug("New {} Tx - {}", type, getIdentifier());
}
ActorContext getActorContext() {
return txContextFactory.getActorContext();
}
-
- OperationLimiter getLimiter() {
- return limiter;
- }
}
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.OperationLimiter;
import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
private final String transactionPath;
- public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor,
+ public PreLithiumTransactionContextImpl(TransactionIdentifier identifier, String transactionPath, ActorSelection actor,
ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationLimiter limiter) {
- super(actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
+ super(identifier, actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
this.transactionPath = transactionPath;
}
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
private Timeout operationTimeout;
private final String selfAddressHostPort;
private TransactionRateLimiter txRateLimiter;
- private final int transactionOutstandingOperationLimit;
private Timeout transactionCommitOperationTimeout;
private Timeout shardInitializationTimeout;
private final Dispatchers dispatchers;
selfAddressHostPort = null;
}
- transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
}
private void setCachedProperties() {
return builder.toString();
}
- /**
- * Get the maximum number of operations that are to be permitted within a transaction before the transaction
- * should begin throttling the operations
- *
- * Parking reading this configuration here because we need to get to the actor system settings
- *
- * @return
- */
- public int getTransactionOutstandingOperationLimit(){
- return transactionOutstandingOperationLimit;
- }
-
/**
* This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
* us to create a timer for pretty much anything.
}
leaf shard-batched-modification-count {
- default 100;
+ default 1000;
type non-zero-uint32-type;
description "The number of transaction modification operations (put, merge, delete) to
batch before sending to the shard transaction actor. Batching improves
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
public String findShard(YangInstanceIdentifier path) {
return "junk";
}
+ }).put(
+ "cars", new ShardStrategy() {
+ @Override
+ public String findShard(YangInstanceIdentifier path) {
+ return "cars";
+ }
}).build();
}
@Override
public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
- return TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace) ?
- Optional.of("junk") : Optional.<String>absent();
+ if(TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
+ return Optional.of("junk");
+ } else if(CarsModel.BASE_QNAME.getNamespace().toASCIIString().equals(nameSpace)){
+ return Optional.of("cars");
+ }
+ return Optional.<String>absent();
}
};
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
- doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
mockComponentFactory = TransactionContextFactory.create(mockActorContext);
doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
- doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
return actorRef;
}
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter) {
+ localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter.getIdentifier()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWriteTransaction;
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
- verify(limiter).release();
verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
}
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
- verify(limiter).release();
verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
}
public void testDelete() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
localTransactionContext.deleteData(yangInstanceIdentifier);
- verify(limiter).release();
verify(readWriteTransaction).delete(yangInstanceIdentifier);
}
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.<Optional<NormalizedNode<?,?>>>create());
- verify(limiter).release();
verify(readWriteTransaction).read(yangInstanceIdentifier);
}
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture.<Boolean> create());
- verify(limiter).release();
verify(readWriteTransaction).exists(yangInstanceIdentifier);
}
doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
doReturn(mockCohort).when(readWriteTransaction).ready();
localTransactionContext.readyTransaction();
- verify(limiter).onComplete(null, null);
verify(readWriteTransaction).ready();
}
package org.opendaylight.controller.cluster.datastore;
import static org.junit.Assert.assertEquals;
-import java.util.concurrent.Semaphore;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
public void testOnComplete() throws Exception {
int permits = 10;
OperationLimiter limiter = new OperationLimiter(new TransactionIdentifier("foo", 1), permits, 1);
- Semaphore semaphore = limiter.getSemaphore();
- semaphore.acquire(permits);
+ limiter.acquire(permits);
int availablePermits = 0;
limiter.onComplete(null, DataExistsReply.create(true));
- assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+ assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
limiter.onComplete(null, DataExistsReply.create(true));
- assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+ assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
limiter.onComplete(null, new IllegalArgumentException());
- assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits());
+ assertEquals("availablePermits", ++availablePermits, limiter.availablePermits());
limiter.onComplete(null, new BatchedModificationsReply(4));
availablePermits += 4;
- assertEquals("availablePermits", availablePermits, semaphore.availablePermits());
+ assertEquals("availablePermits", availablePermits, limiter.availablePermits());
}
+
}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+
+public class TransactionContextWrapperTest {
+
+ @Mock
+ TransactionIdentifier identifier;
+
+ @Mock
+ ActorContext actorContext;
+
+ @Mock
+ TransactionContext transactionContext;
+
+ TransactionContextWrapper transactionContextWrapper;
+
+ @Before
+ public void setUp(){
+ MockitoAnnotations.initMocks(this);
+ doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
+ transactionContextWrapper = new TransactionContextWrapper(identifier, actorContext);
+ }
+
+ @Test
+ public void testExecutePriorTransactionOperations(){
+ for(int i=0;i<100;i++) {
+ transactionContextWrapper.maybeExecuteTransactionOperation(mock(TransactionOperation.class));
+ }
+ assertEquals(901, transactionContextWrapper.getLimiter().availablePermits());
+
+ transactionContextWrapper.executePriorTransactionOperations(transactionContext);
+
+ assertEquals(1001, transactionContextWrapper.getLimiter().availablePermits());
+ }
+}
\ No newline at end of file
throttleOperation(operation, 1, true);
}
+ private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+ throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()));
+ }
+
private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){
return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
Optional.<DataTree>absent());
}
- private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+ private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){
ActorSystem actorSystem = getSystem();
ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+ // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
+ // we now allow one extra permit to be allowed for ready
+ doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2).
+ shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext();
doReturn(actorSystem.actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
if(shardFound) {
doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+ doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).
+ when(mockActorContext).findPrimaryShardAsync(eq("cars"));
+
} else {
doReturn(Futures.failed(new Exception("not found")))
.when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
long end = System.nanoTime();
- long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
- expected, (end-start)), (end - start) > expected);
+ expectedCompletionTime, (end-start)),
+ ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2));
}
ActorSystem actorSystem = getSystem();
ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
doReturn(actorSystem.actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
ActorSystem actorSystem = getSystem();
ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
doReturn(actorSystem.actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
@Test
public void testWriteCompletionForLocalShard(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperationLocal(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteThrottlingWhenShardFound(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteThrottlingWhenShardNotFound(){
// Confirm that there is no throttling when the Shard is not found
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testWriteCompletion(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeThrottlingWhenShardFound(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeThrottlingWhenShardNotFound(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeCompletion(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testMergeCompletionForLocalShard(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperationLocal(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testDeleteCompletionForLocalShard(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperationLocal(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testDeleteCompletion(){
- dataStoreContextBuilder.shardBatchedModificationCount(1);
completeOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
@Test
public void testReadyThrottlingWithTwoTransactionContexts(){
-
throttleOperation(new TransactionProxyOperation() {
@Override
public void run(TransactionProxy transactionProxy) {
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- transactionProxy.write(TestModel.TEST_PATH, carsNode);
+ // Trying to write to Cars will cause another transaction context to get created
+ transactionProxy.write(CarsModel.BASE_PATH, carsNode);
+ // Now ready should block for both transaction contexts
transactionProxy.ready();
}
- }, 2, true);
+ }, 1, true, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()) * 2);
}
private void testModificationOperationBatching(TransactionType type) throws Exception {
doReturn(memberName).when(mockActorContext).getCurrentMemberName();
- doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
-
doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);