</schema-service>
</module>
-->
-
+ <!-- Cluster RPC -->
+ <!-- Enable the following module if you want to use remote rpc connector
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">prefix:remote-rpc-connector</type>
+ <name>remote-rpc-connector</name>
+ <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">
+ <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
+ <name>dom-broker</name>
+ </dom-broker>
+ </module>
+ -->
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:inmemory-datastore-provider">prefix:inmemory-operational-datastore-provider</type>
<name>operational-store-service</name>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-parent</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>sal-remoterpc-connector</artifactId>
+ <packaging>bundle</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-cluster_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-remote_${scala.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-testkit_${scala.version}</artifactId>
+ </dependency>
+
+ <!-- SAL Dependencies -->
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-connector-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-common-util</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-core-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>netconf-util</artifactId>
+ </dependency>
+
+ <!-- Yang tools-->
+
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-impl</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-common</artifactId>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+
+ <!-- Test Dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+ <Export-package></Export-package>
+ <Private-Package></Private-Package>
+ <Import-Package>!org.jboss.*;!com.jcraft.*;*</Import-Package>
+ <Embed-Dependency>
+ !sal*;
+ !*config-api*;
+ !*testkit*;
+ *protobuf*;
+ akka*;
+ *scala*;
+ *config*;
+ *netty*;
+ *uncommons*;
+ </Embed-Dependency>
+ <Embed-Transitive>true</Embed-Transitive>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>config</id>
+ <goals>
+ <goal>generate-sources</goal>
+ </goals>
+ <configuration>
+ <codeGenerators>
+ <generator>
+ <codeGeneratorClass>org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator</codeGeneratorClass>
+ <outputBaseDir>${jmxGeneratorPath}</outputBaseDir>
+ <additionalConfiguration>
+ <namespaceToPackage1>urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang</namespaceToPackage1>
+ </additionalConfiguration>
+ </generator>
+ <generator>
+ <codeGeneratorClass>org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl</codeGeneratorClass>
+ <outputBaseDir>${salGeneratorPath}</outputBaseDir>
+ </generator>
+ </codeGenerators>
+ <inspectDependencies>true</inspectDependencies>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <scm>
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+ <tag>HEAD</tag>
+ <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL:Architecture:Clustering</url>
+ </scm>
+</project>
--- /dev/null
+package org.opendaylight.controller.config.yang.config.remote_rpc_connector;
+
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderFactory;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.osgi.framework.BundleContext;
+
+public class RemoteRPCBrokerModule extends org.opendaylight.controller.config.yang.config.remote_rpc_connector.AbstractRemoteRPCBrokerModule {
+ private BundleContext bundleContext;
+ public RemoteRPCBrokerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ super(identifier, dependencyResolver);
+ }
+
+ public RemoteRPCBrokerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.config.remote_rpc_connector.RemoteRPCBrokerModule oldModule, java.lang.AutoCloseable oldInstance) {
+ super(identifier, dependencyResolver, oldModule, oldInstance);
+ }
+
+ @Override
+ public void customValidation() {
+ // add custom validation form module attributes here.
+ }
+
+ @Override
+ public java.lang.AutoCloseable createInstance() {
+ Broker broker = getDomBrokerDependency();
+ return RemoteRpcProviderFactory.createInstance(broker, bundleContext);
+ }
+
+ public void setBundleContext(final BundleContext bundleContext) {
+ this.bundleContext = bundleContext;
+ }
+}
--- /dev/null
+/*
+* Generated file
+*
+* Generated from: yang module name: remote-rpc-connector yang module local name: remote-rpc-connector
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Mon Jul 07 17:02:25 PDT 2014
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.config.remote_rpc_connector;
+
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
+import org.opendaylight.controller.config.spi.Module;
+import org.osgi.framework.BundleContext;
+
+public class RemoteRPCBrokerModuleFactory extends org.opendaylight.controller.config.yang.config.remote_rpc_connector.AbstractRemoteRPCBrokerModuleFactory {
+
+ @Override
+ public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) {
+ RemoteRPCBrokerModule module = (RemoteRPCBrokerModule)super.createModule(instanceName,dependencyResolver,bundleContext);
+ module.setBundleContext(bundleContext);
+ return module;
+ }
+
+ @Override
+ public Module createModule(String instanceName, DependencyResolver dependencyResolver,
+ DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception {
+ RemoteRPCBrokerModule module = (RemoteRPCBrokerModule)super.createModule(instanceName, dependencyResolver,
+ old, bundleContext);
+ module.setBundleContext(bundleContext);
+ return module;
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import org.opendaylight.controller.remote.rpc.messages.Monitor;
+
+public abstract class AbstractUntypedActor extends UntypedActor {
+ protected final LoggingAdapter LOG =
+ Logging.getLogger(getContext().system(), this);
+
+
+ public AbstractUntypedActor(){
+ LOG.debug("Actor created {}", getSelf());
+ getContext().
+ system().
+ actorSelection("user/termination-monitor").
+ tell(new Monitor(getSelf()), getSelf());
+ }
+
+ @Override public void onReceive(Object message) throws Exception {
+ LOG.debug("Received message {}", message);
+ handleReceive(message);
+ LOG.debug("Done handling message {}", message);
+ }
+
+ protected abstract void handleReceive(Object message) throws Exception;
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.google.common.base.Function;
+import com.typesafe.config.ConfigFactory;
+
+import javax.annotation.Nullable;
+
+public class ActorSystemFactory {
+ private static final ActorSystem actorSystem = (new Function<Void, ActorSystem>(){
+
+ @Nullable @Override public ActorSystem apply(@Nullable Void aVoid) {
+ ActorSystem system =
+ ActorSystem.create("opendaylight-rpc", ConfigFactory
+ .load().getConfig("odl-cluster"));
+ system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+ return system;
+ }
+ }).apply(null);
+
+ public static final ActorSystem getInstance(){
+ return actorSystem;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.util.Timeout;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static akka.pattern.Patterns.ask;
+
+public class ActorUtil {
+ public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS);
+ public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS);
+ public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS);
+ public static final FiniteDuration LOCAL_AWAIT_DURATION = Duration.create(2, TimeUnit.SECONDS);
+ public static final FiniteDuration REMOTE_AWAIT_DURATION = Duration.create(15, TimeUnit.SECONDS);
+ public static final FiniteDuration AWAIT_DURATION = Duration.create(17, TimeUnit.SECONDS);
+
+ /**
+ * Executes an operation on a local actor and wait for it's response
+ * @param actor
+ * @param message
+ * @param askDuration
+ * @param awaitDuration
+ * @return The response of the operation
+ */
+ public static Object executeLocalOperation(ActorRef actor, Object message,
+ FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception{
+ Future<Object> future =
+ ask(actor, message, new Timeout(askDuration));
+
+ return Await.result(future, awaitDuration);
+ }
+
+ /**
+ * Execute an operation on a remote actor and wait for it's response
+ * @param actor
+ * @param message
+ * @param askDuration
+ * @param awaitDuration
+ * @return
+ */
+ public static Object executeRemoteOperation(ActorSelection actor, Object message,
+ FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception{
+ Future<Object> future =
+ ask(actor, message, new Timeout(askDuration));
+ return Await.result(future, awaitDuration);
+ }
+
+}
--- /dev/null
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorRef;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+public class RemoteRpcImplementation implements RpcImplementation,
+ RoutedRpcDefaultImplementation {
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class);
+ private ActorRef rpcBroker;
+ private SchemaContext schemaContext;
+
+ public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) {
+ this.rpcBroker = rpcBroker;
+ this.schemaContext = schemaContext;
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+ InvokeRoutedRpc rpcMsg = new InvokeRoutedRpc(rpc, identifier, input);
+
+ return executeMsg(rpcMsg);
+ }
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ // TODO : check if we need to get this from routing registry
+ return Collections.emptySet();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+ InvokeRpc rpcMsg = new InvokeRpc(rpc, input);
+ return executeMsg(rpcMsg);
+ }
+
+ private ListenableFuture<RpcResult<CompositeNode>> executeMsg(Object rpcMsg) {
+ CompositeNode result = null;
+ Collection<RpcError> errors = errors = new ArrayList<>();
+ try {
+ Object response = ActorUtil.executeLocalOperation(rpcBroker, rpcMsg, ActorUtil.ASK_DURATION, ActorUtil.AWAIT_DURATION);
+ if(response instanceof RpcResponse) {
+ RpcResponse rpcResponse = (RpcResponse) response;
+ result = XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode());
+ } else if(response instanceof ErrorResponse) {
+ ErrorResponse errorResponse = (ErrorResponse) response;
+ Exception e = errorResponse.getException();
+ errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
+ }
+ } catch (Exception e) {
+ LOG.error("Error occurred while invoking RPC actor {}", e.toString());
+ errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
+ }
+ return Futures.immediateFuture(Rpcs.getRpcResult(true, result, errors));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
+import org.opendaylight.controller.remote.rpc.registry.ClusterWrapperImpl;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * This is the base class which initialize all the actors, listeners and
+ * default RPc implementation so remote invocation of rpcs.
+ */
+public class RemoteRpcProvider implements AutoCloseable, Provider{
+
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
+
+ private final ActorSystem actorSystem;
+ private ActorRef rpcBroker;
+ private ActorRef rpcRegistry;
+ private final RpcProvisionRegistry rpcProvisionRegistry;
+ private Broker.ProviderSession brokerSession;
+ private RpcListener rpcListener;
+ private RoutedRpcListener routeChangeListener;
+ private RemoteRpcImplementation rpcImplementation;
+ public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) {
+ this.actorSystem = actorSystem;
+ this.rpcProvisionRegistry = rpcProvisionRegistry;
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.actorSystem.shutdown();
+ unregisterSupportedRpcs();
+ unregisterSupportedRoutedRpcs();
+ }
+
+ @Override
+ public void onSessionInitiated(Broker.ProviderSession session) {
+ this.brokerSession = session;
+ start();
+ }
+
+ @Override
+ public Collection<ProviderFunctionality> getProviderFunctionality() {
+ return null;
+ }
+
+ private void start() {
+ LOG.debug("Starting all rpc listeners.");
+ // Create actor to handle and sync routing table in cluster
+ ClusterWrapper clusterWrapper = new ClusterWrapperImpl(actorSystem);
+ rpcRegistry = actorSystem.actorOf(RpcRegistry.props(clusterWrapper), "rpc-registry");
+
+ // Create actor to invoke and execute rpc
+ SchemaService schemaService = brokerSession.getService(SchemaService.class);
+ SchemaContext schemaContext = schemaService.getGlobalContext();
+ rpcBroker = actorSystem.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "rpc-broker");
+ String rpcBrokerPath = clusterWrapper.getAddress().toString() + "/user/rpc-broker";
+ rpcListener = new RpcListener(rpcRegistry, rpcBrokerPath);
+ routeChangeListener = new RoutedRpcListener(rpcRegistry, rpcBrokerPath);
+ rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext);
+ brokerSession.addRpcRegistrationListener(rpcListener);
+ rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
+ rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
+ announceSupportedRpcs();
+ announceSupportedRoutedRpcs();
+
+ }
+
+ /**
+ * Add all the locally registered RPCs in the clustered routing table
+ */
+ private void announceSupportedRpcs(){
+ LOG.debug("Adding all supported rpcs to routing table");
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for (QName rpc : currentlySupported) {
+ rpcListener.onRpcImplementationAdded(rpc);
+ }
+ }
+
+ /**
+ * Add all the locally registered Routed RPCs in the clustered routing table
+ */
+ private void announceSupportedRoutedRpcs(){
+
+ //TODO: announce all routed RPCs as well
+
+ }
+
+ /**
+ * Un-Register all the supported RPCs from clustered routing table
+ */
+ private void unregisterSupportedRpcs(){
+ LOG.debug("removing all supported rpcs to routing table");
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for (QName rpc : currentlySupported) {
+ rpcListener.onRpcImplementationRemoved(rpc);
+ }
+ }
+
+ /**
+ * Un-Register all the locally supported Routed RPCs from clustered routing table
+ */
+ private void unregisterSupportedRoutedRpcs(){
+
+ //TODO: remove all routed RPCs as well
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.osgi.framework.BundleContext;
+
+public class RemoteRpcProviderFactory {
+ public static RemoteRpcProvider createInstance(final Broker broker, final BundleContext bundleContext){
+ RemoteRpcProvider rpcProvider =
+ new RemoteRpcProvider(ActorSystemFactory.getInstance(), (RpcProvisionRegistry) broker);
+ broker.registerProvider(rpcProvider, bundleContext);
+ return rpcProvider;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+import java.io.Serializable;
+
+public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>,Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private QName context;
+ private QName type;
+ private InstanceIdentifier route;
+
+ public RouteIdentifierImpl(QName context, QName type, InstanceIdentifier route) {
+ this.context = context;
+ this.type = type;
+ this.route = route;
+ }
+
+ @Override
+ public QName getContext() {
+ return this.context;
+ }
+
+ @Override
+ public QName getType() {
+ return this.type;
+ }
+
+ @Override
+ public InstanceIdentifier getRoute() {
+ return this.route;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ RouteIdentifierImpl that = (RouteIdentifierImpl) o;
+
+ if (context == null){
+ if (that.getContext() != null) return false;
+ }else
+ if (!context.equals(that.context)) return false;
+
+ if (route == null){
+ if (that.getRoute() != null) return false;
+ }else
+ if (!route.equals(that.route)) return false;
+
+ if (type == null){
+ if (that.getType() != null) return false;
+ }else
+ if (!type.equals(that.type)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int prime = 31;
+ int result = 0;
+ result = prime * result + (context == null ? 0:context.hashCode());
+ result = prime * result + (type == null ? 0:type.hashCode());
+ result = prime * result + (route == null ? 0:route.hashCode());
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "RouteIdentifierImpl{" +
+ "context=" + context +
+ ", type=" + type +
+ ", route=" + route +
+ '}';
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
+import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class RoutedRpcListener implements RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
+ private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class);
+ private final ActorRef rpcRegistry;
+ private final String actorPath;
+
+ public RoutedRpcListener(ActorRef rpcRegistry, String actorPath) {
+ this.rpcRegistry = rpcRegistry;
+ this.actorPath = actorPath;
+ }
+
+ @Override
+ public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
+ Map<RpcRoutingContext, Set<InstanceIdentifier>> announcements = routeChange.getAnnouncements();
+ announce(getRouteIdentifiers(announcements));
+
+ Map<RpcRoutingContext, Set<InstanceIdentifier>> removals = routeChange.getRemovals();
+ remove(getRouteIdentifiers(removals));
+ }
+
+ /**
+ *
+ * @param announcements
+ */
+ private void announce(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
+ LOG.debug("Announcing [{}]", announcements);
+ AddRoutedRpc addRpcMsg = new AddRoutedRpc(announcements, actorPath);
+ try {
+ ActorUtil.executeLocalOperation(rpcRegistry, addRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
+ } catch (Exception e) {
+ // Just logging it because Akka API throws this exception
+ LOG.error(e.toString());
+ }
+ }
+
+ /**
+ *
+ * @param removals
+ */
+ private void remove(Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
+ LOG.debug("Removing [{}]", removals);
+ RemoveRoutedRpc removeRpcMsg = new RemoveRoutedRpc(removals, actorPath);
+ try {
+ ActorUtil.executeLocalOperation(rpcRegistry, removeRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
+ } catch (Exception e) {
+ // Just logging it because Akka API throws this exception
+ LOG.error(e.toString());
+ }
+ }
+
+ /**
+ *
+ * @param changes
+ * @return
+ */
+ private Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers(Map<RpcRoutingContext, Set<InstanceIdentifier>> changes) {
+ RouteIdentifierImpl routeId = null;
+ Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdSet = new HashSet<>();
+
+ for (RpcRoutingContext context : changes.keySet()){
+ for (InstanceIdentifier instanceId : changes.get(context)){
+ routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId);
+ routeIdSet.add(routeId);
+ }
+ }
+ return routeIdSet;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.GetRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Future;
+
+/**
+ * Actor to initiate execution of remote RPC on other nodes of the cluster.
+ */
+
+public class RpcBroker extends AbstractUntypedActor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
+ private final Broker.ProviderSession brokerSession;
+ private final ActorRef rpcRegistry;
+ private final SchemaContext schemaContext;
+
+ private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, SchemaContext schemaContext){
+ this.brokerSession = brokerSession;
+ this.rpcRegistry = rpcRegistry;
+ this.schemaContext = schemaContext;
+ }
+
+ public static Props props(final Broker.ProviderSession brokerSession, final ActorRef rpcRegistry, final SchemaContext schemaContext){
+ return Props.create(new Creator<RpcBroker>(){
+
+ @Override
+ public RpcBroker create() throws Exception {
+ return new RpcBroker(brokerSession, rpcRegistry, schemaContext);
+ }
+ });
+ }
+ @Override
+ protected void handleReceive(Object message) throws Exception {
+ if(message instanceof InvokeRoutedRpc) {
+ invokeRemoteRoutedRpc((InvokeRoutedRpc) message);
+ } else if(message instanceof InvokeRpc) {
+ invokeRemoteRpc((InvokeRpc) message);
+ } else if(message instanceof ExecuteRpc) {
+ executeRpc((ExecuteRpc) message);
+ }
+ }
+
+ private void invokeRemoteRoutedRpc(InvokeRoutedRpc msg) {
+ // Look up the remote actor to execute rpc
+ LOG.debug("Looking up the remote actor for route {}", msg);
+ try {
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier());
+ GetRoutedRpc routedRpcMsg = new GetRoutedRpc(routeId);
+ GetRoutedRpcReply rpcReply = (GetRoutedRpcReply)ActorUtil.executeLocalOperation(rpcRegistry, routedRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
+
+ String remoteActorPath = rpcReply.getRoutePath();
+ if(remoteActorPath == null) {
+ LOG.debug("No remote actor found for rpc execution.");
+
+ getSender().tell(new ErrorResponse(
+ new IllegalStateException("No remote actor found for rpc execution.")), self());
+ } else {
+
+ ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
+
+ Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath),
+ executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
+
+ getSender().tell(operationRes, self());
+ }
+ } catch (Exception e) {
+ LOG.error(e.toString());
+ getSender().tell(new ErrorResponse(e), self());
+ }
+ }
+
+ private void invokeRemoteRpc(InvokeRpc msg) {
+ // Look up the remote actor to execute rpc
+ LOG.debug("Looking up the remote actor for route {}", msg);
+ try {
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), null);
+ GetRpc rpcMsg = new GetRpc(routeId);
+ GetRpcReply rpcReply = (GetRpcReply)ActorUtil.executeLocalOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
+ String remoteActorPath = rpcReply.getRoutePath();
+
+ if(remoteActorPath == null) {
+ LOG.debug("No remote actor found for rpc execution.");
+
+ getSender().tell(new ErrorResponse(
+ new IllegalStateException("No remote actor found for rpc execution.")), self());
+ } else {
+ ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc());
+ Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath),
+ executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION);
+
+ getSender().tell(operationRes, self());
+ }
+ } catch (Exception e) {
+ LOG.error(e.toString());
+ getSender().tell(new ErrorResponse(e), self());
+ }
+ }
+
+ private void executeRpc(ExecuteRpc msg) {
+ LOG.debug("Executing rpc for rpc {}", msg.getRpc());
+ try {
+ Future<RpcResult<CompositeNode>> rpc = brokerSession.rpc(msg.getRpc(), XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext));
+ RpcResult<CompositeNode> rpcResult = rpc != null ? rpc.get():null;
+
+ CompositeNode result = rpcResult != null ? rpcResult.getResult() : null;
+ getSender().tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result, schemaContext)), self());
+ } catch (Exception e) {
+ LOG.error(e.toString());
+ getSender().tell(new ErrorResponse(e), self());
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.remote.rpc.messages.AddRpc;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RpcListener implements RpcRegistrationListener{
+
+ private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class);
+ private final ActorRef rpcRegistry;
+ private final String actorPath;
+
+ public RpcListener(ActorRef rpcRegistry, String actorPath) {
+ this.rpcRegistry = rpcRegistry;
+ this.actorPath = actorPath;
+ }
+
+ @Override
+ public void onRpcImplementationAdded(QName rpc) {
+ LOG.debug("Adding registration for [{}]", rpc);
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
+ AddRpc addRpcMsg = new AddRpc(routeId, actorPath);
+ try {
+ ActorUtil.executeLocalOperation(rpcRegistry, addRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
+ LOG.debug("Route added [{}-{}]", routeId, this.actorPath);
+ } catch (Exception e) {
+ // Just logging it because Akka API throws this exception
+ LOG.error(e.toString());
+ }
+
+ }
+
+ @Override
+ public void onRpcImplementationRemoved(QName rpc) {
+ LOG.debug("Removing registration for [{}]", rpc);
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
+ RemoveRpc removeRpcMsg = new RemoveRpc(routeId);
+ try {
+ ActorUtil.executeLocalOperation(rpcRegistry, removeRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION);
+ } catch (Exception e) {
+ // Just logging it because Akka API throws this exception
+ LOG.error(e.toString());
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import akka.actor.Terminated;
+import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import org.opendaylight.controller.remote.rpc.messages.Monitor;
+
+public class TerminationMonitor extends UntypedActor{
+ protected final LoggingAdapter LOG =
+ Logging.getLogger(getContext().system(), this);
+
+ public TerminationMonitor(){
+ LOG.info("Created TerminationMonitor");
+ }
+
+ @Override public void onReceive(Object message) throws Exception {
+ if(message instanceof Terminated){
+ Terminated terminated = (Terminated) message;
+ LOG.debug("Actor terminated : {}", terminated.actor());
+ }else if(message instanceof Monitor){
+ Monitor monitor = (Monitor) message;
+ getContext().watch(monitor.getActorRef());
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+import com.google.common.base.Optional;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.impl.XmlTreeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
+import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.xml.sax.SAXException;
+
+import javax.activation.UnsupportedDataTypeException;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Set;
+
+public class XmlUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(XmlUtils.class);
+
+ public static String inputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
+ if (cNode == null) return new String();
+
+ //Document domTree = NodeUtils.buildShadowDomTree(cNode);
+ Document domTree = null;
+ try {
+ Set<RpcDefinition> rpcs = schemaContext.getOperations();
+ for(RpcDefinition rpc : rpcs) {
+ if(rpc.getQName().equals(cNode.getNodeType())){
+ domTree = XmlDocumentUtils.toDocument(cNode, rpc.getInput(), XmlDocumentUtils.defaultValueCodecProvider());
+ break;
+ }
+ }
+
+ } catch (UnsupportedDataTypeException e) {
+ LOG.error("Error during translation of CompositeNode to Document", e);
+ }
+ return domTransformer(domTree);
+ }
+
+ public static String outputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){
+ if (cNode == null) return new String();
+
+ //Document domTree = NodeUtils.buildShadowDomTree(cNode);
+ Document domTree = null;
+ try {
+ Set<RpcDefinition> rpcs = schemaContext.getOperations();
+ for(RpcDefinition rpc : rpcs) {
+ if(rpc.getQName().equals(cNode.getNodeType())){
+ domTree = XmlDocumentUtils.toDocument(cNode, rpc.getInput(), XmlDocumentUtils.defaultValueCodecProvider());
+ break;
+ }
+ }
+
+ } catch (UnsupportedDataTypeException e) {
+ LOG.error("Error during translation of CompositeNode to Document", e);
+ }
+ return domTransformer(domTree);
+ }
+
+ private static String domTransformer(Document domTree) {
+ StringWriter writer = new StringWriter();
+ try {
+ TransformerFactory tf = TransformerFactory.newInstance();
+ Transformer transformer = tf.newTransformer();
+ transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
+ transformer.transform(new DOMSource(domTree), new StreamResult(writer));
+ } catch (TransformerException e) {
+
+ LOG.error("Error during translation of Document to OutputStream", e);
+ }
+ LOG.debug("compositeNodeToXml " + writer.toString());
+
+ return writer.toString();
+ }
+
+ public static CompositeNode xmlToCompositeNode(String xml){
+ if (xml==null || xml.length()==0) return null;
+
+ Node<?> dataTree;
+ try {
+ dataTree = XmlTreeBuilder.buildDataTree(new ByteArrayInputStream(xml.getBytes()));
+ } catch (XMLStreamException e) {
+ LOG.error("Error during building data tree from XML", e);
+ return null;
+ }
+ if (dataTree == null) {
+ LOG.error("data tree is null");
+ return null;
+ }
+ if (dataTree instanceof SimpleNode) {
+ LOG.error("RPC XML was resolved as SimpleNode");
+ return null;
+ }
+ return (CompositeNode) dataTree;
+ }
+
+ public static CompositeNode inputXmlToCompositeNode(QName rpc, String xml, SchemaContext schemaContext){
+ if (xml==null || xml.length()==0) return null;
+
+ Node<?> dataTree = null;
+ try {
+
+ Document doc = XmlUtil.readXmlToDocument(xml);
+ LOG.debug("xmlToCompositeNode Document is " + xml );
+ Set<RpcDefinition> rpcs = schemaContext.getOperations();
+ for(RpcDefinition rpcDef : rpcs) {
+ if(rpcDef.getQName().equals(rpc)){
+ dataTree = XmlDocumentUtils.toDomNode(doc.getDocumentElement(), Optional.<DataSchemaNode>of(rpcDef.getInput()), Optional.of(XmlDocumentUtils.defaultValueCodecProvider()));
+ break;
+ }
+ }
+ } catch (SAXException e) {
+ LOG.error("Error during building data tree from XML", e);
+ } catch (IOException e) {
+ LOG.error("Error during building data tree from XML", e);
+ }
+
+ LOG.debug("xmlToCompositeNode " + dataTree.toString());
+ return (CompositeNode) dataTree;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class AddRoutedRpc implements Serializable {
+
+ Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements;
+ String actorPath;
+
+ public AddRoutedRpc(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements, String actorPath) {
+ this.announcements = announcements;
+ this.actorPath = actorPath;
+ }
+
+ public Set<RpcRouter.RouteIdentifier<?, ?, ?>> getAnnouncements() {
+ return announcements;
+ }
+
+ public String getActorPath() {
+ return actorPath;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+
+import java.io.Serializable;
+
+public class AddRpc implements Serializable {
+
+ RouteIdentifierImpl routeId;
+ String actorPath;
+
+ public AddRpc(RouteIdentifierImpl routeId, String actorPath) {
+ this.routeId = routeId;
+ this.actorPath = actorPath;
+ }
+
+ public RouteIdentifierImpl getRouteId() {
+ return routeId;
+ }
+
+ public String getActorPath() {
+ return actorPath;
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import java.io.Serializable;
+
+public class ErrorResponse implements Serializable {
+
+ Exception exception;
+
+ public ErrorResponse(Exception e) {
+ this.exception = e;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+
+import org.opendaylight.yangtools.yang.common.QName;
+
+import java.io.Serializable;
+
+public class ExecuteRpc implements Serializable {
+
+ private String inputCompositeNode;
+ private QName rpc;
+
+ public ExecuteRpc(String inputCompositeNode, QName rpc) {
+ this.inputCompositeNode = inputCompositeNode;
+ this.rpc = rpc;
+ }
+
+ public String getInputCompositeNode() {
+ return inputCompositeNode;
+ }
+
+ public QName getRpc() {
+ return rpc;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+
+import java.io.Serializable;
+
+public class GetRoutedRpc implements Serializable {
+
+ RouteIdentifierImpl routeId;
+
+ public GetRoutedRpc(RouteIdentifierImpl routeId) {
+ this.routeId = routeId;
+ }
+
+ public RouteIdentifierImpl getRouteId() {
+ return routeId;
+ }
+}
--- /dev/null
+package org.opendaylight.controller.remote.rpc.messages;
+
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+import java.io.Serializable;
+
+public class GetRoutedRpcReply implements Serializable {
+
+ private String routePath;
+
+ public GetRoutedRpcReply(String routePath) {
+ this.routePath = routePath;
+ }
+
+ public String getRoutePath() {
+ return routePath;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+
+import java.io.Serializable;
+
+public class GetRpc implements Serializable {
+
+ RouteIdentifierImpl routeId;
+
+ public GetRpc(RouteIdentifierImpl routeId) {
+ this.routeId = routeId;
+ }
+
+ public RouteIdentifierImpl getRouteId() {
+ return routeId;
+ }
+}
--- /dev/null
+package org.opendaylight.controller.remote.rpc.messages;
+
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+import java.io.Serializable;
+
+public class GetRpcReply implements Serializable {
+
+ private String routePath;
+
+ public GetRpcReply(String routePath) {
+ this.routePath = routePath;
+ }
+
+ public String getRoutePath() {
+ return routePath;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+import java.io.Serializable;
+
+public class InvokeRoutedRpc implements Serializable {
+
+ private QName rpc;
+ private InstanceIdentifier identifier;
+ private CompositeNode input;
+
+ public InvokeRoutedRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+ this.rpc = rpc;
+ this.identifier = identifier;
+ this.input = input;
+ }
+
+ public InvokeRoutedRpc(QName rpc, CompositeNode input) {
+ this.rpc = rpc;
+ this.input = input;
+ }
+
+ public QName getRpc() {
+ return rpc;
+ }
+
+ public InstanceIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ public CompositeNode getInput() {
+ return input;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+
+import java.io.Serializable;
+
+public class InvokeRpc implements Serializable {
+
+ private QName rpc;
+ private CompositeNode input;
+
+ public InvokeRpc(QName rpc, CompositeNode input) {
+ this.rpc = rpc;
+ this.input = input;
+ }
+
+ public QName getRpc() {
+ return rpc;
+ }
+
+ public CompositeNode getInput() {
+ return input;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+import akka.actor.ActorRef;
+
+public class Monitor {
+ private final ActorRef actorRef;
+
+ public Monitor(ActorRef actorRef){
+
+ this.actorRef = actorRef;
+ }
+
+ public ActorRef getActorRef() {
+ return actorRef;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class RemoveRoutedRpc implements Serializable {
+
+ Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements;
+ String actorPath;
+
+ public RemoveRoutedRpc(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements, String actorPath) {
+ this.announcements = announcements;
+ this.actorPath = actorPath;
+ }
+
+ public Set<RpcRouter.RouteIdentifier<?, ?, ?>> getAnnouncements() {
+ return announcements;
+ }
+
+ public String getActorPath() {
+ return actorPath;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+
+import java.io.Serializable;
+
+public class RemoveRpc implements Serializable {
+
+ RouteIdentifierImpl routeId;
+
+ public RemoveRpc(RouteIdentifierImpl routeId) {
+ this.routeId = routeId;
+ }
+
+ public RouteIdentifierImpl getRouteId() {
+ return routeId;
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.messages;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+
+import java.io.Serializable;
+import java.util.LinkedHashSet;
+import java.util.Map;
+
+public class RoutingTableData implements Serializable {
+ private Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> rpcMap;
+ private Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> routedRpcMap;
+
+ public RoutingTableData(Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> rpcMap,
+ Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> routedRpcMap) {
+ this.rpcMap = rpcMap;
+ this.routedRpcMap = routedRpcMap;
+ }
+
+ public Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> getRpcMap() {
+ return rpcMap;
+ }
+
+ public Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> getRoutedRpcMap() {
+ return routedRpcMap;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.messages;
+
+
+
+import java.io.Serializable;
+
+public class RpcResponse implements Serializable {
+ private String resultCompositeNode;
+
+ public RpcResponse(String resultCompositeNode) {
+ this.resultCompositeNode = resultCompositeNode;
+ }
+
+ public String getResultCompositeNode() {
+ return resultCompositeNode;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+
+import akka.actor.Address;
+import akka.cluster.ClusterEvent;
+
+public interface ClusterWrapper {
+
+ ClusterEvent.CurrentClusterState getState();
+
+ Address getAddress();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+
+
+public class ClusterWrapperImpl implements ClusterWrapper{
+
+ private Cluster cluster;
+
+ public ClusterWrapperImpl(ActorSystem actorSystem) {
+ cluster = Cluster.get(actorSystem);
+ }
+
+ @Override
+ public ClusterEvent.CurrentClusterState getState() {
+ return cluster.state();
+ }
+
+ @Override
+ public Address getAddress() {
+ return cluster.selfAddress();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class RoutingTable<I, R> {
+
+ private final Logger LOG = LoggerFactory.getLogger(RoutingTable.class);
+
+ private ConcurrentMap<I,R> globalRpcMap = new ConcurrentHashMap<>();
+ private ConcurrentMap<I, LinkedHashSet<R>> routedRpcMap = new ConcurrentHashMap<>();
+
+ public ConcurrentMap<I, R> getGlobalRpcMap() {
+ return globalRpcMap;
+ }
+
+ public ConcurrentMap<I, LinkedHashSet<R>> getRoutedRpcMap() {
+ return routedRpcMap;
+ }
+
+ public R getGlobalRoute(final I routeId) {
+ Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
+ return globalRpcMap.get(routeId);
+ }
+
+ public void addGlobalRoute(final I routeId, final R route) {
+ Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
+ Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
+ LOG.debug("addGlobalRoute: adding a new route with id[{}] and value [{}]", routeId, route);
+ if(globalRpcMap.putIfAbsent(routeId, route) != null) {
+ LOG.debug("A route already exist for route id [{}] ", routeId);
+ }
+ }
+
+ public void removeGlobalRoute(final I routeId) {
+ Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
+ LOG.debug("removeGlobalRoute: removing a new route with id [{}]", routeId);
+ globalRpcMap.remove(routeId);
+ }
+
+ public Set<R> getRoutedRpc(final I routeId) {
+ Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
+ Set<R> routes = routedRpcMap.get(routeId);
+
+ if (routes == null) {
+ return Collections.emptySet();
+ }
+
+ return ImmutableSet.copyOf(routes);
+ }
+
+ public R getLastAddedRoutedRpc(final I routeId) {
+
+ Set<R> routes = getRoutedRpc(routeId);
+
+ if (routes.isEmpty()) {
+ return null;
+ }
+
+ R route = null;
+ Iterator<R> iter = routes.iterator();
+ while (iter.hasNext()) {
+ route = iter.next();
+ }
+
+ return route;
+ }
+
+ public void addRoutedRpc(final I routeId, final R route) {
+ Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
+ Preconditions.checkNotNull(route, "addRoute: route cannot be null");
+ LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
+ threadSafeAdd(routeId, route);
+ }
+
+ public void addRoutedRpcs(final Set<I> routeIds, final R route) {
+ Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
+ for (I routeId : routeIds){
+ addRoutedRpc(routeId, route);
+ }
+ }
+
+ public void removeRoute(final I routeId, final R route) {
+ Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
+ Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
+
+ LinkedHashSet<R> routes = routedRpcMap.get(routeId);
+ if (routes == null) {
+ return;
+ }
+ LOG.debug("removeRoute: removing a new route with k/v [{}/{}]", routeId, route);
+ threadSafeRemove(routeId, route);
+ }
+
+ public void removeRoutes(final Set<I> routeIds, final R route) {
+ Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
+ for (I routeId : routeIds){
+ removeRoute(routeId, route);
+ }
+ }
+
+ /**
+ * This method guarantees that no 2 thread over write each other's changes.
+ * Just so that we dont end up in infinite loop, it tries for 100 times then throw
+ */
+ private void threadSafeAdd(final I routeId, final R route) {
+
+ for (int i=0;i<100;i++){
+
+ LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
+ updatedRoutes.add(route);
+ LinkedHashSet<R> oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes);
+ if (oldRoutes == null) {
+ return;
+ }
+
+ updatedRoutes = new LinkedHashSet<>(oldRoutes);
+ updatedRoutes.add(route);
+
+ if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
+ return;
+ }
+ }
+ //the method did not already return means it failed to add route in 100 attempts
+ throw new IllegalStateException("Failed to add route [" + routeId + "]");
+ }
+
+ /**
+ * This method guarantees that no 2 thread over write each other's changes.
+ * Just so that we dont end up in infinite loop, it tries for 100 times then throw
+ */
+ private void threadSafeRemove(final I routeId, final R route) {
+ LinkedHashSet<R> updatedRoutes = null;
+ for (int i=0;i<100;i++){
+ LinkedHashSet<R> oldRoutes = routedRpcMap.get(routeId);
+
+ // if route to be deleted is the only entry in the set then remove routeId from the cache
+ if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
+ routedRpcMap.remove(routeId);
+ return;
+ }
+
+ // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
+ updatedRoutes = new LinkedHashSet<>(oldRoutes);
+ updatedRoutes.remove(route);
+ if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) {
+ return;
+ }
+
+ }
+ //the method did not already return means it failed to remove route in 100 attempts
+ throw new IllegalStateException("Failed to remove route [" + routeId + "]");
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry;
+
+import akka.actor.ActorSelection;
+import akka.actor.Address;
+import akka.actor.Props;
+import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
+import akka.japi.Creator;
+import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
+import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.AddRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.GetRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
+import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This Actor maintains the routing table state and sync it with other nodes in the cluster.
+ *
+ * A scheduler runs after an interval of time, which pick a random member from the cluster
+ * and send the current state of routing table to the member.
+ *
+ * when a message of routing table data is received, it gets merged with the local routing table
+ * to keep the latest data.
+ */
+
+public class RpcRegistry extends AbstractUntypedActor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class);
+ private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ private final ClusterWrapper clusterWrapper;
+ private final ScheduledFuture<?> syncScheduler;
+
+ private RpcRegistry(ClusterWrapper clusterWrapper){
+ this.routingTable = new RoutingTable<>();
+ this.clusterWrapper = clusterWrapper;
+ this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
+ }
+
+ public static Props props(final ClusterWrapper clusterWrapper){
+ return Props.create(new Creator<RpcRegistry>(){
+
+ @Override
+ public RpcRegistry create() throws Exception {
+ return new RpcRegistry(clusterWrapper);
+ }
+ });
+ }
+
+ @Override
+ protected void handleReceive(Object message) throws Exception {
+ LOG.debug("Received message {}", message);
+ if(message instanceof RoutingTableData) {
+ syncRoutingTable((RoutingTableData) message);
+ } else if(message instanceof GetRoutedRpc) {
+ getRoutedRpc((GetRoutedRpc) message);
+ } else if(message instanceof GetRpc) {
+ getRpc((GetRpc) message);
+ } else if(message instanceof AddRpc) {
+ addRpc((AddRpc) message);
+ } else if(message instanceof RemoveRpc) {
+ removeRpc((RemoveRpc) message);
+ } else if(message instanceof AddRoutedRpc) {
+ addRoutedRpc((AddRoutedRpc) message);
+ } else if(message instanceof RemoveRoutedRpc) {
+ removeRoutedRpc((RemoveRoutedRpc) message);
+ }
+ }
+
+ private void getRoutedRpc(GetRoutedRpc rpcMsg){
+ LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
+ String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
+ GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
+
+ getSender().tell(routedRpcReply, self());
+ }
+
+ private void getRpc(GetRpc rpcMsg) {
+ LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
+ String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
+ GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
+
+ getSender().tell(rpcReply, self());
+ }
+
+ private void addRpc(AddRpc rpcMsg) {
+ LOG.debug("Add Rpc to routing table {}", rpcMsg);
+ routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
+
+ getSender().tell("Success", self());
+ }
+
+ private void removeRpc(RemoveRpc rpcMsg) {
+ LOG.debug("Removing Rpc to routing table {}", rpcMsg);
+ routingTable.removeGlobalRoute(rpcMsg.getRouteId());
+
+ getSender().tell("Success", self());
+ }
+
+ private void addRoutedRpc(AddRoutedRpc rpcMsg) {
+ routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
+ getSender().tell("Success", self());
+ }
+
+ private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
+ routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
+ getSender().tell("Success", self());
+ }
+
+ private void syncRoutingTable(RoutingTableData routingTableData) {
+ LOG.debug("Syncing routing table {}", routingTableData);
+
+ Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
+ Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
+ for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+ routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
+ }
+
+ Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
+ routingTableData.getRoutedRpcMap();
+ routeIds = newRoutedRpcMap.keySet();
+
+ for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+ Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
+ for(String routeAddress : routeAddresses) {
+ routingTable.addRoutedRpc(routeId, routeAddress);
+ }
+ }
+ }
+
+ private ActorSelection getRandomRegistryActor() {
+ ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
+ ActorSelection actor = null;
+ Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
+ int memberSize = members.size();
+ // Don't select yourself
+ if(memberSize > 1) {
+ Address currentNodeAddress = clusterWrapper.getAddress();
+ int index = new Random().nextInt(memberSize);
+ int i = 0;
+ // keeping previous member, in case when random index member is same as current actor
+ // and current actor member is last in set
+ Member previousMember = null;
+ for(Member member : members){
+ if(i == index-1) {
+ previousMember = member;
+ }
+ if(i == index) {
+ if(!currentNodeAddress.equals(member.address())) {
+ actor = this.context().actorSelection(member.address() + "/user/rpc-registry");
+ break;
+ } else if(index < memberSize-1){ // pick the next element in the set
+ index++;
+ }
+ }
+ i++;
+ }
+ if(actor == null && previousMember != null) {
+ actor = this.context().actorSelection(previousMember.address() + "/user/rpc-registry");
+ }
+ }
+ return actor;
+ }
+
+ private class SendRoutingTable implements Runnable {
+
+ @Override
+ public void run() {
+ RoutingTableData routingTableData =
+ new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
+ LOG.debug("Sending routing table for sync {}", routingTableData);
+ ActorSelection actor = getRandomRegistryActor();
+ if(actor != null) {
+ actor.tell(routingTableData, self());
+ }
+ }
+ }
+}
--- /dev/null
+odl-cluster{
+ akka {
+ actor {
+ provider = "akka.cluster.ClusterActorRefProvider"
+
+ }
+ remote {
+ log-remote-lifecycle-events = off
+ netty.tcp {
+ hostname = "192.168.141.142"
+ port = 2551
+ }
+ }
+
+ cluster {
+ seed-nodes = ["akka.tcp://opendaylight-rpc@192.168.141.141:2551"]
+
+ auto-down-unreachable-after = 10s
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+module remote-rpc-connector {
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector";
+ prefix "remote-rpc-connector";
+
+ import config { prefix config; revision-date 2013-04-05; }
+ import opendaylight-md-sal-dom {prefix dom;}
+
+ description
+ "This module contains the base YANG definitions for
+ the remote routed rpc";
+
+ revision "2014-07-07" {
+ description
+ "Initial revision";
+ }
+
+ // This is the definition of the service implementation as a module identity.
+ identity remote-rpc-connector {
+ base config:module-type;
+ // Specifies the prefix for generated java classes.
+ config:java-name-prefix RemoteRPCBroker;
+ }
+
+ augment "/config:modules/config:module/config:configuration" {
+ case remote-rpc-connector {
+ when "/config:modules/config:module/config:type = 'remote-rpc-connector'";
+
+ container dom-broker {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity dom:dom-broker-osgi-registry;
+ }
+ }
+ }
+ }
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc;
+
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import com.google.common.util.concurrent.Futures;
+import junit.framework.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.AddRpc;
+import org.opendaylight.controller.remote.rpc.messages.ErrorResponse;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.InvokeRpc;
+import org.opendaylight.controller.remote.rpc.messages.RpcResponse;
+import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.ModifyAction;
+import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RpcBrokerTest {
+
+ static ActorSystem system;
+
+
+ @BeforeClass
+ public static void setup() {
+ system = ActorSystem.create();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ @Test
+ public void testInvokeRpcError() throws URISyntaxException {
+ new JavaTestKit(system) {{
+ ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
+ Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
+ QName rpc = new QName(new URI("actor1"), "actor1");
+ InvokeRpc invokeMsg = new InvokeRpc(rpc, null);
+ rpcBroker.tell(invokeMsg, getRef());
+
+ Boolean getMsg = new ExpectMsg<Boolean>("ErrorResponse") {
+ protected Boolean match(Object in) {
+ if (in instanceof ErrorResponse) {
+ ErrorResponse reply = (ErrorResponse)in;
+ return "No remote actor found for rpc execution.".equals(reply.getException().getMessage());
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(getMsg);
+ }};
+ }
+
+ /**
+ * This test method invokes and executes the remote rpc
+ */
+
+ @Test
+ public void testInvokeRpc() throws URISyntaxException {
+ new JavaTestKit(system) {{
+ ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
+ Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
+ ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor1");
+ // Add RPC in table
+ QName rpc = new QName(new URI("actor1"), "actor1");
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
+ final String route = rpcBrokerRemote.path().toString();
+ AddRpc rpcMsg = new AddRpc(routeId, route);
+ rpcRegistry.tell(rpcMsg, getRef());
+ expectMsgEquals(duration("2 second"), "Success");
+
+ // invoke rpc
+ CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
+ CompositeNode invokeRpcResult = mock(CompositeNode.class);
+ Collection<RpcError> errors = new ArrayList<>();
+ RpcResult<CompositeNode> result = Rpcs.getRpcResult(true, invokeRpcResult, errors);
+ Future<RpcResult<CompositeNode>> rpcResult = Futures.immediateFuture(result);
+ when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult);
+ InvokeRpc invokeMsg = new InvokeRpc(rpc, input);
+ rpcBroker.tell(invokeMsg, getRef());
+
+ //verify response msg
+ Boolean getMsg = new ExpectMsg<Boolean>("RpcResponse") {
+ protected Boolean match(Object in) {
+ if (in instanceof RpcResponse) {
+ return true;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(getMsg);
+ }};
+ }
+
+ @Test
+ public void testInvokeRoutedRpcError() throws URISyntaxException {
+ new JavaTestKit(system) {{
+ ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
+ Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class);
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
+ QName rpc = new QName(new URI("actor1"), "actor1");
+ InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, null);
+ rpcBroker.tell(invokeMsg, getRef());
+
+ Boolean getMsg = new ExpectMsg<Boolean>("ErrorResponse") {
+ protected Boolean match(Object in) {
+ if (in instanceof ErrorResponse) {
+ ErrorResponse reply = (ErrorResponse)in;
+ return "No remote actor found for rpc execution.".equals(reply.getException().getMessage());
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(getMsg);
+ }};
+ }
+
+ /**
+ * This test method invokes and executes the remote routed rpc
+ */
+
+ @Test
+ public void testInvokeRoutedRpc() throws URISyntaxException {
+ new JavaTestKit(system) {{
+ ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class)));
+ Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class);
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext));
+ ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor2");
+ // Add Routed RPC in table
+ QName rpc = new QName(new URI("actor2"), "actor2");
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null);
+ final String route = rpcBrokerRemote.path().toString();
+ Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
+ routeIds.add(routeId);
+
+ AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route);
+ rpcRegistry.tell(rpcMsg, getRef());
+ expectMsgEquals(duration("2 second"), "Success");
+
+ // invoke rpc
+ CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList<Node<?>>(), ModifyAction.REPLACE);
+ CompositeNode invokeRpcResult = mock(CompositeNode.class);
+ Collection<RpcError> errors = new ArrayList<>();
+ RpcResult<CompositeNode> result = Rpcs.getRpcResult(true, invokeRpcResult, errors);
+ Future<RpcResult<CompositeNode>> rpcResult = Futures.immediateFuture(result);
+ when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult);
+ InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, input);
+ rpcBroker.tell(invokeMsg, getRef());
+
+ //verify response msg
+ Boolean getMsg = new ExpectMsg<Boolean>("RpcResponse") {
+ protected Boolean match(Object in) {
+ if (in instanceof RpcResponse) {
+ return true;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(getMsg);
+ }};
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class RoutingTableTest {
+
+ private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable =
+ new RoutingTable<>();
+
+ @Test
+ public void addGlobalRouteNullRouteIdTest() {
+ try {
+ routingTable.addGlobalRoute(null, null);
+
+ Assert.fail("Null pointer exception was not thrown.");
+ } catch (Exception e) {
+ Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName());
+ Assert.assertEquals("addGlobalRoute: routeId cannot be null!", e.getMessage());
+ }
+ }
+
+ @Test
+ public void addGlobalRouteNullRouteTest() {
+ try {
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, null, null);
+ routingTable.addGlobalRoute(routeId, null);
+
+ Assert.fail("Null pointer exception was not thrown.");
+ } catch (Exception e) {
+ Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName());
+ Assert.assertEquals("addGlobalRoute: route cannot be null!", e.getMessage());
+ }
+ }
+
+ @Test
+ public void getGlobalRouteNullTest() {
+ try {
+ routingTable.getGlobalRoute(null);
+
+ Assert.fail("Null pointer exception was not thrown.");
+ } catch (Exception e) {
+ Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName());
+ Assert.assertEquals("getGlobalRoute: routeId cannot be null!", e.getMessage());
+ }
+ }
+
+ @Test
+ public void getGlobalRouteTest() throws URISyntaxException {
+ QName type = new QName(new URI("actor1"), "actor1");
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+ String route = "actor1";
+
+ routingTable.addGlobalRoute(routeId, route);
+
+ String returnedRoute = routingTable.getGlobalRoute(routeId);
+
+ Assert.assertEquals(route, returnedRoute);
+
+ }
+
+ @Test
+ public void removeGlobalRouteTest() throws URISyntaxException {
+ QName type = new QName(new URI("actorRemove"), "actorRemove");
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+ String route = "actorRemove";
+
+ routingTable.addGlobalRoute(routeId, route);
+
+ String returnedRoute = routingTable.getGlobalRoute(routeId);
+
+ Assert.assertEquals(route, returnedRoute);
+
+ routingTable.removeGlobalRoute(routeId);
+
+ String deletedRoute = routingTable.getGlobalRoute(routeId);
+
+ Assert.assertNull(deletedRoute);
+ }
+
+ @Test
+ public void addRoutedRpcNullRouteIdTest() {
+ try {
+ routingTable.addRoutedRpc(null, null);
+
+ Assert.fail("Null pointer exception was not thrown.");
+ } catch (Exception e) {
+ Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName());
+ Assert.assertEquals("addRoute: routeId cannot be null", e.getMessage());
+ }
+ }
+
+ @Test
+ public void addRoutedRpcNullRouteTest() {
+ try {
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, null, null);
+
+ routingTable.addRoutedRpc(routeId, null);
+
+ Assert.fail("Null pointer exception was not thrown.");
+ } catch (Exception e) {
+ Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName());
+ Assert.assertEquals("addRoute: route cannot be null", e.getMessage());
+ }
+ }
+
+ @Test
+ public void getRoutedRpcNullTest() {
+ try {
+ routingTable.getRoutedRpc(null);
+
+ Assert.fail("Null pointer exception was not thrown.");
+ } catch (Exception e) {
+ Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName());
+ Assert.assertEquals("getRoutes: routeId cannot be null!", e.getMessage());
+ }
+ }
+
+ @Test
+ public void getRoutedRpcTest() throws URISyntaxException {
+ QName type = new QName(new URI("actor1"), "actor1");
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+ String route = "actor1";
+
+ routingTable.addRoutedRpc(routeId, route);
+
+ Set<String> routes = routingTable.getRoutedRpc(routeId);
+
+ Assert.assertEquals(1, routes.size());
+ Assert.assertTrue(routes.contains(route));
+
+ }
+
+ @Test
+ public void getLastRoutedRpcTest() throws URISyntaxException {
+ QName type = new QName(new URI("first1"), "first1");
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+ String route = "first1";
+
+ routingTable.addRoutedRpc(routeId, route);
+
+ String route2 = "second1";
+ routingTable.addRoutedRpc(routeId, route2);
+
+ String latest = routingTable.getLastAddedRoutedRpc(routeId);
+ Assert.assertEquals(route2, latest);
+
+ }
+
+ @Test
+ public void removeRoutedRpcTest() throws URISyntaxException {
+ QName type = new QName(new URI("remove"), "remove");
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+ String route = "remove";
+ routingTable.addRoutedRpc(routeId, route);
+
+ String latest = routingTable.getLastAddedRoutedRpc(routeId);
+ Assert.assertEquals(route, latest);
+
+ routingTable.removeRoute(routeId, route);
+ String removed = routingTable.getLastAddedRoutedRpc(routeId);
+ Assert.assertNull(removed);
+ }
+
+ @Test
+ public void removeRoutedRpcsTest() throws URISyntaxException {
+ QName type = new QName(new URI("remove1"), "remove1");
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+
+ QName type2 = new QName(new URI("remove2"), "remove2");
+ RouteIdentifierImpl routeId2 = new RouteIdentifierImpl(null, type2, null);
+
+ Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
+ routeIds.add(routeId);
+ routeIds.add(routeId2);
+ String route = "remove1";
+
+ routingTable.addRoutedRpcs(routeIds, route);
+ String latest1 = routingTable.getLastAddedRoutedRpc(routeId);
+ Assert.assertEquals(route, latest1);
+
+ String latest2 = routingTable.getLastAddedRoutedRpc(routeId2);
+ Assert.assertEquals(route, latest2);
+
+ routingTable.removeRoutes(routeIds, route);
+ String removed1 = routingTable.getLastAddedRoutedRpc(routeId);
+ Assert.assertNull(removed1);
+
+ String removed2 = routingTable.getLastAddedRoutedRpc(routeId2);
+ Assert.assertNull(removed2);
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.remote.rpc.registry;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import junit.framework.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.AddRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.GetRpc;
+import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
+import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.yangtools.yang.common.QName;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class RpcRegistryTest {
+
+ static ActorSystem system;
+
+
+ @BeforeClass
+ public static void setup() {
+ system = ActorSystem.create();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ /**
+ This test add, read and remove an entry in global rpc
+ */
+ @Test
+ public void testGlobalRpc() throws URISyntaxException {
+ new JavaTestKit(system) {{
+ ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
+ QName type = new QName(new URI("actor1"), "actor1");
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+ final String route = "actor1";
+
+ AddRpc rpcMsg = new AddRpc(routeId, route);
+ rpcRegistry.tell(rpcMsg, getRef());
+ expectMsgEquals(duration("2 second"), "Success");
+
+ GetRpc getRpc = new GetRpc(routeId);
+ rpcRegistry.tell(getRpc, getRef());
+
+ Boolean getMsg = new ExpectMsg<Boolean>("GetRpcReply") {
+ protected Boolean match(Object in) {
+ if (in instanceof GetRpcReply) {
+ GetRpcReply reply = (GetRpcReply)in;
+ return route.equals(reply.getRoutePath());
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(getMsg);
+
+ RemoveRpc removeMsg = new RemoveRpc(routeId);
+ rpcRegistry.tell(removeMsg, getRef());
+ expectMsgEquals(duration("2 second"), "Success");
+
+ rpcRegistry.tell(getRpc, getRef());
+
+ Boolean getNullMsg = new ExpectMsg<Boolean>("GetRpcReply") {
+ protected Boolean match(Object in) {
+ if (in instanceof GetRpcReply) {
+ GetRpcReply reply = (GetRpcReply)in;
+ return reply.getRoutePath() == null;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+ Assert.assertTrue(getNullMsg);
+ }};
+
+ }
+
+ /**
+ This test add, read and remove an entry in routed rpc
+ */
+ @Test
+ public void testRoutedRpc() throws URISyntaxException {
+ new JavaTestKit(system) {{
+ ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class)));
+ QName type = new QName(new URI("actor1"), "actor1");
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null);
+ final String route = "actor1";
+
+ Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new HashSet<>();
+ routeIds.add(routeId);
+
+ AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route);
+ rpcRegistry.tell(rpcMsg, getRef());
+ expectMsgEquals(duration("2 second"), "Success");
+
+ GetRoutedRpc getRpc = new GetRoutedRpc(routeId);
+ rpcRegistry.tell(getRpc, getRef());
+
+ Boolean getMsg = new ExpectMsg<Boolean>("GetRoutedRpcReply") {
+ protected Boolean match(Object in) {
+ if (in instanceof GetRoutedRpcReply) {
+ GetRoutedRpcReply reply = (GetRoutedRpcReply)in;
+ return route.equals(reply.getRoutePath());
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(getMsg);
+
+ RemoveRoutedRpc removeMsg = new RemoveRoutedRpc(routeIds, route);
+ rpcRegistry.tell(removeMsg, getRef());
+ expectMsgEquals(duration("2 second"), "Success");
+
+ rpcRegistry.tell(getRpc, getRef());
+
+ Boolean getNullMsg = new ExpectMsg<Boolean>("GetRoutedRpcReply") {
+ protected Boolean match(Object in) {
+ if (in instanceof GetRoutedRpcReply) {
+ GetRoutedRpcReply reply = (GetRoutedRpcReply)in;
+ return reply.getRoutePath() == null;
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+ Assert.assertTrue(getNullMsg);
+ }};
+
+ }
+
+}