From: Tom Pantelis Date: Tue, 27 Jan 2015 05:00:58 +0000 (+0000) Subject: Merge "Optimize PathUtils.toYangInstanceIdentifier()" X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=77f32f904a0338742dd5357b07e2d4ed465eb394;hp=cc2af31ea079c744a0cc76c562417051214914cb Merge "Optimize PathUtils.toYangInstanceIdentifier()" --- diff --git a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-features/pom.xml b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-features/pom.xml index 880e2dc9d5..1ee28b8d5a 100644 --- a/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-features/pom.xml +++ b/opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-features/pom.xml @@ -39,6 +39,30 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL + + org.opendaylight.yangtools + features-yangtools + features + ${yangtools.version} + xml + runtime + + + org.opendaylight.controller + features-mdsal + features + ${mdsal.version} + xml + runtime + + + org.opendaylight.controller + features-restconf + features + ${mdsal.version} + xml + runtime + ${symbol_dollar}{groupId} ${artifactId}-impl diff --git a/opendaylight/config/config-parent/pom.xml b/opendaylight/config/config-parent/pom.xml index 10c1824ebe..af39b63447 100644 --- a/opendaylight/config/config-parent/pom.xml +++ b/opendaylight/config/config-parent/pom.xml @@ -24,6 +24,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html 0.3.0-SNAPSHOT 1.2.0-SNAPSHOT + 0.7.0-SNAPSHOT src/main/yang-gen-config src/main/config/default-config.xml @@ -45,10 +46,21 @@ and is available at http://www.eclipse.org/legal/epl-v10.html pom import + + org.opendaylight.yangtools + yangtools-artifacts + ${yangtools.version} + pom + import + + + org.opendaylight.yangtools + yang-common + org.opendaylight.controller config-api diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index f34e88fb27..9f48ef96cf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -185,6 +186,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final String transactionChainId; private final SchemaContext schemaContext; private boolean inReadyState; + private final Semaphore operationLimiter; + private final OperationCompleter operationCompleter; public TransactionProxy(ActorContext actorContext, TransactionType transactionType) { this(actorContext, transactionType, ""); @@ -221,6 +224,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { phantomReferenceCache.put(cleanup, cleanup); } + // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem + this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit()); + this.operationCompleter = new OperationCompleter(operationLimiter); + LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId); } @@ -257,6 +264,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} read {}", identifier, path); + throttleOperation(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); return txFutureCallback.enqueueReadOperation(new ReadOperation>>() { @Override @@ -275,6 +284,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} exists {}", identifier, path); + throttleOperation(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); return txFutureCallback.enqueueReadOperation(new ReadOperation() { @Override @@ -292,6 +303,25 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { "Transaction is sealed - further modifications are not allowed"); } + private void throttleOperation() { + throttleOperation(1); + } + + private void throttleOperation(int acquirePermits) { + try { + if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){ + LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier()); + } + } catch (InterruptedException e) { + if(LOG.isDebugEnabled()) { + LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e); + } else { + LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier()); + } + } + } + + @Override public void write(final YangInstanceIdentifier path, final NormalizedNode data) { @@ -299,6 +329,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} write {}", identifier, path); + throttleOperation(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); txFutureCallback.enqueueModifyOperation(new TransactionOperation() { @Override @@ -315,6 +347,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} merge {}", identifier, path); + throttleOperation(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); txFutureCallback.enqueueModifyOperation(new TransactionOperation() { @Override @@ -331,6 +365,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} delete {}", identifier, path); + throttleOperation(); + TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path); txFutureCallback.enqueueModifyOperation(new TransactionOperation() { @Override @@ -345,6 +381,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { checkModificationState(); + throttleOperation(txFutureCallbackMap.size()); + inReadyState = true; LOG.debug("Tx {} Readying {} transactions for commit", identifier, @@ -668,7 +706,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, failure.getMessage()); - localTransactionContext = new NoOpTransactionContext(failure, identifier); + localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter); } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { localTransactionContext = createValidTransactionContext( CreateTransactionReply.fromSerializable(response)); @@ -676,7 +714,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { IllegalArgumentException exception = new IllegalArgumentException(String.format( "Invalid reply type %s for CreateTransaction", response.getClass())); - localTransactionContext = new NoOpTransactionContext(exception, identifier); + localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter); } for(TransactionOperation oper: txOperationsOnComplete) { @@ -713,7 +751,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { boolean isTxActorLocal = actorContext.isPathLocal(transactionPath); return new TransactionContextImpl(transactionPath, transactionActor, identifier, - actorContext, schemaContext, isTxActorLocal, reply.getVersion()); + actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter); } } @@ -755,35 +793,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class); private final ActorContext actorContext; - private final SchemaContext schemaContext; private final String transactionPath; private final ActorSelection actor; private final boolean isTxActorLocal; private final short remoteTransactionVersion; + private final OperationCompleter operationCompleter; + private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, SchemaContext schemaContext, - boolean isTxActorLocal, short remoteTransactionVersion) { + boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) { super(identifier); this.transactionPath = transactionPath; this.actor = actor; this.actorContext = actorContext; - this.schemaContext = schemaContext; this.isTxActorLocal = isTxActorLocal; this.remoteTransactionVersion = remoteTransactionVersion; + this.operationCompleter = operationCompleter; } + private Future completeOperation(Future operationFuture){ + operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher()); + return operationFuture; + } + + private ActorSelection getActor() { return actor; } private Future executeOperationAsync(SerializableMessage msg) { - return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()); + return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable())); } private Future executeOperationAsync(VersionedSerializableMessage msg) { - return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : - msg.toSerializable(remoteTransactionVersion)); + return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : + msg.toSerializable(remoteTransactionVersion))); } @Override @@ -1057,10 +1102,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class); private final Throwable failure; + private final Semaphore operationLimiter; - public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){ + public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, Semaphore operationLimiter){ super(identifier); this.failure = failure; + this.operationLimiter = operationLimiter; } @Override @@ -1071,28 +1118,33 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { @Override public Future readyTransaction() { LOG.debug("Tx {} readyTransaction called", identifier); + operationLimiter.release(); return akka.dispatch.Futures.failed(failure); } @Override public void deleteData(YangInstanceIdentifier path) { LOG.debug("Tx {} deleteData called path = {}", identifier, path); + operationLimiter.release(); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} mergeData called path = {}", identifier, path); + operationLimiter.release(); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} writeData called path = {}", identifier, path); + operationLimiter.release(); } @Override public CheckedFuture>, ReadFailedException> readData( YangInstanceIdentifier path) { LOG.debug("Tx {} readData called path = {}", identifier, path); + operationLimiter.release(); return Futures.immediateFailedCheckedFuture(new ReadFailedException( "Error reading data for path " + path, failure)); } @@ -1101,8 +1153,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { public CheckedFuture dataExists( YangInstanceIdentifier path) { LOG.debug("Tx {} dataExists called path = {}", identifier, path); + operationLimiter.release(); return Futures.immediateFailedCheckedFuture(new ReadFailedException( "Error checking exists for path " + path, failure)); } } + + private static class OperationCompleter extends OnComplete { + private final Semaphore operationLimiter; + OperationCompleter(Semaphore operationLimiter){ + this.operationLimiter = operationLimiter; + } + + @Override + public void onComplete(Throwable throwable, Object o){ + this.operationLimiter.release(); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index f217d05bb2..c9fdf38931 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore.utils; +import static akka.pattern.Patterns.ask; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; @@ -21,6 +22,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; @@ -45,8 +47,6 @@ import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import static akka.pattern.Patterns.ask; - /** * The ActorContext class contains utility methods which could be used by * non-actors (like DistributedDataStore) to work with actors a little more @@ -84,6 +84,7 @@ public class ActorContext { private final FiniteDuration operationDuration; private final Timeout operationTimeout; private final String selfAddressHostPort; + private final int transactionOutstandingOperationLimit; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { @@ -110,6 +111,8 @@ public class ActorContext { } else { selfAddressHostPort = null; } + + transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity(); } public DatastoreContext getDatastoreContext() { @@ -431,4 +434,16 @@ public class ActorContext { return builder.toString(); } + + /** + * Get the maximum number of operations that are to be permitted within a transaction before the transaction + * should begin throttling the operations + * + * Parking reading this configuration here because we need to get to the actor system settings + * + * @return + */ + public int getTransactionOutstandingOperationLimit(){ + return transactionOutstandingOperationLimit; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index ce0547c388..dd37371a45 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore; import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -19,19 +18,21 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -public class TransactionChainProxyTest { - ActorContext actorContext = mock(ActorContext.class); +public class TransactionChainProxyTest extends AbstractActorTest{ + ActorContext actorContext = null; SchemaContext schemaContext = mock(SchemaContext.class); @Before public void setUp() { - doReturn(schemaContext).when(actorContext).getSchemaContext(); + actorContext = new MockActorContext(getSystem()); + actorContext.setSchemaContext(schemaContext); } @SuppressWarnings("resource") diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 5e53b29db1..79edd19bba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -10,6 +10,7 @@ import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; @@ -30,6 +31,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -59,6 +61,7 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; @@ -118,7 +121,7 @@ public class TransactionProxyTest { schemaContext = TestModel.createTestContext(); - DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build(); doReturn(getSystem()).when(mockActorContext).getActorSystem(); doReturn(memberName).when(mockActorContext).getCurrentMemberName(); @@ -126,6 +129,7 @@ public class TransactionProxyTest { doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext(); + doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); ShardStrategyFactory.setConfiguration(configuration); } @@ -358,6 +362,10 @@ public class TransactionProxyTest { return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION); } + private Future incompleteFuture(){ + return mock(Future.class); + } + private Future mergeDataReply() { return Futures.successful(new MergeDataReply()); } @@ -395,6 +403,10 @@ public class TransactionProxyTest { executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), eqCreateTransaction(memberName, type)); + doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); + + doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); + return actorRef; } @@ -1222,4 +1234,425 @@ public class TransactionProxyTest { verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path())); } + + private static interface TransactionProxyOperation { + void run(TransactionProxy transactionProxy); + } + + private void throttleOperation(TransactionProxyOperation operation) { + throttleOperation(operation, 1, true); + } + + private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){ + ActorSystem actorSystem = getSystem(); + ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + + doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit(); + + doReturn(actorSystem.actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + if(shardFound) { + doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + } else { + doReturn(Futures.failed(new Exception("not found"))) + .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + } + + String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; + CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() + .setTransactionId("txn-1") + .setTransactionActorPath(actorPath) + .build(); + + doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, READ_WRITE)); + + doReturn(true).when(mockActorContext).isPathLocal(actorPath); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + long start = System.currentTimeMillis(); + + operation.run(transactionProxy); + + long end = System.currentTimeMillis(); + + Assert.assertTrue(String.format("took less time than expected %s was %s", + mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000, + (end-start)), (end - start) > mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000); + + } + + private void completeOperation(TransactionProxyOperation operation){ + completeOperation(operation, true); + } + + private void completeOperation(TransactionProxyOperation operation, boolean shardFound){ + ActorSystem actorSystem = getSystem(); + ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + + doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit(); + + doReturn(actorSystem.actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + if(shardFound) { + doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + } else { + doReturn(Futures.failed(new Exception("not found"))) + .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + } + + String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; + CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() + .setTransactionId("txn-1") + .setTransactionActorPath(actorPath) + .build(); + + doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, READ_WRITE)); + + doReturn(true).when(mockActorContext).isPathLocal(actorPath); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + long start = System.currentTimeMillis(); + + operation.run(transactionProxy); + + long end = System.currentTimeMillis(); + + Assert.assertTrue(String.format("took more time than expected %s was %s", + mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000, + (end-start)), (end - start) <= mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000); + } + + public void testWriteThrottling(boolean shardFound){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + } + }, 1, shardFound); + } + + @Test + public void testWriteThrottlingWhenShardFound(){ + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + } + }); + + } + + @Test + public void testWriteThrottlingWhenShardNotFound(){ + // Confirm that there is no throttling when the Shard is not found + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + } + }, false); + + } + + + @Test + public void testWriteCompletion(){ + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqSerializedWriteData(nodeToWrite)); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + } + }); + + } + + @Test + public void testMergeThrottlingWhenShardFound(){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqMergeData(nodeToMerge)); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); + } + }); + } + + @Test + public void testMergeThrottlingWhenShardNotFound(){ + + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqMergeData(nodeToMerge)); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); + } + }, false); + } + + @Test + public void testMergeCompletion(){ + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqMergeData(nodeToMerge)); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge); + } + }); + + } + + @Test + public void testDeleteThrottlingWhenShardFound(){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDeleteData()); + + transactionProxy.delete(TestModel.TEST_PATH); + + transactionProxy.delete(TestModel.TEST_PATH); + } + }); + } + + + @Test + public void testDeleteThrottlingWhenShardNotFound(){ + + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDeleteData()); + + transactionProxy.delete(TestModel.TEST_PATH); + + transactionProxy.delete(TestModel.TEST_PATH); + } + }, false); + } + + @Test + public void testDeleteCompletion(){ + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDeleteData()); + + transactionProxy.delete(TestModel.TEST_PATH); + + transactionProxy.delete(TestModel.TEST_PATH); + } + }); + + } + + @Test + public void testReadThrottlingWhenShardFound(){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqReadData()); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.read(TestModel.TEST_PATH); + } + }); + } + + @Test + public void testReadThrottlingWhenShardNotFound(){ + + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqReadData()); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.read(TestModel.TEST_PATH); + } + }, false); + } + + + @Test + public void testReadCompletion(){ + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqReadData()); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.read(TestModel.TEST_PATH); + } + }); + + } + + @Test + public void testExistsThrottlingWhenShardFound(){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDataExists()); + + transactionProxy.exists(TestModel.TEST_PATH); + + transactionProxy.exists(TestModel.TEST_PATH); + } + }); + } + + @Test + public void testExistsThrottlingWhenShardNotFound(){ + + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDataExists()); + + transactionProxy.exists(TestModel.TEST_PATH); + + transactionProxy.exists(TestModel.TEST_PATH); + } + }, false); + } + + + @Test + public void testExistsCompletion(){ + completeOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDataExists()); + + transactionProxy.exists(TestModel.TEST_PATH); + + transactionProxy.exists(TestModel.TEST_PATH); + } + }); + + } + + @Test + public void testReadyThrottling(){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), any(ReadyTransaction.class)); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.ready(); + } + }); + } + + @Test + public void testReadyThrottlingWithTwoTransactionContexts(){ + + throttleOperation(new TransactionProxyOperation() { + @Override + public void run(TransactionProxy transactionProxy) { + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + NormalizedNode carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(carsNode)); + + doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), any(ReadyTransaction.class)); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, carsNode); + + transactionProxy.ready(); + } + }, 2, true); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index fcb0324bea..e4ab969f5c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -1,8 +1,10 @@ package org.opendaylight.controller.cluster.datastore.utils; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; import akka.actor.UntypedActor; @@ -21,10 +23,6 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - public class ActorContextTest extends AbstractActorTest{ private static class MockShardManager extends UntypedActor { @@ -224,7 +222,7 @@ public class ActorContextTest extends AbstractActorTest{ @Test public void testResolvePathForRemoteActor() { ActorContext actorContext = - new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock( + new ActorContext(getSystem(), mock(ActorRef.class), mock( ClusterWrapper.class), mock(Configuration.class)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java index e571e3a715..67fa0960cb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java @@ -20,6 +20,7 @@ public class TestModel { public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13", "test"); + public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list"); public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list"); public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");