Bug 1608: Make ActorContext ASK_DURATION configurable 90/10390/8
authortpantelis <tpanteli@brocade.com>
Sun, 24 Aug 2014 06:03:53 +0000 (02:03 -0400)
committerMoiz Raja <moraja@cisco.com>
Thu, 4 Sep 2014 15:22:32 +0000 (15:22 +0000)
Made the duration timeout for akka ask operations configurable via the
config system.

Also removed the explicit passing of the duration timeout to
ActorContext#exeucte* methods since all calls previously passed the
static ActorContext.ASK_DURATION.

Change-Id: Ic54e65d8d444110294048348472d1d8d56f0fd44
Signed-off-by: tpantelis <tpanteli@brocade.com>
16 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java

index 202ced9a26512047f57ee8ad2b0ced7e61366771..0a137e07df43a1bb7ca2fb3e854d7f63adfd46a3 100644 (file)
@@ -72,6 +72,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
                     actorSystem, actorSystem.actorOf(
                         ShardManager.props(type, cluster, configuration, datastoreContext).
                             withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
+
+        actorContext.setOperationTimeout(dataStoreProperties.getOperationTimeoutInSeconds());
     }
 
     public DistributedDataStore(ActorContext actorContext) {
@@ -98,8 +100,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
         Object result = actorContext.executeLocalShardOperation(shardName,
-            new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
-            ActorContext.ASK_DURATION);
+            new RegisterChangeListener(path, dataChangeListenerActor.path(), scope));
 
         if (result != null) {
             RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
index eb6a5361381942306a9be7869d1048801f7c9a0f..df3245ffb225d9d3b0baf704e81e499ebdced314 100644 (file)
@@ -18,21 +18,24 @@ public class DistributedDataStoreProperties {
     private final int maxShardDataChangeExecutorQueueSize;
     private final int maxShardDataChangeExecutorPoolSize;
     private final int shardTransactionIdleTimeoutInMinutes;
+    private final int operationTimeoutInSeconds;
 
     public DistributedDataStoreProperties() {
         maxShardDataChangeListenerQueueSize = 1000;
         maxShardDataChangeExecutorQueueSize = 1000;
         maxShardDataChangeExecutorPoolSize = 20;
         shardTransactionIdleTimeoutInMinutes = 10;
+        operationTimeoutInSeconds = 5;
     }
 
     public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
             int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
-            int shardTransactionIdleTimeoutInMinutes) {
+            int shardTransactionIdleTimeoutInMinutes, int operationTimeoutInSeconds) {
         this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
         this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
         this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
         this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
+        this.operationTimeoutInSeconds = operationTimeoutInSeconds;
     }
 
     public int getMaxShardDataChangeListenerQueueSize() {
@@ -50,4 +53,8 @@ public class DistributedDataStoreProperties {
     public int getShardTransactionIdleTimeoutInMinutes() {
         return shardTransactionIdleTimeoutInMinutes;
     }
+
+    public int getOperationTimeoutInSeconds() {
+        return operationTimeoutInSeconds;
+    }
 }
index c557118b1e8f92234d597ce6372cda2674f6a6bd..a5be69531d73ede89f7bba5978a85d2e045d8989 100644 (file)
@@ -151,8 +151,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
             ActorSelection cohort = actorContext.actorSelection(actorPath);
 
-            futureList.add(actorContext.executeRemoteOperationAsync(cohort, message,
-                    ActorContext.ASK_DURATION));
+            futureList.add(actorContext.executeRemoteOperationAsync(cohort, message));
         }
 
         return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
index fc1a3aad74ca95ca0545f319303c50a50cd994df..a8b20c030e1dd34f274ce2e02b56669739824af3 100644 (file)
@@ -353,8 +353,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         try {
             Object response = actorContext.executeShardOperation(shardName,
-                new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
-                ActorContext.ASK_DURATION);
+                new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable());
             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                 CreateTransactionReply reply =
                     CreateTransactionReply.fromSerializable(response);
@@ -472,7 +471,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Send the ReadyTransaction message to the Tx actor.
 
             final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
-                    new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
+                    new ReadyTransaction().toSerializable());
 
             // Combine all the previously recorded put/merge/delete operation reply Futures and the
             // ReadyTransactionReply Future into one Future. If any one fails then the combined
@@ -532,23 +531,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         public void deleteData(YangInstanceIdentifier path) {
             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
+                    new DeleteData(path).toSerializable() ));
         }
 
         @Override
         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new MergeData(path, data, schemaContext).toSerializable(),
-                    ActorContext.ASK_DURATION));
+                    new MergeData(path, data, schemaContext).toSerializable()));
         }
 
         @Override
         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
             LOG.debug("Tx {} writeData called path = {}", identifier, path);
             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new WriteData(path, data, schemaContext).toSerializable(),
-                    ActorContext.ASK_DURATION));
+                    new WriteData(path, data, schemaContext).toSerializable()));
         }
 
         @Override
@@ -634,7 +631,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             };
 
             Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
-                    new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
+                    new ReadData(path).toSerializable());
             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
 
@@ -715,7 +712,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             };
 
             Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
-                    new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
+                    new DataExists(path).toSerializable());
             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
     }
index 818a8ca8b390ec2fcb33f2cad91ad597fa7ca758..b87dc4f608b191c21d5fa34b8bf4a9744864f96a 100644 (file)
@@ -47,10 +47,7 @@ public class ActorContext {
     private static final Logger
         LOG = LoggerFactory.getLogger(ActorContext.class);
 
-    public static final FiniteDuration ASK_DURATION =
-        Duration.create(5, TimeUnit.SECONDS);
-    public static final Duration AWAIT_DURATION =
-        Duration.create(5, TimeUnit.SECONDS);
+    private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
 
     public static final String MAILBOX = "bounded-mailbox";
 
@@ -59,6 +56,8 @@ public class ActorContext {
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
     private volatile SchemaContext schemaContext;
+    private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
+    private Timeout operationTimeout = new Timeout(operationDuration);
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
         ClusterWrapper clusterWrapper,
@@ -93,6 +92,11 @@ public class ActorContext {
         }
     }
 
+    public void setOperationTimeout(int timeoutInSeconds) {
+        operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
+        operationTimeout = new Timeout(operationDuration);
+    }
+
     public SchemaContext getSchemaContext() {
         return schemaContext;
     }
@@ -117,7 +121,7 @@ public class ActorContext {
      */
     public ActorRef findLocalShard(String shardName) {
         Object result = executeLocalOperation(shardManager,
-            new FindLocalShard(shardName), ASK_DURATION);
+            new FindLocalShard(shardName));
 
         if (result instanceof LocalShardFound) {
             LocalShardFound found = (LocalShardFound) result;
@@ -133,7 +137,7 @@ public class ActorContext {
 
     public String findPrimaryPath(String shardName) {
         Object result = executeLocalOperation(shardManager,
-            new FindPrimary(shardName).toSerializable(), ASK_DURATION);
+            new FindPrimary(shardName).toSerializable());
 
         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
             PrimaryFound found = PrimaryFound.fromSerializable(result);
@@ -151,16 +155,13 @@ public class ActorContext {
      *
      * @param actor
      * @param message
-     * @param duration
      * @return The response of the operation
      */
-    public Object executeLocalOperation(ActorRef actor, Object message,
-        FiniteDuration duration) {
-        Future<Object> future =
-            ask(actor, message, new Timeout(duration));
+    public Object executeLocalOperation(ActorRef actor, Object message) {
+        Future<Object> future = ask(actor, message, operationTimeout);
 
         try {
-            return Await.result(future, AWAIT_DURATION);
+            return Await.result(future, operationDuration);
         } catch (Exception e) {
             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
         }
@@ -171,21 +172,19 @@ public class ActorContext {
      *
      * @param actor
      * @param message
-     * @param duration
      * @return
      */
-    public Object executeRemoteOperation(ActorSelection actor, Object message,
-        FiniteDuration duration) {
+    public Object executeRemoteOperation(ActorSelection actor, Object message) {
 
         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
 
-        Future<Object> future =
-            ask(actor, message, new Timeout(duration));
+        Future<Object> future = ask(actor, message, operationTimeout);
 
         try {
-            return Await.result(future, AWAIT_DURATION);
+            return Await.result(future, operationDuration);
         } catch (Exception e) {
-            throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
+            throw new TimeoutException("Sending message " + message.getClass().toString() +
+                    " to actor " + actor.toString() + " failed" , e);
         }
     }
 
@@ -194,15 +193,13 @@ public class ActorContext {
      *
      * @param actor the ActorSelection
      * @param message the message to send
-     * @param duration the maximum amount of time to send he message
      * @return a Future containing the eventual result
      */
-    public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
-            FiniteDuration duration) {
+    public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
 
         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
 
-        return ask(actor, message, new Timeout(duration));
+        return ask(actor, message, operationTimeout);
     }
 
     /**
@@ -225,16 +222,14 @@ public class ActorContext {
      *
      * @param shardName
      * @param message
-     * @param duration
      * @return
      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
      */
-    public Object executeShardOperation(String shardName, Object message,
-        FiniteDuration duration) {
+    public Object executeShardOperation(String shardName, Object message) {
         ActorSelection primary = findPrimary(shardName);
 
-        return executeRemoteOperation(primary, message, duration);
+        return executeRemoteOperation(primary, message);
     }
 
     /**
@@ -246,19 +241,17 @@ public class ActorContext {
      *
      * @param shardName the name of the shard on which the operation needs to be executed
      * @param message the message that needs to be sent to the shard
-     * @param duration the time duration in which this operation should complete
      * @return the message that was returned by the local actor on which the
      *         the operation was executed. If a local shard was not found then
      *         null is returned
      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
      *         if the operation does not complete in a specified time duration
      */
-    public Object executeLocalShardOperation(String shardName, Object message,
-        FiniteDuration duration) {
+    public Object executeLocalShardOperation(String shardName, Object message) {
         ActorRef local = findLocalShard(shardName);
 
         if(local != null) {
-            return executeLocalOperation(local, message, duration);
+            return executeLocalOperation(local, message);
         }
 
         return null;
index f5a0d3783ab011728ab0688db60b404a9c53719e..c26be148ee45c3ccfae16359dba45ce3ea57baca 100644 (file)
@@ -33,9 +33,11 @@ public class DistributedConfigDataStoreProviderModule extends
         }
 
         return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
-                new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
-                        props.getMaxShardDataChangeExecutorQueueSize(),
-                        props.getMaxShardDataChangeListenerQueueSize(),
-                        props.getShardTransactionIdleTimeoutInMinutes()));
+                new DistributedDataStoreProperties(
+                        props.getMaxShardDataChangeExecutorPoolSize().getValue(),
+                        props.getMaxShardDataChangeExecutorQueueSize().getValue(),
+                        props.getMaxShardDataChangeListenerQueueSize().getValue(),
+                        props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+                        props.getOperationTimeoutInSeconds().getValue()));
     }
 }
index 443334d11f65378871da7a6f2ae97097d4c2eb87..a88d09457ae938c78d87b80c5370574e311eeac4 100644 (file)
@@ -34,10 +34,12 @@ public class DistributedOperationalDataStoreProviderModule extends
 
         return DistributedDataStoreFactory.createInstance("operational",
                 getOperationalSchemaServiceDependency(),
-                new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
-                        props.getMaxShardDataChangeExecutorQueueSize(),
-                        props.getMaxShardDataChangeListenerQueueSize(),
-                        props.getShardTransactionIdleTimeoutInMinutes()));
+                new DistributedDataStoreProperties(
+                        props.getMaxShardDataChangeExecutorPoolSize().getValue(),
+                        props.getMaxShardDataChangeExecutorQueueSize().getValue(),
+                        props.getMaxShardDataChangeListenerQueueSize().getValue(),
+                        props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+                        props.getOperationTimeoutInSeconds().getValue()));
     }
 
 }
index a9a8a1ad986c65835018f80773707561473934b8..d50be2ca0ef8fdc8123e5b63a62887034bed0bb1 100644 (file)
@@ -36,30 +36,48 @@ module distributed-datastore-provider {
                 config:java-name-prefix DistributedOperationalDataStoreProvider;
      }
 
+    typedef non-zero-uint16-type {
+        type uint16 {
+            range "1..max";
+        }
+    }
+    
+    typedef operation-timeout-type {
+        type uint16 {
+            range "5..max";
+        }
+    }
+    
     grouping data-store-properties {
         leaf max-shard-data-change-executor-queue-size {
             default 1000;
-            type uint16;
+            type non-zero-uint16-type;
             description "The maximum queue size for each shard's data store data change notification executor.";
          }
 
          leaf max-shard-data-change-executor-pool-size {
             default 20;
-            type uint16;
+            type non-zero-uint16-type;
             description "The maximum thread pool size for each shard's data store data change notification executor.";
          }
 
          leaf max-shard-data-change-listener-queue-size {
             default 1000;
-            type uint16;
+            type non-zero-uint16-type;
             description "The maximum queue size for each shard's data store data change listeners.";
          }
          
          leaf shard-transaction-idle-timeout-in-minutes {
             default 10;
-            type uint16;
+            type non-zero-uint16-type;
             description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
          }
+         
+         leaf operation-timeout-in-seconds {
+            default 5;
+            type operation-timeout-type;
+            description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
+         }
     }
     
     // Augments the 'configuration' choice node under modules/module.
index e653c3d3717351182a6a57cf570c4a6bf6449500..2ed11cfbda21eff65b3cb6223b671731c891ab67 100644 (file)
@@ -82,8 +82,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest {
         ActorContext
             testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
-            .executeLocalOperation(actorRef, "messages",
-                ActorContext.ASK_DURATION);
+            .executeLocalOperation(actorRef, "messages");
 
         Assert.assertNotNull(messages);
 
index c99a7e8c8c4908133bac2a9710e3549f02e2410c..3d0aaa0082e55e8b73976af4637977dd9111c97d 100644 (file)
@@ -62,8 +62,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
         ActorContext
             testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
-            .executeLocalOperation(actorRef, "messages",
-                ActorContext.ASK_DURATION);
+            .executeLocalOperation(actorRef, "messages");
 
         Assert.assertNotNull(messages);
 
index e10570cd158bdfc4a1d98132788cd89f47cddc15..e39b9abd65a711e333f9c0315b25de2e3457266a 100644 (file)
@@ -13,6 +13,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -185,17 +186,17 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
 
             ).build();
 
+        Timeout askTimeout = new Timeout(ASK_RESULT_DURATION);
+
         //This is done so that Modification list is updated which is used during commit
-        Future future =
-            akka.pattern.Patterns.ask(shardTransaction, writeData, 3000);
+        Future<Object> future = akka.pattern.Patterns.ask(shardTransaction, writeData, askTimeout);
 
         //ready transaction creates the cohort so that we get into the
         //block where in commmit is done
         ShardTransactionMessages.ReadyTransaction readyTransaction =
             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
 
-        future =
-            akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000);
+        future = akka.pattern.Patterns.ask(shardTransaction, readyTransaction, askTimeout);
 
         //but when the message is sent it will have the MockCommit object
         //so that we can simulate throwing of exception
@@ -216,10 +217,7 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
         when(mockModification.toSerializable()).thenReturn(
             PersistentMessages.CompositeModification.newBuilder().build());
 
-        future =
-            akka.pattern.Patterns.ask(subject,
-                mockForwardCommitTransaction
-                , 3000);
+        future = akka.pattern.Patterns.ask(subject, mockForwardCommitTransaction, askTimeout);
         Await.result(future, ASK_RESULT_DURATION);
     }
 
index adb12b298e99260b6f33a6c309f3c6eb16ca78bb..1cd0f85fa1917057f77144f23009a7edb65c0150 100644 (file)
@@ -35,7 +35,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 
 import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -93,12 +92,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         }
 
         stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
-                isA(requestType), any(FiniteDuration.class));
+                isA(requestType));
     }
 
     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
         verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
-                any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
+                any(ActorSelection.class), isA(requestType));
     }
 
     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
index f69ae88ec873ed044ab40c36a63e6f0b68b9db67..e5392e025158704f44d152306aaa727b64d460e8 100644 (file)
@@ -56,8 +56,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -216,10 +214,6 @@ public class TransactionProxyTest extends AbstractActorTest {
         return getSystem().actorSelection(actorRef.path());
     }
 
-    private FiniteDuration anyDuration() {
-        return any(FiniteDuration.class);
-    }
-
     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
         return CreateTransactionReply.newBuilder()
             .setTransactionActorPath(actorRef.path().toString())
@@ -232,7 +226,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 when(mockActorContext).actorSelection(actorRef.path().toString());
         doReturn(createTransactionReply(actorRef)).when(mockActorContext).
                 executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
-                        eqCreateTransaction(memberName, type), anyDuration());
+                        eqCreateTransaction(memberName, type));
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
                 anyString(), eq(actorRef.path().toString()));
         doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
@@ -259,7 +253,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 READ_ONLY);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
                 TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
@@ -269,7 +263,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
 
@@ -283,7 +277,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+                executeRemoteOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -296,7 +290,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+                executeRemoteOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -308,7 +302,7 @@ public class TransactionProxyTest extends AbstractActorTest {
             throws Throwable {
 
         doThrow(exToThrow).when(mockActorContext).executeShardOperation(
-                anyString(), any(), anyDuration());
+                anyString(), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -348,14 +342,13 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
-                        anyDuration());
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -368,7 +361,7 @@ public class TransactionProxyTest extends AbstractActorTest {
             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
         } finally {
             verify(mockActorContext, times(0)).executeRemoteOperationAsync(
-                    eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                    eq(actorSelection(actorRef)), eqReadData());
         }
     }
 
@@ -379,10 +372,10 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(expectedNode));
 
         doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -414,14 +407,14 @@ public class TransactionProxyTest extends AbstractActorTest {
                 READ_ONLY);
 
         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
         assertEquals("Exists response", false, exists);
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
@@ -443,7 +436,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+                executeRemoteOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -456,7 +449,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+                executeRemoteOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -471,14 +464,13 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
-                        anyDuration());
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
 
         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -491,7 +483,7 @@ public class TransactionProxyTest extends AbstractActorTest {
             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
         } finally {
             verify(mockActorContext, times(0)).executeRemoteOperationAsync(
-                    eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                    eq(actorSelection(actorRef)), eqDataExists());
         }
     }
 
@@ -502,10 +494,10 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+                eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -556,7 +548,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -564,7 +556,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
         verify(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 WriteDataReply.SERIALIZABLE_CLASS);
@@ -599,7 +591,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -607,7 +599,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
         verify(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 MergeDataReply.SERIALIZABLE_CLASS);
@@ -618,7 +610,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
 
         doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+                eq(actorSelection(actorRef)), eqDeleteData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -626,7 +618,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         transactionProxy.delete(TestModel.TEST_PATH);
 
         verify(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+                eq(actorSelection(actorRef)), eqDeleteData());
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 DeleteDataReply.SERIALIZABLE_CLASS);
@@ -665,13 +657,13 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
@@ -700,14 +692,13 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
-                        anyDuration());
+                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -736,11 +727,11 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
-                        isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+                        isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -763,7 +754,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
 
         doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
-                anyString(), any(), anyDuration());
+                anyString(), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -793,11 +784,11 @@ public class TransactionProxyTest extends AbstractActorTest {
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+                eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
                 executeRemoteOperationAsync(eq(actorSelection(actorRef)),
-                        isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+                        isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -830,7 +821,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
-                eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+                eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_WRITE);
index fda9ccdfdbbd44dda363c92b842d246e389466bb..5d8fb8393d6c4fd773a0a94b6fd156ac02fe1c14 100644 (file)
@@ -117,7 +117,7 @@ public class ActorContextTest extends AbstractActorTest{
                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
                             mock(Configuration.class));
 
-                    Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+                    Object out = actorContext.executeLocalShardOperation("default", "hello");
 
                     assertEquals("hello", out);
 
@@ -144,7 +144,7 @@ public class ActorContextTest extends AbstractActorTest{
                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
                             mock(Configuration.class));
 
-                    Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+                    Object out = actorContext.executeLocalShardOperation("default", "hello");
 
                     assertNull(out);
 
@@ -232,7 +232,7 @@ public class ActorContextTest extends AbstractActorTest{
 
                     ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-                    Object out = actorContext.executeRemoteOperation(actor, "hello", duration("3 seconds"));
+                    Object out = actorContext.executeRemoteOperation(actor, "hello");
 
                     assertEquals("hello", out);
 
@@ -261,8 +261,7 @@ public class ActorContextTest extends AbstractActorTest{
 
                     ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-                    Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello",
-                            Duration.create(3, TimeUnit.SECONDS));
+                    Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello");
 
                     try {
                         Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
index b19fd3a5290b2c8df92917403d7a501bcf79bfcf..8fa3a17f901541f79be1ec45a373ebe097ba69a1 100644 (file)
@@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
-import scala.concurrent.duration.FiniteDuration;
 
 public class MockActorContext extends ActorContext {
 
@@ -33,12 +32,12 @@ public class MockActorContext extends ActorContext {
 
 
     @Override public Object executeShardOperation(String shardName,
-        Object message, FiniteDuration duration) {
+        Object message) {
         return executeShardOperationResponse;
     }
 
     @Override public Object executeRemoteOperation(ActorSelection actor,
-        Object message, FiniteDuration duration) {
+        Object message) {
         return executeRemoteOperationResponse;
     }
 
@@ -76,13 +75,13 @@ public class MockActorContext extends ActorContext {
 
     @Override
     public Object executeLocalOperation(ActorRef actor,
-        Object message, FiniteDuration duration) {
+        Object message) {
         return this.executeLocalOperationResponse;
     }
 
     @Override
     public Object executeLocalShardOperation(String shardName,
-        Object message, FiniteDuration duration) {
+        Object message) {
         return this.executeLocalShardOperationResponse;
     }
 }
index 939096e7f306c7c6d3f0b131fa0880f6374fa274..4ddba2f1b9d773b3c4783d3e401fa7df43d32a0e 100644 (file)
@@ -21,8 +21,7 @@ public class TestUtils {
         ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf(
             Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
-            .executeLocalOperation(actorRef, "messages",
-                ActorContext.ASK_DURATION);
+            .executeLocalOperation(actorRef, "messages");
 
         Assert.assertNotNull(messages);