Bug 1608: Make ActorContext ASK_DURATION configurable
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
index e12a9663d1fca8a969c25166f38986ea19eab51a..b87dc4f608b191c21d5fa34b8bf4a9744864f96a 100644 (file)
@@ -23,6 +23,8 @@ import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,15 +47,17 @@ public class ActorContext {
     private static final Logger
         LOG = LoggerFactory.getLogger(ActorContext.class);
 
-    public static final FiniteDuration ASK_DURATION =
-        Duration.create(5, TimeUnit.SECONDS);
-    public static final Duration AWAIT_DURATION =
-        Duration.create(5, TimeUnit.SECONDS);
+    private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
+
+    public static final String MAILBOX = "bounded-mailbox";
 
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
+    private volatile SchemaContext schemaContext;
+    private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
+    private Timeout operationTimeout = new Timeout(operationDuration);
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
         ClusterWrapper clusterWrapper,
@@ -80,6 +84,22 @@ public class ActorContext {
         return actorSystem.actorSelection(actorPath);
     }
 
+    public void setSchemaContext(SchemaContext schemaContext) {
+        this.schemaContext = schemaContext;
+
+        if(shardManager != null) {
+            shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+        }
+    }
+
+    public void setOperationTimeout(int timeoutInSeconds) {
+        operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
+        operationTimeout = new Timeout(operationDuration);
+    }
+
+    public SchemaContext getSchemaContext() {
+        return schemaContext;
+    }
 
     /**
      * Finds the primary for a given shard
@@ -101,7 +121,7 @@ public class ActorContext {
      */
     public ActorRef findLocalShard(String shardName) {
         Object result = executeLocalOperation(shardManager,
-            new FindLocalShard(shardName), ASK_DURATION);
+            new FindLocalShard(shardName));
 
         if (result instanceof LocalShardFound) {
             LocalShardFound found = (LocalShardFound) result;
@@ -117,7 +137,7 @@ public class ActorContext {
 
     public String findPrimaryPath(String shardName) {
         Object result = executeLocalOperation(shardManager,
-            new FindPrimary(shardName).toSerializable(), ASK_DURATION);
+            new FindPrimary(shardName).toSerializable());
 
         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
             PrimaryFound found = PrimaryFound.fromSerializable(result);
@@ -135,16 +155,13 @@ public class ActorContext {
      *
      * @param actor
      * @param message
-     * @param duration
      * @return The response of the operation
      */
-    public Object executeLocalOperation(ActorRef actor, Object message,
-        FiniteDuration duration) {
-        Future<Object> future =
-            ask(actor, message, new Timeout(duration));
+    public Object executeLocalOperation(ActorRef actor, Object message) {
+        Future<Object> future = ask(actor, message, operationTimeout);
 
         try {
-            return Await.result(future, AWAIT_DURATION);
+            return Await.result(future, operationDuration);
         } catch (Exception e) {
             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
         }
@@ -155,21 +172,19 @@ public class ActorContext {
      *
      * @param actor
      * @param message
-     * @param duration
      * @return
      */
-    public Object executeRemoteOperation(ActorSelection actor, Object message,
-        FiniteDuration duration) {
+    public Object executeRemoteOperation(ActorSelection actor, Object message) {
 
         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
 
-        Future<Object> future =
-            ask(actor, message, new Timeout(duration));
+        Future<Object> future = ask(actor, message, operationTimeout);
 
         try {
-            return Await.result(future, AWAIT_DURATION);
+            return Await.result(future, operationDuration);
         } catch (Exception e) {
-            throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
+            throw new TimeoutException("Sending message " + message.getClass().toString() +
+                    " to actor " + actor.toString() + " failed" , e);
         }
     }
 
@@ -178,15 +193,13 @@ public class ActorContext {
      *
      * @param actor the ActorSelection
      * @param message the message to send
-     * @param duration the maximum amount of time to send he message
      * @return a Future containing the eventual result
      */
-    public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
-            FiniteDuration duration) {
+    public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
 
         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
 
-        return ask(actor, message, new Timeout(duration));
+        return ask(actor, message, operationTimeout);
     }
 
     /**
@@ -209,16 +222,14 @@ public class ActorContext {
      *
      * @param shardName
      * @param message
-     * @param duration
      * @return
      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
      */
-    public Object executeShardOperation(String shardName, Object message,
-        FiniteDuration duration) {
+    public Object executeShardOperation(String shardName, Object message) {
         ActorSelection primary = findPrimary(shardName);
 
-        return executeRemoteOperation(primary, message, duration);
+        return executeRemoteOperation(primary, message);
     }
 
     /**
@@ -230,19 +241,17 @@ public class ActorContext {
      *
      * @param shardName the name of the shard on which the operation needs to be executed
      * @param message the message that needs to be sent to the shard
-     * @param duration the time duration in which this operation should complete
      * @return the message that was returned by the local actor on which the
      *         the operation was executed. If a local shard was not found then
      *         null is returned
      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
      *         if the operation does not complete in a specified time duration
      */
-    public Object executeLocalShardOperation(String shardName, Object message,
-        FiniteDuration duration) {
+    public Object executeLocalShardOperation(String shardName, Object message) {
         ActorRef local = findLocalShard(shardName);
 
         if(local != null) {
-            return executeLocalOperation(local, message, duration);
+            return executeLocalOperation(local, message);
         }
 
         return null;