Merge "BUG-2195 Enable features test for netconf connector"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
index d8af09c86b6ec01479f038104fdad350170f4991..0a1e80b0cbaea069f3a75cb558bad130d7562dce 100644 (file)
@@ -13,17 +13,21 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
+import akka.dispatch.Mapper;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -33,9 +37,7 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.TimeUnit;
-
 import static akka.pattern.Patterns.ask;
 
 /**
@@ -117,14 +119,14 @@ public class ActorContext {
     }
 
     /**
-     * Finds a local shard given it's shard name and return it's ActorRef
+     * Finds a local shard given its shard name and return it's ActorRef
      *
      * @param shardName the name of the local shard that needs to be found
      * @return a reference to a local shard actor which represents the shard
      *         specified by the shardName
      */
     public Optional<ActorRef> findLocalShard(String shardName) {
-        Object result = executeOperation(shardManager, new FindLocalShard(shardName));
+        Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
 
         if (result instanceof LocalShardFound) {
             LocalShardFound found = (LocalShardFound) result;
@@ -135,9 +137,40 @@ public class ActorContext {
         return Optional.absent();
     }
 
+    /**
+     * Finds a local shard async given its shard name and return a Future from which to obtain the
+     * ActorRef.
+     *
+     * @param shardName the name of the local shard that needs to be found
+     */
+    public Future<ActorRef> findLocalShardAsync( final String shardName, Timeout timeout) {
+        Future<Object> future = executeOperationAsync(shardManager,
+                new FindLocalShard(shardName, true), timeout);
+
+        return future.map(new Mapper<Object, ActorRef>() {
+            @Override
+            public ActorRef checkedApply(Object response) throws Throwable {
+                if(response instanceof LocalShardFound) {
+                    LocalShardFound found = (LocalShardFound)response;
+                    LOG.debug("Local shard found {}", found.getPath());
+                    return found.getPath();
+                } else if(response instanceof ActorNotInitialized) {
+                    throw new NotInitializedException(
+                            String.format("Found local shard for %s but it's not initialized yet.",
+                                    shardName));
+                } else if(response instanceof LocalShardNotFound) {
+                    throw new LocalShardNotFoundException(
+                            String.format("Local shard for %s does not exist.", shardName));
+                }
+
+                throw new UnknownMessageException(String.format(
+                        "FindLocalShard returned unkown response: %s", response));
+            }
+        }, getActorSystem().dispatcher());
+    }
 
     private String findPrimaryPathOrNull(String shardName) {
-        Object result = executeOperation(shardManager, new FindPrimary(shardName).toSerializable());
+        Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
 
         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
             PrimaryFound found = PrimaryFound.fromSerializable(result);
@@ -237,6 +270,10 @@ public class ActorContext {
         actorSystem.shutdown();
     }
 
+    public ClusterWrapper getClusterWrapper() {
+        return clusterWrapper;
+    }
+
     public String getCurrentMemberName(){
         return clusterWrapper.getCurrentMemberName();
     }
@@ -262,4 +299,30 @@ public class ActorContext {
     public FiniteDuration getOperationDuration() {
         return operationDuration;
     }
+
+    public boolean isLocalPath(String path) {
+        String selfAddress = clusterWrapper.getSelfAddress();
+        if (path == null || selfAddress == null) {
+            return false;
+        }
+
+        int atIndex1 = path.indexOf("@");
+        int atIndex2 = selfAddress.indexOf("@");
+
+        if (atIndex1 == -1 || atIndex2 == -1) {
+            return false;
+        }
+
+        int slashIndex1 = path.indexOf("/", atIndex1);
+        int slashIndex2 = selfAddress.indexOf("/", atIndex2);
+
+        if (slashIndex1 == -1 || slashIndex2 == -1) {
+            return false;
+        }
+
+        String hostPort1 = path.substring(atIndex1, slashIndex1);
+        String hostPort2 = selfAddress.substring(atIndex2, slashIndex2);
+
+        return hostPort1.equals(hostPort2);
+    }
 }