From: Tony Tkacik Date: Mon, 28 Jul 2014 17:45:38 +0000 (+0000) Subject: Merge "Introduction of XSQL" X-Git-Tag: release/helium~417 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=ff2f98614e20366d532439b73d9a51470210ae61;hp=800c476b2210456c33e4950cf345144cba02a4cd Merge "Introduction of XSQL" --- diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index 163c64ed52..d6c65a4272 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -1235,6 +1235,11 @@ sal-rest-connector ${mdsal.version} + + org.opendaylight.controller + sal-rest-connector-config + ${mdsal.version} + org.opendaylight.controller sal-rest-docgen diff --git a/opendaylight/distribution/opendaylight/pom.xml b/opendaylight/distribution/opendaylight/pom.xml index d238ee75cc..3131bd5edd 100644 --- a/opendaylight/distribution/opendaylight/pom.xml +++ b/opendaylight/distribution/opendaylight/pom.xml @@ -732,6 +732,27 @@ + + org.apache.maven.plugins + maven-dependency-plugin + 2.8 + + + unpack-provided-configs + + unpack-dependencies + + generate-resources + + ${project.build.directory}/configuration + sal-rest-connector-config + **\/*.xml + true + false + + + + maven-assembly-plugin 2.3 @@ -1009,6 +1030,10 @@ org.opendaylight.controller sal-rest-connector + + org.opendaylight.controller + sal-rest-connector-config + org.opendaylight.controller sal-rest-docgen @@ -1285,6 +1310,20 @@ package + + unpack-provided-configs + + unpack-dependencies + + generate-resources + + ${project.build.directory}/generated-resources/opendaylight/configuration + sal-rest-connector-config + **\/*.xml + true + false + + diff --git a/opendaylight/distribution/opendaylight/src/assemble/bin.xml b/opendaylight/distribution/opendaylight/src/assemble/bin.xml index 0ff4c9a83c..e5fc98a863 100644 --- a/opendaylight/distribution/opendaylight/src/assemble/bin.xml +++ b/opendaylight/distribution/opendaylight/src/assemble/bin.xml @@ -75,6 +75,13 @@ opendaylight/ + + ${project.build.directory}/configuration/initial + /opendaylight/configuration/initial + + **/META-INF/** + + diff --git a/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-md-sal.xml b/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-md-sal.xml index 38df344f85..9534094fa8 100644 --- a/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-md-sal.xml +++ b/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-md-sal.xml @@ -72,7 +72,17 @@ --> - + + prefix:inmemory-operational-datastore-provider operational-store-service diff --git a/opendaylight/md-sal/pom.xml b/opendaylight/md-sal/pom.xml index 1cfd5a6639..2cddbbf354 100644 --- a/opendaylight/md-sal/pom.xml +++ b/opendaylight/md-sal/pom.xml @@ -43,6 +43,7 @@ sal-connector-api sal-rest-connector + sal-rest-connector-config sal-netconf-connector inventory-manager diff --git a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml new file mode 100644 index 0000000000..6ee301d827 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml @@ -0,0 +1,177 @@ + + + 4.0.0 + + org.opendaylight.controller + sal-parent + 1.1-SNAPSHOT + + sal-remoterpc-connector + bundle + + + + com.google.guava + guava + + + + com.typesafe.akka + akka-actor_${scala.version} + + + + com.typesafe.akka + akka-cluster_${scala.version} + + + + com.typesafe.akka + akka-remote_${scala.version} + + + + com.typesafe.akka + akka-testkit_${scala.version} + + + + + + org.opendaylight.controller + sal-connector-api + + + + org.opendaylight.controller + sal-common-util + + + + org.opendaylight.controller + sal-core-api + + + org.opendaylight.controller + netconf-util + + + + + + org.opendaylight.yangtools + yang-data-api + + + + org.opendaylight.yangtools + yang-data-impl + + + + org.opendaylight.yangtools + yang-common + + + + + org.osgi + org.osgi.core + + + + org.slf4j + slf4j-api + + + + org.scala-lang + scala-library + + + + + junit + junit + test + + + org.mockito + mockito-all + test + + + + org.slf4j + slf4j-simple + ${slf4j.version} + test + + + + + + + + + org.apache.felix + maven-bundle-plugin + true + + + ${project.groupId}.${project.artifactId} + + + !org.jboss.*;!com.jcraft.*;* + + !sal*; + !*config-api*; + !*testkit*; + *protobuf*; + akka*; + *scala*; + *config*; + *netty*; + *uncommons*; + + true + + + + + org.opendaylight.yangtools + yang-maven-plugin + + + config + + generate-sources + + + + + org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator + ${jmxGeneratorPath} + + urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang + + + + org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl + ${salGeneratorPath} + + + true + + + + + + + + scm:git:ssh://git.opendaylight.org:29418/controller.git + scm:git:ssh://git.opendaylight.org:29418/controller.git + HEAD + https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL:Architecture:Clustering + + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java new file mode 100644 index 0000000000..8315bbeeb3 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java @@ -0,0 +1,31 @@ +package org.opendaylight.controller.config.yang.config.remote_rpc_connector; + +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderFactory; +import org.opendaylight.controller.sal.core.api.Broker; +import org.osgi.framework.BundleContext; + +public class RemoteRPCBrokerModule extends org.opendaylight.controller.config.yang.config.remote_rpc_connector.AbstractRemoteRPCBrokerModule { + private BundleContext bundleContext; + public RemoteRPCBrokerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { + super(identifier, dependencyResolver); + } + + public RemoteRPCBrokerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.config.remote_rpc_connector.RemoteRPCBrokerModule oldModule, java.lang.AutoCloseable oldInstance) { + super(identifier, dependencyResolver, oldModule, oldInstance); + } + + @Override + public void customValidation() { + // add custom validation form module attributes here. + } + + @Override + public java.lang.AutoCloseable createInstance() { + Broker broker = getDomBrokerDependency(); + return RemoteRpcProviderFactory.createInstance(broker, bundleContext); + } + + public void setBundleContext(final BundleContext bundleContext) { + this.bundleContext = bundleContext; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModuleFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModuleFactory.java new file mode 100644 index 0000000000..e1ba46ae15 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModuleFactory.java @@ -0,0 +1,35 @@ +/* +* Generated file +* +* Generated from: yang module name: remote-rpc-connector yang module local name: remote-rpc-connector +* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator +* Generated at: Mon Jul 07 17:02:25 PDT 2014 +* +* Do not modify this file unless it is present under src/main directory +*/ +package org.opendaylight.controller.config.yang.config.remote_rpc_connector; + +import org.opendaylight.controller.config.api.DependencyResolver; +import org.opendaylight.controller.config.api.DynamicMBeanWithInstance; +import org.opendaylight.controller.config.spi.Module; +import org.osgi.framework.BundleContext; + +public class RemoteRPCBrokerModuleFactory extends org.opendaylight.controller.config.yang.config.remote_rpc_connector.AbstractRemoteRPCBrokerModuleFactory { + + @Override + public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) { + RemoteRPCBrokerModule module = (RemoteRPCBrokerModule)super.createModule(instanceName,dependencyResolver,bundleContext); + module.setBundleContext(bundleContext); + return module; + } + + @Override + public Module createModule(String instanceName, DependencyResolver dependencyResolver, + DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception { + RemoteRPCBrokerModule module = (RemoteRPCBrokerModule)super.createModule(instanceName, dependencyResolver, + old, bundleContext); + module.setBundleContext(bundleContext); + return module; + } + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java new file mode 100644 index 0000000000..66593ae2a5 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/AbstractUntypedActor.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + +import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import org.opendaylight.controller.remote.rpc.messages.Monitor; + +public abstract class AbstractUntypedActor extends UntypedActor { + protected final LoggingAdapter LOG = + Logging.getLogger(getContext().system(), this); + + + public AbstractUntypedActor(){ + LOG.debug("Actor created {}", getSelf()); + getContext(). + system(). + actorSelection("user/termination-monitor"). + tell(new Monitor(getSelf()), getSelf()); + } + + @Override public void onReceive(Object message) throws Exception { + LOG.debug("Received message {}", message); + handleReceive(message); + LOG.debug("Done handling message {}", message); + } + + protected abstract void handleReceive(Object message) throws Exception; +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java new file mode 100644 index 0000000000..4a6124a3bc --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorSystemFactory.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import com.google.common.base.Function; +import com.typesafe.config.ConfigFactory; + +import javax.annotation.Nullable; + +public class ActorSystemFactory { + private static final ActorSystem actorSystem = (new Function(){ + + @Nullable @Override public ActorSystem apply(@Nullable Void aVoid) { + ActorSystem system = + ActorSystem.create("opendaylight-rpc", ConfigFactory + .load().getConfig("odl-cluster")); + system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor"); + return system; + } + }).apply(null); + + public static final ActorSystem getInstance(){ + return actorSystem; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorUtil.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorUtil.java new file mode 100644 index 0000000000..13324f996e --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorUtil.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + * + */ +package org.opendaylight.controller.remote.rpc; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.util.Timeout; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +import static akka.pattern.Patterns.ask; + +public class ActorUtil { + public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS); + public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS); + public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS); + public static final FiniteDuration LOCAL_AWAIT_DURATION = Duration.create(2, TimeUnit.SECONDS); + public static final FiniteDuration REMOTE_AWAIT_DURATION = Duration.create(15, TimeUnit.SECONDS); + public static final FiniteDuration AWAIT_DURATION = Duration.create(17, TimeUnit.SECONDS); + + /** + * Executes an operation on a local actor and wait for it's response + * @param actor + * @param message + * @param askDuration + * @param awaitDuration + * @return The response of the operation + */ + public static Object executeLocalOperation(ActorRef actor, Object message, + FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception{ + Future future = + ask(actor, message, new Timeout(askDuration)); + + return Await.result(future, awaitDuration); + } + + /** + * Execute an operation on a remote actor and wait for it's response + * @param actor + * @param message + * @param askDuration + * @param awaitDuration + * @return + */ + public static Object executeRemoteOperation(ActorSelection actor, Object message, + FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception{ + Future future = + ask(actor, message, new Timeout(askDuration)); + return Await.result(future, awaitDuration); + } + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java new file mode 100644 index 0000000000..43aa5b7e85 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java @@ -0,0 +1,77 @@ +package org.opendaylight.controller.remote.rpc; + +import akka.actor.ActorRef; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.remote.rpc.messages.ErrorResponse; +import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; +import org.opendaylight.controller.remote.rpc.messages.RpcResponse; +import org.opendaylight.controller.sal.common.util.RpcErrors; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation; +import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; + +public class RemoteRpcImplementation implements RpcImplementation, + RoutedRpcDefaultImplementation { + private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcImplementation.class); + private ActorRef rpcBroker; + private SchemaContext schemaContext; + + public RemoteRpcImplementation(ActorRef rpcBroker, SchemaContext schemaContext) { + this.rpcBroker = rpcBroker; + this.schemaContext = schemaContext; + } + + @Override + public ListenableFuture> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { + InvokeRoutedRpc rpcMsg = new InvokeRoutedRpc(rpc, identifier, input); + + return executeMsg(rpcMsg); + } + + @Override + public Set getSupportedRpcs() { + // TODO : check if we need to get this from routing registry + return Collections.emptySet(); + } + + @Override + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { + InvokeRpc rpcMsg = new InvokeRpc(rpc, input); + return executeMsg(rpcMsg); + } + + private ListenableFuture> executeMsg(Object rpcMsg) { + CompositeNode result = null; + Collection errors = errors = new ArrayList<>(); + try { + Object response = ActorUtil.executeLocalOperation(rpcBroker, rpcMsg, ActorUtil.ASK_DURATION, ActorUtil.AWAIT_DURATION); + if(response instanceof RpcResponse) { + RpcResponse rpcResponse = (RpcResponse) response; + result = XmlUtils.xmlToCompositeNode(rpcResponse.getResultCompositeNode()); + } else if(response instanceof ErrorResponse) { + ErrorResponse errorResponse = (ErrorResponse) response; + Exception e = errorResponse.getException(); + errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause())); + } + } catch (Exception e) { + LOG.error("Error occurred while invoking RPC actor {}", e.toString()); + errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause())); + } + return Futures.immediateFuture(Rpcs.getRpcResult(true, result, errors)); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java new file mode 100644 index 0000000000..1bb7ea4514 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper; +import org.opendaylight.controller.remote.rpc.registry.ClusterWrapperImpl; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.sal.core.api.Provider; +import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; +import org.opendaylight.controller.sal.core.api.model.SchemaService; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Set; + +/** + * This is the base class which initialize all the actors, listeners and + * default RPc implementation so remote invocation of rpcs. + */ +public class RemoteRpcProvider implements AutoCloseable, Provider{ + + private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class); + + private final ActorSystem actorSystem; + private ActorRef rpcBroker; + private ActorRef rpcRegistry; + private final RpcProvisionRegistry rpcProvisionRegistry; + private Broker.ProviderSession brokerSession; + private RpcListener rpcListener; + private RoutedRpcListener routeChangeListener; + private RemoteRpcImplementation rpcImplementation; + public RemoteRpcProvider(ActorSystem actorSystem, RpcProvisionRegistry rpcProvisionRegistry) { + this.actorSystem = actorSystem; + this.rpcProvisionRegistry = rpcProvisionRegistry; + } + + @Override + public void close() throws Exception { + this.actorSystem.shutdown(); + unregisterSupportedRpcs(); + unregisterSupportedRoutedRpcs(); + } + + @Override + public void onSessionInitiated(Broker.ProviderSession session) { + this.brokerSession = session; + start(); + } + + @Override + public Collection getProviderFunctionality() { + return null; + } + + private void start() { + LOG.debug("Starting all rpc listeners."); + // Create actor to handle and sync routing table in cluster + ClusterWrapper clusterWrapper = new ClusterWrapperImpl(actorSystem); + rpcRegistry = actorSystem.actorOf(RpcRegistry.props(clusterWrapper), "rpc-registry"); + + // Create actor to invoke and execute rpc + SchemaService schemaService = brokerSession.getService(SchemaService.class); + SchemaContext schemaContext = schemaService.getGlobalContext(); + rpcBroker = actorSystem.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "rpc-broker"); + String rpcBrokerPath = clusterWrapper.getAddress().toString() + "/user/rpc-broker"; + rpcListener = new RpcListener(rpcRegistry, rpcBrokerPath); + routeChangeListener = new RoutedRpcListener(rpcRegistry, rpcBrokerPath); + rpcImplementation = new RemoteRpcImplementation(rpcBroker, schemaContext); + brokerSession.addRpcRegistrationListener(rpcListener); + rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener); + rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation); + announceSupportedRpcs(); + announceSupportedRoutedRpcs(); + + } + + /** + * Add all the locally registered RPCs in the clustered routing table + */ + private void announceSupportedRpcs(){ + LOG.debug("Adding all supported rpcs to routing table"); + Set currentlySupported = brokerSession.getSupportedRpcs(); + for (QName rpc : currentlySupported) { + rpcListener.onRpcImplementationAdded(rpc); + } + } + + /** + * Add all the locally registered Routed RPCs in the clustered routing table + */ + private void announceSupportedRoutedRpcs(){ + + //TODO: announce all routed RPCs as well + + } + + /** + * Un-Register all the supported RPCs from clustered routing table + */ + private void unregisterSupportedRpcs(){ + LOG.debug("removing all supported rpcs to routing table"); + Set currentlySupported = brokerSession.getSupportedRpcs(); + for (QName rpc : currentlySupported) { + rpcListener.onRpcImplementationRemoved(rpc); + } + } + + /** + * Un-Register all the locally supported Routed RPCs from clustered routing table + */ + private void unregisterSupportedRoutedRpcs(){ + + //TODO: remove all routed RPCs as well + + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java new file mode 100644 index 0000000000..61dc8183df --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + + +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; +import org.osgi.framework.BundleContext; + +public class RemoteRpcProviderFactory { + public static RemoteRpcProvider createInstance(final Broker broker, final BundleContext bundleContext){ + RemoteRpcProvider rpcProvider = + new RemoteRpcProvider(ActorSystemFactory.getInstance(), (RpcProvisionRegistry) broker); + broker.registerProvider(rpcProvider, bundleContext); + return rpcProvider; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java new file mode 100644 index 0000000000..ea72238fbc --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; + +import java.io.Serializable; + +public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier,Serializable { + private static final long serialVersionUID = 1L; + + private QName context; + private QName type; + private InstanceIdentifier route; + + public RouteIdentifierImpl(QName context, QName type, InstanceIdentifier route) { + this.context = context; + this.type = type; + this.route = route; + } + + @Override + public QName getContext() { + return this.context; + } + + @Override + public QName getType() { + return this.type; + } + + @Override + public InstanceIdentifier getRoute() { + return this.route; + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RouteIdentifierImpl that = (RouteIdentifierImpl) o; + + if (context == null){ + if (that.getContext() != null) return false; + }else + if (!context.equals(that.context)) return false; + + if (route == null){ + if (that.getRoute() != null) return false; + }else + if (!route.equals(that.route)) return false; + + if (type == null){ + if (that.getType() != null) return false; + }else + if (!type.equals(that.type)) return false; + + return true; + } + + @Override + public int hashCode() { + int prime = 31; + int result = 0; + result = prime * result + (context == null ? 0:context.hashCode()); + result = prime * result + (type == null ? 0:type.hashCode()); + result = prime * result + (route == null ? 0:route.hashCode()); + return result; + } + + @Override + public String toString() { + return "RouteIdentifierImpl{" + + "context=" + context + + ", type=" + type + + ", route=" + route + + '}'; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java new file mode 100644 index 0000000000..a0df3629fa --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + + +import akka.actor.ActorRef; +import org.opendaylight.controller.md.sal.common.api.routing.RouteChange; +import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; +import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.sal.core.api.RpcRoutingContext; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class RoutedRpcListener implements RouteChangeListener{ + private static final Logger LOG = LoggerFactory.getLogger(RoutedRpcListener.class); + private final ActorRef rpcRegistry; + private final String actorPath; + + public RoutedRpcListener(ActorRef rpcRegistry, String actorPath) { + this.rpcRegistry = rpcRegistry; + this.actorPath = actorPath; + } + + @Override + public void onRouteChange(RouteChange routeChange) { + Map> announcements = routeChange.getAnnouncements(); + announce(getRouteIdentifiers(announcements)); + + Map> removals = routeChange.getRemovals(); + remove(getRouteIdentifiers(removals)); + } + + /** + * + * @param announcements + */ + private void announce(Set> announcements) { + LOG.debug("Announcing [{}]", announcements); + AddRoutedRpc addRpcMsg = new AddRoutedRpc(announcements, actorPath); + try { + ActorUtil.executeLocalOperation(rpcRegistry, addRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); + } catch (Exception e) { + // Just logging it because Akka API throws this exception + LOG.error(e.toString()); + } + } + + /** + * + * @param removals + */ + private void remove(Set> removals){ + LOG.debug("Removing [{}]", removals); + RemoveRoutedRpc removeRpcMsg = new RemoveRoutedRpc(removals, actorPath); + try { + ActorUtil.executeLocalOperation(rpcRegistry, removeRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); + } catch (Exception e) { + // Just logging it because Akka API throws this exception + LOG.error(e.toString()); + } + } + + /** + * + * @param changes + * @return + */ + private Set> getRouteIdentifiers(Map> changes) { + RouteIdentifierImpl routeId = null; + Set> routeIdSet = new HashSet<>(); + + for (RpcRoutingContext context : changes.keySet()){ + for (InstanceIdentifier instanceId : changes.get(context)){ + routeId = new RouteIdentifierImpl(null, context.getRpc(), instanceId); + routeIdSet.add(routeId); + } + } + return routeIdSet; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java new file mode 100644 index 0000000000..3354fc3da2 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.japi.Creator; +import org.opendaylight.controller.remote.rpc.messages.ErrorResponse; +import org.opendaylight.controller.remote.rpc.messages.ExecuteRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; +import org.opendaylight.controller.remote.rpc.messages.GetRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; +import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; +import org.opendaylight.controller.remote.rpc.messages.RpcResponse; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Future; + +/** + * Actor to initiate execution of remote RPC on other nodes of the cluster. + */ + +public class RpcBroker extends AbstractUntypedActor { + + private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class); + private final Broker.ProviderSession brokerSession; + private final ActorRef rpcRegistry; + private final SchemaContext schemaContext; + + private RpcBroker(Broker.ProviderSession brokerSession, ActorRef rpcRegistry, SchemaContext schemaContext){ + this.brokerSession = brokerSession; + this.rpcRegistry = rpcRegistry; + this.schemaContext = schemaContext; + } + + public static Props props(final Broker.ProviderSession brokerSession, final ActorRef rpcRegistry, final SchemaContext schemaContext){ + return Props.create(new Creator(){ + + @Override + public RpcBroker create() throws Exception { + return new RpcBroker(brokerSession, rpcRegistry, schemaContext); + } + }); + } + @Override + protected void handleReceive(Object message) throws Exception { + if(message instanceof InvokeRoutedRpc) { + invokeRemoteRoutedRpc((InvokeRoutedRpc) message); + } else if(message instanceof InvokeRpc) { + invokeRemoteRpc((InvokeRpc) message); + } else if(message instanceof ExecuteRpc) { + executeRpc((ExecuteRpc) message); + } + } + + private void invokeRemoteRoutedRpc(InvokeRoutedRpc msg) { + // Look up the remote actor to execute rpc + LOG.debug("Looking up the remote actor for route {}", msg); + try { + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), msg.getIdentifier()); + GetRoutedRpc routedRpcMsg = new GetRoutedRpc(routeId); + GetRoutedRpcReply rpcReply = (GetRoutedRpcReply)ActorUtil.executeLocalOperation(rpcRegistry, routedRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); + + String remoteActorPath = rpcReply.getRoutePath(); + if(remoteActorPath == null) { + LOG.debug("No remote actor found for rpc execution."); + + getSender().tell(new ErrorResponse( + new IllegalStateException("No remote actor found for rpc execution.")), self()); + } else { + + ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc()); + + Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath), + executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION); + + getSender().tell(operationRes, self()); + } + } catch (Exception e) { + LOG.error(e.toString()); + getSender().tell(new ErrorResponse(e), self()); + } + } + + private void invokeRemoteRpc(InvokeRpc msg) { + // Look up the remote actor to execute rpc + LOG.debug("Looking up the remote actor for route {}", msg); + try { + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, msg.getRpc(), null); + GetRpc rpcMsg = new GetRpc(routeId); + GetRpcReply rpcReply = (GetRpcReply)ActorUtil.executeLocalOperation(rpcRegistry, rpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); + String remoteActorPath = rpcReply.getRoutePath(); + + if(remoteActorPath == null) { + LOG.debug("No remote actor found for rpc execution."); + + getSender().tell(new ErrorResponse( + new IllegalStateException("No remote actor found for rpc execution.")), self()); + } else { + ExecuteRpc executeMsg = new ExecuteRpc(XmlUtils.inputCompositeNodeToXml(msg.getInput(), schemaContext), msg.getRpc()); + Object operationRes = ActorUtil.executeRemoteOperation(this.context().actorSelection(remoteActorPath), + executeMsg, ActorUtil.REMOTE_ASK_DURATION, ActorUtil.REMOTE_AWAIT_DURATION); + + getSender().tell(operationRes, self()); + } + } catch (Exception e) { + LOG.error(e.toString()); + getSender().tell(new ErrorResponse(e), self()); + } + } + + private void executeRpc(ExecuteRpc msg) { + LOG.debug("Executing rpc for rpc {}", msg.getRpc()); + try { + Future> rpc = brokerSession.rpc(msg.getRpc(), XmlUtils.inputXmlToCompositeNode(msg.getRpc(), msg.getInputCompositeNode(), schemaContext)); + RpcResult rpcResult = rpc != null ? rpc.get():null; + + CompositeNode result = rpcResult != null ? rpcResult.getResult() : null; + getSender().tell(new RpcResponse(XmlUtils.outputCompositeNodeToXml(result, schemaContext)), self()); + } catch (Exception e) { + LOG.error(e.toString()); + getSender().tell(new ErrorResponse(e), self()); + } + } + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java new file mode 100644 index 0000000000..ae760fadc4 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + + +import akka.actor.ActorRef; +import org.opendaylight.controller.remote.rpc.messages.AddRpc; +import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; +import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; +import org.opendaylight.yangtools.yang.common.QName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RpcListener implements RpcRegistrationListener{ + + private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class); + private final ActorRef rpcRegistry; + private final String actorPath; + + public RpcListener(ActorRef rpcRegistry, String actorPath) { + this.rpcRegistry = rpcRegistry; + this.actorPath = actorPath; + } + + @Override + public void onRpcImplementationAdded(QName rpc) { + LOG.debug("Adding registration for [{}]", rpc); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null); + AddRpc addRpcMsg = new AddRpc(routeId, actorPath); + try { + ActorUtil.executeLocalOperation(rpcRegistry, addRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); + LOG.debug("Route added [{}-{}]", routeId, this.actorPath); + } catch (Exception e) { + // Just logging it because Akka API throws this exception + LOG.error(e.toString()); + } + + } + + @Override + public void onRpcImplementationRemoved(QName rpc) { + LOG.debug("Removing registration for [{}]", rpc); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null); + RemoveRpc removeRpcMsg = new RemoveRpc(routeId); + try { + ActorUtil.executeLocalOperation(rpcRegistry, removeRpcMsg, ActorUtil.LOCAL_ASK_DURATION, ActorUtil.LOCAL_AWAIT_DURATION); + } catch (Exception e) { + // Just logging it because Akka API throws this exception + LOG.error(e.toString()); + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java new file mode 100644 index 0000000000..a90f1e1ed2 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + +import akka.actor.Terminated; +import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import org.opendaylight.controller.remote.rpc.messages.Monitor; + +public class TerminationMonitor extends UntypedActor{ + protected final LoggingAdapter LOG = + Logging.getLogger(getContext().system(), this); + + public TerminationMonitor(){ + LOG.info("Created TerminationMonitor"); + } + + @Override public void onReceive(Object message) throws Exception { + if(message instanceof Terminated){ + Terminated terminated = (Terminated) message; + LOG.debug("Actor terminated : {}", terminated.actor()); + }else if(message instanceof Monitor){ + Monitor monitor = (Monitor) message; + getContext().watch(monitor.getActorRef()); + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/XmlUtils.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/XmlUtils.java new file mode 100644 index 0000000000..5fb2bb8772 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/XmlUtils.java @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + +import com.google.common.base.Optional; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.Node; +import org.opendaylight.yangtools.yang.data.api.SimpleNode; +import org.opendaylight.yangtools.yang.data.impl.XmlTreeBuilder; +import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils; +import org.opendaylight.yangtools.yang.model.api.DataSchemaNode; +import org.opendaylight.yangtools.yang.model.api.RpcDefinition; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.xml.sax.SAXException; + +import javax.activation.UnsupportedDataTypeException; +import javax.xml.stream.XMLStreamException; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.StringWriter; +import java.util.Set; + +public class XmlUtils { + + private static final Logger LOG = LoggerFactory.getLogger(XmlUtils.class); + + public static String inputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){ + if (cNode == null) return new String(); + + //Document domTree = NodeUtils.buildShadowDomTree(cNode); + Document domTree = null; + try { + Set rpcs = schemaContext.getOperations(); + for(RpcDefinition rpc : rpcs) { + if(rpc.getQName().equals(cNode.getNodeType())){ + domTree = XmlDocumentUtils.toDocument(cNode, rpc.getInput(), XmlDocumentUtils.defaultValueCodecProvider()); + break; + } + } + + } catch (UnsupportedDataTypeException e) { + LOG.error("Error during translation of CompositeNode to Document", e); + } + return domTransformer(domTree); + } + + public static String outputCompositeNodeToXml(CompositeNode cNode, SchemaContext schemaContext){ + if (cNode == null) return new String(); + + //Document domTree = NodeUtils.buildShadowDomTree(cNode); + Document domTree = null; + try { + Set rpcs = schemaContext.getOperations(); + for(RpcDefinition rpc : rpcs) { + if(rpc.getQName().equals(cNode.getNodeType())){ + domTree = XmlDocumentUtils.toDocument(cNode, rpc.getInput(), XmlDocumentUtils.defaultValueCodecProvider()); + break; + } + } + + } catch (UnsupportedDataTypeException e) { + LOG.error("Error during translation of CompositeNode to Document", e); + } + return domTransformer(domTree); + } + + private static String domTransformer(Document domTree) { + StringWriter writer = new StringWriter(); + try { + TransformerFactory tf = TransformerFactory.newInstance(); + Transformer transformer = tf.newTransformer(); + transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes"); + transformer.transform(new DOMSource(domTree), new StreamResult(writer)); + } catch (TransformerException e) { + + LOG.error("Error during translation of Document to OutputStream", e); + } + LOG.debug("compositeNodeToXml " + writer.toString()); + + return writer.toString(); + } + + public static CompositeNode xmlToCompositeNode(String xml){ + if (xml==null || xml.length()==0) return null; + + Node dataTree; + try { + dataTree = XmlTreeBuilder.buildDataTree(new ByteArrayInputStream(xml.getBytes())); + } catch (XMLStreamException e) { + LOG.error("Error during building data tree from XML", e); + return null; + } + if (dataTree == null) { + LOG.error("data tree is null"); + return null; + } + if (dataTree instanceof SimpleNode) { + LOG.error("RPC XML was resolved as SimpleNode"); + return null; + } + return (CompositeNode) dataTree; + } + + public static CompositeNode inputXmlToCompositeNode(QName rpc, String xml, SchemaContext schemaContext){ + if (xml==null || xml.length()==0) return null; + + Node dataTree = null; + try { + + Document doc = XmlUtil.readXmlToDocument(xml); + LOG.debug("xmlToCompositeNode Document is " + xml ); + Set rpcs = schemaContext.getOperations(); + for(RpcDefinition rpcDef : rpcs) { + if(rpcDef.getQName().equals(rpc)){ + dataTree = XmlDocumentUtils.toDomNode(doc.getDocumentElement(), Optional.of(rpcDef.getInput()), Optional.of(XmlDocumentUtils.defaultValueCodecProvider())); + break; + } + } + } catch (SAXException e) { + LOG.error("Error during building data tree from XML", e); + } catch (IOException e) { + LOG.error("Error during building data tree from XML", e); + } + + LOG.debug("xmlToCompositeNode " + dataTree.toString()); + return (CompositeNode) dataTree; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRoutedRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRoutedRpc.java new file mode 100644 index 0000000000..25773bb8a6 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRoutedRpc.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.messages; + +import org.opendaylight.controller.sal.connector.api.RpcRouter; + +import java.io.Serializable; +import java.util.Set; + +public class AddRoutedRpc implements Serializable { + + Set> announcements; + String actorPath; + + public AddRoutedRpc(Set> announcements, String actorPath) { + this.announcements = announcements; + this.actorPath = actorPath; + } + + public Set> getAnnouncements() { + return announcements; + } + + public String getActorPath() { + return actorPath; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRpc.java new file mode 100644 index 0000000000..eac973137e --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRpc.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.messages; + +import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; + +import java.io.Serializable; + +public class AddRpc implements Serializable { + + RouteIdentifierImpl routeId; + String actorPath; + + public AddRpc(RouteIdentifierImpl routeId, String actorPath) { + this.routeId = routeId; + this.actorPath = actorPath; + } + + public RouteIdentifierImpl getRouteId() { + return routeId; + } + + public String getActorPath() { + return actorPath; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java new file mode 100644 index 0000000000..ef3f528112 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.messages; + +import java.io.Serializable; + +public class ErrorResponse implements Serializable { + + Exception exception; + + public ErrorResponse(Exception e) { + this.exception = e; + } + + public Exception getException() { + return exception; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java new file mode 100644 index 0000000000..030d81ac7e --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.messages; + + +import org.opendaylight.yangtools.yang.common.QName; + +import java.io.Serializable; + +public class ExecuteRpc implements Serializable { + + private String inputCompositeNode; + private QName rpc; + + public ExecuteRpc(String inputCompositeNode, QName rpc) { + this.inputCompositeNode = inputCompositeNode; + this.rpc = rpc; + } + + public String getInputCompositeNode() { + return inputCompositeNode; + } + + public QName getRpc() { + return rpc; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpc.java new file mode 100644 index 0000000000..b1fa410e8d --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpc.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.messages; + + +import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; + +import java.io.Serializable; + +public class GetRoutedRpc implements Serializable { + + RouteIdentifierImpl routeId; + + public GetRoutedRpc(RouteIdentifierImpl routeId) { + this.routeId = routeId; + } + + public RouteIdentifierImpl getRouteId() { + return routeId; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpcReply.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpcReply.java new file mode 100644 index 0000000000..0e1563340b --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpcReply.java @@ -0,0 +1,24 @@ +package org.opendaylight.controller.remote.rpc.messages; + +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +import java.io.Serializable; + +public class GetRoutedRpcReply implements Serializable { + + private String routePath; + + public GetRoutedRpcReply(String routePath) { + this.routePath = routePath; + } + + public String getRoutePath() { + return routePath; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpc.java new file mode 100644 index 0000000000..c55627961a --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpc.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.messages; + +import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; + +import java.io.Serializable; + +public class GetRpc implements Serializable { + + RouteIdentifierImpl routeId; + + public GetRpc(RouteIdentifierImpl routeId) { + this.routeId = routeId; + } + + public RouteIdentifierImpl getRouteId() { + return routeId; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpcReply.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpcReply.java new file mode 100644 index 0000000000..3309b989c5 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpcReply.java @@ -0,0 +1,24 @@ +package org.opendaylight.controller.remote.rpc.messages; + +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +import java.io.Serializable; + +public class GetRpcReply implements Serializable { + + private String routePath; + + public GetRpcReply(String routePath) { + this.routePath = routePath; + } + + public String getRoutePath() { + return routePath; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRoutedRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRoutedRpc.java new file mode 100644 index 0000000000..00ef980bb1 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRoutedRpc.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.messages; + +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; + +import java.io.Serializable; + +public class InvokeRoutedRpc implements Serializable { + + private QName rpc; + private InstanceIdentifier identifier; + private CompositeNode input; + + public InvokeRoutedRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { + this.rpc = rpc; + this.identifier = identifier; + this.input = input; + } + + public InvokeRoutedRpc(QName rpc, CompositeNode input) { + this.rpc = rpc; + this.input = input; + } + + public QName getRpc() { + return rpc; + } + + public InstanceIdentifier getIdentifier() { + return identifier; + } + + public CompositeNode getInput() { + return input; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java new file mode 100644 index 0000000000..1f4eab0971 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.messages; + +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; + +import java.io.Serializable; + +public class InvokeRpc implements Serializable { + + private QName rpc; + private CompositeNode input; + + public InvokeRpc(QName rpc, CompositeNode input) { + this.rpc = rpc; + this.input = input; + } + + public QName getRpc() { + return rpc; + } + + public CompositeNode getInput() { + return input; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/Monitor.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/Monitor.java new file mode 100644 index 0000000000..43a31ef7f2 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/Monitor.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.messages; + +import akka.actor.ActorRef; + +public class Monitor { + private final ActorRef actorRef; + + public Monitor(ActorRef actorRef){ + + this.actorRef = actorRef; + } + + public ActorRef getActorRef() { + return actorRef; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRoutedRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRoutedRpc.java new file mode 100644 index 0000000000..a3aa9d12fa --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRoutedRpc.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.messages; + +import org.opendaylight.controller.sal.connector.api.RpcRouter; + +import java.io.Serializable; +import java.util.Set; + +public class RemoveRoutedRpc implements Serializable { + + Set> announcements; + String actorPath; + + public RemoveRoutedRpc(Set> announcements, String actorPath) { + this.announcements = announcements; + this.actorPath = actorPath; + } + + public Set> getAnnouncements() { + return announcements; + } + + public String getActorPath() { + return actorPath; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRpc.java new file mode 100644 index 0000000000..0bfd78aaae --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRpc.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.messages; + +import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; + +import java.io.Serializable; + +public class RemoveRpc implements Serializable { + + RouteIdentifierImpl routeId; + + public RemoveRpc(RouteIdentifierImpl routeId) { + this.routeId = routeId; + } + + public RouteIdentifierImpl getRouteId() { + return routeId; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RoutingTableData.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RoutingTableData.java new file mode 100644 index 0000000000..132fdba054 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RoutingTableData.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.messages; + +import org.opendaylight.controller.sal.connector.api.RpcRouter; + +import java.io.Serializable; +import java.util.LinkedHashSet; +import java.util.Map; + +public class RoutingTableData implements Serializable { + private Map, String> rpcMap; + private Map, LinkedHashSet> routedRpcMap; + + public RoutingTableData(Map, String> rpcMap, + Map, LinkedHashSet> routedRpcMap) { + this.rpcMap = rpcMap; + this.routedRpcMap = routedRpcMap; + } + + public Map, String> getRpcMap() { + return rpcMap; + } + + public Map, LinkedHashSet> getRoutedRpcMap() { + return routedRpcMap; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java new file mode 100644 index 0000000000..cbfecb1918 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.messages; + + + +import java.io.Serializable; + +public class RpcResponse implements Serializable { + private String resultCompositeNode; + + public RpcResponse(String resultCompositeNode) { + this.resultCompositeNode = resultCompositeNode; + } + + public String getResultCompositeNode() { + return resultCompositeNode; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapper.java new file mode 100644 index 0000000000..4ddc2bee85 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapper.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.registry; + + +import akka.actor.Address; +import akka.cluster.ClusterEvent; + +public interface ClusterWrapper { + + ClusterEvent.CurrentClusterState getState(); + + Address getAddress(); +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapperImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapperImpl.java new file mode 100644 index 0000000000..89603a134c --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/ClusterWrapperImpl.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.registry; + + +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; + + +public class ClusterWrapperImpl implements ClusterWrapper{ + + private Cluster cluster; + + public ClusterWrapperImpl(ActorSystem actorSystem) { + cluster = Cluster.get(actorSystem); + } + + @Override + public ClusterEvent.CurrentClusterState getState() { + return cluster.state(); + } + + @Override + public Address getAddress() { + return cluster.selfAddress(); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java new file mode 100644 index 0000000000..5e19653a22 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.registry; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class RoutingTable { + + private final Logger LOG = LoggerFactory.getLogger(RoutingTable.class); + + private ConcurrentMap globalRpcMap = new ConcurrentHashMap<>(); + private ConcurrentMap> routedRpcMap = new ConcurrentHashMap<>(); + + public ConcurrentMap getGlobalRpcMap() { + return globalRpcMap; + } + + public ConcurrentMap> getRoutedRpcMap() { + return routedRpcMap; + } + + public R getGlobalRoute(final I routeId) { + Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!"); + return globalRpcMap.get(routeId); + } + + public void addGlobalRoute(final I routeId, final R route) { + Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!"); + Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!"); + LOG.debug("addGlobalRoute: adding a new route with id[{}] and value [{}]", routeId, route); + if(globalRpcMap.putIfAbsent(routeId, route) != null) { + LOG.debug("A route already exist for route id [{}] ", routeId); + } + } + + public void removeGlobalRoute(final I routeId) { + Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!"); + LOG.debug("removeGlobalRoute: removing a new route with id [{}]", routeId); + globalRpcMap.remove(routeId); + } + + public Set getRoutedRpc(final I routeId) { + Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!"); + Set routes = routedRpcMap.get(routeId); + + if (routes == null) { + return Collections.emptySet(); + } + + return ImmutableSet.copyOf(routes); + } + + public R getLastAddedRoutedRpc(final I routeId) { + + Set routes = getRoutedRpc(routeId); + + if (routes.isEmpty()) { + return null; + } + + R route = null; + Iterator iter = routes.iterator(); + while (iter.hasNext()) { + route = iter.next(); + } + + return route; + } + + public void addRoutedRpc(final I routeId, final R route) { + Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null"); + Preconditions.checkNotNull(route, "addRoute: route cannot be null"); + LOG.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route); + threadSafeAdd(routeId, route); + } + + public void addRoutedRpcs(final Set routeIds, final R route) { + Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null"); + for (I routeId : routeIds){ + addRoutedRpc(routeId, route); + } + } + + public void removeRoute(final I routeId, final R route) { + Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!"); + Preconditions.checkNotNull(route, "removeRoute: route cannot be null!"); + + LinkedHashSet routes = routedRpcMap.get(routeId); + if (routes == null) { + return; + } + LOG.debug("removeRoute: removing a new route with k/v [{}/{}]", routeId, route); + threadSafeRemove(routeId, route); + } + + public void removeRoutes(final Set routeIds, final R route) { + Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null"); + for (I routeId : routeIds){ + removeRoute(routeId, route); + } + } + + /** + * This method guarantees that no 2 thread over write each other's changes. + * Just so that we dont end up in infinite loop, it tries for 100 times then throw + */ + private void threadSafeAdd(final I routeId, final R route) { + + for (int i=0;i<100;i++){ + + LinkedHashSet updatedRoutes = new LinkedHashSet<>(); + updatedRoutes.add(route); + LinkedHashSet oldRoutes = routedRpcMap.putIfAbsent(routeId, updatedRoutes); + if (oldRoutes == null) { + return; + } + + updatedRoutes = new LinkedHashSet<>(oldRoutes); + updatedRoutes.add(route); + + if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) { + return; + } + } + //the method did not already return means it failed to add route in 100 attempts + throw new IllegalStateException("Failed to add route [" + routeId + "]"); + } + + /** + * This method guarantees that no 2 thread over write each other's changes. + * Just so that we dont end up in infinite loop, it tries for 100 times then throw + */ + private void threadSafeRemove(final I routeId, final R route) { + LinkedHashSet updatedRoutes = null; + for (int i=0;i<100;i++){ + LinkedHashSet oldRoutes = routedRpcMap.get(routeId); + + // if route to be deleted is the only entry in the set then remove routeId from the cache + if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){ + routedRpcMap.remove(routeId); + return; + } + + // if there are multiple routes for this routeId, remove the route to be deleted only from the set. + updatedRoutes = new LinkedHashSet<>(oldRoutes); + updatedRoutes.remove(route); + if (routedRpcMap.replace(routeId, oldRoutes, updatedRoutes)) { + return; + } + + } + //the method did not already return means it failed to remove route in 100 attempts + throw new IllegalStateException("Failed to remove route [" + routeId + "]"); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java new file mode 100644 index 0000000000..7cb505aa98 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -0,0 +1,202 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry; + +import akka.actor.ActorSelection; +import akka.actor.Address; +import akka.actor.Props; +import akka.cluster.ClusterEvent; +import akka.cluster.Member; +import akka.japi.Creator; +import org.opendaylight.controller.remote.rpc.AbstractUntypedActor; +import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.AddRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; +import org.opendaylight.controller.remote.rpc.messages.GetRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; +import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; +import org.opendaylight.controller.remote.rpc.messages.RoutingTableData; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConversions; + +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * This Actor maintains the routing table state and sync it with other nodes in the cluster. + * + * A scheduler runs after an interval of time, which pick a random member from the cluster + * and send the current state of routing table to the member. + * + * when a message of routing table data is received, it gets merged with the local routing table + * to keep the latest data. + */ + +public class RpcRegistry extends AbstractUntypedActor { + + private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class); + private RoutingTable, String> routingTable; + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private final ClusterWrapper clusterWrapper; + private final ScheduledFuture syncScheduler; + + private RpcRegistry(ClusterWrapper clusterWrapper){ + this.routingTable = new RoutingTable<>(); + this.clusterWrapper = clusterWrapper; + this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS); + } + + public static Props props(final ClusterWrapper clusterWrapper){ + return Props.create(new Creator(){ + + @Override + public RpcRegistry create() throws Exception { + return new RpcRegistry(clusterWrapper); + } + }); + } + + @Override + protected void handleReceive(Object message) throws Exception { + LOG.debug("Received message {}", message); + if(message instanceof RoutingTableData) { + syncRoutingTable((RoutingTableData) message); + } else if(message instanceof GetRoutedRpc) { + getRoutedRpc((GetRoutedRpc) message); + } else if(message instanceof GetRpc) { + getRpc((GetRpc) message); + } else if(message instanceof AddRpc) { + addRpc((AddRpc) message); + } else if(message instanceof RemoveRpc) { + removeRpc((RemoveRpc) message); + } else if(message instanceof AddRoutedRpc) { + addRoutedRpc((AddRoutedRpc) message); + } else if(message instanceof RemoveRoutedRpc) { + removeRoutedRpc((RemoveRoutedRpc) message); + } + } + + private void getRoutedRpc(GetRoutedRpc rpcMsg){ + LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg); + String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId()); + GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath); + + getSender().tell(routedRpcReply, self()); + } + + private void getRpc(GetRpc rpcMsg) { + LOG.debug("Get global Rpc location from routing table {}", rpcMsg); + String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId()); + GetRpcReply rpcReply = new GetRpcReply(remoteActorPath); + + getSender().tell(rpcReply, self()); + } + + private void addRpc(AddRpc rpcMsg) { + LOG.debug("Add Rpc to routing table {}", rpcMsg); + routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath()); + + getSender().tell("Success", self()); + } + + private void removeRpc(RemoveRpc rpcMsg) { + LOG.debug("Removing Rpc to routing table {}", rpcMsg); + routingTable.removeGlobalRoute(rpcMsg.getRouteId()); + + getSender().tell("Success", self()); + } + + private void addRoutedRpc(AddRoutedRpc rpcMsg) { + routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); + getSender().tell("Success", self()); + } + + private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) { + routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath()); + getSender().tell("Success", self()); + } + + private void syncRoutingTable(RoutingTableData routingTableData) { + LOG.debug("Syncing routing table {}", routingTableData); + + Map, String> newRpcMap = routingTableData.getRpcMap(); + Set> routeIds = newRpcMap.keySet(); + for(RpcRouter.RouteIdentifier routeId : routeIds) { + routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId)); + } + + Map, LinkedHashSet> newRoutedRpcMap = + routingTableData.getRoutedRpcMap(); + routeIds = newRoutedRpcMap.keySet(); + + for(RpcRouter.RouteIdentifier routeId : routeIds) { + Set routeAddresses = newRoutedRpcMap.get(routeId); + for(String routeAddress : routeAddresses) { + routingTable.addRoutedRpc(routeId, routeAddress); + } + } + } + + private ActorSelection getRandomRegistryActor() { + ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState(); + ActorSelection actor = null; + Set members = JavaConversions.asJavaSet(clusterState.members()); + int memberSize = members.size(); + // Don't select yourself + if(memberSize > 1) { + Address currentNodeAddress = clusterWrapper.getAddress(); + int index = new Random().nextInt(memberSize); + int i = 0; + // keeping previous member, in case when random index member is same as current actor + // and current actor member is last in set + Member previousMember = null; + for(Member member : members){ + if(i == index-1) { + previousMember = member; + } + if(i == index) { + if(!currentNodeAddress.equals(member.address())) { + actor = this.context().actorSelection(member.address() + "/user/rpc-registry"); + break; + } else if(index < memberSize-1){ // pick the next element in the set + index++; + } + } + i++; + } + if(actor == null && previousMember != null) { + actor = this.context().actorSelection(previousMember.address() + "/user/rpc-registry"); + } + } + return actor; + } + + private class SendRoutingTable implements Runnable { + + @Override + public void run() { + RoutingTableData routingTableData = + new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap()); + LOG.debug("Sending routing table for sync {}", routingTableData); + ActorSelection actor = getRandomRegistryActor(); + if(actor != null) { + actor.tell(routingTableData, self()); + } + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf b/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf new file mode 100644 index 0000000000..9585e9ffb6 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf @@ -0,0 +1,21 @@ +odl-cluster{ + akka { + actor { + provider = "akka.cluster.ClusterActorRefProvider" + + } + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "192.168.141.142" + port = 2551 + } + } + + cluster { + seed-nodes = ["akka.tcp://opendaylight-rpc@192.168.141.141:2551"] + + auto-down-unreachable-after = 10s + } + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/yang/remote-rpc-connector.yang b/opendaylight/md-sal/sal-remoterpc-connector/src/main/yang/remote-rpc-connector.yang new file mode 100644 index 0000000000..08db5c0043 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/yang/remote-rpc-connector.yang @@ -0,0 +1,40 @@ +module remote-rpc-connector { + yang-version 1; + namespace "urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector"; + prefix "remote-rpc-connector"; + + import config { prefix config; revision-date 2013-04-05; } + import opendaylight-md-sal-dom {prefix dom;} + + description + "This module contains the base YANG definitions for + the remote routed rpc"; + + revision "2014-07-07" { + description + "Initial revision"; + } + + // This is the definition of the service implementation as a module identity. + identity remote-rpc-connector { + base config:module-type; + // Specifies the prefix for generated java classes. + config:java-name-prefix RemoteRPCBroker; + } + + augment "/config:modules/config:module/config:configuration" { + case remote-rpc-connector { + when "/config:modules/config:module/config:type = 'remote-rpc-connector'"; + + container dom-broker { + uses config:service-ref { + refine type { + mandatory true; + config:required-identity dom:dom-broker-osgi-registry; + } + } + } + } + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java new file mode 100644 index 0000000000..595d8331eb --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java @@ -0,0 +1,213 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc; + + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import com.google.common.util.concurrent.Futures; +import junit.framework.Assert; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.AddRpc; +import org.opendaylight.controller.remote.rpc.messages.ErrorResponse; +import org.opendaylight.controller.remote.rpc.messages.InvokeRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.InvokeRpc; +import org.opendaylight.controller.remote.rpc.messages.RpcResponse; +import org.opendaylight.controller.remote.rpc.registry.ClusterWrapper; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.ModifyAction; +import org.opendaylight.yangtools.yang.data.api.Node; +import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Future; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RpcBrokerTest { + + static ActorSystem system; + + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void testInvokeRpcError() throws URISyntaxException { + new JavaTestKit(system) {{ + ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class))); + Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class); + SchemaContext schemaContext = mock(SchemaContext.class); + ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext)); + QName rpc = new QName(new URI("actor1"), "actor1"); + InvokeRpc invokeMsg = new InvokeRpc(rpc, null); + rpcBroker.tell(invokeMsg, getRef()); + + Boolean getMsg = new ExpectMsg("ErrorResponse") { + protected Boolean match(Object in) { + if (in instanceof ErrorResponse) { + ErrorResponse reply = (ErrorResponse)in; + return "No remote actor found for rpc execution.".equals(reply.getException().getMessage()); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertTrue(getMsg); + }}; + } + + /** + * This test method invokes and executes the remote rpc + */ + + @Test + public void testInvokeRpc() throws URISyntaxException { + new JavaTestKit(system) {{ + ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class))); + Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class); + SchemaContext schemaContext = mock(SchemaContext.class); + ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext)); + ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor1"); + // Add RPC in table + QName rpc = new QName(new URI("actor1"), "actor1"); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null); + final String route = rpcBrokerRemote.path().toString(); + AddRpc rpcMsg = new AddRpc(routeId, route); + rpcRegistry.tell(rpcMsg, getRef()); + expectMsgEquals(duration("2 second"), "Success"); + + // invoke rpc + CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList>(), ModifyAction.REPLACE); + CompositeNode invokeRpcResult = mock(CompositeNode.class); + Collection errors = new ArrayList<>(); + RpcResult result = Rpcs.getRpcResult(true, invokeRpcResult, errors); + Future> rpcResult = Futures.immediateFuture(result); + when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult); + InvokeRpc invokeMsg = new InvokeRpc(rpc, input); + rpcBroker.tell(invokeMsg, getRef()); + + //verify response msg + Boolean getMsg = new ExpectMsg("RpcResponse") { + protected Boolean match(Object in) { + if (in instanceof RpcResponse) { + return true; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertTrue(getMsg); + }}; + } + + @Test + public void testInvokeRoutedRpcError() throws URISyntaxException { + new JavaTestKit(system) {{ + ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class))); + Broker.ProviderSession brokerSession = Mockito.mock(Broker.ProviderSession.class); + SchemaContext schemaContext = mock(SchemaContext.class); + ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext)); + QName rpc = new QName(new URI("actor1"), "actor1"); + InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, null); + rpcBroker.tell(invokeMsg, getRef()); + + Boolean getMsg = new ExpectMsg("ErrorResponse") { + protected Boolean match(Object in) { + if (in instanceof ErrorResponse) { + ErrorResponse reply = (ErrorResponse)in; + return "No remote actor found for rpc execution.".equals(reply.getException().getMessage()); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertTrue(getMsg); + }}; + } + + /** + * This test method invokes and executes the remote routed rpc + */ + + @Test + public void testInvokeRoutedRpc() throws URISyntaxException { + new JavaTestKit(system) {{ + ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(mock(ClusterWrapper.class))); + Broker.ProviderSession brokerSession = mock(Broker.ProviderSession.class); + SchemaContext schemaContext = mock(SchemaContext.class); + ActorRef rpcBroker = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext)); + ActorRef rpcBrokerRemote = system.actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), "actor2"); + // Add Routed RPC in table + QName rpc = new QName(new URI("actor2"), "actor2"); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, rpc, null); + final String route = rpcBrokerRemote.path().toString(); + Set> routeIds = new HashSet<>(); + routeIds.add(routeId); + + AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route); + rpcRegistry.tell(rpcMsg, getRef()); + expectMsgEquals(duration("2 second"), "Success"); + + // invoke rpc + CompositeNode input = new ImmutableCompositeNode(QName.create("ns", "2013-12-09", "child1"), new ArrayList>(), ModifyAction.REPLACE); + CompositeNode invokeRpcResult = mock(CompositeNode.class); + Collection errors = new ArrayList<>(); + RpcResult result = Rpcs.getRpcResult(true, invokeRpcResult, errors); + Future> rpcResult = Futures.immediateFuture(result); + when(brokerSession.rpc(rpc, input)).thenReturn(rpcResult); + InvokeRoutedRpc invokeMsg = new InvokeRoutedRpc(rpc, input); + rpcBroker.tell(invokeMsg, getRef()); + + //verify response msg + Boolean getMsg = new ExpectMsg("RpcResponse") { + protected Boolean match(Object in) { + if (in instanceof RpcResponse) { + return true; + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertTrue(getMsg); + }}; + } + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableTest.java new file mode 100644 index 0000000000..a57402a793 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableTest.java @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.registry; + +import junit.framework.Assert; +import org.junit.Test; +import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.yangtools.yang.common.QName; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashSet; +import java.util.Set; + +public class RoutingTableTest { + + private RoutingTable, String> routingTable = + new RoutingTable<>(); + + @Test + public void addGlobalRouteNullRouteIdTest() { + try { + routingTable.addGlobalRoute(null, null); + + Assert.fail("Null pointer exception was not thrown."); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName()); + Assert.assertEquals("addGlobalRoute: routeId cannot be null!", e.getMessage()); + } + } + + @Test + public void addGlobalRouteNullRouteTest() { + try { + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, null, null); + routingTable.addGlobalRoute(routeId, null); + + Assert.fail("Null pointer exception was not thrown."); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName()); + Assert.assertEquals("addGlobalRoute: route cannot be null!", e.getMessage()); + } + } + + @Test + public void getGlobalRouteNullTest() { + try { + routingTable.getGlobalRoute(null); + + Assert.fail("Null pointer exception was not thrown."); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName()); + Assert.assertEquals("getGlobalRoute: routeId cannot be null!", e.getMessage()); + } + } + + @Test + public void getGlobalRouteTest() throws URISyntaxException { + QName type = new QName(new URI("actor1"), "actor1"); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); + String route = "actor1"; + + routingTable.addGlobalRoute(routeId, route); + + String returnedRoute = routingTable.getGlobalRoute(routeId); + + Assert.assertEquals(route, returnedRoute); + + } + + @Test + public void removeGlobalRouteTest() throws URISyntaxException { + QName type = new QName(new URI("actorRemove"), "actorRemove"); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); + String route = "actorRemove"; + + routingTable.addGlobalRoute(routeId, route); + + String returnedRoute = routingTable.getGlobalRoute(routeId); + + Assert.assertEquals(route, returnedRoute); + + routingTable.removeGlobalRoute(routeId); + + String deletedRoute = routingTable.getGlobalRoute(routeId); + + Assert.assertNull(deletedRoute); + } + + @Test + public void addRoutedRpcNullRouteIdTest() { + try { + routingTable.addRoutedRpc(null, null); + + Assert.fail("Null pointer exception was not thrown."); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName()); + Assert.assertEquals("addRoute: routeId cannot be null", e.getMessage()); + } + } + + @Test + public void addRoutedRpcNullRouteTest() { + try { + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, null, null); + + routingTable.addRoutedRpc(routeId, null); + + Assert.fail("Null pointer exception was not thrown."); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName()); + Assert.assertEquals("addRoute: route cannot be null", e.getMessage()); + } + } + + @Test + public void getRoutedRpcNullTest() { + try { + routingTable.getRoutedRpc(null); + + Assert.fail("Null pointer exception was not thrown."); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class.getName(), e.getClass().getName()); + Assert.assertEquals("getRoutes: routeId cannot be null!", e.getMessage()); + } + } + + @Test + public void getRoutedRpcTest() throws URISyntaxException { + QName type = new QName(new URI("actor1"), "actor1"); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); + String route = "actor1"; + + routingTable.addRoutedRpc(routeId, route); + + Set routes = routingTable.getRoutedRpc(routeId); + + Assert.assertEquals(1, routes.size()); + Assert.assertTrue(routes.contains(route)); + + } + + @Test + public void getLastRoutedRpcTest() throws URISyntaxException { + QName type = new QName(new URI("first1"), "first1"); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); + String route = "first1"; + + routingTable.addRoutedRpc(routeId, route); + + String route2 = "second1"; + routingTable.addRoutedRpc(routeId, route2); + + String latest = routingTable.getLastAddedRoutedRpc(routeId); + Assert.assertEquals(route2, latest); + + } + + @Test + public void removeRoutedRpcTest() throws URISyntaxException { + QName type = new QName(new URI("remove"), "remove"); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); + String route = "remove"; + routingTable.addRoutedRpc(routeId, route); + + String latest = routingTable.getLastAddedRoutedRpc(routeId); + Assert.assertEquals(route, latest); + + routingTable.removeRoute(routeId, route); + String removed = routingTable.getLastAddedRoutedRpc(routeId); + Assert.assertNull(removed); + } + + @Test + public void removeRoutedRpcsTest() throws URISyntaxException { + QName type = new QName(new URI("remove1"), "remove1"); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); + + QName type2 = new QName(new URI("remove2"), "remove2"); + RouteIdentifierImpl routeId2 = new RouteIdentifierImpl(null, type2, null); + + Set> routeIds = new HashSet<>(); + routeIds.add(routeId); + routeIds.add(routeId2); + String route = "remove1"; + + routingTable.addRoutedRpcs(routeIds, route); + String latest1 = routingTable.getLastAddedRoutedRpc(routeId); + Assert.assertEquals(route, latest1); + + String latest2 = routingTable.getLastAddedRoutedRpc(routeId2); + Assert.assertEquals(route, latest2); + + routingTable.removeRoutes(routeIds, route); + String removed1 = routingTable.getLastAddedRoutedRpc(routeId); + Assert.assertNull(removed1); + + String removed2 = routingTable.getLastAddedRoutedRpc(routeId2); + Assert.assertNull(removed2); + } + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java new file mode 100644 index 0000000000..d011d331a6 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.remote.rpc.registry; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import junit.framework.Assert; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl; +import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.AddRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply; +import org.opendaylight.controller.remote.rpc.messages.GetRpc; +import org.opendaylight.controller.remote.rpc.messages.GetRpcReply; +import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc; +import org.opendaylight.controller.remote.rpc.messages.RemoveRpc; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.yangtools.yang.common.QName; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashSet; +import java.util.Set; + +public class RpcRegistryTest { + + static ActorSystem system; + + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + /** + This test add, read and remove an entry in global rpc + */ + @Test + public void testGlobalRpc() throws URISyntaxException { + new JavaTestKit(system) {{ + ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class))); + QName type = new QName(new URI("actor1"), "actor1"); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); + final String route = "actor1"; + + AddRpc rpcMsg = new AddRpc(routeId, route); + rpcRegistry.tell(rpcMsg, getRef()); + expectMsgEquals(duration("2 second"), "Success"); + + GetRpc getRpc = new GetRpc(routeId); + rpcRegistry.tell(getRpc, getRef()); + + Boolean getMsg = new ExpectMsg("GetRpcReply") { + protected Boolean match(Object in) { + if (in instanceof GetRpcReply) { + GetRpcReply reply = (GetRpcReply)in; + return route.equals(reply.getRoutePath()); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertTrue(getMsg); + + RemoveRpc removeMsg = new RemoveRpc(routeId); + rpcRegistry.tell(removeMsg, getRef()); + expectMsgEquals(duration("2 second"), "Success"); + + rpcRegistry.tell(getRpc, getRef()); + + Boolean getNullMsg = new ExpectMsg("GetRpcReply") { + protected Boolean match(Object in) { + if (in instanceof GetRpcReply) { + GetRpcReply reply = (GetRpcReply)in; + return reply.getRoutePath() == null; + } else { + throw noMatch(); + } + } + }.get(); + Assert.assertTrue(getNullMsg); + }}; + + } + + /** + This test add, read and remove an entry in routed rpc + */ + @Test + public void testRoutedRpc() throws URISyntaxException { + new JavaTestKit(system) {{ + ActorRef rpcRegistry = system.actorOf(RpcRegistry.props(Mockito.mock(ClusterWrapper.class))); + QName type = new QName(new URI("actor1"), "actor1"); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(null, type, null); + final String route = "actor1"; + + Set> routeIds = new HashSet<>(); + routeIds.add(routeId); + + AddRoutedRpc rpcMsg = new AddRoutedRpc(routeIds, route); + rpcRegistry.tell(rpcMsg, getRef()); + expectMsgEquals(duration("2 second"), "Success"); + + GetRoutedRpc getRpc = new GetRoutedRpc(routeId); + rpcRegistry.tell(getRpc, getRef()); + + Boolean getMsg = new ExpectMsg("GetRoutedRpcReply") { + protected Boolean match(Object in) { + if (in instanceof GetRoutedRpcReply) { + GetRoutedRpcReply reply = (GetRoutedRpcReply)in; + return route.equals(reply.getRoutePath()); + } else { + throw noMatch(); + } + } + }.get(); // this extracts the received message + + Assert.assertTrue(getMsg); + + RemoveRoutedRpc removeMsg = new RemoveRoutedRpc(routeIds, route); + rpcRegistry.tell(removeMsg, getRef()); + expectMsgEquals(duration("2 second"), "Success"); + + rpcRegistry.tell(getRpc, getRef()); + + Boolean getNullMsg = new ExpectMsg("GetRoutedRpcReply") { + protected Boolean match(Object in) { + if (in instanceof GetRoutedRpcReply) { + GetRoutedRpcReply reply = (GetRoutedRpcReply)in; + return reply.getRoutePath() == null; + } else { + throw noMatch(); + } + } + }.get(); + Assert.assertTrue(getNullMsg); + }}; + + } + +} diff --git a/opendaylight/md-sal/sal-rest-connector-config/pom.xml b/opendaylight/md-sal/sal-rest-connector-config/pom.xml new file mode 100644 index 0000000000..6d050cf425 --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector-config/pom.xml @@ -0,0 +1,20 @@ + + + + + 4.0.0 + + org.opendaylight.controller + sal-parent + 1.1-SNAPSHOT + + sal-rest-connector-config + Configuration files for sal-rest-connector + jar + diff --git a/opendaylight/md-sal/sal-rest-connector-config/src/main/resources/initial/10-rest-connector.xml b/opendaylight/md-sal/sal-rest-connector-config/src/main/resources/initial/10-rest-connector.xml new file mode 100644 index 0000000000..2fdc8c7d1e --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector-config/src/main/resources/initial/10-rest-connector.xml @@ -0,0 +1,38 @@ + + + + + + + + + rest:rest-connector-impl + rest-connector-default-impl + 8181 + + dom:dom-broker-osgi-registry + dom-broker + + + + + + + rest:rest-connector + + rest-connector-default + + /modules/module[type='rest-connector-impl'][name='rest-connector-default-impl'] + + + + + + + diff --git a/opendaylight/md-sal/sal-rest-connector/pom.xml b/opendaylight/md-sal/sal-rest-connector/pom.xml index b760263967..09fb5b3677 100644 --- a/opendaylight/md-sal/sal-rest-connector/pom.xml +++ b/opendaylight/md-sal/sal-rest-connector/pom.xml @@ -18,6 +18,14 @@ ${project.groupId} sal-core-api + + ${project.groupId} + sal-binding-config + + + ${project.groupId} + config-api + com.google.code.gson gson @@ -58,6 +66,10 @@ org.opendaylight.yangtools.model ietf-yang-types-20130715 + + org.opendaylight.yangtools.model + ietf-inet-types + org.slf4j slf4j-api @@ -89,6 +101,10 @@ mockito-all test + + org.opendaylight.controller + sal-core-spi + @@ -102,14 +118,43 @@ MD SAL Restconf Connector org.opendaylight.controller.sal.rest.*, org.opendaylight.controller.sal.restconf.rpc.*, - org.opendaylight.controller.sal.restconf.impl, + org.opendaylight.controller.sal.restconf.impl, + org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.rest.connector.rev140724.*, + *, com.sun.jersey.spi.container.servlet - org.opendaylight.controller.sal.rest.impl.RestconfProvider /restconf + + org.opendaylight.yangtools + yang-maven-plugin + + + config + + generate-sources + + + + + org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator + ${jmxGeneratorPath} + + urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang + + + + org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl + ${salGeneratorPath} + + + true + + + + diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/rest/connector/RestConnectorModule.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/rest/connector/RestConnectorModule.java new file mode 100644 index 0000000000..582c657868 --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/rest/connector/RestConnectorModule.java @@ -0,0 +1,30 @@ +package org.opendaylight.controller.config.yang.md.sal.rest.connector; + +import org.opendaylight.controller.sal.rest.impl.RestconfProviderImpl; + +public class RestConnectorModule extends org.opendaylight.controller.config.yang.md.sal.rest.connector.AbstractRestConnectorModule { + + public RestConnectorModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { + super(identifier, dependencyResolver); + } + + public RestConnectorModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.md.sal.rest.connector.RestConnectorModule oldModule, java.lang.AutoCloseable oldInstance) { + super(identifier, dependencyResolver, oldModule, oldInstance); + } + + @Override + public void customValidation() { + // add custom validation form module attributes here. + } + + @Override + public java.lang.AutoCloseable createInstance() { + // Create an instance of our provider + RestconfProviderImpl instance = new RestconfProviderImpl(); + // Set its port + instance.setWebsocketPort(getWebsocketPort()); + // Register it with the Broker + getDomBrokerDependency().registerProvider(instance); + return instance; + } +} diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/rest/connector/RestConnectorModuleFactory.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/rest/connector/RestConnectorModuleFactory.java new file mode 100644 index 0000000000..957b08f6ae --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/rest/connector/RestConnectorModuleFactory.java @@ -0,0 +1,15 @@ +/* +* Generated file +* +* Generated from: yang module name: opendaylight-rest-connector yang module local name: rest-connector-impl +* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator +* Generated at: Fri Jul 25 04:33:31 CDT 2014 +* +* Do not modify this file unless it is present under src/main directory +*/ +package org.opendaylight.controller.config.yang.md.sal.rest.connector; + + +public class RestConnectorModuleFactory extends org.opendaylight.controller.config.yang.md.sal.rest.connector.AbstractRestConnectorModuleFactory { + +} diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/api/RestConnector.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/api/RestConnector.java new file mode 100644 index 0000000000..fe45a0202d --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/api/RestConnector.java @@ -0,0 +1,9 @@ +package org.opendaylight.controller.sal.rest.api; + +/* + * This is a simple dummy interface to allow us to create instances of RestconfProvider + * via the config subsystem. + */ +public interface RestConnector { + +} diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProvider.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProviderImpl.java similarity index 54% rename from opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProvider.java rename to opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProviderImpl.java index 1f1d0eb831..adb176a65d 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProvider.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProviderImpl.java @@ -9,30 +9,30 @@ package org.opendaylight.controller.sal.rest.impl; import java.util.Collection; import java.util.Collections; -import org.opendaylight.controller.sal.core.api.Broker; + import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.opendaylight.controller.sal.core.api.Provider; import org.opendaylight.controller.sal.core.api.data.DataBrokerService; import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.opendaylight.controller.sal.core.api.mount.MountService; +import org.opendaylight.controller.sal.rest.api.RestConnector; import org.opendaylight.controller.sal.restconf.impl.BrokerFacade; import org.opendaylight.controller.sal.restconf.impl.ControllerContext; import org.opendaylight.controller.sal.streams.websockets.WebSocketServer; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.PortNumber; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.model.api.SchemaServiceListener; -import org.osgi.framework.BundleActivator; -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; -import org.osgi.util.tracker.ServiceTracker; -import org.osgi.util.tracker.ServiceTrackerCustomizer; -public class RestconfProvider implements BundleActivator, Provider, ServiceTrackerCustomizer { +public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnector { public final static String NOT_INITALIZED_MSG = "Restconf is not initialized yet. Please try again later"; private ListenerRegistration listenerRegistration; - private ServiceTracker brokerServiceTrancker; - private BundleContext bundleContext; + private PortNumber port; + public void setWebsocketPort(PortNumber port) { + this.port = port; + } + private Thread webSocketServerThread; @Override @@ -46,32 +46,10 @@ public class RestconfProvider implements BundleActivator, Provider, ServiceTrack listenerRegistration = schemaService.registerSchemaServiceListener(ControllerContext.getInstance()); ControllerContext.getInstance().setSchemas(schemaService.getGlobalContext()); ControllerContext.getInstance().setMountService(session.getService(MountService.class)); - } - @Override - public void start(BundleContext context) throws Exception { - String websocketPortStr = context.getProperty(WebSocketServer.WEBSOCKET_SERVER_CONFIG_PROPERTY); - int websocketPort = (websocketPortStr != null && !"".equals(websocketPortStr)) ? Integer - .parseInt(websocketPortStr) : WebSocketServer.DEFAULT_PORT; - bundleContext = context; - webSocketServerThread = new Thread(WebSocketServer.createInstance(websocketPort)); - webSocketServerThread.setName("Web socket server"); + webSocketServerThread = new Thread(WebSocketServer.createInstance(port.getValue().intValue())); + webSocketServerThread.setName("Web socket server on port " + port); webSocketServerThread.start(); - brokerServiceTrancker = new ServiceTracker<>(context, Broker.class, this); - brokerServiceTrancker.open(); - } - - @Override - public void stop(BundleContext context) { - if (listenerRegistration != null) { - try { - listenerRegistration.close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - webSocketServerThread.interrupt(); - brokerServiceTrancker.close(); } @Override @@ -80,22 +58,10 @@ public class RestconfProvider implements BundleActivator, Provider, ServiceTrack } @Override - public Broker addingService(ServiceReference reference) { - Broker broker = bundleContext.getService(reference); - broker.registerProvider(this, bundleContext); - return broker; - } - - @Override - public void modifiedService(ServiceReference reference, Broker service) { - // NOOP - } - - @Override - public void removedService(ServiceReference reference, Broker service) { - bundleContext.ungetService(reference); - BrokerFacade.getInstance().setContext(null); - BrokerFacade.getInstance().setDataService(null); - ControllerContext.getInstance().setSchemas(null); + public void close() { + if (listenerRegistration != null) { + listenerRegistration.close(); + } + webSocketServerThread.interrupt(); } } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.java index 0a67c84d8b..8f9913d30d 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.java @@ -9,13 +9,11 @@ package org.opendaylight.controller.sal.restconf.impl; import com.google.common.base.Function; import com.google.common.base.Objects; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.collect.BiMap; -import com.google.common.collect.FluentIterable; import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -28,7 +26,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -824,34 +821,8 @@ public class ControllerContext implements SchemaContextListener { private QName toQName(final String name) { final String module = toModuleName(name); final String node = toNodeName(name); - Set modules = globalSchema.getModules(); - - final Comparator comparator = new Comparator() { - @Override - public int compare(final Module o1, final Module o2) { - return o1.getRevision().compareTo(o2.getRevision()); - } - }; - - List sorted = new ArrayList(modules); - Collections. sort(new ArrayList(modules), comparator); - - final Function transform = new Function() { - @Override - public QName apply(final Module m) { - return QName.create(m.getNamespace(), m.getRevision(), m.getName()); - } - }; - - final Predicate findFirst = new Predicate() { - @Override - public boolean apply(final QName qn) { - return Objects.equal(module, qn.getLocalName()); - } - }; - - Optional namespace = FluentIterable.from(sorted).transform(transform).firstMatch(findFirst); - return namespace.isPresent() ? QName.create(namespace.get(), node) : null; + final Module m = globalSchema.findModuleByName(module, null); + return m == null ? null : QName.create(m.getQNameModule(), node); } private static boolean isListOrContainer(final DataSchemaNode node) { diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang b/opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang new file mode 100644 index 0000000000..a8fc8ff4d5 --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/main/yang/opendaylight-rest-connector.yang @@ -0,0 +1,47 @@ +module opendaylight-rest-connector { + yang-version 1; + namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:rest:connector"; + prefix "md-sal-rest-connector"; + + import config { prefix config; revision-date 2013-04-05; } + import opendaylight-md-sal-dom {prefix dom; revision-date 2013-10-28;} + import opendaylight-md-sal-binding {prefix sal; revision-date 2013-10-28;} + import ietf-inet-types {prefix inet; revision-date 2010-09-24;} + + description + "Service definition for Rest Connector"; + + revision "2014-07-24" { + description + "Initial revision"; + } + + identity rest-connector { + base "config:service-type"; + config:java-class "org.opendaylight.controller.sal.rest.api.RestConnector"; + } + + identity rest-connector-impl { + base config:module-type; + config:provided-service rest-connector; + config:java-name-prefix RestConnector; + } + + augment "/config:modules/config:module/config:configuration" { + case rest-connector-impl { + when "/config:modules/config:module/config:type = 'rest-connector-impl'"; + leaf websocket-port { + mandatory true; + type inet:port-number; + } + container dom-broker { + uses config:service-ref { + refine type { + mandatory true; + config:required-identity dom:dom-broker-osgi-registry; + } + } + } + } + } +} \ No newline at end of file