Serialization/Deserialization and a host of other fixes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
index 2f1949ec6a8b12c701e87d696b8777dedece947f..c7ee7d8c2cca9ced8f17cf9f4fbc2eecd5cd3f45 100644 (file)
@@ -14,6 +14,10 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.util.Timeout;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
@@ -41,18 +45,24 @@ public class ActorContext {
     private static final Logger
         LOG = LoggerFactory.getLogger(ActorContext.class);
 
-    public static final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS);
-    public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS);
+    public static final FiniteDuration ASK_DURATION =
+        Duration.create(5, TimeUnit.SECONDS);
+    public static final Duration AWAIT_DURATION =
+        Duration.create(5, TimeUnit.SECONDS);
 
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
+    private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
 
     private SchemaContext schemaContext = null;
 
-    public ActorContext(ActorSystem actorSystem, ActorRef shardManager, Configuration configuration){
+    public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
+        ClusterWrapper clusterWrapper,
+        Configuration configuration) {
         this.actorSystem = actorSystem;
         this.shardManager = shardManager;
+        this.clusterWrapper = clusterWrapper;
         this.configuration = configuration;
     }
 
@@ -64,11 +74,11 @@ public class ActorContext {
         return shardManager;
     }
 
-    public ActorSelection actorSelection(String actorPath){
+    public ActorSelection actorSelection(String actorPath) {
         return actorSystem.actorSelection(actorPath);
     }
 
-    public ActorSelection actorSelection(ActorPath actorPath){
+    public ActorSelection actorSelection(ActorPath actorPath) {
         return actorSystem.actorSelection(actorPath);
     }
 
@@ -80,28 +90,35 @@ public class ActorContext {
      * @return
      */
     public ActorSelection findPrimary(String shardName) {
+        String path = findPrimaryPath(shardName);
+        return actorSystem.actorSelection(path);
+    }
+
+    public String findPrimaryPath(String shardName) {
         Object result = executeLocalOperation(shardManager,
-            new FindPrimary(shardName), ASK_DURATION);
+            new FindPrimary(shardName).toSerializable(), ASK_DURATION);
 
-        if(result instanceof PrimaryFound){
-            PrimaryFound found = (PrimaryFound) result;
+        if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+            PrimaryFound found = PrimaryFound.fromSerializable(result);
 
-            LOG.error("Primary found {}", found.getPrimaryPath());
+            LOG.debug("Primary found {}", found.getPrimaryPath());
 
-            return actorSystem.actorSelection(found.getPrimaryPath());
+            return found.getPrimaryPath();
         }
         throw new PrimaryNotFoundException();
     }
 
+
     /**
      * Executes an operation on a local actor and wait for it's response
+     *
      * @param actor
      * @param message
      * @param duration
      * @return The response of the operation
      */
     public Object executeLocalOperation(ActorRef actor, Object message,
-        FiniteDuration duration){
+        FiniteDuration duration) {
         Future<Object> future =
             ask(actor, message, new Timeout(duration));
 
@@ -114,13 +131,17 @@ public class ActorContext {
 
     /**
      * Execute an operation on a remote actor and wait for it's response
+     *
      * @param actor
      * @param message
      * @param duration
      * @return
      */
     public Object executeRemoteOperation(ActorSelection actor, Object message,
-        FiniteDuration duration){
+        FiniteDuration duration) {
+
+        LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+
         Future<Object> future =
             ask(actor, message, new Timeout(duration));
 
@@ -134,18 +155,19 @@ public class ActorContext {
     /**
      * Execute an operation on the primary for a given shard
      * <p>
-     *     This method first finds the primary for a given shard ,then sends
-     *     the message to the remote shard and waits for a response
+     * This method first finds the primary for a given shard ,then sends
+     * the message to the remote shard and waits for a response
      * </p>
+     *
      * @param shardName
      * @param message
      * @param duration
-     * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
-     * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
-     *
      * @return
+     * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
+     * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
      */
-    public Object executeShardOperation(String shardName, Object message, FiniteDuration duration){
+    public Object executeShardOperation(String shardName, Object message,
+        FiniteDuration duration) {
         ActorSelection primary = findPrimary(shardName);
 
         return executeRemoteOperation(primary, message, duration);
@@ -155,4 +177,44 @@ public class ActorContext {
         shardManager.tell(PoisonPill.getInstance(), null);
         actorSystem.shutdown();
     }
+
+    public String getRemoteActorPath(final String shardName,
+        final String localPathOfRemoteActor) {
+        final String path = findPrimaryPath(shardName);
+
+        LoadingCache<String, String> graphs = CacheBuilder.newBuilder()
+            .expireAfterAccess(2, TimeUnit.SECONDS)
+            .build(
+                new CacheLoader<String, String>() {
+                    public String load(String key) {
+                        return resolvePath(path, localPathOfRemoteActor);
+                    }
+                }
+            );
+        return graphs.getUnchecked(localPathOfRemoteActor);
+    }
+
+    public String resolvePath(final String primaryPath,
+        final String localPathOfRemoteActor) {
+        StringBuilder builder = new StringBuilder();
+        String[] primaryPathElements = primaryPath.split("/");
+        builder.append(primaryPathElements[0]).append("//")
+            .append(primaryPathElements[1]).append(primaryPathElements[2]);
+        String[] remotePathElements = localPathOfRemoteActor.split("/");
+        for (int i = 3; i < remotePathElements.length; i++) {
+            builder.append("/").append(remotePathElements[i]);
+        }
+
+        return builder.toString();
+
+    }
+
+    public ActorPath actorFor(String path){
+        return actorSystem.actorFor(path).path();
+    }
+
+    public String getCurrentMemberName(){
+        return clusterWrapper.getCurrentMemberName();
+    }
+
 }