Changed key actors to use bounded mailbox
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
index c7ee7d8c2cca9ced8f17cf9f4fbc2eecd5cd3f45..818a8ca8b390ec2fcb33f2cad91ad597fa7ca758 100644 (file)
@@ -14,18 +14,20 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.util.Timeout;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
+
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -50,12 +52,13 @@ public class ActorContext {
     public static final Duration AWAIT_DURATION =
         Duration.create(5, TimeUnit.SECONDS);
 
+    public static final String MAILBOX = "bounded-mailbox";
+
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
-
-    private SchemaContext schemaContext = null;
+    private volatile SchemaContext schemaContext;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
         ClusterWrapper clusterWrapper,
@@ -82,6 +85,17 @@ public class ActorContext {
         return actorSystem.actorSelection(actorPath);
     }
 
+    public void setSchemaContext(SchemaContext schemaContext) {
+        this.schemaContext = schemaContext;
+
+        if(shardManager != null) {
+            shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+        }
+    }
+
+    public SchemaContext getSchemaContext() {
+        return schemaContext;
+    }
 
     /**
      * Finds the primary for a given shard
@@ -94,6 +108,29 @@ public class ActorContext {
         return actorSystem.actorSelection(path);
     }
 
+    /**
+     * Finds a local shard given it's shard name and return it's ActorRef
+     *
+     * @param shardName the name of the local shard that needs to be found
+     * @return a reference to a local shard actor which represents the shard
+     *         specified by the shardName
+     */
+    public ActorRef findLocalShard(String shardName) {
+        Object result = executeLocalOperation(shardManager,
+            new FindLocalShard(shardName), ASK_DURATION);
+
+        if (result instanceof LocalShardFound) {
+            LocalShardFound found = (LocalShardFound) result;
+
+            LOG.debug("Local shard found {}", found.getPath());
+
+            return found.getPath();
+        }
+
+        return null;
+    }
+
+
     public String findPrimaryPath(String shardName) {
         Object result = executeLocalOperation(shardManager,
             new FindPrimary(shardName).toSerializable(), ASK_DURATION);
@@ -105,7 +142,7 @@ public class ActorContext {
 
             return found.getPrimaryPath();
         }
-        throw new PrimaryNotFoundException();
+        throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
     }
 
 
@@ -125,7 +162,7 @@ public class ActorContext {
         try {
             return Await.result(future, AWAIT_DURATION);
         } catch (Exception e) {
-            throw new TimeoutException(e);
+            throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
         }
     }
 
@@ -148,10 +185,37 @@ public class ActorContext {
         try {
             return Await.result(future, AWAIT_DURATION);
         } catch (Exception e) {
-            throw new TimeoutException(e);
+            throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
         }
     }
 
+    /**
+     * Execute an operation on a remote actor asynchronously.
+     *
+     * @param actor the ActorSelection
+     * @param message the message to send
+     * @param duration the maximum amount of time to send he message
+     * @return a Future containing the eventual result
+     */
+    public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
+            FiniteDuration duration) {
+
+        LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+
+        return ask(actor, message, new Timeout(duration));
+    }
+
+    /**
+     * Sends an operation to be executed by a remote actor asynchronously without waiting for a
+     * reply (essentially set and forget).
+     *
+     * @param actor the ActorSelection
+     * @param message the message to send
+     */
+    public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
+        actor.tell(message, ActorRef.noSender());
+    }
+
     /**
      * Execute an operation on the primary for a given shard
      * <p>
@@ -173,27 +237,48 @@ public class ActorContext {
         return executeRemoteOperation(primary, message, duration);
     }
 
-    public void shutdown() {
-        shardManager.tell(PoisonPill.getInstance(), null);
-        actorSystem.shutdown();
+    /**
+     * Execute an operation on the the local shard only
+     * <p>
+     *     This method first finds the address of the local shard if any. It then
+     *     executes the operation on it.
+     * </p>
+     *
+     * @param shardName the name of the shard on which the operation needs to be executed
+     * @param message the message that needs to be sent to the shard
+     * @param duration the time duration in which this operation should complete
+     * @return the message that was returned by the local actor on which the
+     *         the operation was executed. If a local shard was not found then
+     *         null is returned
+     * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
+     *         if the operation does not complete in a specified time duration
+     */
+    public Object executeLocalShardOperation(String shardName, Object message,
+        FiniteDuration duration) {
+        ActorRef local = findLocalShard(shardName);
+
+        if(local != null) {
+            return executeLocalOperation(local, message, duration);
+        }
+
+        return null;
     }
 
-    public String getRemoteActorPath(final String shardName,
-        final String localPathOfRemoteActor) {
-        final String path = findPrimaryPath(shardName);
 
-        LoadingCache<String, String> graphs = CacheBuilder.newBuilder()
-            .expireAfterAccess(2, TimeUnit.SECONDS)
-            .build(
-                new CacheLoader<String, String>() {
-                    public String load(String key) {
-                        return resolvePath(path, localPathOfRemoteActor);
-                    }
-                }
-            );
-        return graphs.getUnchecked(localPathOfRemoteActor);
+    public void shutdown() {
+        shardManager.tell(PoisonPill.getInstance(), null);
+        actorSystem.shutdown();
     }
 
+    /**
+     * @deprecated Need to stop using this method. There are ways to send a
+     * remote ActorRef as a string which should be used instead of this hack
+     *
+     * @param primaryPath
+     * @param localPathOfRemoteActor
+     * @return
+     */
+    @Deprecated
     public String resolvePath(final String primaryPath,
         final String localPathOfRemoteActor) {
         StringBuilder builder = new StringBuilder();