Updated remote rpc code after integration tests. Rpc execution is failing because... 68/8768/7
authorHarman Singh <harmasin@cisco.com>
Tue, 8 Jul 2014 00:11:22 +0000 (17:11 -0700)
committerHarman Singh <harmasin@cisco.com>
Tue, 22 Jul 2014 00:06:09 +0000 (17:06 -0700)
Change-Id: Ic51bba7088371b4428624da9744e0563abdc4a95
Signed-off-by: Harman Singh <harmasin@cisco.com>
40 files changed:
opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-md-sal.xml
opendaylight/md-sal/sal-remoterpc-connector/pom.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModuleFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorUtil.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/XmlUtils.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRoutedRpc.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRpc.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpc.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpcReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpc.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpcReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRoutedRpc.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/Monitor.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRoutedRpc.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRpc.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RoutingTableData.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapper.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapperImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/yang/remote-rpc-connector.yang [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java [new file with mode: 0644]

index 38df344f858641d60d4913e0dc251efc6427d64d..9534094fa85fca88574225ad5c0e241f9927ca94 100644 (file)
                     </schema-service>
                 </module>
                 -->
-
+                <!-- Cluster RPC -->
+                <!-- Enable the following module if you want to use remote rpc connector
+                <module>
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">prefix:remote-rpc-connector</type>
+                    <name>remote-rpc-connector</name>
+                    <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">
+                        <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
+                        <name>dom-broker</name>
+                    </dom-broker>
+                </module>
+                -->
                 <module>
                     <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:inmemory-datastore-provider">prefix:inmemory-operational-datastore-provider</type>
                     <name>operational-store-service</name>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml
new file mode 100644 (file)
index 0000000..6ee301d
--- /dev/null
@@ -0,0 +1,177 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>sal-parent</artifactId>
+    <version>1.1-SNAPSHOT</version>
+  </parent>
+  <artifactId>sal-remoterpc-connector</artifactId>
+  <packaging>bundle</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-actor_${scala.version}</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-cluster_${scala.version}</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-remote_${scala.version}</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-testkit_${scala.version}</artifactId>
+    </dependency>
+
+    <!-- SAL Dependencies -->
+
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-connector-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-common-util</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-core-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-util</artifactId>
+    </dependency>
+
+    <!-- Yang tools-->
+
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-data-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-data-impl</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-common</artifactId>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.osgi</groupId>
+      <artifactId>org.osgi.core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+    </dependency>
+
+    <!-- Test Dependencies -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <extensions>true</extensions>
+        <configuration>
+          <instructions>
+            <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+            <Export-package></Export-package>
+            <Private-Package></Private-Package>
+            <Import-Package>!org.jboss.*;!com.jcraft.*;*</Import-Package>
+            <Embed-Dependency>
+                !sal*;
+                !*config-api*;
+                !*testkit*;
+                *protobuf*;
+                akka*;
+                *scala*;
+                *config*;
+                *netty*;
+                *uncommons*;
+            </Embed-Dependency>
+            <Embed-Transitive>true</Embed-Transitive>
+          </instructions>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.opendaylight.yangtools</groupId>
+        <artifactId>yang-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>config</id>
+            <goals>
+              <goal>generate-sources</goal>
+            </goals>
+            <configuration>
+              <codeGenerators>
+                <generator>
+                  <codeGeneratorClass>org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator</codeGeneratorClass>
+                  <outputBaseDir>${jmxGeneratorPath}</outputBaseDir>
+                  <additionalConfiguration>
+                    <namespaceToPackage1>urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang</namespaceToPackage1>
+                  </additionalConfiguration>
+                </generator>
+                <generator>
+                  <codeGeneratorClass>org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl</codeGeneratorClass>
+                  <outputBaseDir>${salGeneratorPath}</outputBaseDir>
+                </generator>
+              </codeGenerators>
+              <inspectDependencies>true</inspectDependencies>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <scm>
+    <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+    <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+    <tag>HEAD</tag>
+    <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL:Architecture:Clustering</url>
+  </scm>
+</project>
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java
new file mode 100644 (file)
index 0000000..8315bbe
--- /dev/null
@@ -0,0 +1,31 @@
+package org.opendaylight.controller.config.yang.config.remote_rpc_connector;
+
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderFactory;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.osgi.framework.BundleContext;
+
+public class RemoteRPCBrokerModule extends org.opendaylight.controller.config.yang.config.remote_rpc_connector.AbstractRemoteRPCBrokerModule {
+  private BundleContext bundleContext;
+  public RemoteRPCBrokerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+    super(identifier, dependencyResolver);
+  }
+
+  public RemoteRPCBrokerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.config.remote_rpc_connector.RemoteRPCBrokerModule oldModule, java.lang.AutoCloseable oldInstance) {
+    super(identifier, dependencyResolver, oldModule, oldInstance);
+  }
+
+  @Override
+  public void customValidation() {
+     // add custom validation form module attributes here.
+  }
+
+  @Override
+  public java.lang.AutoCloseable createInstance() {
+    Broker broker = getDomBrokerDependency();
+    return RemoteRpcProviderFactory.createInstance(broker, bundleContext);
+  }
+
+  public void setBundleContext(final BundleContext bundleContext) {
+    this.bundleContext = bundleContext;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModuleFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModuleFactory.java
new file mode 100644 (file)
index 0000000..e1ba46a
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+* Generated file
+*
+* Generated from: yang module name: remote-rpc-connector yang module local name: remote-rpc-connector
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Mon Jul 07 17:02:25 PDT 2014
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.config.remote_rpc_connector;
+
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
+import org.opendaylight.controller.config.spi.Module;
+import org.osgi.framework.BundleContext;
+
+public class RemoteRPCBrokerModuleFactory extends org.opendaylight.controller.config.yang.config.remote_rpc_connector.AbstractRemoteRPCBrokerModuleFactory {
+
+  @Override
+  public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) {
+    RemoteRPCBrokerModule module = (RemoteRPCBrokerModule)super.createModule(instanceName,dependencyResolver,bundleContext);
+    module.setBundleContext(bundleContext);
+    return module;
+  }
+
+  @Override
+  public Module createModule(String instanceName, DependencyResolver dependencyResolver,
+                             DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception {
+    RemoteRPCBrokerModule module = (RemoteRPCBrokerModule)super.createModule(instanceName, dependencyResolver,
+        old, bundleContext);
+    module.setBundleContext(bundleContext);
+    return module;
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java
new file mode 100644 (file)
index 0000000..66593ae
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import org.opendaylight.controller.remote.rpc.messages.Monitor;
+
+public abstract class AbstractUntypedActor extends UntypedActor {
+    protected final LoggingAdapter LOG =
+        Logging.getLogger(getContext().system(), this);
+
+
+    public AbstractUntypedActor(){
+        LOG.debug("Actor created {}", getSelf());
+        getContext().
+            system().
+            actorSelection("user/termination-monitor").
+            tell(new Monitor(getSelf()), getSelf());
+    }
+
+    @Override public void onReceive(Object message) throws Exception {
+        LOG.debug("Received message {}", message);
+        handleReceive(message);
+        LOG.debug("Done handling message {}", message);
+    }
+
+    protected abstract void handleReceive(Object message) throws Exception;
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java
new file mode 100644 (file)
index 0000000..4a6124a
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.google.common.base.Function;
+import com.typesafe.config.ConfigFactory;
+
+import javax.annotation.Nullable;
+
+public class ActorSystemFactory {
+    private static final ActorSystem actorSystem = (new Function<Void, ActorSystem>(){
+
+        @Nullable @Override public ActorSystem apply(@Nullable Void aVoid) {
+                ActorSystem system =
+                    ActorSystem.create("opendaylight-rpc", ConfigFactory
+                        .load().getConfig("odl-cluster"));
+                system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+                return system;
+        }
+    }).apply(null);
+
+    public static final ActorSystem getInstance(){
+        return actorSystem;
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorUtil.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorUtil.java
new file mode 100644 (file)
index 0000000..13324f9
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.util.Timeout;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static akka.pattern.Patterns.ask;
+
+public class ActorUtil {
+  public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS);
+  public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS);
+  public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS);
+  public static final FiniteDuration LOCAL_AWAIT_DURATION = Duration.create(2, TimeUnit.SECONDS);
+  public static final FiniteDuration REMOTE_AWAIT_DURATION = Duration.create(15, TimeUnit.SECONDS);
+  public static final FiniteDuration AWAIT_DURATION = Duration.create(17, TimeUnit.SECONDS);
+
+  /**
+   * Executes an operation on a local actor and wait for it's response
+   * @param actor
+   * @param message
+   * @param askDuration
+   * @param awaitDuration
+   * @return The response of the operation
+   */
+  public static Object executeLocalOperation(ActorRef actor, Object message,
+                                      FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception{
+    Future<Object> future =
+        ask(actor, message, new Timeout(askDuration));
+
+      return Await.result(future, awaitDuration);
+  }
+
+  /**
+   * Execute an operation on a remote actor and wait for it's response
+   * @param actor
+   * @param message
+   * @param askDuration
+   * @param awaitDuration
+   * @return
+   */
+  public static Object executeRemoteOperation(ActorSelection actor, Object message,
+                                              FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception{
+    Future<Object> future =
+        ask(actor, message, new Timeout(askDuration));
+      return Await.result(future, awaitDuration);
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
new file mode 100644 (file)
index 0000000..43aa5b7
--- /dev/null
@@ -0,0 +1,77 @@
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorRef;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+public class RemoteRpcImplementation implements RpcImplementation,
+    RoutedRpcDefaultImplementation {
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
+  private ActorRef rpcBroker;
+  private SchemaContext schemaContext;
+
+  public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) {
+    this.rpcBroker = rpcBroker;
+    this.schemaContext = schemaContext;
+  }
+
+  @Override
+  public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+    InvokeRoutedRpc rpcMsg = new InvokeRoutedRpc(rpc, identifier, input);
+
+    return executeMsg(rpcMsg);
+  }
+
+  @Override
+  public Set<QName> getSupportedRpcs() {
+    // TODO : check if we need to get this from routing registry
+    return Collections.emptySet();
+  }
+
+  @Override
+  public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+    InvokeRpc rpcMsg = new InvokeRpc(rpc, input);
+    return executeMsg(rpcMsg);
+  }
+
+  private ListenableFuture<RpcResult<CompositeNode>> executeMsg(Object rpcMsg) {
+    CompositeNode result = null;
+    Collection<RpcError> errors = errors = new ArrayList<>();
+    try {
+      Object response = ActorUtil.executeLocalOperation(rpcBroker, rpcMsg, ActorUtil.ASK_DURATION, ActorUtil.AWAIT_DURATION);
+      if(response instanceof RpcResponse) {
+        RpcResponse rpcResponse = (RpcResponse) response;
+        result = XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode());
+      } else if(response instanceof ErrorResponse) {
+        ErrorResponse errorResponse = (ErrorResponse) response;
+        Exception e = errorResponse.getException();
+        errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
+      }
+    } catch (Exception e) {
+      LOG.error("Error occurred while invoking RPC actor {}", e.toString());
+      errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
+    }
+    return Futures.immediateFuture(Rpcs.getRpcResult(true, result, errors));
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java
new file mode 100644 (file)
index 0000000..1bb7ea4
--- /dev/null
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
+import org.opendaylight.controller.remote.rpc.registry.ClusterWrapperImpl;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * This is the base class which initialize all the actors, listeners and
+ * default RPc implementation so remote invocation of rpcs.
+ */
+public class RemoteRpcProvider implements AutoCloseable, Provider{
+
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
+
+  private final ActorSystem actorSystem;
+  private ActorRef rpcBroker;
+  private ActorRef rpcRegistry;
+  private final RpcProvisionRegistry rpcProvisionRegistry;
+  private Broker.ProviderSession brokerSession;
+  private RpcListener rpcListener;
+  private RoutedRpcListener routeChangeListener;
+  private RemoteRpcImplementation rpcImplementation;
+  public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) {
+    this.actorSystem = actorSystem;
+    this.rpcProvisionRegistry = rpcProvisionRegistry;
+  }
+
+  @Override
+  public void close() throws Exception {
+    this.actorSystem.shutdown();
+    unregisterSupportedRpcs();
+    unregisterSupportedRoutedRpcs();
+  }
+
+  @Override
+  public void onSessionInitiated(Broker.ProviderSession session) {
+    this.brokerSession = session;
+    start();
+  }
+
+  @Override
+  public Collection<ProviderFunctionality> getProviderFunctionality() {
+    return null;
+  }
+
+  private void start() {
+    LOG.debug("Starting all rpc listeners.");
+    // Create actor to handle and sync routing table in cluster
+    ClusterWrapper clusterWrapper = new ClusterWrapperImpl(actorSystem);
+    rpcRegistry = actorSystem.actorOf(RpcRegistry.props(clusterWrapper), "rpc-registry");
+
+    // Create actor to invoke and execute rpc
+    SchemaService schemaService = brokerSession.getService(SchemaService.class);
+    SchemaContext schemaContext = schemaService.getGlobalContext();
+    rpcBroker = actorSystem.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "rpc-broker");
+    String rpcBrokerPath = clusterWrapper.getAddress().toString() + "/user/rpc-broker";
+    rpcListener = new RpcListener(rpcRegistry, rpcBrokerPath);
+    routeChangeListener = new RoutedRpcListener(rpcRegistry, rpcBrokerPath);
+    rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext);
+    brokerSession.addRpcRegistrationListener(rpcListener);
+    rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
+    rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
+    announceSupportedRpcs();
+    announceSupportedRoutedRpcs();
+
+  }
+
+  /**
+   * Add all the locally registered RPCs in the clustered routing table
+   */
+  private void announceSupportedRpcs(){
+    LOG.debug("Adding all supported rpcs to routing table");
+    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+    for (QName rpc : currentlySupported) {
+      rpcListener.onRpcImplementationAdded(rpc);
+    }
+  }
+
+  /**
+   * Add all the locally registered Routed RPCs in the clustered routing table
+   */
+  private void announceSupportedRoutedRpcs(){
+
+    //TODO: announce all routed RPCs as well
+
+  }
+
+  /**
+   * Un-Register all the supported RPCs from clustered routing table
+   */
+  private void unregisterSupportedRpcs(){
+    LOG.debug("removing all supported rpcs to routing table");
+    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+    for (QName rpc : currentlySupported) {
+      rpcListener.onRpcImplementationRemoved(rpc);
+    }
+  }
+
+  /**
+   * Un-Register all the locally supported Routed RPCs from clustered routing table
+   */
+  private void unregisterSupportedRoutedRpcs(){
+
+    //TODO: remove all routed RPCs as well
+
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java
new file mode 100644 (file)
index 0000000..61dc818
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.osgi.framework.BundleContext;
+
+public class RemoteRpcProviderFactory {
+    public static RemoteRpcProvider createInstance(final Broker broker, final BundleContext bundleContext){
+      RemoteRpcProvider rpcProvider =
+          new RemoteRpcProvider(ActorSystemFactory.getInstance(), (RpcProvisionRegistry) broker);
+      broker.registerProvider(rpcProvider, bundleContext);
+      return rpcProvider;
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java
new file mode 100644 (file)
index 0000000..ea72238
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+import java.io.Serializable;
+
+public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>,Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private QName context;
+  private QName type;
+  private InstanceIdentifier route;
+
+  public RouteIdentifierImpl(QName context, QName type, InstanceIdentifier route) {
+    this.context = context;
+    this.type = type;
+    this.route = route;
+  }
+
+  @Override
+  public QName getContext() {
+    return this.context;
+  }
+
+  @Override
+  public QName getType() {
+    return this.type;
+  }
+
+  @Override
+  public InstanceIdentifier getRoute() {
+    return this.route;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    RouteIdentifierImpl that = (RouteIdentifierImpl) o;
+
+    if (context == null){
+      if (that.getContext() != null)  return false;
+    }else
+      if (!context.equals(that.context)) return false;
+
+    if (route == null){
+      if (that.getRoute() != null) return false;
+    }else
+      if (!route.equals(that.route)) return false;
+
+    if (type == null){
+      if (that.getType() != null) return false;
+    }else
+      if (!type.equals(that.type)) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int prime = 31;
+    int result = 0;
+    result = prime * result + (context == null ? 0:context.hashCode());
+    result = prime * result + (type    == null ? 0:type.hashCode());
+    result = prime * result + (route   == null ? 0:route.hashCode());
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "RouteIdentifierImpl{" +
+        "context=" + context +
+        ", type=" + type +
+        ", route=" + route +
+        '}';
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java
new file mode 100644 (file)
index 0000000..a0df362
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
+import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class RoutedRpcListener implements RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
+  private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class);
+  private final ActorRef rpcRegistry;
+  private final String actorPath;
+
+  public RoutedRpcListener(ActorRef rpcRegistry, String actorPath) {
+    this.rpcRegistry = rpcRegistry;
+    this.actorPath = actorPath;
+  }
+
+  @Override
+  public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
+    Map<RpcRoutingContext, Set<InstanceIdentifier>> announcements = routeChange.getAnnouncements();
+    announce(getRouteIdentifiers(announcements));
+
+    Map<RpcRoutingContext, Set<InstanceIdentifier>> removals = routeChange.getRemovals();
+    remove(getRouteIdentifiers(removals));
+  }
+
+  /**
+   *
+   * @param announcements
+   */
+  private void announce(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
+    LOG.debug("Announcing [{}]", announcements);
+    AddRoutedRpc addRpcMsg = new AddRoutedRpc(announcements, actorPath);
+    try {
+      ActorUtil.executeLocalOperation(rpcRegistry, addRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
+    } catch (Exception e) {
+      // Just logging it because Akka API throws this exception
+      LOG.error(e.toString());
+    }
+  }
+
+  /**
+   *
+   * @param removals
+   */
+  private void remove(Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
+    LOG.debug("Removing [{}]", removals);
+    RemoveRoutedRpc removeRpcMsg = new RemoveRoutedRpc(removals, actorPath);
+    try {
+      ActorUtil.executeLocalOperation(rpcRegistry, removeRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
+    } catch (Exception e) {
+      // Just logging it because Akka API throws this exception
+      LOG.error(e.toString());
+    }
+  }
+
+  /**
+   *
+   * @param changes
+   * @return
+   */
+  private Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers(Map<RpcRoutingContext, Set<InstanceIdentifier>> changes) {
+    RouteIdentifierImpl routeId = null;
+    Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdSet = new HashSet<>();
+
+    for (RpcRoutingContext context : changes.keySet()){
+      for (InstanceIdentifier instanceId : changes.get(context)){
+        routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId);
+        routeIdSet.add(routeId);
+      }
+    }
+    return routeIdSet;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
new file mode 100644 (file)
index 0000000..3354fc3
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.GetRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Future;
+
+/**
+ * Actor to initiate execution of remote RPC on other nodes of the cluster.
+ */
+
+public class RpcBroker extends AbstractUntypedActor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
+  private final Broker.ProviderSession brokerSession;
+  private final ActorRef rpcRegistry;
+  private final SchemaContext schemaContext;
+
+  private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, SchemaContext schemaContext){
+    this.brokerSession = brokerSession;
+    this.rpcRegistry = rpcRegistry;
+    this.schemaContext = schemaContext;
+  }
+
+  public static Props props(final Broker.ProviderSession brokerSession, final ActorRef rpcRegistry, final SchemaContext schemaContext){
+    return Props.create(new Creator<RpcBroker>(){
+
+      @Override
+      public RpcBroker create() throws Exception {
+        return new RpcBroker(brokerSession, rpcRegistry, schemaContext);
+      }
+    });
+  }
+  @Override
+  protected void handleReceive(Object message) throws Exception {
+    if(message instanceof InvokeRoutedRpc) {
+      invokeRemoteRoutedRpc((InvokeRoutedRpc) message);
+    } else if(message instanceof InvokeRpc) {
+      invokeRemoteRpc((InvokeRpc) message);
+    } else if(message instanceof ExecuteRpc) {
+      executeRpc((ExecuteRpc) message);
+    }
+  }
+
+  private void invokeRemoteRoutedRpc(InvokeRoutedRpc msg) {
+    // Look up the remote actor to execute rpc
+    LOG.debug("Looking up the remote actor for route {}", msg);
+    try {
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier());
+      GetRoutedRpc routedRpcMsg = new GetRoutedRpc(routeId);
+      GetRoutedRpcReply rpcReply = (GetRoutedRpcReply)ActorUtil.executeLocalOperation(rpcRegistry, routedRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
+
+      String remoteActorPath = rpcReply.getRoutePath();
+      if(remoteActorPath == null) {
+        LOG.debug("No remote actor found for rpc execution.");
+
+        getSender().tell(new ErrorResponse(
+          new IllegalStateException("No remote actor found for rpc execution.")), self());
+      } else {
+
+        ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
+
+        Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath),
+            executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
+
+        getSender().tell(operationRes, self());
+      }
+    } catch (Exception e) {
+        LOG.error(e.toString());
+        getSender().tell(new ErrorResponse(e), self());
+    }
+  }
+
+  private void invokeRemoteRpc(InvokeRpc msg) {
+    // Look up the remote actor to execute rpc
+    LOG.debug("Looking up the remote actor for route {}", msg);
+    try {
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), null);
+      GetRpc rpcMsg = new GetRpc(routeId);
+      GetRpcReply rpcReply = (GetRpcReply)ActorUtil.executeLocalOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
+      String remoteActorPath = rpcReply.getRoutePath();
+
+      if(remoteActorPath == null) {
+        LOG.debug("No remote actor found for rpc execution.");
+
+        getSender().tell(new ErrorResponse(
+          new IllegalStateException("No remote actor found for rpc execution.")), self());
+      } else {
+        ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
+        Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath),
+            executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
+
+        getSender().tell(operationRes, self());
+      }
+    } catch (Exception e) {
+        LOG.error(e.toString());
+        getSender().tell(new ErrorResponse(e), self());
+    }
+  }
+
+  private void executeRpc(ExecuteRpc msg) {
+    LOG.debug("Executing rpc for rpc {}", msg.getRpc());
+    try {
+      Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(msg.getRpc(), XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext));
+      RpcResult<CompositeNode> rpcResult = rpc != null ? rpc.get():null;
+
+      CompositeNode result = rpcResult != null ? rpcResult.getResult() : null;
+      getSender().tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result, schemaContext)), self());
+    } catch (Exception e) {
+      LOG.error(e.toString());
+      getSender().tell(new ErrorResponse(e), self());
+    }
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java
new file mode 100644 (file)
index 0000000..ae760fa
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.remote.rpc.messages.AddRpc;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RpcListener implements RpcRegistrationListener{
+
+  private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class);
+  private final ActorRef rpcRegistry;
+  private final String actorPath;
+
+  public RpcListener(ActorRef rpcRegistry, String actorPath) {
+    this.rpcRegistry = rpcRegistry;
+    this.actorPath = actorPath;
+  }
+
+  @Override
+  public void onRpcImplementationAdded(QName rpc) {
+    LOG.debug("Adding registration for [{}]", rpc);
+    RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
+    AddRpc addRpcMsg = new AddRpc(routeId, actorPath);
+    try {
+      ActorUtil.executeLocalOperation(rpcRegistry, addRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
+      LOG.debug("Route added [{}-{}]", routeId, this.actorPath);
+    } catch (Exception e) {
+      // Just logging it because Akka API throws this exception
+      LOG.error(e.toString());
+    }
+
+  }
+
+  @Override
+  public void onRpcImplementationRemoved(QName rpc) {
+    LOG.debug("Removing registration for [{}]", rpc);
+    RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
+    RemoveRpc removeRpcMsg = new RemoveRpc(routeId);
+    try {
+      ActorUtil.executeLocalOperation(rpcRegistry, removeRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
+    } catch (Exception e) {
+      // Just logging it because Akka API throws this exception
+      LOG.error(e.toString());
+    }
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java
new file mode 100644 (file)
index 0000000..a90f1e1
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.Terminated;
+import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import org.opendaylight.controller.remote.rpc.messages.Monitor;
+
+public class TerminationMonitor extends UntypedActor{
+    protected final LoggingAdapter LOG =
+        Logging.getLogger(getContext().system(), this);
+
+    public TerminationMonitor(){
+        LOG.info("Created TerminationMonitor");
+    }
+
+    @Override public void onReceive(Object message) throws Exception {
+        if(message instanceof Terminated){
+            Terminated terminated = (Terminated) message;
+            LOG.debug("Actor terminated : {}", terminated.actor());
+        }else if(message instanceof Monitor){
+          Monitor monitor = (Monitor) message;
+          getContext().watch(monitor.getActorRef());
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/XmlUtils.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/XmlUtils.java
new file mode 100644 (file)
index 0000000..5fb2bb8
--- /dev/null
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import com.google.common.base.Optional;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.impl.XmlTreeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
+import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.xml.sax.SAXException;
+
+import javax.activation.UnsupportedDataTypeException;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Set;
+
+public class XmlUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(XmlUtils.class);
+
+  public static String inputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
+    if (cNode == null) return new String();
+
+    //Document domTree = NodeUtils.buildShadowDomTree(cNode);
+    Document domTree = null;
+    try {
+      Set<RpcDefinition> rpcs =  schemaContext.getOperations();
+      for(RpcDefinition rpc : rpcs) {
+        if(rpc.getQName().equals(cNode.getNodeType())){
+          domTree = XmlDocumentUtils.toDocument(cNode, rpc.getInput(), XmlDocumentUtils.defaultValueCodecProvider());
+          break;
+        }
+      }
+
+    } catch (UnsupportedDataTypeException e) {
+      LOG.error("Error during translation of CompositeNode to Document", e);
+    }
+    return domTransformer(domTree);
+  }
+
+  public static String outputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
+    if (cNode == null) return new String();
+
+    //Document domTree = NodeUtils.buildShadowDomTree(cNode);
+    Document domTree = null;
+    try {
+      Set<RpcDefinition> rpcs =  schemaContext.getOperations();
+      for(RpcDefinition rpc : rpcs) {
+        if(rpc.getQName().equals(cNode.getNodeType())){
+          domTree = XmlDocumentUtils.toDocument(cNode, rpc.getInput(), XmlDocumentUtils.defaultValueCodecProvider());
+          break;
+        }
+      }
+
+    } catch (UnsupportedDataTypeException e) {
+      LOG.error("Error during translation of CompositeNode to Document", e);
+    }
+    return domTransformer(domTree);
+  }
+
+  private static String domTransformer(Document domTree) {
+    StringWriter writer = new StringWriter();
+    try {
+      TransformerFactory tf = TransformerFactory.newInstance();
+      Transformer transformer = tf.newTransformer();
+      transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
+      transformer.transform(new DOMSource(domTree), new StreamResult(writer));
+    } catch (TransformerException e) {
+
+      LOG.error("Error during translation of Document to OutputStream", e);
+    }
+    LOG.debug("compositeNodeToXml " + writer.toString());
+
+    return writer.toString();
+  }
+
+  public static CompositeNode xmlToCompositeNode(String xml){
+    if (xml==null || xml.length()==0) return null;
+
+    Node<?> dataTree;
+    try {
+      dataTree = XmlTreeBuilder.buildDataTree(new ByteArrayInputStream(xml.getBytes()));
+    } catch (XMLStreamException e) {
+      LOG.error("Error during building data tree from XML", e);
+      return null;
+    }
+    if (dataTree == null) {
+      LOG.error("data tree is null");
+      return null;
+    }
+    if (dataTree instanceof SimpleNode) {
+      LOG.error("RPC XML was resolved as SimpleNode");
+      return null;
+    }
+    return (CompositeNode) dataTree;
+  }
+
+  public static CompositeNode inputXmlToCompositeNode(QName rpc, String xml,  SchemaContext schemaContext){
+    if (xml==null || xml.length()==0) return null;
+
+    Node<?> dataTree = null;
+    try {
+
+      Document doc = XmlUtil.readXmlToDocument(xml);
+      LOG.debug("xmlToCompositeNode Document is " + xml );
+      Set<RpcDefinition> rpcs =  schemaContext.getOperations();
+      for(RpcDefinition rpcDef : rpcs) {
+        if(rpcDef.getQName().equals(rpc)){
+          dataTree = XmlDocumentUtils.toDomNode(doc.getDocumentElement(), Optional.<DataSchemaNode>of(rpcDef.getInput()), Optional.of(XmlDocumentUtils.defaultValueCodecProvider()));
+          break;
+        }
+      }
+    } catch (SAXException e) {
+      LOG.error("Error during building data tree from XML", e);
+    } catch (IOException e) {
+      LOG.error("Error during building data tree from XML", e);
+    }
+
+    LOG.debug("xmlToCompositeNode " + dataTree.toString());
+    return (CompositeNode) dataTree;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRoutedRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRoutedRpc.java
new file mode 100644 (file)
index 0000000..25773bb
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class AddRoutedRpc implements Serializable {
+
+  Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements;
+  String actorPath;
+
+  public AddRoutedRpc(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements, String actorPath) {
+    this.announcements = announcements;
+    this.actorPath = actorPath;
+  }
+
+  public Set<RpcRouter.RouteIdentifier<?, ?, ?>> getAnnouncements() {
+    return announcements;
+  }
+
+  public String getActorPath() {
+    return actorPath;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRpc.java
new file mode 100644 (file)
index 0000000..eac9731
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+
+import java.io.Serializable;
+
+public class AddRpc implements Serializable {
+
+  RouteIdentifierImpl routeId;
+  String actorPath;
+
+  public AddRpc(RouteIdentifierImpl routeId, String actorPath) {
+    this.routeId = routeId;
+    this.actorPath = actorPath;
+  }
+
+  public RouteIdentifierImpl getRouteId() {
+    return routeId;
+  }
+
+  public String getActorPath() {
+    return actorPath;
+  }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java
new file mode 100644 (file)
index 0000000..ef3f528
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import java.io.Serializable;
+
+public class ErrorResponse implements Serializable {
+
+  Exception exception;
+
+  public ErrorResponse(Exception e) {
+    this.exception = e;
+  }
+
+  public Exception getException() {
+    return exception;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java
new file mode 100644 (file)
index 0000000..030d81a
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+
+import org.opendaylight.yangtools.yang.common.QName;
+
+import java.io.Serializable;
+
+public class ExecuteRpc implements Serializable {
+
+  private String inputCompositeNode;
+  private QName rpc;
+
+  public ExecuteRpc(String inputCompositeNode, QName rpc) {
+    this.inputCompositeNode = inputCompositeNode;
+    this.rpc = rpc;
+  }
+
+  public String getInputCompositeNode() {
+    return inputCompositeNode;
+  }
+
+  public QName getRpc() {
+    return rpc;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpc.java
new file mode 100644 (file)
index 0000000..b1fa410
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+
+import java.io.Serializable;
+
+public class GetRoutedRpc implements Serializable {
+
+  RouteIdentifierImpl routeId;
+
+  public GetRoutedRpc(RouteIdentifierImpl routeId) {
+    this.routeId = routeId;
+  }
+
+  public RouteIdentifierImpl getRouteId() {
+    return routeId;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpcReply.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpcReply.java
new file mode 100644 (file)
index 0000000..0e15633
--- /dev/null
@@ -0,0 +1,24 @@
+package org.opendaylight.controller.remote.rpc.messages;
+
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+import java.io.Serializable;
+
+public class GetRoutedRpcReply implements Serializable {
+
+  private String routePath;
+
+  public GetRoutedRpcReply(String routePath) {
+    this.routePath = routePath;
+  }
+
+  public String getRoutePath() {
+    return routePath;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpc.java
new file mode 100644 (file)
index 0000000..c556279
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+
+import java.io.Serializable;
+
+public class GetRpc implements Serializable {
+
+  RouteIdentifierImpl routeId;
+
+  public GetRpc(RouteIdentifierImpl routeId) {
+    this.routeId = routeId;
+  }
+
+  public RouteIdentifierImpl getRouteId() {
+    return routeId;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpcReply.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpcReply.java
new file mode 100644 (file)
index 0000000..3309b98
--- /dev/null
@@ -0,0 +1,24 @@
+package org.opendaylight.controller.remote.rpc.messages;
+
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+import java.io.Serializable;
+
+public class GetRpcReply implements Serializable {
+
+  private String routePath;
+
+  public GetRpcReply(String routePath) {
+    this.routePath = routePath;
+  }
+
+  public String getRoutePath() {
+    return routePath;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRoutedRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRoutedRpc.java
new file mode 100644 (file)
index 0000000..00ef980
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+import java.io.Serializable;
+
+public class InvokeRoutedRpc implements Serializable {
+
+  private QName rpc;
+  private InstanceIdentifier identifier;
+  private CompositeNode input;
+
+  public InvokeRoutedRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+    this.rpc = rpc;
+    this.identifier = identifier;
+    this.input = input;
+  }
+
+  public InvokeRoutedRpc(QName rpc, CompositeNode input) {
+    this.rpc = rpc;
+    this.input = input;
+  }
+
+  public QName getRpc() {
+    return rpc;
+  }
+
+  public InstanceIdentifier getIdentifier() {
+    return identifier;
+  }
+
+  public CompositeNode getInput() {
+    return input;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java
new file mode 100644 (file)
index 0000000..1f4eab0
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+
+import java.io.Serializable;
+
+public class InvokeRpc implements Serializable {
+
+  private QName rpc;
+  private CompositeNode input;
+
+  public InvokeRpc(QName rpc, CompositeNode input) {
+    this.rpc = rpc;
+    this.input = input;
+  }
+
+  public QName getRpc() {
+    return rpc;
+  }
+
+  public CompositeNode getInput() {
+    return input;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/Monitor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/Monitor.java
new file mode 100644 (file)
index 0000000..43a31ef
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+import akka.actor.ActorRef;
+
+public class Monitor {
+    private final ActorRef actorRef;
+
+    public Monitor(ActorRef actorRef){
+
+        this.actorRef = actorRef;
+    }
+
+    public ActorRef getActorRef() {
+        return actorRef;
+    }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRoutedRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRoutedRpc.java
new file mode 100644 (file)
index 0000000..a3aa9d1
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class RemoveRoutedRpc implements Serializable {
+
+  Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements;
+  String actorPath;
+
+  public RemoveRoutedRpc(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements, String actorPath) {
+    this.announcements = announcements;
+    this.actorPath = actorPath;
+  }
+
+  public Set<RpcRouter.RouteIdentifier<?, ?, ?>> getAnnouncements() {
+    return announcements;
+  }
+
+  public String getActorPath() {
+    return actorPath;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRpc.java
new file mode 100644 (file)
index 0000000..0bfd78a
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+
+import java.io.Serializable;
+
+public class RemoveRpc implements Serializable {
+
+  RouteIdentifierImpl routeId;
+
+  public RemoveRpc(RouteIdentifierImpl routeId) {
+    this.routeId = routeId;
+  }
+
+  public RouteIdentifierImpl getRouteId() {
+    return routeId;
+  }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RoutingTableData.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RoutingTableData.java
new file mode 100644 (file)
index 0000000..132fdba
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+
+import java.io.Serializable;
+import java.util.LinkedHashSet;
+import java.util.Map;
+
+public class RoutingTableData implements Serializable {
+  private Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> rpcMap;
+  private Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> routedRpcMap;
+
+  public RoutingTableData(Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> rpcMap,
+                          Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> routedRpcMap) {
+    this.rpcMap = rpcMap;
+    this.routedRpcMap = routedRpcMap;
+  }
+
+  public Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> getRpcMap() {
+    return rpcMap;
+  }
+
+  public Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> getRoutedRpcMap() {
+    return routedRpcMap;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java
new file mode 100644 (file)
index 0000000..cbfecb1
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+
+
+import java.io.Serializable;
+
+public class RpcResponse implements Serializable {
+  private String resultCompositeNode;
+
+  public RpcResponse(String resultCompositeNode) {
+    this.resultCompositeNode = resultCompositeNode;
+  }
+
+  public String getResultCompositeNode() {
+    return resultCompositeNode;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapper.java
new file mode 100644 (file)
index 0000000..4ddc2be
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+
+import akka.actor.Address;
+import akka.cluster.ClusterEvent;
+
+public interface ClusterWrapper {
+
+  ClusterEvent.CurrentClusterState getState();
+
+  Address getAddress();
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapperImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapperImpl.java
new file mode 100644 (file)
index 0000000..89603a1
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+
+
+public class ClusterWrapperImpl  implements ClusterWrapper{
+
+  private Cluster cluster;
+
+  public ClusterWrapperImpl(ActorSystem actorSystem) {
+    cluster = Cluster.get(actorSystem);
+  }
+
+  @Override
+  public ClusterEvent.CurrentClusterState getState() {
+    return cluster.state();
+  }
+
+  @Override
+  public Address getAddress() {
+    return cluster.selfAddress();
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
new file mode 100644 (file)
index 0000000..5e19653
--- /dev/null
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class RoutingTable<I, R> {
+
+  private final Logger LOG = LoggerFactory.getLogger(RoutingTable.class);
+
+  private ConcurrentMap<I,R> globalRpcMap = new ConcurrentHashMap<>();
+  private ConcurrentMap<I, LinkedHashSet<R>> routedRpcMap = new ConcurrentHashMap<>();
+
+  public ConcurrentMap<I, R> getGlobalRpcMap() {
+    return globalRpcMap;
+  }
+
+  public ConcurrentMap<I, LinkedHashSet<R>> getRoutedRpcMap() {
+    return routedRpcMap;
+  }
+
+  public R getGlobalRoute(final I routeId) {
+    Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
+    return globalRpcMap.get(routeId);
+  }
+
+  public void addGlobalRoute(final I routeId, final R route) {
+    Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
+    Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
+    LOG.debug("addGlobalRoute: adding  a new route with id[{}] and value [{}]", routeId, route);
+    if(globalRpcMap.putIfAbsent(routeId, route) != null) {
+      LOG.debug("A route already exist for route id [{}] ", routeId);
+    }
+  }
+
+  public void removeGlobalRoute(final I routeId) {
+    Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
+    LOG.debug("removeGlobalRoute: removing  a new route with id [{}]", routeId);
+    globalRpcMap.remove(routeId);
+  }
+
+  public Set<R> getRoutedRpc(final I routeId) {
+    Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
+    Set<R> routes = routedRpcMap.get(routeId);
+
+    if (routes == null) {
+      return Collections.emptySet();
+    }
+
+    return ImmutableSet.copyOf(routes);
+  }
+
+  public R getLastAddedRoutedRpc(final I routeId) {
+
+    Set<R> routes = getRoutedRpc(routeId);
+
+    if (routes.isEmpty()) {
+      return null;
+    }
+
+    R route = null;
+    Iterator<R> iter = routes.iterator();
+    while (iter.hasNext()) {
+      route = iter.next();
+    }
+
+    return route;
+  }
+
+  public void addRoutedRpc(final I routeId, final R route)   {
+    Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
+    Preconditions.checkNotNull(route, "addRoute: route cannot be null");
+    LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
+    threadSafeAdd(routeId, route);
+  }
+
+  public void addRoutedRpcs(final Set<I> routeIds, final R route) {
+    Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
+    for (I routeId : routeIds){
+      addRoutedRpc(routeId, route);
+    }
+  }
+
+  public void removeRoute(final I routeId, final R route) {
+    Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
+    Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
+
+    LinkedHashSet<R> routes = routedRpcMap.get(routeId);
+    if (routes == null) {
+      return;
+    }
+    LOG.debug("removeRoute: removing  a new route with k/v [{}/{}]", routeId, route);
+    threadSafeRemove(routeId, route);
+  }
+
+  public void removeRoutes(final Set<I> routeIds, final R route) {
+    Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
+    for (I routeId : routeIds){
+      removeRoute(routeId, route);
+    }
+  }
+
+  /**
+   * This method guarantees that no 2 thread over write each other's changes.
+   * Just so that we dont end up in infinite loop, it tries for 100 times then throw
+   */
+  private void threadSafeAdd(final I routeId, final R route) {
+
+    for (int i=0;i<100;i++){
+
+      LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
+      updatedRoutes.add(route);
+      LinkedHashSet<R> oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes);
+      if (oldRoutes == null) {
+        return;
+      }
+
+      updatedRoutes = new LinkedHashSet<>(oldRoutes);
+      updatedRoutes.add(route);
+
+      if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
+        return;
+      }
+    }
+    //the method did not already return means it failed to add route in 100 attempts
+    throw new IllegalStateException("Failed to add route [" + routeId + "]");
+  }
+
+  /**
+   * This method guarantees that no 2 thread over write each other's changes.
+   * Just so that we dont end up in infinite loop, it tries for 100 times then throw
+   */
+  private void threadSafeRemove(final I routeId, final R route) {
+    LinkedHashSet<R> updatedRoutes = null;
+    for (int i=0;i<100;i++){
+      LinkedHashSet<R> oldRoutes = routedRpcMap.get(routeId);
+
+      // if route to be deleted is the only entry in the set then remove routeId from the cache
+      if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
+        routedRpcMap.remove(routeId);
+        return;
+      }
+
+      // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
+      updatedRoutes = new LinkedHashSet<>(oldRoutes);
+      updatedRoutes.remove(route);
+      if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
+        return;
+      }
+
+    }
+    //the method did not already return means it failed to remove route in 100 attempts
+    throw new IllegalStateException("Failed to remove route [" + routeId + "]");
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
new file mode 100644 (file)
index 0000000..7cb505a
--- /dev/null
@@ -0,0 +1,202 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry;
+
+import akka.actor.ActorSelection;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
+import akka.japi.Creator;
+import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
+import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.AddRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.GetRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
+import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This Actor maintains the routing table state and sync it with other nodes in the cluster.
+ *
+ * A scheduler runs after an interval of time, which pick a random member from the cluster
+ * and send the current state of routing table to the member.
+ *
+ * when a message of routing table data is received, it gets merged with the local routing table
+ * to keep the latest data.
+ */
+
+public class RpcRegistry extends AbstractUntypedActor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class);
+  private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
+  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+  private final ClusterWrapper clusterWrapper;
+  private final ScheduledFuture<?> syncScheduler;
+
+  private RpcRegistry(ClusterWrapper clusterWrapper){
+    this.routingTable = new RoutingTable<>();
+    this.clusterWrapper = clusterWrapper;
+    this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
+  }
+
+  public static Props props(final ClusterWrapper clusterWrapper){
+    return Props.create(new Creator<RpcRegistry>(){
+
+      @Override
+      public RpcRegistry create() throws Exception {
+        return new RpcRegistry(clusterWrapper);
+      }
+    });
+  }
+
+  @Override
+  protected void handleReceive(Object message) throws Exception {
+    LOG.debug("Received message {}", message);
+    if(message instanceof RoutingTableData) {
+      syncRoutingTable((RoutingTableData) message);
+    } else if(message instanceof GetRoutedRpc) {
+      getRoutedRpc((GetRoutedRpc) message);
+    } else if(message instanceof GetRpc) {
+      getRpc((GetRpc) message);
+    } else if(message instanceof AddRpc) {
+      addRpc((AddRpc) message);
+    } else if(message instanceof RemoveRpc) {
+      removeRpc((RemoveRpc) message);
+    } else if(message instanceof AddRoutedRpc) {
+      addRoutedRpc((AddRoutedRpc) message);
+    } else if(message instanceof RemoveRoutedRpc) {
+      removeRoutedRpc((RemoveRoutedRpc) message);
+    }
+  }
+
+  private void getRoutedRpc(GetRoutedRpc rpcMsg){
+    LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
+    String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
+    GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
+
+    getSender().tell(routedRpcReply, self());
+  }
+
+  private void getRpc(GetRpc rpcMsg) {
+    LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
+    String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
+    GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
+
+    getSender().tell(rpcReply, self());
+  }
+
+  private void addRpc(AddRpc rpcMsg) {
+    LOG.debug("Add Rpc to routing table {}", rpcMsg);
+    routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
+
+    getSender().tell("Success", self());
+  }
+
+  private void removeRpc(RemoveRpc rpcMsg) {
+    LOG.debug("Removing Rpc to routing table {}", rpcMsg);
+    routingTable.removeGlobalRoute(rpcMsg.getRouteId());
+
+    getSender().tell("Success", self());
+  }
+
+  private void addRoutedRpc(AddRoutedRpc rpcMsg) {
+    routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
+    getSender().tell("Success", self());
+  }
+
+  private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
+    routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
+    getSender().tell("Success", self());
+  }
+
+  private void syncRoutingTable(RoutingTableData routingTableData) {
+    LOG.debug("Syncing routing table {}", routingTableData);
+
+    Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
+    Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
+    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+      routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
+    }
+
+    Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
+        routingTableData.getRoutedRpcMap();
+    routeIds = newRoutedRpcMap.keySet();
+
+    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+      Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
+      for(String routeAddress : routeAddresses) {
+        routingTable.addRoutedRpc(routeId, routeAddress);
+      }
+    }
+  }
+
+  private ActorSelection getRandomRegistryActor() {
+    ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
+    ActorSelection actor = null;
+    Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
+    int memberSize = members.size();
+    // Don't select yourself
+    if(memberSize > 1) {
+      Address currentNodeAddress = clusterWrapper.getAddress();
+      int index = new Random().nextInt(memberSize);
+      int i = 0;
+      // keeping previous member, in case when random index member is same as current actor
+      // and current actor member is last in set
+      Member previousMember = null;
+      for(Member member : members){
+        if(i == index-1) {
+          previousMember = member;
+        }
+        if(i == index) {
+          if(!currentNodeAddress.equals(member.address())) {
+            actor = this.context().actorSelection(member.address() + "/user/rpc-registry");
+            break;
+          } else if(index < memberSize-1){ // pick the next element in the set
+            index++;
+          }
+        }
+        i++;
+      }
+      if(actor == null && previousMember != null) {
+        actor = this.context().actorSelection(previousMember.address() + "/user/rpc-registry");
+      }
+    }
+    return actor;
+  }
+
+  private class SendRoutingTable implements Runnable {
+
+    @Override
+    public void run() {
+      RoutingTableData routingTableData =
+          new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
+      LOG.debug("Sending routing table for sync {}", routingTableData);
+      ActorSelection actor = getRandomRegistryActor();
+      if(actor != null) {
+        actor.tell(routingTableData, self());
+      }
+    }
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf b/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf
new file mode 100644 (file)
index 0000000..9585e9f
--- /dev/null
@@ -0,0 +1,21 @@
+odl-cluster{
+  akka {
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "192.168.141.142"
+        port = 2551
+      }
+    }
+
+    cluster {
+      seed-nodes = ["akka.tcp://opendaylight-rpc@192.168.141.141:2551"]
+
+      auto-down-unreachable-after = 10s
+    }
+  }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/yang/remote-rpc-connector.yang b/opendaylight/md-sal/sal-remoterpc-connector/src/main/yang/remote-rpc-connector.yang
new file mode 100644 (file)
index 0000000..08db5c0
--- /dev/null
@@ -0,0 +1,40 @@
+module remote-rpc-connector {
+       yang-version 1;
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector";
+    prefix "remote-rpc-connector";
+
+    import config { prefix config; revision-date 2013-04-05; }
+    import opendaylight-md-sal-dom {prefix dom;}
+    
+    description
+        "This module contains the base YANG definitions for
+                 the remote routed rpc";
+    revision "2014-07-07" {
+        description
+            "Initial revision";
+    }
+
+    // This is the definition of the service implementation as a module identity.
+    identity remote-rpc-connector {
+      base config:module-type;
+      // Specifies the prefix for generated java classes.
+      config:java-name-prefix RemoteRPCBroker;
+    }
+
+    augment "/config:modules/config:module/config:configuration" {
+        case remote-rpc-connector {
+            when "/config:modules/config:module/config:type = 'remote-rpc-connector'";
+            
+            container dom-broker {
+                uses config:service-ref {
+                    refine type {
+                        mandatory true;
+                        config:required-identity dom:dom-broker-osgi-registry;
+                    }
+                }
+            }
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java
new file mode 100644 (file)
index 0000000..595d833
--- /dev/null
@@ -0,0 +1,213 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.Futures;
+import junit.framework.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.AddRpc;
+import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.ModifyAction;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RpcBrokerTest {
+
+  static ActorSystem system;
+
+
+  @BeforeClass
+  public static void setup() {
+    system = ActorSystem.create();
+  }
+
+  @AfterClass
+  public static void teardown() {
+    JavaTestKit.shutdownActorSystem(system);
+    system = null;
+  }
+
+  @Test
+  public void testInvokeRpcError() throws URISyntaxException {
+    new JavaTestKit(system) {{
+      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
+      Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
+      SchemaContext schemaContext = mock(SchemaContext.class);
+      ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
+      QName rpc = new QName(new URI("actor1"), "actor1");
+      InvokeRpc invokeMsg = new InvokeRpc(rpc, null);
+      rpcBroker.tell(invokeMsg, getRef());
+
+      Boolean getMsg = new ExpectMsg<Boolean>("ErrorResponse") {
+        protected Boolean match(Object in) {
+          if (in instanceof ErrorResponse) {
+            ErrorResponse reply = (ErrorResponse)in;
+            return "No remote actor found for rpc execution.".equals(reply.getException().getMessage());
+          } else {
+            throw noMatch();
+          }
+        }
+      }.get(); // this extracts the received message
+
+      Assert.assertTrue(getMsg);
+    }};
+  }
+
+  /**
+   * This test method invokes and executes the remote rpc
+   */
+
+  @Test
+  public void testInvokeRpc() throws URISyntaxException {
+    new JavaTestKit(system) {{
+      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
+      Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
+      SchemaContext schemaContext = mock(SchemaContext.class);
+      ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
+      ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor1");
+      // Add RPC in table
+      QName rpc = new QName(new URI("actor1"), "actor1");
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
+      final String route = rpcBrokerRemote.path().toString();
+      AddRpc rpcMsg = new AddRpc(routeId, route);
+      rpcRegistry.tell(rpcMsg, getRef());
+      expectMsgEquals(duration("2 second"), "Success");
+
+      // invoke rpc
+      CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
+      CompositeNode invokeRpcResult = mock(CompositeNode.class);
+      Collection<RpcError> errors = new ArrayList<>();
+      RpcResult<CompositeNode> result = Rpcs.getRpcResult(true, invokeRpcResult, errors);
+      Future<RpcResult<CompositeNode>> rpcResult = Futures.immediateFuture(result);
+      when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult);
+      InvokeRpc invokeMsg = new InvokeRpc(rpc, input);
+      rpcBroker.tell(invokeMsg, getRef());
+
+      //verify response msg
+      Boolean getMsg = new ExpectMsg<Boolean>("RpcResponse") {
+        protected Boolean match(Object in) {
+          if (in instanceof RpcResponse) {
+            return true;
+          } else {
+            throw noMatch();
+          }
+        }
+      }.get(); // this extracts the received message
+
+      Assert.assertTrue(getMsg);
+    }};
+  }
+
+  @Test
+  public void testInvokeRoutedRpcError() throws URISyntaxException {
+    new JavaTestKit(system) {{
+      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
+      Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
+      SchemaContext schemaContext = mock(SchemaContext.class);
+      ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
+      QName rpc = new QName(new URI("actor1"), "actor1");
+      InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, null);
+      rpcBroker.tell(invokeMsg, getRef());
+
+      Boolean getMsg = new ExpectMsg<Boolean>("ErrorResponse") {
+        protected Boolean match(Object in) {
+          if (in instanceof ErrorResponse) {
+            ErrorResponse reply = (ErrorResponse)in;
+            return "No remote actor found for rpc execution.".equals(reply.getException().getMessage());
+          } else {
+            throw noMatch();
+          }
+        }
+      }.get(); // this extracts the received message
+
+      Assert.assertTrue(getMsg);
+    }};
+  }
+
+  /**
+   * This test method invokes and executes the remote routed rpc
+   */
+
+  @Test
+  public void testInvokeRoutedRpc() throws URISyntaxException {
+    new JavaTestKit(system) {{
+      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
+      Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
+      SchemaContext schemaContext = mock(SchemaContext.class);
+      ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
+      ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor2");
+      // Add Routed RPC in table
+      QName rpc = new QName(new URI("actor2"), "actor2");
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
+      final String route = rpcBrokerRemote.path().toString();
+      Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
+      routeIds.add(routeId);
+
+      AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route);
+      rpcRegistry.tell(rpcMsg, getRef());
+      expectMsgEquals(duration("2 second"), "Success");
+
+      // invoke rpc
+      CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
+      CompositeNode invokeRpcResult = mock(CompositeNode.class);
+      Collection<RpcError> errors = new ArrayList<>();
+      RpcResult<CompositeNode> result = Rpcs.getRpcResult(true, invokeRpcResult, errors);
+      Future<RpcResult<CompositeNode>> rpcResult = Futures.immediateFuture(result);
+      when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult);
+      InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, input);
+      rpcBroker.tell(invokeMsg, getRef());
+
+      //verify response msg
+      Boolean getMsg = new ExpectMsg<Boolean>("RpcResponse") {
+        protected Boolean match(Object in) {
+          if (in instanceof RpcResponse) {
+            return true;
+          } else {
+            throw noMatch();
+          }
+        }
+      }.get(); // this extracts the received message
+
+      Assert.assertTrue(getMsg);
+    }};
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableTest.java
new file mode 100644 (file)
index 0000000..a57402a
--- /dev/null
@@ -0,0 +1,209 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class RoutingTableTest {
+
+  private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable =
+      new RoutingTable<>();
+
+  @Test
+  public void addGlobalRouteNullRouteIdTest() {
+    try {
+      routingTable.addGlobalRoute(null, null);
+
+      Assert.fail("Null pointer exception was not thrown.");
+    } catch (Exception e) {
+      Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName());
+      Assert.assertEquals("addGlobalRoute: routeId cannot be null!", e.getMessage());
+    }
+  }
+
+  @Test
+  public void addGlobalRouteNullRouteTest() {
+    try {
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, null, null);
+      routingTable.addGlobalRoute(routeId, null);
+
+      Assert.fail("Null pointer exception was not thrown.");
+    } catch (Exception e) {
+      Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName());
+      Assert.assertEquals("addGlobalRoute: route cannot be null!", e.getMessage());
+    }
+  }
+
+  @Test
+  public void getGlobalRouteNullTest() {
+    try {
+      routingTable.getGlobalRoute(null);
+
+      Assert.fail("Null pointer exception was not thrown.");
+    } catch (Exception e) {
+      Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName());
+      Assert.assertEquals("getGlobalRoute: routeId cannot be null!", e.getMessage());
+    }
+  }
+
+  @Test
+  public void getGlobalRouteTest() throws URISyntaxException {
+    QName type = new QName(new URI("actor1"), "actor1");
+    RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+    String route = "actor1";
+
+    routingTable.addGlobalRoute(routeId, route);
+
+    String returnedRoute = routingTable.getGlobalRoute(routeId);
+
+    Assert.assertEquals(route, returnedRoute);
+
+  }
+
+  @Test
+  public void removeGlobalRouteTest() throws URISyntaxException {
+    QName type = new QName(new URI("actorRemove"), "actorRemove");
+    RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+    String route = "actorRemove";
+
+    routingTable.addGlobalRoute(routeId, route);
+
+    String returnedRoute = routingTable.getGlobalRoute(routeId);
+
+    Assert.assertEquals(route, returnedRoute);
+
+    routingTable.removeGlobalRoute(routeId);
+
+    String deletedRoute = routingTable.getGlobalRoute(routeId);
+
+    Assert.assertNull(deletedRoute);
+  }
+
+  @Test
+  public void addRoutedRpcNullRouteIdTest() {
+    try {
+      routingTable.addRoutedRpc(null, null);
+
+      Assert.fail("Null pointer exception was not thrown.");
+    } catch (Exception e) {
+      Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName());
+      Assert.assertEquals("addRoute: routeId cannot be null", e.getMessage());
+    }
+  }
+
+  @Test
+  public void addRoutedRpcNullRouteTest() {
+    try {
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, null, null);
+
+      routingTable.addRoutedRpc(routeId, null);
+
+      Assert.fail("Null pointer exception was not thrown.");
+    } catch (Exception e) {
+      Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName());
+      Assert.assertEquals("addRoute: route cannot be null", e.getMessage());
+    }
+  }
+
+  @Test
+  public void getRoutedRpcNullTest() {
+    try {
+      routingTable.getRoutedRpc(null);
+
+      Assert.fail("Null pointer exception was not thrown.");
+    } catch (Exception e) {
+      Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName());
+      Assert.assertEquals("getRoutes: routeId cannot be null!", e.getMessage());
+    }
+  }
+
+  @Test
+  public void getRoutedRpcTest() throws URISyntaxException {
+    QName type = new QName(new URI("actor1"), "actor1");
+    RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+    String route = "actor1";
+
+    routingTable.addRoutedRpc(routeId, route);
+
+    Set<String> routes = routingTable.getRoutedRpc(routeId);
+
+    Assert.assertEquals(1, routes.size());
+    Assert.assertTrue(routes.contains(route));
+
+  }
+
+  @Test
+  public void getLastRoutedRpcTest() throws URISyntaxException {
+    QName type = new QName(new URI("first1"), "first1");
+    RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+    String route = "first1";
+
+    routingTable.addRoutedRpc(routeId, route);
+
+    String route2 = "second1";
+    routingTable.addRoutedRpc(routeId, route2);
+
+    String latest = routingTable.getLastAddedRoutedRpc(routeId);
+    Assert.assertEquals(route2, latest);
+
+  }
+
+  @Test
+  public void removeRoutedRpcTest() throws URISyntaxException {
+    QName type = new QName(new URI("remove"), "remove");
+    RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+    String route = "remove";
+    routingTable.addRoutedRpc(routeId, route);
+
+    String latest = routingTable.getLastAddedRoutedRpc(routeId);
+    Assert.assertEquals(route, latest);
+
+    routingTable.removeRoute(routeId, route);
+    String removed = routingTable.getLastAddedRoutedRpc(routeId);
+    Assert.assertNull(removed);
+  }
+
+  @Test
+  public void removeRoutedRpcsTest() throws URISyntaxException {
+    QName type = new QName(new URI("remove1"), "remove1");
+    RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+
+    QName type2 = new QName(new URI("remove2"), "remove2");
+    RouteIdentifierImpl routeId2 = new RouteIdentifierImpl(null, type2, null);
+
+    Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
+    routeIds.add(routeId);
+    routeIds.add(routeId2);
+    String route = "remove1";
+
+    routingTable.addRoutedRpcs(routeIds, route);
+    String latest1 = routingTable.getLastAddedRoutedRpc(routeId);
+    Assert.assertEquals(route, latest1);
+
+    String latest2 = routingTable.getLastAddedRoutedRpc(routeId2);
+    Assert.assertEquals(route, latest2);
+
+    routingTable.removeRoutes(routeIds, route);
+    String removed1 = routingTable.getLastAddedRoutedRpc(routeId);
+    Assert.assertNull(removed1);
+
+    String removed2 = routingTable.getLastAddedRoutedRpc(routeId2);
+    Assert.assertNull(removed2);
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
new file mode 100644 (file)
index 0000000..d011d33
--- /dev/null
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.AddRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.GetRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class RpcRegistryTest {
+
+  static ActorSystem system;
+
+
+  @BeforeClass
+  public static void setup() {
+    system = ActorSystem.create();
+  }
+
+  @AfterClass
+  public static void teardown() {
+    JavaTestKit.shutdownActorSystem(system);
+    system = null;
+  }
+
+  /**
+   This test add, read and remove an entry in global rpc
+   */
+  @Test
+  public void testGlobalRpc() throws URISyntaxException {
+    new JavaTestKit(system) {{
+      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
+      QName type = new QName(new URI("actor1"), "actor1");
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+      final String route = "actor1";
+
+      AddRpc rpcMsg = new AddRpc(routeId, route);
+      rpcRegistry.tell(rpcMsg, getRef());
+      expectMsgEquals(duration("2 second"), "Success");
+
+      GetRpc getRpc = new GetRpc(routeId);
+      rpcRegistry.tell(getRpc, getRef());
+
+      Boolean getMsg = new ExpectMsg<Boolean>("GetRpcReply") {
+        protected Boolean match(Object in) {
+          if (in instanceof GetRpcReply) {
+            GetRpcReply reply = (GetRpcReply)in;
+            return route.equals(reply.getRoutePath());
+          } else {
+            throw noMatch();
+          }
+        }
+      }.get(); // this extracts the received message
+
+      Assert.assertTrue(getMsg);
+
+      RemoveRpc removeMsg = new RemoveRpc(routeId);
+      rpcRegistry.tell(removeMsg, getRef());
+      expectMsgEquals(duration("2 second"), "Success");
+
+      rpcRegistry.tell(getRpc, getRef());
+
+      Boolean getNullMsg = new ExpectMsg<Boolean>("GetRpcReply") {
+        protected Boolean match(Object in) {
+          if (in instanceof GetRpcReply) {
+            GetRpcReply reply = (GetRpcReply)in;
+            return reply.getRoutePath() == null;
+          } else {
+            throw noMatch();
+          }
+        }
+      }.get();
+      Assert.assertTrue(getNullMsg);
+    }};
+
+  }
+
+  /**
+   This test add, read and remove an entry in routed rpc
+   */
+  @Test
+  public void testRoutedRpc() throws URISyntaxException {
+    new JavaTestKit(system) {{
+      ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
+      QName type = new QName(new URI("actor1"), "actor1");
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+      final String route = "actor1";
+
+      Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
+      routeIds.add(routeId);
+
+      AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route);
+      rpcRegistry.tell(rpcMsg, getRef());
+      expectMsgEquals(duration("2 second"), "Success");
+
+      GetRoutedRpc getRpc = new GetRoutedRpc(routeId);
+      rpcRegistry.tell(getRpc, getRef());
+
+      Boolean getMsg = new ExpectMsg<Boolean>("GetRoutedRpcReply") {
+        protected Boolean match(Object in) {
+          if (in instanceof GetRoutedRpcReply) {
+            GetRoutedRpcReply reply = (GetRoutedRpcReply)in;
+            return route.equals(reply.getRoutePath());
+          } else {
+            throw noMatch();
+          }
+        }
+      }.get(); // this extracts the received message
+
+      Assert.assertTrue(getMsg);
+
+      RemoveRoutedRpc removeMsg = new RemoveRoutedRpc(routeIds, route);
+      rpcRegistry.tell(removeMsg, getRef());
+      expectMsgEquals(duration("2 second"), "Success");
+
+      rpcRegistry.tell(getRpc, getRef());
+
+      Boolean getNullMsg = new ExpectMsg<Boolean>("GetRoutedRpcReply") {
+        protected Boolean match(Object in) {
+          if (in instanceof GetRoutedRpcReply) {
+            GetRoutedRpcReply reply = (GetRoutedRpcReply)in;
+            return reply.getRoutePath() == null;
+          } else {
+            throw noMatch();
+          }
+        }
+      }.get();
+      Assert.assertTrue(getNullMsg);
+    }};
+
+  }
+
+}