Merge "Optimize PathUtils.toYangInstanceIdentifier()"
authorTom Pantelis <tpanteli@brocade.com>
Tue, 27 Jan 2015 05:00:58 +0000 (05:00 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 27 Jan 2015 05:00:58 +0000 (05:00 +0000)
opendaylight/archetypes/opendaylight-startup/src/main/resources/archetype-resources/__artifactId__-features/pom.xml
opendaylight/config/config-parent/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java

index 880e2dc..1ee28b8 100644 (file)
@@ -39,6 +39,30 @@ and is available at http://www.eclipse.org/legal/epl-v10.html INTERNAL
     </dependencies>
   </dependencyManagement>
   <dependencies>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>features-yangtools</artifactId>
+      <classifier>features</classifier>
+      <version>${yangtools.version}</version>
+      <type>xml</type>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>features-mdsal</artifactId>
+      <classifier>features</classifier>
+      <version>${mdsal.version}</version>
+      <type>xml</type>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>features-restconf</artifactId>
+      <classifier>features</classifier>
+      <version>${mdsal.version}</version>
+      <type>xml</type>
+      <scope>runtime</scope>
+    </dependency>
     <dependency>
       <groupId>${symbol_dollar}{groupId}</groupId>
       <artifactId>${artifactId}-impl</artifactId>
index 10c1824..af39b63 100644 (file)
@@ -24,6 +24,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
   <properties>
     <config.version>0.3.0-SNAPSHOT</config.version>
     <mdsal.version>1.2.0-SNAPSHOT</mdsal.version>
+    <yangtools.version>0.7.0-SNAPSHOT</yangtools.version>
     <jmxGeneratorPath>src/main/yang-gen-config</jmxGeneratorPath>
     <config.file>src/main/config/default-config.xml</config.file>
   </properties>
@@ -45,10 +46,21 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
         <type>pom</type>
         <scope>import</scope>
       </dependency>
+      <dependency>
+        <groupId>org.opendaylight.yangtools</groupId>
+        <artifactId>yangtools-artifacts</artifactId>
+        <version>${yangtools.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
   <dependencies>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-common</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>config-api</artifactId>
index f34e88f..9f48ef9 100644 (file)
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -185,6 +186,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final String transactionChainId;
     private final SchemaContext schemaContext;
     private boolean inReadyState;
+    private final Semaphore operationLimiter;
+    private final OperationCompleter operationCompleter;
 
     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
         this(actorContext, transactionType, "");
@@ -221,6 +224,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             phantomReferenceCache.put(cleanup, cleanup);
         }
 
+        // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+        this.operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
+        this.operationCompleter = new OperationCompleter(operationLimiter);
+
         LOG.debug("Created txn {} of type {} on chain {}", identifier, transactionType, transactionChainId);
     }
 
@@ -257,6 +264,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} read {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         return txFutureCallback.enqueueReadOperation(new ReadOperation<Optional<NormalizedNode<?, ?>>>() {
             @Override
@@ -275,6 +284,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} exists {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         return txFutureCallback.enqueueReadOperation(new ReadOperation<Boolean>() {
             @Override
@@ -292,6 +303,25 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 "Transaction is sealed - further modifications are not allowed");
     }
 
+    private void throttleOperation() {
+        throttleOperation(1);
+    }
+
+    private void throttleOperation(int acquirePermits) {
+        try {
+            if(!operationLimiter.tryAcquire(acquirePermits, actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+                LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
+            }
+        } catch (InterruptedException e) {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
+            } else {
+                LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
+            }
+        }
+    }
+
+
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
 
@@ -299,6 +329,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} write {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
             @Override
@@ -315,6 +347,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} merge {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
             @Override
@@ -331,6 +365,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         LOG.debug("Tx {} delete {}", identifier, path);
 
+        throttleOperation();
+
         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
         txFutureCallback.enqueueModifyOperation(new TransactionOperation() {
             @Override
@@ -345,6 +381,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         checkModificationState();
 
+        throttleOperation(txFutureCallbackMap.size());
+
         inReadyState = true;
 
         LOG.debug("Tx {} Readying {} transactions for commit", identifier,
@@ -668,7 +706,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
                             failure.getMessage());
 
-                    localTransactionContext = new NoOpTransactionContext(failure, identifier);
+                    localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
                 } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                     localTransactionContext = createValidTransactionContext(
                             CreateTransactionReply.fromSerializable(response));
@@ -676,7 +714,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                     IllegalArgumentException exception = new IllegalArgumentException(String.format(
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-                    localTransactionContext = new NoOpTransactionContext(exception, identifier);
+                    localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
                 }
 
                 for(TransactionOperation oper: txOperationsOnComplete) {
@@ -713,7 +751,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
             return new TransactionContextImpl(transactionPath, transactionActor, identifier,
-                actorContext, schemaContext, isTxActorLocal, reply.getVersion());
+                actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
         }
     }
 
@@ -755,35 +793,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
 
         private final ActorContext actorContext;
-        private final SchemaContext schemaContext;
         private final String transactionPath;
         private final ActorSelection actor;
         private final boolean isTxActorLocal;
         private final short remoteTransactionVersion;
+        private final OperationCompleter operationCompleter;
+
 
         private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
                 ActorContext actorContext, SchemaContext schemaContext,
-                boolean isTxActorLocal, short remoteTransactionVersion) {
+                boolean isTxActorLocal, short remoteTransactionVersion, OperationCompleter operationCompleter) {
             super(identifier);
             this.transactionPath = transactionPath;
             this.actor = actor;
             this.actorContext = actorContext;
-            this.schemaContext = schemaContext;
             this.isTxActorLocal = isTxActorLocal;
             this.remoteTransactionVersion = remoteTransactionVersion;
+            this.operationCompleter = operationCompleter;
         }
 
+        private Future<Object> completeOperation(Future<Object> operationFuture){
+            operationFuture.onComplete(this.operationCompleter, actorContext.getActorSystem().dispatcher());
+            return operationFuture;
+        }
+
+
         private ActorSelection getActor() {
             return actor;
         }
 
         private Future<Object> executeOperationAsync(SerializableMessage msg) {
-            return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable());
+            return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg : msg.toSerializable()));
         }
 
         private Future<Object> executeOperationAsync(VersionedSerializableMessage msg) {
-            return actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
-                msg.toSerializable(remoteTransactionVersion));
+            return completeOperation(actorContext.executeOperationAsync(getActor(), isTxActorLocal ? msg :
+                    msg.toSerializable(remoteTransactionVersion)));
         }
 
         @Override
@@ -1057,10 +1102,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
         private final Throwable failure;
+        private final Semaphore operationLimiter;
 
-        public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){
+        public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, Semaphore operationLimiter){
             super(identifier);
             this.failure = failure;
+            this.operationLimiter = operationLimiter;
         }
 
         @Override
@@ -1071,28 +1118,33 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         @Override
         public Future<ActorSelection> readyTransaction() {
             LOG.debug("Tx {} readyTransaction called", identifier);
+            operationLimiter.release();
             return akka.dispatch.Futures.failed(failure);
         }
 
         @Override
         public void deleteData(YangInstanceIdentifier path) {
             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+            operationLimiter.release();
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+            operationLimiter.release();
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} writeData called path = {}", identifier, path);
+            operationLimiter.release();
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
                 YangInstanceIdentifier path) {
             LOG.debug("Tx {} readData called path = {}", identifier, path);
+            operationLimiter.release();
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error reading data for path " + path, failure));
         }
@@ -1101,8 +1153,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public CheckedFuture<Boolean, ReadFailedException> dataExists(
                 YangInstanceIdentifier path) {
             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+            operationLimiter.release();
             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
                     "Error checking exists for path " + path, failure));
         }
     }
+
+    private static class OperationCompleter extends OnComplete<Object> {
+        private final Semaphore operationLimiter;
+        OperationCompleter(Semaphore operationLimiter){
+            this.operationLimiter = operationLimiter;
+        }
+
+        @Override
+        public void onComplete(Throwable throwable, Object o){
+            this.operationLimiter.release();
+        }
+    }
 }
index f217d05..c9fdf38 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -21,6 +22,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
@@ -45,8 +47,6 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import static akka.pattern.Patterns.ask;
-
 /**
  * The ActorContext class contains utility methods which could be used by
  * non-actors (like DistributedDataStore) to work with actors a little more
@@ -84,6 +84,7 @@ public class ActorContext {
     private final FiniteDuration operationDuration;
     private final Timeout operationTimeout;
     private final String selfAddressHostPort;
+    private final int transactionOutstandingOperationLimit;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -110,6 +111,8 @@ public class ActorContext {
         } else {
             selfAddressHostPort = null;
         }
+
+        transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -431,4 +434,16 @@ public class ActorContext {
 
         return builder.toString();
     }
+
+    /**
+     * Get the maximum number of operations that are to be permitted within a transaction before the transaction
+     * should begin throttling the operations
+     *
+     * Parking reading this configuration here because we need to get to the actor system settings
+     *
+     * @return
+     */
+    public int getTransactionOutstandingOperationLimit(){
+        return transactionOutstandingOperationLimit;
+    }
 }
index ce0547c..dd37371 100644 (file)
@@ -11,7 +11,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -19,19 +18,21 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class TransactionChainProxyTest {
-    ActorContext actorContext = mock(ActorContext.class);
+public class TransactionChainProxyTest extends AbstractActorTest{
+    ActorContext actorContext = null;
     SchemaContext schemaContext = mock(SchemaContext.class);
 
     @Before
     public void setUp() {
-        doReturn(schemaContext).when(actorContext).getSchemaContext();
+        actorContext = new MockActorContext(getSystem());
+        actorContext.setSchemaContext(schemaContext);
     }
 
     @SuppressWarnings("resource")
index 5e53b29..79edd19 100644 (file)
@@ -10,6 +10,7 @@ import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
@@ -30,6 +31,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -59,6 +61,7 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
@@ -118,7 +121,7 @@ public class TransactionProxyTest {
 
         schemaContext = TestModel.createTestContext();
 
-        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build();
+        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build();
 
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
@@ -126,6 +129,7 @@ public class TransactionProxyTest {
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext();
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
 
         ShardStrategyFactory.setConfiguration(configuration);
     }
@@ -358,6 +362,10 @@ public class TransactionProxyTest {
         return mergeSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
     }
 
+    private Future<Object> incompleteFuture(){
+        return mock(Future.class);
+    }
+
     private Future<MergeDataReply> mergeDataReply() {
         return Futures.successful(new MergeDataReply());
     }
@@ -395,6 +403,10 @@ public class TransactionProxyTest {
                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
                         eqCreateTransaction(memberName, type));
 
+        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
+
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
         return actorRef;
     }
 
@@ -1222,4 +1234,425 @@ public class TransactionProxyTest {
 
         verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
     }
+
+    private static interface TransactionProxyOperation {
+        void run(TransactionProxy transactionProxy);
+    }
+
+    private void throttleOperation(TransactionProxyOperation operation) {
+        throttleOperation(operation, 1, true);
+    }
+
+    private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(actorSystem.actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        if(shardFound) {
+            doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+                    when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        } else {
+            doReturn(Futures.failed(new Exception("not found")))
+                    .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        }
+
+        String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+                .setTransactionId("txn-1")
+                .setTransactionActorPath(actorPath)
+                .build();
+
+        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(memberName, READ_WRITE));
+
+        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        long start = System.currentTimeMillis();
+
+        operation.run(transactionProxy);
+
+        long end = System.currentTimeMillis();
+
+        Assert.assertTrue(String.format("took less time than expected %s was %s",
+                mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
+                (end-start)), (end - start) > mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
+
+    }
+
+    private void completeOperation(TransactionProxyOperation operation){
+        completeOperation(operation, true);
+    }
+
+    private void completeOperation(TransactionProxyOperation operation, boolean shardFound){
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(actorSystem.actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        if(shardFound) {
+            doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+                    when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        } else {
+            doReturn(Futures.failed(new Exception("not found")))
+                    .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+        }
+
+        String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+        CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+                .setTransactionId("txn-1")
+                .setTransactionActorPath(actorPath)
+                .build();
+
+        doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(memberName, READ_WRITE));
+
+        doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        long start = System.currentTimeMillis();
+
+        operation.run(transactionProxy);
+
+        long end = System.currentTimeMillis();
+
+        Assert.assertTrue(String.format("took more time than expected %s was %s",
+                mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000,
+                (end-start)), (end - start) <= mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000);
+    }
+
+    public void testWriteThrottling(boolean shardFound){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        }, 1, shardFound);
+    }
+
+    @Test
+    public void testWriteThrottlingWhenShardFound(){
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        });
+
+    }
+
+    @Test
+    public void testWriteThrottlingWhenShardNotFound(){
+        // Confirm that there is no throttling when the Shard is not found
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        }, false);
+
+    }
+
+
+    @Test
+    public void testWriteCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqSerializedWriteData(nodeToWrite));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+            }
+        });
+
+    }
+
+    @Test
+    public void testMergeThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        });
+    }
+
+    @Test
+    public void testMergeThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        }, false);
+    }
+
+    @Test
+    public void testMergeCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqMergeData(nodeToMerge));
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+
+                transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
+            }
+        });
+
+    }
+
+    @Test
+    public void testDeleteThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+
+    @Test
+    public void testDeleteThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+    @Test
+    public void testDeleteCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDeleteData());
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+
+                transactionProxy.delete(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testReadThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+    @Test
+    public void testReadThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+
+    @Test
+    public void testReadCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqReadData());
+
+                transactionProxy.read(TestModel.TEST_PATH);
+
+                transactionProxy.read(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testExistsThrottlingWhenShardFound(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        });
+    }
+
+    @Test
+    public void testExistsThrottlingWhenShardNotFound(){
+
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        }, false);
+    }
+
+
+    @Test
+    public void testExistsCompletion(){
+        completeOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqDataExists());
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+
+                transactionProxy.exists(TestModel.TEST_PATH);
+            }
+        });
+
+    }
+
+    @Test
+    public void testReadyThrottling(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), any(ReadyTransaction.class));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.ready();
+            }
+        });
+    }
+
+    @Test
+    public void testReadyThrottlingWithTwoTransactionContexts(){
+
+        throttleOperation(new TransactionProxyOperation() {
+            @Override
+            public void run(TransactionProxy transactionProxy) {
+                NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+                NormalizedNode<?, ?> carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
+
+                doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), eqWriteData(carsNode));
+
+                doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
+                        any(ActorSelection.class), any(ReadyTransaction.class));
+
+                transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+                transactionProxy.write(TestModel.TEST_PATH, carsNode);
+
+                transactionProxy.ready();
+            }
+        }, 2, true);
+    }
 }
index fcb0324..e4ab969 100644 (file)
@@ -1,8 +1,10 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
@@ -21,10 +23,6 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
 public class ActorContextTest extends AbstractActorTest{
 
     private static class MockShardManager extends UntypedActor {
@@ -224,7 +222,7 @@ public class ActorContextTest extends AbstractActorTest{
     @Test
     public void testResolvePathForRemoteActor() {
         ActorContext actorContext =
-                new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock(
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(
                         ClusterWrapper.class),
                         mock(Configuration.class));
 
index e571e3a..67fa096 100644 (file)
@@ -20,6 +20,7 @@ public class TestModel {
 
   public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
           "test");
+
   public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
   public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
   public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");