From: Moiz Raja Date: Wed, 14 Jan 2015 23:21:35 +0000 (-0800) Subject: BUG 2518 : Throttle operations in a transaction X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=7b963a588f2438b0d8cfe95e384ba36e417d925e BUG 2518 : Throttle operations in a transaction In some use cases a single transaction may do a lot of operations in a short period of time. This happens for example when testing the bgp-plugin. To ensure that clients do not overwhelm the datastore this patch throttles operations on the transaction proxy side and prevents the client from sending more operations than can be handled by the ShardTransaction actor in a reasonable amount of time. The throttling serves as a back-pressure mechanism. Akka recommends that for back pressure we use a bounded mailbox with an adequate push timeout. We are doing this. However the akka's behavior on the timeout expiring is to send the throttled message to dead letters. So to properly do what akka expects us to do we would need to watch dead letters and use that as an indication that we need back pressure which I think is inadequate for our purpose and thus the Semaphore based throttling. Change-Id: Ib1a0f128ffde009a82b8cd67001203e0b959fdf5 Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index f34e88fb27..9f48ef96cf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -185,6 +186,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final String transactionChainId; private final SchemaContext schemaContext; private boolean inReadyState; + private final Semaphore operationLimiter; + private final OperationCompleter operationCompleter; public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { this(actorContext, transactionType, ""); @@ -221,6 +224,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { phantomReferenceCache.put(cleanup, cleanup); } + // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem + this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit()); + this.operationCompleter = new OperationCompleter(operationLimiter); + LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId); } @@ -257,6 +264,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} read {}", identifier, path); + throttleOperation(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); return txFutureCallback.enqueueReadOperation(new ReadOperation>>() { @Override @@ -275,6 +284,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} exists {}", identifier, path); + throttleOperation(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); return txFutureCallback.enqueueReadOperation(new ReadOperation() { @Override @@ -292,6 +303,25 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { "Transaction is sealed - further modifications are not allowed"); } + private void throttleOperation() { + throttleOperation(1); + } + + private void throttleOperation(int acquirePermits) { + try { + if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ + LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); + } + } catch (InterruptedException e) { + if(LOG.isDebugEnabled()) { + LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e); + } else { + LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier()); + } + } + } + + @Override public void write(final YangInstanceIdentifier path, final NormalizedNode data) { @@ -299,6 +329,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} write {}", identifier, path); + throttleOperation(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); txFutureCallback.enqueueModifyOperation(new TransactionOperation() { @Override @@ -315,6 +347,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} merge {}", identifier, path); + throttleOperation(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); txFutureCallback.enqueueModifyOperation(new TransactionOperation() { @Override @@ -331,6 +365,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} delete {}", identifier, path); + throttleOperation(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); txFutureCallback.enqueueModifyOperation(new TransactionOperation() { @Override @@ -345,6 +381,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { checkModificationState(); + throttleOperation(txFutureCallbackMap.size()); + inReadyState = true; LOG.debug("Tx {} Readying {} transactions for commit", identifier, @@ -668,7 +706,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, failure.getMessage()); - localTransactionContext = new NoOpTransactionContext(failure, identifier); + localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter); } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { localTransactionContext = createValidTransactionContext( CreateTransactionReply.fromSerializable(response)); @@ -676,7 +714,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { IllegalArgumentException exception = new IllegalArgumentException(String.format( "Invalid reply type %s for CreateTransaction", response.getClass())); - localTransactionContext = new NoOpTransactionContext(exception, identifier); + localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter); } for(TransactionOperation oper: txOperationsOnComplete) { @@ -713,7 +751,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); return new TransactionContextImpl(transactionPath, transactionActor, identifier, - actorContext, schemaContext, isTxActorLocal, reply.getVersion()); + actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); } } @@ -755,35 +793,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class); private final ActorContext actorContext; - private final SchemaContext schemaContext; private final String transactionPath; private final ActorSelection actor; private final boolean isTxActorLocal; private final short remoteTransactionVersion; + private final OperationCompleter operationCompleter; + private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, SchemaContext schemaContext, - boolean isTxActorLocal, short remoteTransactionVersion) { + boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) { super(identifier); this.transactionPath = transactionPath; this.actor = actor; this.actorContext = actorContext; - this.schemaContext = schemaContext; this.isTxActorLocal = isTxActorLocal; this.remoteTransactionVersion = remoteTransactionVersion; + this.operationCompleter = operationCompleter; } + private Future completeOperation(Future operationFuture){ + operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher()); + return operationFuture; + } + + private ActorSelection getActor() { return actor; } private Future executeOperationAsync(SerializableMessage msg) { - return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()); + return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable())); } private Future executeOperationAsync(VersionedSerializableMessage msg) { - return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : - msg.toSerializable(remoteTransactionVersion)); + return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : + msg.toSerializable(remoteTransactionVersion))); } @Override @@ -1057,10 +1102,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class); private final Throwable failure; + private final Semaphore operationLimiter; - public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){ + public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, Semaphore operationLimiter){ super(identifier); this.failure = failure; + this.operationLimiter = operationLimiter; } @Override @@ -1071,28 +1118,33 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public Future readyTransaction() { LOG.debug("Tx {} readyTransaction called", identifier); + operationLimiter.release(); return akka.dispatch.Futures.failed(failure); } @Override public void deleteData(YangInstanceIdentifier path) { LOG.debug("Tx {} deleteData called path = {}", identifier, path); + operationLimiter.release(); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} mergeData called path = {}", identifier, path); + operationLimiter.release(); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} writeData called path = {}", identifier, path); + operationLimiter.release(); } @Override public CheckedFuture>, ReadFailedException> readData( YangInstanceIdentifier path) { LOG.debug("Tx {} readData called path = {}", identifier, path); + operationLimiter.release(); return Futures.immediateFailedCheckedFuture(new ReadFailedException( "Error reading data for path " + path, failure)); } @@ -1101,8 +1153,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public CheckedFuture dataExists( YangInstanceIdentifier path) { LOG.debug("Tx {} dataExists called path = {}", identifier, path); + operationLimiter.release(); return Futures.immediateFailedCheckedFuture(new ReadFailedException( "Error checking exists for path " + path, failure)); } } + + private static class OperationCompleter extends OnComplete { + private final Semaphore operationLimiter; + OperationCompleter(Semaphore operationLimiter){ + this.operationLimiter = operationLimiter; + } + + @Override + public void onComplete(Throwable throwable, Object o){ + this.operationLimiter.release(); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index f217d05bb2..c9fdf38931 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore.utils; +import static akka.pattern.Patterns.ask; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -21,6 +22,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; @@ -45,8 +47,6 @@ import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import static akka.pattern.Patterns.ask; - /** * The ActorContext class contains utility methods which could be used by * non-actors (like DistributedDataStore) to work with actors a little more @@ -84,6 +84,7 @@ public class ActorContext { private final FiniteDuration operationDuration; private final Timeout operationTimeout; private final String selfAddressHostPort; + private final int transactionOutstandingOperationLimit; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { @@ -110,6 +111,8 @@ public class ActorContext { } else { selfAddressHostPort = null; } + + transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity(); } public DatastoreContext getDatastoreContext() { @@ -431,4 +434,16 @@ public class ActorContext { return builder.toString(); } + + /** + * Get the maximum number of operations that are to be permitted within a transaction before the transaction + * should begin throttling the operations + * + * Parking reading this configuration here because we need to get to the actor system settings + * + * @return + */ + public int getTransactionOutstandingOperationLimit(){ + return transactionOutstandingOperationLimit; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index ce0547c388..dd37371a45 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore; import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -19,19 +18,21 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -public class TransactionChainProxyTest { - ActorContext actorContext = mock(ActorContext.class); +public class TransactionChainProxyTest extends AbstractActorTest{ + ActorContext actorContext = null; SchemaContext schemaContext = mock(SchemaContext.class); @Before public void setUp() { - doReturn(schemaContext).when(actorContext).getSchemaContext(); + actorContext = new MockActorContext(getSystem()); + actorContext.setSchemaContext(schemaContext); } @SuppressWarnings("resource") diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 5e53b29db1..79edd19bba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -10,6 +10,7 @@ import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; @@ -30,6 +31,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -59,6 +61,7 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; @@ -118,7 +121,7 @@ public class TransactionProxyTest { schemaContext = TestModel.createTestContext(); - DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build(); doReturn(getSystem()).when(mockActorContext).getActorSystem(); doReturn(memberName).when(mockActorContext).getCurrentMemberName(); @@ -126,6 +129,7 @@ public class TransactionProxyTest { doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext(); + doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); ShardStrategyFactory.setConfiguration(configuration); } @@ -358,6 +362,10 @@ public class TransactionProxyTest { return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION); } + private Future incompleteFuture(){ + return mock(Future.class); + } + private Future mergeDataReply() { return Futures.successful(new MergeDataReply()); } @@ -395,6 +403,10 @@ public class TransactionProxyTest { executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), eqCreateTransaction(memberName, type)); + doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); + + doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); + return actorRef; } @@ -1222,4 +1234,425 @@ public class TransactionProxyTest { verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path())); } + + private static interface TransactionProxyOperation { + void run(TransactionProxy transactionProxy); + } + + private void throttleOperation(TransactionProxyOperation operation) { + throttleOperation(operation, 1, true); + } + + private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){ + ActorSystem actorSystem = getSystem(); + ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + + doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit(); + + doReturn(actorSystem.actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + if(shardFound) { + doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + } else { + doReturn(Futures.failed(new Exception("not found"))) + .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + } + + String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; + CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() + .setTransactionId("txn-1") + .setTransactionActorPath(actorPath) + .build(); + + doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, READ_WRITE)); + + doReturn(true).when(mockActorContext).isPathLocal(actorPath); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + long start = System.currentTimeMillis(); + + operation.run(transactionProxy); + + long end = System.currentTimeMillis(); + + Assert.assertTrue(String.format("took less time than expected %s was %s", + mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000, + (end-start)), (end - start) > mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000); + + } + + private void completeOperation(TransactionProxyOperation operation){ + completeOperation(operation, true); + } + + private void completeOperation(TransactionProxyOperation operation, boolean shardFound){ + ActorSystem actorSystem = getSystem(); + ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + + doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit(); + + doReturn(actorSystem.actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + if(shardFound) { + doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + } else { + doReturn(Futures.failed(new Exception("not found"))) + .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + } + + String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; + CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() + .setTransactionId("txn-1") + .setTransactionActorPath(actorPath) + .build(); + + doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, READ_WRITE)); + + doReturn(true).when(mockActorContext).isPathLocal(actorPath); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + long start = System.currentTimeMillis(); + + operation.run(transactionProxy); + + long end = System.currentTimeMillis(); + + Assert.assertTrue(String.format("took more time than expected %s was %s", + mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000, + (end-start)), (end - start) <= mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000); + } + + public void testWriteThrottling(boolean shardFound){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + } + }, 1, shardFound); + } + + @Test + public void testWriteThrottlingWhenShardFound(){ + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + } + }); + + } + + @Test + public void testWriteThrottlingWhenShardNotFound(){ + // Confirm that there is no throttling when the Shard is not found + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + } + }, false); + + } + + + @Test + public void testWriteCompletion(){ + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqSerializedWriteData(nodeToWrite)); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + } + }); + + } + + @Test + public void testMergeThrottlingWhenShardFound(){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqMergeData(nodeToMerge)); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); + } + }); + } + + @Test + public void testMergeThrottlingWhenShardNotFound(){ + + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqMergeData(nodeToMerge)); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); + } + }, false); + } + + @Test + public void testMergeCompletion(){ + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqMergeData(nodeToMerge)); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); + } + }); + + } + + @Test + public void testDeleteThrottlingWhenShardFound(){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDeleteData()); + + transactionProxy.delete(TestModel.TEST_PATH); + + transactionProxy.delete(TestModel.TEST_PATH); + } + }); + } + + + @Test + public void testDeleteThrottlingWhenShardNotFound(){ + + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDeleteData()); + + transactionProxy.delete(TestModel.TEST_PATH); + + transactionProxy.delete(TestModel.TEST_PATH); + } + }, false); + } + + @Test + public void testDeleteCompletion(){ + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDeleteData()); + + transactionProxy.delete(TestModel.TEST_PATH); + + transactionProxy.delete(TestModel.TEST_PATH); + } + }); + + } + + @Test + public void testReadThrottlingWhenShardFound(){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqReadData()); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.read(TestModel.TEST_PATH); + } + }); + } + + @Test + public void testReadThrottlingWhenShardNotFound(){ + + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqReadData()); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.read(TestModel.TEST_PATH); + } + }, false); + } + + + @Test + public void testReadCompletion(){ + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqReadData()); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.read(TestModel.TEST_PATH); + } + }); + + } + + @Test + public void testExistsThrottlingWhenShardFound(){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDataExists()); + + transactionProxy.exists(TestModel.TEST_PATH); + + transactionProxy.exists(TestModel.TEST_PATH); + } + }); + } + + @Test + public void testExistsThrottlingWhenShardNotFound(){ + + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDataExists()); + + transactionProxy.exists(TestModel.TEST_PATH); + + transactionProxy.exists(TestModel.TEST_PATH); + } + }, false); + } + + + @Test + public void testExistsCompletion(){ + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDataExists()); + + transactionProxy.exists(TestModel.TEST_PATH); + + transactionProxy.exists(TestModel.TEST_PATH); + } + }); + + } + + @Test + public void testReadyThrottling(){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), any(ReadyTransaction.class)); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.ready(); + } + }); + } + + @Test + public void testReadyThrottlingWithTwoTransactionContexts(){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + NormalizedNode carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(carsNode)); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), any(ReadyTransaction.class)); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, carsNode); + + transactionProxy.ready(); + } + }, 2, true); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index fcb0324bea..e4ab969f5c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -1,8 +1,10 @@ package org.opendaylight.controller.cluster.datastore.utils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; import akka.actor.UntypedActor; @@ -21,10 +23,6 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - public class ActorContextTest extends AbstractActorTest{ private static class MockShardManager extends UntypedActor { @@ -224,7 +222,7 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testResolvePathForRemoteActor() { ActorContext actorContext = - new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock( + new ActorContext(getSystem(), mock(ActorRef.class), mock( ClusterWrapper.class), mock(Configuration.class)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java index e571e3a715..67fa0960cb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java @@ -20,6 +20,7 @@ public class TestModel { public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13", "test"); + public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list"); public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list"); public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");