From: Tony Tkacik Date: Thu, 5 Dec 2013 20:51:46 +0000 (+0100) Subject: Implementation for enabling remote rpc calls between 2 instances of md-sal X-Git-Tag: jenkins-controller-bulk-release-prepare-only-2-1~226^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=97a4294baa44125d53a7ee2a3646f8a1c8da73e6 Implementation for enabling remote rpc calls between 2 instances of md-sal - This provides implementation for enabling remote rpc calls between 2 instances of md-sal. The current implementation enables remote execution of globally unique services in the cluster. For details, please refer to this wiki page (https://wiki.opendaylight.org/view/Zeromq_connector). This wiki page is a draft. - Added relativePath in pom so that parent pom can be found. - Removed dependency to sal-infinispan-routingtable - Exported "impl" as well from zeromq-routingtable. Fixed dependencies in RouterTest. - Removed oss.sonatype release repo from md-sal pom. ODL nexus repo mirrors it. - Updated server code to handle exception - Server code now uses WB pattern instead of listerner pattern. - Fixed pom so that parent can be resolved - Rebased due to changed in unmerged dependency - Added state machine to RpcSocket. - Added unit tests to RpcSocketTest and SocketManagerTest. - Added CompositeNode methods to ExampleConsumer & XML files for creation of CompositeNodes - Added CompositeNode testcases to RouterTest - Translated scala code to java - Added code to convert CompositeNode to xml and back to help - with serialization. - Added more unit and integration tests. This is squash for: https://git.opendaylight.org/gerrit/2882 https://git.opendaylight.org/gerrit/3022 https://git.opendaylight.org/gerrit/3028 https://git.opendaylight.org/gerrit/3159 Change-Id: I44739fd8ad61043c2e786875bb7787e3fa68e435 Signed-off-by: Abhishek Kumar Signed-off-by: Tony Tkacik Signed-off-by: Alex Fan --- diff --git a/opendaylight/distribution/opendaylight/pom.xml b/opendaylight/distribution/opendaylight/pom.xml index adc0c0973f..4c0b81f7d7 100644 --- a/opendaylight/distribution/opendaylight/pom.xml +++ b/opendaylight/distribution/opendaylight/pom.xml @@ -441,6 +441,23 @@ org.opendaylight.controller.thirdparty ganymed + + org.opendaylight.controller + sal-remoterpc-connector + 1.0-SNAPSHOT + + + org.opendaylight.controller + + zeromq-routingtable.implementation + + 0.4.1-SNAPSHOT + + + org.zeromq + jeromq + 0.3.1 + diff --git a/opendaylight/md-sal/pom.xml b/opendaylight/md-sal/pom.xml index 0e78598c11..b34621d02d 100644 --- a/opendaylight/md-sal/pom.xml +++ b/opendaylight/md-sal/pom.xml @@ -12,7 +12,7 @@ - + sal-common sal-common-api sal-common-impl @@ -42,45 +42,47 @@ sal-connector-api sal-rest-connector sal-netconf-connector - + + zeromq-routingtable/implementation + sal-remoterpc-connector/implementation clustered-data-store/implementation inventory-manager statistics-manager forwardingrules-manager - + compatibility - zeromq-routingtable/implementation - sal-zeromq-connector - integrationtests - - false - + integrationtests + + false + sal-binding-it - zeromq-routingtable/integrationtest clustered-data-store/integrationtest - test + + + - IDE - - - m2e.version - - - - - target-ide - + IDE + + + m2e.version + + + + + target-ide + @@ -103,9 +105,13 @@ 14.0.1 5.0.0 4.8.1 + 1.5.1 + 1.9.5 2.4.3 2.5 0.5.3.201107060350 + 0.5.1-SNAPSHOT + jacoco reuseReports @@ -119,28 +125,28 @@ - opendaylight-mirror - opendaylight-mirror - ${nexusproxy}/groups/public/ - - false - - - true - never - + opendaylight-mirror + opendaylight-mirror + ${nexusproxy}/groups/public/ + + false + + + true + never + - opendaylight-snapshot - opendaylight-snapshot - ${nexusproxy}/repositories/opendaylight.snapshot/ - - true - - - false - + opendaylight-snapshot + opendaylight-snapshot + ${nexusproxy}/repositories/opendaylight.snapshot/ + + true + + + false + @@ -148,28 +154,28 @@ - opendaylight-mirror - opendaylight-mirror - ${nexusproxy}/groups/public/ - - false - - - true - never - + opendaylight-mirror + opendaylight-mirror + ${nexusproxy}/groups/public/ + + false + + + true + never + - opendaylight-snapshot - opendaylight-snapshot - ${nexusproxy}/repositories/opendaylight.snapshot/ - - true - - - false - + opendaylight-snapshot + opendaylight-snapshot + ${nexusproxy}/repositories/opendaylight.snapshot/ + + true + + + false + @@ -216,6 +222,11 @@ yang-data-api ${yang.version} + + org.opendaylight.yangtools + yang-data-impl + ${yang.version} + org.opendaylight.yangtools yang-model-api @@ -232,6 +243,17 @@ sal-connector-api ${project.version} + + org.opendaylight.controller + sal + ${sal.version} + + + org.osgi + org.osgi.compendium + + + @@ -249,7 +271,11 @@ org.eclipse.xtend.lib ${xtend.version} - + + org.osgi + org.osgi.core + ${osgi.core.version} + junit @@ -260,7 +286,25 @@ org.mockito mockito-all - 1.9.5 + ${mockito.version} + test + + + org.powermock + powermock-module-junit4 + ${powermock.version} + test + + + org.powermock + powermock-api-mockito + ${powermock.version} + test + + + org.powermock + powermock-core + ${powermock.version} test @@ -278,15 +322,9 @@ maven-bundle-plugin ${bundle.plugin.version} true - + ${project.groupId}.${project.artifactId} @@ -328,7 +366,8 @@ jacoco-maven-plugin ${jacoco.version} - + org.eclipse.m2e lifecycle-mapping @@ -346,7 +385,7 @@ - + @@ -360,7 +399,7 @@ - + @@ -373,7 +412,7 @@ - + diff --git a/opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/sal/common/util/Rpcs.java b/opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/sal/common/util/Rpcs.java index e46b566522..54e1a065f4 100644 --- a/opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/sal/common/util/Rpcs.java +++ b/opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/sal/common/util/Rpcs.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.sal.common.util; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -20,7 +21,7 @@ public class Rpcs { return ret; } - private static class RpcResultTO implements RpcResult { + private static class RpcResultTO implements RpcResult, Serializable { private final Collection errors; private final T result; diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/AbstractConsumer.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/AbstractConsumer.java index 1fb73bc9a9..99a38ca43a 100644 --- a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/AbstractConsumer.java +++ b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/AbstractConsumer.java @@ -13,17 +13,24 @@ import java.util.Collections; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; -public abstract class AbstractConsumer implements Consumer, BundleActivator { +public abstract class AbstractConsumer implements Consumer, BundleActivator,ServiceTrackerCustomizer { + + + + + private BundleContext context; + private ServiceTracker tracker; + private Broker broker; - Broker broker; - ServiceReference brokerRef; @Override public final void start(BundleContext context) throws Exception { + this.context = context; this.startImpl(context); - brokerRef = context.getServiceReference(Broker.class); - broker = context.getService(brokerRef); - broker.registerConsumer(this,context); + tracker = new ServiceTracker<>(context, Broker.class, this); + tracker.open(); } @@ -32,9 +39,7 @@ public abstract class AbstractConsumer implements Consumer, BundleActivator { public final void stop(BundleContext context) throws Exception { stopImpl(context); broker = null; - if(brokerRef != null) { - context.ungetService(brokerRef); - } + tracker.close(); } protected void startImpl(BundleContext context) { @@ -49,4 +54,25 @@ public abstract class AbstractConsumer implements Consumer, BundleActivator { return Collections.emptySet(); } + + @Override + public Broker addingService(ServiceReference reference) { + if(broker == null) { + broker = context.getService(reference); + broker.registerConsumer(this, context); + return broker; + } + + return null; + } + + @Override + public void modifiedService(ServiceReference reference, Broker service) { + // NOOP + } + + @Override + public void removedService(ServiceReference reference, Broker service) { + stopImpl(context); + } } diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/AbstractProvider.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/AbstractProvider.java index 621ef92132..1cb1a2bc85 100644 --- a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/AbstractProvider.java +++ b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/AbstractProvider.java @@ -10,16 +10,20 @@ package org.opendaylight.controller.sal.core.api; import java.util.Collection; import java.util.Collections; +import javax.naming.Context; + import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; -public abstract class AbstractProvider implements BundleActivator, Provider { +public abstract class AbstractProvider implements BundleActivator, Provider,ServiceTrackerCustomizer { - private ServiceReference brokerRef; private Broker broker; - + private BundleContext context; + private ServiceTracker tracker; @Override public Collection getProviderFunctionality() { return Collections.emptySet(); @@ -27,12 +31,10 @@ public abstract class AbstractProvider implements BundleActivator, Provider { @Override public final void start(BundleContext context) throws Exception { - brokerRef = context.getServiceReference(Broker.class); - broker = context.getService(brokerRef); - + this.context = context; this.startImpl(context); - - broker.registerProvider(this,context); + tracker = new ServiceTracker<>(context, Broker.class, this); + tracker.open(); } protected void startImpl(BundleContext context) { @@ -44,7 +46,31 @@ public abstract class AbstractProvider implements BundleActivator, Provider { @Override public final void stop(BundleContext context) throws Exception { + broker = null; + tracker.close(); + tracker = null; stopImpl(context); } + @Override + public Broker addingService(ServiceReference reference) { + if(broker == null) { + broker = context.getService(reference); + broker.registerProvider(this, context); + return broker; + } + + return null; + } + + @Override + public void modifiedService(ServiceReference reference, Broker service) { + // NOOP + } + + @Override + public void removedService(ServiceReference reference, Broker service) { + stopImpl(context); + } + } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerConfigActivator.xtend b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerConfigActivator.xtend index 482cfa959f..dc116ca979 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerConfigActivator.xtend +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerConfigActivator.xtend @@ -14,6 +14,7 @@ import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier import org.opendaylight.controller.sal.core.api.data.DataStore import org.opendaylight.controller.sal.dom.broker.impl.SchemaAwareDataStoreAdapter import org.opendaylight.controller.sal.core.api.model.SchemaServiceListener +import org.opendaylight.controller.sal.dom.broker.impl.RpcRouterImpl class BrokerConfigActivator implements AutoCloseable { @@ -37,7 +38,7 @@ class BrokerConfigActivator implements AutoCloseable { val emptyProperties = new Hashtable(); broker.setBundleContext(context); - + broker.setRouter(new RpcRouterImpl("Rpc router")) schemaService = new SchemaServiceImpl(); schemaService.setContext(context); schemaService.setParser(new YangParserImpl()); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/implementation/pom.xml new file mode 100644 index 0000000000..b8e0938a5f --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/pom.xml @@ -0,0 +1,199 @@ + + + 4.0.0 + + org.opendaylight.controller + sal-parent + ../.. + 1.0-SNAPSHOT + + + sal-remoterpc-connector + bundle + + + 0.3.1 + 1.9.8 + 1.0.1 + + + + + + ${project.groupId} + sal-core-api + ${project.version} + + + ${project.groupId} + sal-connector-api + ${project.version} + + + ${project.groupId} + sal-common-util + ${project.version} + + + org.opendaylight.controller + zeromq-routingtable.implementation + + 0.4.1-SNAPSHOT + + + + + org.opendaylight.controller + sal + + + + + org.opendaylight.yangtools + yang-common + + + org.opendaylight.yangtools + yang-data-api + + + org.opendaylight.yangtools + yang-data-impl + + + org.opendaylight.yangtools + yang-common + + + + + org.osgi + org.osgi.core + + + org.zeromq + jeromq + ${zeromq.version} + + + com.google.guava + guava + + + org.slf4j + slf4j-api + + + org.codehaus.jackson + jackson-core-asl + ${jackson.version} + + + org.codehaus.jackson + jackson-mapper-asl + ${jackson.version} + + + stax + stax-api + ${stax.version} + + + + + junit + junit + + + org.mockito + mockito-all + + + org.powermock + powermock-module-junit4 + + + org.powermock + powermock-api-mockito + + + org.powermock + powermock-core + + + + + + + + + org.apache.felix + maven-bundle-plugin + ${bundle.plugin.version} + true + + + + *, + !org.codehaus.enunciate.jaxrs + + + org.opendaylight.controller.config.yang.md.sal.remote.rpc, + org.opendaylight.controller.sal.connector.remoterpc, + org.opendaylight.controller.sal.connector.remoterpc.* + + ${project.groupId}.${project.artifactId} + + + + + + + org.opendaylight.yangtools + yang-maven-plugin + 0.5.9-SNAPSHOT + + + + generate-sources + + + + + + org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator + + ${project.build.directory}/generated-sources/config + + + urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang + + + + + org.opendaylight.yangtools.yang.unified.doc.generator.maven.DocumentationGeneratorImpl + target/site/models + + + true + + + + + + org.opendaylight.controller + yang-jmx-generator-plugin + 0.2.3-SNAPSHOT + + + org.opendaylight.yangtools + maven-sal-api-gen-plugin + 0.6.0-SNAPSHOT + jar + + + + + + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModule.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModule.java new file mode 100644 index 0000000000..606f282bd7 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModule.java @@ -0,0 +1,66 @@ +/** +* Generated file + +* Generated from: yang module name: odl-sal-dom-rpc-remote-cfg yang module local name: remote-zeromq-rpc-server +* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator +* Generated at: Thu Dec 05 14:25:21 CET 2013 +* +* Do not modify this file unless it is present under src/main directory +*/ +package org.opendaylight.controller.config.yang.md.sal.remote.rpc; + +import org.opendaylight.controller.sal.connector.remoterpc.Client; +import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcProvider; +import org.opendaylight.controller.sal.connector.remoterpc.RoutingTableProvider; +import org.opendaylight.controller.sal.connector.remoterpc.ServerImpl; +import org.opendaylight.controller.sal.core.api.Broker; +import org.osgi.framework.BundleContext; + +/** +* +*/ +public final class ZeroMQServerModule extends org.opendaylight.controller.config.yang.md.sal.remote.rpc.AbstractZeroMQServerModule + { + + private static final Integer ZEROMQ_ROUTER_PORT = 5554; + private BundleContext bundleContext; + + public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { + super(identifier, dependencyResolver); + } + + public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, + ZeroMQServerModule oldModule, java.lang.AutoCloseable oldInstance) { + + super(identifier, dependencyResolver, oldModule, oldInstance); + } + + @Override + protected void customValidation(){ + // Add custom validation for module attributes here. + } + + @Override + public java.lang.AutoCloseable createInstance() { + + Broker broker = getDomBrokerDependency(); + RoutingTableProvider provider = new RoutingTableProvider(bundleContext); + + + final int port = getPort() != null ? getPort() : ZEROMQ_ROUTER_PORT; + + ServerImpl serverImpl = new ServerImpl(port); + + Client clientImpl = new Client(); + RemoteRpcProvider facade = new RemoteRpcProvider(serverImpl, clientImpl); + + facade.setRoutingTableProvider(provider ); + + broker.registerProvider(facade, bundleContext); + return facade; + } + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModuleFactory.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModuleFactory.java new file mode 100644 index 0000000000..3cc3ac0f68 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModuleFactory.java @@ -0,0 +1,37 @@ +/** +* Generated file + +* Generated from: yang module name: odl-sal-dom-rpc-remote-cfg yang module local name: remote-zeromq-rpc-server +* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator +* Generated at: Thu Dec 05 14:25:21 CET 2013 +* +* Do not modify this file unless it is present under src/main directory +*/ +package org.opendaylight.controller.config.yang.md.sal.remote.rpc; + +import org.opendaylight.controller.config.api.DependencyResolver; +import org.opendaylight.controller.config.api.DynamicMBeanWithInstance; +import org.opendaylight.controller.config.spi.Module; +import org.osgi.framework.BundleContext; + +/** +* +*/ +public class ZeroMQServerModuleFactory extends org.opendaylight.controller.config.yang.md.sal.remote.rpc.AbstractZeroMQServerModuleFactory +{ + + @Override + public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) { + ZeroMQServerModule module = (ZeroMQServerModule) super.createModule(instanceName, dependencyResolver, bundleContext); + module.setBundleContext(bundleContext); + return module; + } + + @Override + public Module createModule(String instanceName, DependencyResolver dependencyResolver, + DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception { + ZeroMQServerModule module = (ZeroMQServerModule) super.createModule(instanceName, dependencyResolver, old,bundleContext); + module.setBundleContext(bundleContext); + return module; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Client.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Client.java new file mode 100644 index 0000000000..ef3162359c --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Client.java @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc; + +import com.google.common.base.Optional; + +import org.opendaylight.controller.sal.common.util.RpcErrors; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper; +import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; +import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; +import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; + +import static com.google.common.base.Preconditions.*; + +/** + * An implementation of {@link RpcImplementation} that makes remote RPC calls + */ +public class Client implements RemoteRpcClient { + + private final Logger _logger = LoggerFactory.getLogger(Client.class); + + private final LinkedBlockingQueue requestQueue = new LinkedBlockingQueue(100); + + private final ExecutorService pool = Executors.newSingleThreadExecutor(); + private final long TIMEOUT = 5000; // in ms + + private RoutingTableProvider routingTableProvider; + + public RoutingTableProvider getRoutingTableProvider() { + return routingTableProvider; + } + + public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) { + this.routingTableProvider = routingTableProvider; + } + + public LinkedBlockingQueue getRequestQueue() { + return requestQueue; + } + + @Override + public Set getSupportedRpcs() { + // TODO: Find the entries from routing table + return Collections.emptySet(); + } + + public void start() { + pool.execute(new Sender(this)); + + } + + public void stop() { + + _logger.debug("Client stopping..."); + Context.getInstance().getZmqContext().term(); + _logger.debug("ZMQ context terminated"); + + pool.shutdown(); // intiate shutdown + try { + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) + _logger.error("Client thread pool did not shut down"); + } + } catch (InterruptedException e) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + _logger.debug("Client stopped"); + } + + @Override + public RpcResult invokeRpc(QName rpc, CompositeNode input) { + + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(rpc); + + String address = lookupRemoteAddress(routeId); + + Message request = new Message.MessageBuilder().type(Message.MessageType.REQUEST) + .sender(Context.getInstance().getLocalUri()).recipient(address).route(routeId) + .payload(XmlUtils.compositeNodeToXml(input)).build(); + + List errors = new ArrayList(); + + try (SocketPair pair = new SocketPair()) { + + MessageWrapper messageWrapper = new MessageWrapper(request, pair.getSender()); + process(messageWrapper); + Message response = parseMessage(pair.getReceiver()); + + CompositeNode payload = XmlUtils.xmlToCompositeNode((String) response.getPayload()); + + return Rpcs.getRpcResult(true, payload, errors); + + } catch (Exception e) { + collectErrors(e, errors); + return Rpcs.getRpcResult(false, null, errors); + } + + } + + public void process(MessageWrapper msg) throws TimeoutException, InterruptedException { + _logger.debug("Processing message [{}]", msg); + + boolean success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS); + if (!success) + throw new TimeoutException("Queue is full"); + } + + /** + * Block on socket for reply + * + * @param receiver + * @return + */ + private Message parseMessage(ZMQ.Socket receiver) throws IOException, ClassNotFoundException { + return (Message) Message.deserialize(receiver.recv()); + } + + /** + * Find address for the given route identifier in routing table + * + * @param routeId + * route identifier + * @return remote network address + */ + private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId) { + checkNotNull(routeId, "route must not be null"); + + Optional> routingTable = routingTableProvider.getRoutingTable(); + checkNotNull(routingTable.isPresent(), "Routing table is null"); + + Set addresses = routingTable.get().getRoutes(routeId.toString()); + checkNotNull(addresses, "Address not found for route [%s]", routeId); + checkState(addresses.size() == 1, "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); // its + // a + // global + // service. + + String address = addresses.iterator().next(); + checkNotNull(address, "Address not found for route [%s]", routeId); + + return address; + } + + private void collectErrors(Exception e, List errors) { + if (e == null) + return; + if (errors == null) + errors = new ArrayList(); + + errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause())); + for (Throwable t : e.getSuppressed()) { + errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t)); + } + } + + @Override + public void close() throws Exception { + stop(); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Context.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Context.java new file mode 100644 index 0000000000..f0bf12cbc0 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Context.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc; + +import org.zeromq.ZMQ; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; + +/** + * Provides a ZeroMQ Context object + */ +public class Context { + private ZMQ.Context zmqContext = ZMQ.context(1); + private String uri; + + private static Context _instance = new Context(); + + private Context() {} + + public static Context getInstance(){ + return _instance; + } + + public ZMQ.Context getZmqContext(){ + return this.zmqContext; + } + + public String getLocalUri(){ + uri = (uri != null) ? uri + : new StringBuilder("tcp://").append(getIpAddress()).append(":") + .append(getRpcPort()).toString(); + + return uri; + } + + public String getRpcPort(){ + String rpcPort = (System.getProperty("rpc.port") != null) + ? System.getProperty("rpc.port") + : "5554"; + + return rpcPort; + } + + private String getIpAddress(){ + String ipAddress = (System.getProperty("local.ip") != null) + ? System.getProperty("local.ip") + : findIpAddress(); + + return ipAddress; + } + + /** + * Finds IPv4 address of the local VM + * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which + * address will be returned. Read IP from a property file or enhance the code to make it deterministic. + * Should we use IP or hostname? + * + * @return + */ + private String findIpAddress() { + String hostAddress = null; + Enumeration e = null; + try { + e = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e1) { + e1.printStackTrace(); + } + while (e.hasMoreElements()) { + + NetworkInterface n = (NetworkInterface) e.nextElement(); + + Enumeration ee = n.getInetAddresses(); + while (ee.hasMoreElements()) { + InetAddress i = (InetAddress) ee.nextElement(); + if ((i instanceof Inet4Address) && (i.isSiteLocalAddress())) + hostAddress = i.getHostAddress(); + } + } + return hostAddress; + + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcClient.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcClient.java new file mode 100644 index 0000000000..6bd123b7e4 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcClient.java @@ -0,0 +1,13 @@ +package org.opendaylight.controller.sal.connector.remoterpc; + +import org.opendaylight.controller.sal.core.api.RpcImplementation; + +public interface RemoteRpcClient extends RpcImplementation,AutoCloseable{ + + + void setRoutingTableProvider(RoutingTableProvider provider); + + void stop(); + + void start(); +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java new file mode 100644 index 0000000000..3c2e3b0872 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java @@ -0,0 +1,95 @@ +package org.opendaylight.controller.sal.connector.remoterpc; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import org.opendaylight.controller.sal.connector.remoterpc.Client; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; +import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.opendaylight.controller.sal.core.api.Provider; +import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; + +public class RemoteRpcProvider implements + RemoteRpcServer, + RemoteRpcClient, + Provider { + + private final ServerImpl server; + private final Client client; + private RoutingTableProvider provider; + + @Override + public void setRoutingTableProvider(RoutingTableProvider provider) { + this.provider = provider; + server.setRoutingTableProvider(provider); + client.setRoutingTableProvider(provider); + } + + @Override + public RpcResult invokeRpc(QName rpc, CompositeNode input) { + return client.invokeRpc(rpc, input); + } + + @Override + public Set getSupportedRpcs() { + return client.getSupportedRpcs(); + } + + + public RemoteRpcProvider(ServerImpl server, Client client) { + this.server = server; + this.client = client; + } + + public void setBrokerSession(ProviderSession session) { + server.setBrokerSession(session); + } + public void setServerPool(ExecutorService serverPool) { + server.setServerPool(serverPool); + } + public void start() { + client.setRoutingTableProvider(provider); + server.setRoutingTableProvider(provider); + server.start(); + client.start(); + } + public void onRouteUpdated(String key, Set values) { + server.onRouteUpdated(key, values); + } + public void onRouteDeleted(String key) { + server.onRouteDeleted(key); + } + + + @Override + public Collection getProviderFunctionality() { + // TODO Auto-generated method stub + return null; + } + + + @Override + public void onSessionInitiated(ProviderSession session) { + server.setBrokerSession(session); + start(); + } + + + public void close() throws Exception { + server.close(); + client.close(); + } + + + + + @Override + public void stop() { + server.stop(); + client.stop(); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcServer.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcServer.java new file mode 100644 index 0000000000..932600fcb1 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcServer.java @@ -0,0 +1,6 @@ +package org.opendaylight.controller.sal.connector.remoterpc; + + +public interface RemoteRpcServer extends AutoCloseable { + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RoutingTableProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RoutingTableProvider.java new file mode 100644 index 0000000000..cfdf98638d --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RoutingTableProvider.java @@ -0,0 +1,32 @@ +package org.opendaylight.controller.sal.connector.remoterpc; + +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; +import org.osgi.framework.BundleContext; +import org.osgi.util.tracker.ServiceTracker; + +import com.google.common.base.Optional; + +public class RoutingTableProvider implements AutoCloseable { + + @SuppressWarnings("rawtypes") + final ServiceTracker tracker; + + + public RoutingTableProvider(BundleContext ctx) { + @SuppressWarnings("rawtypes") + ServiceTracker rawTracker = new ServiceTracker<>(ctx, RoutingTable.class, null); + tracker = rawTracker; + tracker.open(); + } + + public Optional> getRoutingTable() { + @SuppressWarnings("unchecked") + RoutingTable tracked = tracker.getService(); + return Optional.fromNullable(tracked); + } + + @Override + public void close() throws Exception { + tracker.close(); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocket.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocket.java new file mode 100644 index 0000000000..7e8590ab9b --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocket.java @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc; + +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; + +import java.io.IOException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A class encapsulating {@link ZMQ.Socket} of type {@link ZMQ.REQ}. + * It adds following capabilities: + *
  • Retry logic - Tries 3 times before giving up + *
  • Request times out after {@link TIMEOUT} property + *
  • The limitation of {@link ZMQ.REQ}/{@link ZMQ.REP} pair is that no 2 requests can be sent before + * the response for the 1st request is received. To overcome that, this socket queues all messages until + * the previous request has been responded. + */ +public class RpcSocket { + + // Constants + public static final int TIMEOUT = 2000; + public static final int QUEUE_SIZE = 10; + public static final int NUM_RETRIES = 3; + private static final Logger log = LoggerFactory.getLogger(RpcSocket.class); + + private ZMQ.Socket socket; + private ZMQ.Poller poller; + private String address; + private SocketState state; + private long sendTime; + private int retriesLeft; + private LinkedBlockingQueue inQueue; + + + public RpcSocket(String address, ZMQ.Poller poller) { + this.socket = null; + this.state = new IdleSocketState(); + this.sendTime = -1; + this.retriesLeft = NUM_RETRIES; + this.inQueue = new LinkedBlockingQueue(QUEUE_SIZE); + this.address = address; + this.poller = poller; + createSocket(); + } + + public ZMQ.Socket getSocket() { + return socket; + } + + public String getAddress() { + return address; + } + + public int getRetriesLeft() { + return retriesLeft; + } + + public void setRetriesLeft(int retriesLeft) { + this.retriesLeft = retriesLeft; + } + + public SocketState getState() { + return state; + } + + public void setState(SocketState state) { + this.state = state; + } + + public int getQueueSize() { + return inQueue.size(); + } + + public MessageWrapper removeCurrentRequest() { + return inQueue.poll(); + } + + public boolean hasTimedOut() { + return (System.currentTimeMillis() - sendTime > RpcSocket.TIMEOUT); + } + + public void send(MessageWrapper request) throws TimeoutException { + try { + boolean success = inQueue.offer(request, TIMEOUT, TimeUnit.MILLISECONDS); + if (!success) { + throw new TimeoutException("send :: Queue is full"); + } + process(); + } + catch (InterruptedException e) { + log.error("send : Thread interrupted while attempting to add request to inQueue", e); + } + } + + public MessageWrapper receive() { + Message response = parseMessage(); + MessageWrapper messageWrapper = inQueue.poll(); //remove the message from queue + MessageWrapper responseMessageWrapper = new MessageWrapper(response, messageWrapper.getReceiveSocket()); + + state = new IdleSocketState(); + retriesLeft = NUM_RETRIES; + return responseMessageWrapper; + } + + public void process() { + if (getQueueSize() > 0) //process if there's message in the queue + state.process(this); + } + + // Called by IdleSocketState & BusySocketState + public void sendMessage() { + //Get the message from queue without removing it. For retries + MessageWrapper messageWrapper = inQueue.peek(); + if (messageWrapper != null) { + Message message = messageWrapper.getMessage(); + try { + socket.send(Message.serialize(message)); + } + catch (IOException e) { + log.debug("Message send failed [{}]", message); + log.debug("Exception [{}]", e); + } + sendTime = System.currentTimeMillis(); + } + } + + public Message parseMessage() { + Message parsedMessage = null; + byte[] bytes = socket.recv(); + log.debug("Received bytes:[{}]", bytes.length); + try { + parsedMessage = (Message)Message.deserialize(bytes); + } + catch (IOException|ClassNotFoundException e) { + log.debug("parseMessage : Deserializing received bytes failed", e); + } + + return parsedMessage; + } + + public void recycleSocket() { + close(); + } + + public void close() { + socket.setLinger(10); + socket.close(); + } + + private void createSocket() { + socket = Context.getInstance().getZmqContext().socket(ZMQ.REQ); + socket.connect(address); + poller.register(socket, ZMQ.Poller.POLLIN); + state = new IdleSocketState(); + } + + + /** + * Represents the state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket} + */ + public static interface SocketState { + + /* The processing actions to be performed in this state + */ + public void process(RpcSocket socket); + } + + /** + * Represents the idle state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket} + */ + public static class IdleSocketState implements SocketState { + + @Override + public void process(RpcSocket socket) { + socket.sendMessage(); + socket.setState(new BusySocketState()); + socket.setRetriesLeft(socket.getRetriesLeft()-1); + } + } + + /** + * Represents the busy state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket} + */ + public static class BusySocketState implements SocketState { + + private static Logger log = LoggerFactory.getLogger(BusySocketState.class); + + @Override + public void process(RpcSocket socket) { + if (socket.hasTimedOut()) { + if (socket.getRetriesLeft() > 0) { + log.debug("process : Request timed out, retrying now..."); + socket.sendMessage(); + socket.setRetriesLeft(socket.getRetriesLeft() - 1); + } + else { + // No more retries for current request, so stop processing the current request + MessageWrapper message = socket.removeCurrentRequest(); + if (message != null) { + log.error("Unable to process rpc request [{}]", message); + socket.setState(new IdleSocketState()); + socket.setRetriesLeft(NUM_RETRIES); + } + } + } + // Else no timeout, so allow processing to continue + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java new file mode 100644 index 0000000000..f53d5adc1c --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import static com.google.common.base.Preconditions.*; + +/** + * Main server thread for sending requests. + */ +public class Sender implements Runnable{ + + private final static Logger _logger = LoggerFactory.getLogger(Sender.class); + private final Client client; + + + + + public Sender(Client client) { + super(); + this.client = client; + } + +@Override + public void run() { + _logger.info("Starting..."); + + try (SocketManager socketManager = new SocketManager()){ + while (!Thread.currentThread().isInterrupted()) { + + //read incoming messages from blocking queue + MessageWrapper request = pollForRequest(); + + if (request != null) { + processRequest(socketManager, request); + } + + flushSockets(socketManager); + pollForResponse(socketManager); + processResponse(socketManager); + + } + } catch(Exception t){ + _logger.error("Exception: [{}]", t); + _logger.error("Stopping..."); + } + } + + private void processResponse(SocketManager socketManager) { + for (int i = 0; i < socketManager.getPoller().getSize(); i++) { + // If any sockets get a response, process it + if (socketManager.getPoller().pollin(i)) { + Optional socket = socketManager.getManagedSocketFor( + socketManager.getPoller().getItem(i).getSocket()); + + checkState(socket.isPresent(), "Managed socket not found"); + + MessageWrapper response = socket.get().receive(); + _logger.debug("Received rpc response [{}]", response.getMessage()); + + //TODO: handle exception and introduce timeout on receiver side + try { + response.getReceiveSocket().send(Message.serialize(response.getMessage())); + } catch (IOException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + } + + private void processRequest(SocketManager socketManager, MessageWrapper request) throws TimeoutException { + + if ((request.getMessage() == null) || + (request.getMessage().getRecipient() == null)) { + //invalid message. log and drop + _logger.error("Invalid request [{}]", request); + return; + } + + RpcSocket socket = + socketManager.getManagedSocket(request.getMessage().getRecipient()); + + socket.send(request); + } + + private void flushSockets(SocketManager socketManager){ + for (RpcSocket socket : socketManager.getManagedSockets()){ + socket.process(); + } + } + + private MessageWrapper pollForRequest(){ + return client.getRequestQueue().poll(); + } + + private void pollForResponse(SocketManager socketManager){ + try{ + socketManager.getPoller().poll(10); //poll every 10ms + }catch (Throwable t) { /*Ignore and continue*/ } + } +} + + +/* +SCALA + +package org.opendaylight.controller.sal.connector.remoterpc + + import org.slf4j.{LoggerFactory, Logger} + import scala.collection.JavaConverters._ + import scala.Some + import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, Message} +*/ +/** + * Main server thread for sending requests. This does not maintain any state. If the + * thread dies, it will be restarted + */ +/*class Sender extends Runnable { + private val _logger: Logger = LoggerFactory.getLogger(Sender.this.getClass()) + + override def run = { + _logger.info("Sender starting...") + val socketManager = new SocketManager() + + try { + while (!Thread.currentThread().isInterrupted) { + //read incoming messages from blocking queue + val request: MessageWrapper = Client.requestQueue.poll() + + if (request != null) { + if ((request.message != null) && + (request.message.getRecipient != null)) { + + val socket = socketManager.getManagedSocket(request.message.getRecipient) + socket.send(request) + } else { + //invalid message. log and drop + _logger.error("Invalid request [{}]", request) + } + } + + socketManager.getManagedSockets().asScala.map(s => s.process) + + // Poll all sockets for responses every 1 sec + poll(socketManager) + + // If any sockets get a response, process it + for (i <- 0 until socketManager.poller.getSize) { + if (socketManager.poller.pollin(i)) { + val socket = socketManager.getManagedSocketFor(socketManager.poller.getItem(i).getSocket) + + socket match { + case None => //{ + _logger.error("Could not find a managed socket for zmq socket") + throw new IllegalStateException("Could not find a managed socket for zmq socket") + //} + case Some(s) => { + val response = s.receive() + _logger.debug("Received rpc response [{}]", response.message) + response.receiveSocket.send(Message.serialize(response.message)) + } + } + } + } + + } + } catch{ + case e:Exception => { + _logger.debug("Sender stopping due to exception") + e.printStackTrace() + } + } finally { + socketManager.stop + } + } + + def poll(socketManager:SocketManager) = { + try{ + socketManager.poller.poll(10) + }catch{ + case t:Throwable => //ignore and continue + } + } +} + + +// def newThread(r: Runnable): Thread = { +// val t = new RequestHandler() +// t.setUncaughtExceptionHandler(new RequestProcessorExceptionHandler) +// t +// } + + + +/** + * Restarts the request processing server in the event of unforeseen exceptions + */ +//private class RequestProcessorExceptionHandler extends UncaughtExceptionHandler { +// def uncaughtException(t: Thread, e: Throwable) = { +// _logger.error("Exception caught during request processing [{}]", e) +// _logger.info("Restarting request processor server...") +// RequestProcessor.start() +// } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java new file mode 100644 index 0000000000..83b93858cf --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java @@ -0,0 +1,285 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.connector.remoterpc; + +import com.google.common.base.Optional; + +import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException; +import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException; +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message.MessageType; +import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; +import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; +import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * ZeroMq based implementation of RpcRouter TODO: 1. Make rpc request handling + * async and non-blocking. Note zmq socket is not thread safe 2. Read properties + * from config file using existing(?) ODL properties framework + */ +public class ServerImpl implements RemoteRpcServer, RouteChangeListener { + + private Logger _logger = LoggerFactory.getLogger(ServerImpl.class); + + private ExecutorService serverPool; + + // private RoutingTable routingTable; + private RoutingTableProvider routingTable; + private Set remoteServices; + private ProviderSession brokerSession; + private ZMQ.Context context; + private ZMQ.Socket replySocket; + + private final RpcListener listener = new RpcListener(); + + private final String localUri = Context.getInstance().getLocalUri(); + + private final int rpcPort; + + private RpcImplementation client; + + public RpcImplementation getClient() { + return client; + } + + public void setClient(RpcImplementation client) { + this.client = client; + } + + // Prevent instantiation + public ServerImpl(int rpcPort) { + this.rpcPort = rpcPort; + } + + public void setBrokerSession(ProviderSession session) { + this.brokerSession = session; + } + + public ExecutorService getServerPool() { + return serverPool; + } + + public void setServerPool(ExecutorService serverPool) { + this.serverPool = serverPool; + } + + public void start() { + context = ZMQ.context(1); + serverPool = Executors.newSingleThreadExecutor(); + remoteServices = new HashSet(); + + // Start listening rpc requests + serverPool.execute(receive()); + + brokerSession.addRpcRegistrationListener(listener); + // routingTable.registerRouteChangeListener(routeChangeListener); + + Set currentlySupported = brokerSession.getSupportedRpcs(); + for (QName rpc : currentlySupported) { + listener.onRpcImplementationAdded(rpc); + } + + _logger.debug("RPC Server started [{}]", localUri); + } + + public void stop() { + // TODO: un-subscribe + + // if (context != null) + // context.term(); + // + // _logger.debug("ZMQ Context is terminated."); + + if (serverPool != null) + serverPool.shutdown(); + + _logger.debug("Thread pool is closed."); + } + + private Runnable receive() { + return new Runnable() { + public void run() { + + // Bind to RPC reply socket + replySocket = context.socket(ZMQ.REP); + replySocket.bind("tcp://*:" + Context.getInstance().getRpcPort()); + + // Poller enables listening on multiple sockets using a single + // thread + ZMQ.Poller poller = new ZMQ.Poller(1); + poller.register(replySocket, ZMQ.Poller.POLLIN); + try { + // TODO: Add code to restart the thread after exception + while (!Thread.currentThread().isInterrupted()) { + + poller.poll(); + + if (poller.pollin(0)) { + handleRpcCall(); + } + } + } catch (Exception e) { + // log and continue + _logger.error("Unhandled exception [{}]", e); + } finally { + poller.unregister(replySocket); + replySocket.close(); + } + + } + }; + } + + /** + * @throws InterruptedException + * @throws ExecutionException + */ + private void handleRpcCall() { + + Message request = parseMessage(replySocket); + + _logger.debug("Received rpc request [{}]", request); + + // Call broker to process the message then reply + Future> rpc = null; + RpcResult result = null; + try { + rpc = brokerSession.rpc((QName) request.getRoute().getType(), + XmlUtils.xmlToCompositeNode((String) request.getPayload())); + + result = (rpc != null) ? rpc.get() : null; + + } catch (Exception e) { + _logger.debug("Broker threw [{}]", e); + } + + CompositeNode payload = (result != null) ? result.getResult() : null; + + Message response = new Message.MessageBuilder().type(MessageType.RESPONSE).sender(localUri) + .route(request.getRoute()).payload(XmlUtils.compositeNodeToXml(payload)).build(); + + _logger.debug("Sending rpc response [{}]", response); + + try { + replySocket.send(Message.serialize(response)); + } catch (Exception e) { + _logger.debug("rpc response send failed for message [{}]", response); + _logger.debug("{}", e); + } + + } + + /** + * @param socket + * @return + */ + private Message parseMessage(ZMQ.Socket socket) { + + Message msg = null; + try { + byte[] bytes = socket.recv(); + _logger.debug("Received bytes:[{}]", bytes.length); + msg = (Message) Message.deserialize(bytes); + } catch (Throwable t) { + t.printStackTrace(); + } + return msg; + } + + @Override + public void onRouteUpdated(String key, Set values) { + RouteIdentifierImpl rId = new RouteIdentifierImpl(); + try { + _logger.debug("Updating key/value {}-{}", key, values); + brokerSession.addRpcImplementation((QName) rId.fromString(key).getType(), client); + + } catch (Exception e) { + _logger.info("Route update failed {}", e); + } + } + + @Override + public void onRouteDeleted(String key) { + // TODO: Broker session needs to be updated to support this + throw new UnsupportedOperationException(); + } + + /** + * Listener for rpc registrations + */ + private class RpcListener implements RpcRegistrationListener { + + + + @Override + public void onRpcImplementationAdded(QName name) { + + // if the service name exists in the set, this notice + // has bounced back from the broker. It should be ignored + if (remoteServices.contains(name)) + return; + + _logger.debug("Adding registration for [{}]", name); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(name); + + try { + routingTable.getRoutingTable().get().addGlobalRoute(routeId.toString(), localUri); + _logger.debug("Route added [{}-{}]", name, localUri); + } catch (RoutingTableException | SystemException e) { + // TODO: This can be thrown when route already exists in the + // table. Broker + // needs to handle this. + _logger.error("Unhandled exception while adding global route to routing table [{}]", e); + + } + } + + @Override + public void onRpcImplementationRemoved(QName name) { + + _logger.debug("Removing registration for [{}]", name); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(name); + + try { + routingTable.getRoutingTable().get().removeGlobalRoute(routeId.toString()); + } catch (RoutingTableException | SystemException e) { + _logger.error("Route delete failed {}", e); + } + } + } + + @Override + public void close() throws Exception { + stop(); + } + + public void setRoutingTableProvider(RoutingTableProvider provider) { + this.routingTable = provider; + } + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java new file mode 100644 index 0000000000..588a299626 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc; + +import com.google.common.base.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Manages creation of {@link RpcSocket} and their registration with {@link ZMQ.Poller} + */ +public class SocketManager implements AutoCloseable{ + private static final Logger log = LoggerFactory.getLogger(SocketManager.class); + + /* + * RpcSockets mapped by network address its connected to + */ + private ConcurrentHashMap managedSockets = new ConcurrentHashMap(); + + private ZMQ.Poller _poller = new ZMQ.Poller(2); //randomly selected size. Poller grows automatically + + /** + * Returns a {@link RpcSocket} for the given address + * @param address network address with port eg: 10.199.199.20:5554 + * @return + */ + public RpcSocket getManagedSocket(String address) throws IllegalArgumentException { + //Precondition + if (!address.matches("(tcp://)(.*)(:)(\\d*)")) { + throw new IllegalArgumentException("Address must of format 'tcp://:' but is " + address); + } + + if (!managedSockets.containsKey(address)) { + log.debug("{} Creating new socket for {}", Thread.currentThread().getName()); + RpcSocket socket = new RpcSocket(address, _poller); + managedSockets.put(address, socket); + } + + return managedSockets.get(address); + } + + /** + * Returns a {@link RpcSocket} for the given {@link ZMQ.Socket} + * @param socket + * @return + */ + public Optional getManagedSocketFor(ZMQ.Socket socket) { + for (RpcSocket rpcSocket : managedSockets.values()) { + if (rpcSocket.getSocket().equals(socket)) { + return Optional.of(rpcSocket); + } + } + return Optional.absent(); + } + + /** + * Return a collection of all managed sockets + * @return + */ + public Collection getManagedSockets() { + return managedSockets.values(); + } + + /** + * Returns the {@link ZMQ.Poller} + * @return + */ + public ZMQ.Poller getPoller() { + return _poller; + } + + /** + * This should be called when stopping the server to close all the sockets + * @return + */ + @Override + public void close() throws Exception { + log.debug("Stopping..."); + for (RpcSocket socket : managedSockets.values()) { + socket.close(); + } + managedSockets.clear(); + log.debug("Stopped"); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketPair.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketPair.java new file mode 100644 index 0000000000..67b3a830e3 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketPair.java @@ -0,0 +1,41 @@ +package org.opendaylight.controller.sal.connector.remoterpc; + +import org.zeromq.ZMQ; + +import java.util.UUID; + +/** + * + */ +public class SocketPair implements AutoCloseable{ + private ZMQ.Socket sender; + private ZMQ.Socket receiver; + + private static final String INPROC_PREFIX = "inproc://"; + + public SocketPair(){ + String address = new StringBuilder(INPROC_PREFIX) + .append(UUID.randomUUID()) + .toString(); + + receiver = Context.getInstance().getZmqContext().socket(ZMQ.PAIR); + receiver.bind(address); + + sender = Context.getInstance().getZmqContext().socket(ZMQ.PAIR); + sender.connect(address); + } + + public ZMQ.Socket getSender(){ + return this.sender; + } + + public ZMQ.Socket getReceiver(){ + return this.receiver; + } + + @Override + public void close() throws Exception { + sender.close(); + receiver.close(); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/CompositeNodeImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/CompositeNodeImpl.java new file mode 100644 index 0000000000..073601a1c0 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/CompositeNodeImpl.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc.dto; + +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.*; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class CompositeNodeImpl implements CompositeNode, Serializable { + + private QName key; + private List> children; + + @Override + public List> getChildren() { + return children; + } + + @Override + public List getCompositesByName(QName children) { + throw new UnsupportedOperationException(); + } + + @Override + public List getCompositesByName(String children) { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public List> getSimpleNodesByName(QName children) { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public List> getSimpleNodesByName(String children) { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public CompositeNode getFirstCompositeByName(QName container) { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public SimpleNode getFirstSimpleByName(QName leaf) { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public MutableCompositeNode asMutable() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public QName getKey() { + return key; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public List> setValue(List> value) { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public int size() { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public boolean isEmpty() { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public boolean containsKey(Object key) { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public boolean containsValue(Object value) { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public List> get(Object key) { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public List> put(QName key, List> value) { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public List> remove(Object key) { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public void putAll(Map>> m) { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public void clear() { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public Set keySet() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public Collection>> values() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public Set>>> entrySet() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public QName getNodeType() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public CompositeNode getParent() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public List> getValue() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public ModifyAction getModificationAction() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } +} diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Message.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/Message.java similarity index 88% rename from opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Message.java rename to opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/Message.java index c2c037aee6..95fe99c81c 100644 --- a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Message.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/Message.java @@ -6,10 +6,8 @@ * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq; +package org.opendaylight.controller.sal.connector.remoterpc.dto; - -import org.codehaus.jackson.map.ObjectMapper; import org.opendaylight.controller.sal.connector.api.RpcRouter; import java.io.*; @@ -20,7 +18,8 @@ public class Message implements Serializable { ANNOUNCE((byte) 0), //TODO: Remove announce, add rpc registration and deregistration HEARTBEAT((byte) 1), REQUEST((byte) 2), - RESPONSE((byte) 3); + RESPONSE((byte) 3), + ERROR((byte)4); private final byte type; @@ -35,6 +34,7 @@ public class Message implements Serializable { private MessageType type; private String sender; + private String recipient; private RpcRouter.RouteIdentifier route; private Object payload; @@ -70,11 +70,19 @@ public class Message implements Serializable { this.payload = payload; } + public String getRecipient() { + return recipient; + } + + public void setRecipient(String recipient) { + this.recipient = recipient; + } @Override public String toString() { return "Message{" + "type=" + type + ", sender='" + sender + '\'' + + ", recipient='" + recipient + '\'' + ", route=" + route + ", payload=" + payload + '}'; @@ -108,17 +116,6 @@ public class Message implements Serializable { return o.readObject(); } - public static byte[] toJsonBytes(Message m) throws IOException { - ObjectMapper o = new ObjectMapper(); - return o.writeValueAsBytes(m); - } - - public static Message fromJsonBytes(byte [] bytes) throws IOException { - - ObjectMapper o = new ObjectMapper(); - return o.readValue(bytes, Message.class); - } - public static class Response extends Message implements RpcRouter.RpcReply { private ResponseCode code; // response code @@ -163,6 +160,11 @@ public class Message implements Serializable { return this; } + public MessageBuilder recipient(String recipient){ + message.setRecipient(recipient); + return this; + } + public MessageBuilder route(RpcRouter.RouteIdentifier route){ message.setRoute(route); return this; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/MessageWrapper.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/MessageWrapper.java new file mode 100644 index 0000000000..8d2198c365 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/MessageWrapper.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc.dto; + +import org.zeromq.ZMQ; + +/** + * A class encapsulating {@link Message} and the {@link ZMQ.Socket} over which it is transmitted + */ +public class MessageWrapper { + + private Message _message; + private ZMQ.Socket _receiveSocket; + + public MessageWrapper(Message message, ZMQ.Socket receiveSocket) { + this._message = message; + this._receiveSocket = receiveSocket; + } + + public Message getMessage() { + return _message; + } + + public ZMQ.Socket getReceiveSocket() { + return _receiveSocket; + } +} diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/RouteIdentifierImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RouteIdentifierImpl.java similarity index 54% rename from opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/RouteIdentifierImpl.java rename to opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RouteIdentifierImpl.java index ca404f2ca9..6c5e5fbf11 100644 --- a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/RouteIdentifierImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RouteIdentifierImpl.java @@ -5,19 +5,21 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq; +package org.opendaylight.controller.sal.connector.remoterpc.dto; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import java.io.Serializable; +import java.net.URI; -/** - * User: abhishk2 - */ public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier,Serializable { + transient ObjectMapper mapper = new ObjectMapper(); + private QName context; private QName type; private InstanceIdentifier route; @@ -51,10 +53,35 @@ public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier dataTree; + try { + dataTree = XmlTreeBuilder.buildDataTree(new ByteArrayInputStream(xml.getBytes())); + } catch (XMLStreamException e) { + _logger.error("Error during building data tree from XML", e); + return null; + } + if (dataTree == null) { + _logger.error("data tree is null"); + return null; + } + if (dataTree instanceof SimpleNode) { + _logger.error("RPC XML was resolved as SimpleNode"); + return null; + } + return (CompositeNode) dataTree; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/scala/org/opendaylight/controller/sal/connector/remoterpc/Client.scala b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/scala/org/opendaylight/controller/sal/connector/remoterpc/Client.scala new file mode 100644 index 0000000000..63b68089d3 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/scala/org/opendaylight/controller/sal/connector/remoterpc/Client.scala @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc + +import org.opendaylight.yangtools.yang.data.api.CompositeNode +import org.opendaylight.yangtools.yang.common.{RpcError, RpcResult, QName} +import org.opendaylight.controller.sal.core.api.RpcImplementation +import java.util +import java.util.{UUID, Collections} +import org.zeromq.ZMQ +import org.opendaylight.controller.sal.common.util.{RpcErrors, Rpcs} +import org.slf4j.LoggerFactory +import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, RouteIdentifierImpl, Message} +import Message.MessageType +import java.util.concurrent._ +import java.lang.InterruptedException + + +/** + * An implementation of {@link RpcImplementation} that makes + * remote RPC calls + */ +class Client extends RemoteRpcClient { + + private val _logger = LoggerFactory.getLogger(this.getClass); + + val requestQueue = new LinkedBlockingQueue[MessageWrapper](100) + val pool: ExecutorService = Executors.newSingleThreadExecutor() + private val TIMEOUT = 5000 //in ms + var routingTableProvider: RoutingTableProvider = null + + + def getInstance = this + + + def setRoutingTableProvider(provider : RoutingTableProvider) = { + routingTableProvider = provider; + } + + def getSupportedRpcs: util.Set[QName] = { + Collections.emptySet() + } + + def invokeRpc(rpc: QName, input: CompositeNode): RpcResult[CompositeNode] = { + + val routeId = new RouteIdentifierImpl() + routeId.setType(rpc) + + //lookup address for the rpc request + val routingTable = routingTableProvider.getRoutingTable() + require( routingTable != null, "Routing table not found. Exiting" ) + + val addresses:util.Set[String] = routingTable.getRoutes(routeId.toString) + require(addresses != null, "Address not found for rpc " + rpc); + require(addresses.size() == 1) //its a global service. + + val address = addresses.iterator().next() + require(address != null, "Address is null") + + //create in-process "pair" socket and pass it to sender thread + //Sender replies on this when result is available + val inProcAddress = "inproc://" + UUID.randomUUID() + val receiver = Context.zmqContext.socket(ZMQ.PAIR) + receiver.bind(inProcAddress); + + val sender = Context.zmqContext.socket(ZMQ.PAIR) + sender.connect(inProcAddress) + + val requestMessage = new Message.MessageBuilder() + .`type`(MessageType.REQUEST) + //.sender("tcp://localhost:8081") + .recipient(address) + .route(routeId) + .payload(input) + .build() + + _logger.debug("Queuing up request and expecting response on [{}]", inProcAddress) + + val messageWrapper = new MessageWrapper(requestMessage, sender) + val errors = new util.ArrayList[RpcError] + + try { + process(messageWrapper) + val response = parseMessage(receiver) + + return Rpcs.getRpcResult( + true, response.getPayload.asInstanceOf[CompositeNode], Collections.emptySet()) + + } catch { + case e: Exception => { + errors.add(RpcErrors.getRpcError(null,null,null,null,e.getMessage,null,e.getCause)) + return Rpcs.getRpcResult(false, null, errors) + } + } finally { + receiver.close(); + sender.close(); + } + + } + + /** + * Block on socket for reply + * @param receiver + * @return + */ + private def parseMessage(receiver:ZMQ.Socket): Message = { + val bytes = receiver.recv() + return Message.deserialize(bytes).asInstanceOf[Message] + } + + def start() = { + pool.execute(new Sender) + } + + def process(msg: MessageWrapper) = { + _logger.debug("Processing message [{}]", msg) + val success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS) + + if (!success) throw new TimeoutException("Queue is full"); + + } + + def stop() = { + pool.shutdown() //intiate shutdown + _logger.debug("Client stopping...") + // Context.zmqContext.term(); + // _logger.debug("ZMQ context terminated") + + try { + + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) + _logger.error("Client thread pool did not shut down"); + } + } catch { + case ie:InterruptedException => + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + _logger.debug("Client stopped") + } + + def close() = { + stop(); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/yang/odl-sal-dom-rpc-remote-cfg.yang b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/yang/odl-sal-dom-rpc-remote-cfg.yang new file mode 100644 index 0000000000..d20efe50c1 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/yang/odl-sal-dom-rpc-remote-cfg.yang @@ -0,0 +1,52 @@ +module odl-sal-dom-rpc-remote-cfg { + yang-version 1; + namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc"; + prefix "rpc-cluster"; + + import config { prefix config; revision-date 2013-04-05; } + import opendaylight-md-sal-dom {prefix dom;} + + description + "Service definition for Binding Aware MD-SAL."; + + revision "2013-10-28" { + description + "Initial revision"; + } + + identity remote-rpc-server { + base config:service-type; + config:java-class "org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcServer"; + } + + identity remote-rpc-client { + base config:service-type; + config:java-class "org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcClient"; + } + + identity remote-zeromq-rpc-server { + base config:module-type; + config:provided-service remote-rpc-server; + config:provided-service remote-rpc-client; + config:java-name-prefix ZeroMQServer; + } + + augment "/config:modules/config:module/config:configuration" { + case remote-zeromq-rpc-server { + when "/config:modules/config:module/config:type = 'remote-zeromq-rpc-server'"; + + container dom-broker { + uses config:service-ref { + refine type { + mandatory true; + config:required-identity dom:dom-broker-osgi-registry; + } + } + } + + leaf port { + type uint16; + } + } + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientTest.java new file mode 100644 index 0000000000..2e77537756 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientTest.java @@ -0,0 +1,56 @@ +package org.opendaylight.controller.sal.connector.remoterpc; + +import junit.framework.Assert; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper; +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; + +import java.util.concurrent.TimeoutException; + +public class ClientTest { + + Client client; + + @Before + public void setup(){ + client = new Client(); + client.getRequestQueue().clear(); + } + + @Test + public void testStop() throws Exception { + + } + + @Test + public void testPool() throws Exception { + + } + + @Test + public void process_AddAMessage_ShouldAddToQueue() throws Exception { + client.process(getEmptyMessageWrapper()); + Assert.assertEquals(1, client.getRequestQueue().size()); + } + + /** + * Queue size is 100. Adding 101 message should time out in 2 sec + * if server does not process it + * @throws Exception + */ + @Test(expected = TimeoutException.class) + public void process_Add101Message_ShouldThrow() throws Exception { + for (int i=0;i<101;i++){ + client.process(getEmptyMessageWrapper()); + } + } + + @Test + public void testStart() throws Exception { + } + + private MessageWrapper getEmptyMessageWrapper(){ + return new MessageWrapper(new Message(), null); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java new file mode 100644 index 0000000000..550d9ef125 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java @@ -0,0 +1,50 @@ +package org.opendaylight.controller.sal.connector.remoterpc; + +import org.codehaus.jackson.JsonParseException; +import org.junit.Assert; +import org.junit.Test; +import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; +import org.opendaylight.yangtools.yang.common.QName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; + +public class RouteIdentifierImplTest { + + Logger _logger = LoggerFactory.getLogger(RouteIdentifierImplTest.class); + + private final URI namespace = URI.create("http://cisco.com/example"); + private final QName QNAME = new QName(namespace, "heartbeat"); + + @Test + public void testToString() throws Exception { + RouteIdentifierImpl rId = new RouteIdentifierImpl(); + rId.setType(QNAME); + + _logger.debug(rId.toString()); + + Assert.assertTrue(true); + + } + + @Test + public void testFromString() throws Exception { + RouteIdentifierImpl rId = new RouteIdentifierImpl(); + rId.setType(QNAME); + + _logger.debug("route: " + rId.fromString(rId.toString())); + + Assert.assertTrue(true); + } + + @Test(expected = JsonParseException.class) + public void testFromInvalidString() throws Exception { + String invalidInput = "aklhdgadfa;;;;;;;]]]]=]ag" ; + RouteIdentifierImpl rId = new RouteIdentifierImpl(); + rId.fromString(invalidInput); + + _logger.debug("" + rId); + Assert.assertTrue(true); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocketTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocketTest.java new file mode 100644 index 0000000000..e23a3ca5ef --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocketTest.java @@ -0,0 +1,196 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc; + +import junit.framework.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.zeromq.ZMQ; + +import java.util.concurrent.TimeoutException; + +import static org.mockito.Mockito.doNothing; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(RpcSocket.class) +public class RpcSocketTest { + RpcSocket rpcSocket = new RpcSocket("tcp://localhost:5554", new ZMQ.Poller(1)); + RpcSocket spy = PowerMockito.spy(rpcSocket); + + @Test + public void testCreateSocket() throws Exception { + Assert.assertEquals("tcp://localhost:5554", spy.getAddress()); + Assert.assertEquals(ZMQ.REQ, spy.getSocket().getType()); + } + + @Test(expected = TimeoutException.class) + public void send_WhenQueueGetsFull_ShouldThrow() throws Exception { + + doNothing().when(spy).process(); + + //10 is queue size + for (int i=0;i<10;i++){ + spy.send(getEmptyMessageWrapper()); + } + + //sending 11th message should throw + spy.send(getEmptyMessageWrapper()); + } + + @Test + public void testHasTimedOut() throws Exception { + spy.send(getEmptyMessageWrapper()); + Assert.assertFalse(spy.hasTimedOut()); + Thread.sleep(1000); + Assert.assertFalse(spy.hasTimedOut()); + Thread.sleep(1000); + Assert.assertTrue(spy.hasTimedOut()); + } + + @Test + public void testProcess() throws Exception { + PowerMockito.doNothing().when(spy, "sendMessage"); + spy.send(getEmptyMessageWrapper()); + + //Next message should get queued + spy.send(getEmptyMessageWrapper()); + + //queue size should be 2 + Assert.assertEquals(2, spy.getQueueSize()); + + + spy.process(); + //sleep for 2 secs (timeout) + //message send would be retried + Thread.sleep(2000); + spy.process(); + Thread.sleep(2000); + spy.process(); + Thread.sleep(2000); + spy.process(); //retry fails, next message will get picked up + Assert.assertEquals(1, spy.getQueueSize()); + } + + @Test + public void testProcessStateTransitions() throws Exception { + PowerMockito.doNothing().when(spy, "sendMessage"); + Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); + spy.send(getEmptyMessageWrapper()); + Assert.assertEquals(1, spy.getQueueSize()); + Thread.sleep(200); + Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); + Thread.sleep(1800); + + //1st timeout, 2nd try + spy.process(); + Thread.sleep(200); + Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); + Thread.sleep(1800); + + //2nd timeout, 3rd try + spy.process(); + Thread.sleep(200); + Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); + Thread.sleep(1800); + + //3rd timeout, no more tries => remove + spy.process(); + Thread.sleep(200); + Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); + Assert.assertEquals(0, spy.getQueueSize()); + } + + @Test + public void testParseMessage() throws Exception { + // Write an integration test for parseMessage + } + + @Test + public void testRecycleSocket() throws Exception { + // This will need to be updated in the future...for now, recycleSocket() calls close() + Assert.assertTrue(spy.getSocket().base().check_tag()); + spy.close(); + Assert.assertEquals(10, spy.getSocket().getLinger()); + Assert.assertFalse(spy.getSocket().base().check_tag()); + } + + @Test + public void testClose() throws Exception { + Assert.assertTrue(spy.getSocket().base().check_tag()); + spy.close(); + Assert.assertEquals(10, spy.getSocket().getLinger()); + Assert.assertFalse(spy.getSocket().base().check_tag()); + } + + @Test + public void testReceive() throws Exception { + PowerMockito.doReturn(null).when(spy, "parseMessage"); + PowerMockito.doNothing().when(spy, "process"); + spy.send(getEmptyMessageWrapper()); + + //There should be 1 message waiting in the queue + Assert.assertEquals(1, spy.getQueueSize()); + + spy.receive(); + //This should complete message processing + //The message should be removed from the queue + Assert.assertEquals(0, spy.getQueueSize()); + Assert.assertEquals(RpcSocket.NUM_RETRIES, spy.getRetriesLeft()); + + } + + @Test + public void testReceiveStateTransitions() throws Exception { + PowerMockito.doReturn(null).when(spy, "parseMessage"); + Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); + spy.send(getEmptyMessageWrapper()); + + //There should be 1 message waiting in the queue + Assert.assertEquals(1, spy.getQueueSize()); + Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); + + spy.receive(); + //This should complete message processing + //The message should be removed from the queue + Assert.assertEquals(0, spy.getQueueSize()); + Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); + } + + private MessageWrapper getEmptyMessageWrapper(){ + return new MessageWrapper(new Message(), null); + } + + @Test + public void testProcessReceiveSequence() throws Exception { + PowerMockito.doNothing().when(spy, "sendMessage"); + PowerMockito.doReturn(null).when(spy, "parseMessage"); + Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); + spy.send(getEmptyMessageWrapper()); + spy.send(getEmptyMessageWrapper()); + Assert.assertEquals(2, spy.getQueueSize()); + Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); + + + Thread.sleep(2000); + Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); + spy.receive(); + Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); + Assert.assertEquals(1, spy.getQueueSize()); + + spy.process(); + Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); + spy.receive(); + Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); + Assert.assertEquals(0, spy.getQueueSize()); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SerilizationTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SerilizationTest.java new file mode 100644 index 0000000000..36a4acddca --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SerilizationTest.java @@ -0,0 +1,79 @@ +package org.opendaylight.controller.sal.connector.remoterpc; + +import org.junit.Test; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.Node; +import org.opendaylight.yangtools.yang.data.api.SimpleNode; +import org.opendaylight.yangtools.yang.data.impl.NodeUtils; +import org.opendaylight.yangtools.yang.data.impl.XmlTreeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; + +import javax.xml.stream.XMLStreamException; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.io.StringWriter; + +public class SerilizationTest { + + private static final Logger _logger = LoggerFactory.getLogger(SerilizationTest.class); + + public void fromXml() { + } + + @Test + public void toXml() throws FileNotFoundException { + + InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/FourSimpleChildren.xml"); + StringWriter writer = new StringWriter(); + + CompositeNode data = loadCompositeNode(xmlStream); + Document domTree = NodeUtils.buildShadowDomTree(data); + try { + TransformerFactory tf = TransformerFactory.newInstance(); + Transformer transformer = tf.newTransformer(); + transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes"); + //transformer.setOutputProperty(OutputKeys.METHOD, "xml"); + //transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + //transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); + //transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4"); + transformer.transform(new DOMSource(domTree), new StreamResult(writer)); + } catch (TransformerException e) { + _logger.error("Error during translation of Document to OutputStream", e); + } + + _logger.info("Parsed xml [{}]", writer.toString()); + } + + //Note to self: Stolen from TestUtils + ///Users/alefan/odl/controller4/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/TestUtils.java + // Figure out how to include TestUtils through pom ...was getting errors + private CompositeNode loadCompositeNode(InputStream xmlInputStream) throws FileNotFoundException { + if (xmlInputStream == null) { + throw new IllegalArgumentException(); + } + Node dataTree; + try { + dataTree = XmlTreeBuilder.buildDataTree(xmlInputStream); + } catch (XMLStreamException e) { + _logger.error("Error during building data tree from XML", e); + return null; + } + if (dataTree == null) { + _logger.error("data tree is null"); + return null; + } + if (dataTree instanceof SimpleNode) { + _logger.error("RPC XML was resolved as SimpleNode"); + return null; + } + return (CompositeNode) dataTree; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManagerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManagerTest.java new file mode 100644 index 0000000000..130b30d5f5 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManagerTest.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc; + +import com.google.common.base.Optional; +import junit.framework.Assert; +import org.junit.After; +import org.junit.Before; +import org.zeromq.ZMQ; +import org.opendaylight.controller.sal.connector.remoterpc.SocketManager; +import org.opendaylight.controller.sal.connector.remoterpc.RpcSocket; +import org.opendaylight.controller.sal.connector.remoterpc.Context; +import org.junit.Test; + +public class SocketManagerTest { + + SocketManager manager; + + @Before + public void setup(){ + manager = new SocketManager(); + } + + @After + public void tearDown() throws Exception { + manager.close(); + } + + @Test + public void getManagedSockets_When2NewAdded_ShouldContain2() throws Exception { + + //Prepare data + manager.getManagedSocket("tcp://localhost:5554"); + manager.getManagedSocket("tcp://localhost:5555"); + + Assert.assertTrue( 2 == manager.getManagedSockets().size()); + } + + @Test + public void getManagedSockets_When2NewAddedAnd1Existing_ShouldContain2() throws Exception { + + //Prepare data + manager.getManagedSocket("tcp://localhost:5554"); + manager.getManagedSocket("tcp://localhost:5555"); + manager.getManagedSocket("tcp://localhost:5554"); //ask for the first one + + Assert.assertTrue( 2 == manager.getManagedSockets().size()); + } + + @Test + public void getManagedSocket_WhenPassedAValidAddress_ShouldReturnARpcSocket() throws Exception { + String testAddress = "tcp://localhost:5554"; + RpcSocket rpcSocket = manager.getManagedSocket(testAddress); + Assert.assertEquals(testAddress, rpcSocket.getAddress()); + } + + @Test(expected = IllegalArgumentException.class) + public void getManagedSocket_WhenPassedInvalidHostAddress_ShouldThrow() throws Exception { + String testAddress = "tcp://nonexistenthost:5554"; + RpcSocket rpcSocket = manager.getManagedSocket(testAddress); + } + + @Test(expected = IllegalArgumentException.class) + public void getManagedSocket_WhenPassedInvalidAddress_ShouldThrow() throws Exception { + String testAddress = "xxx"; + RpcSocket rpcSocket = manager.getManagedSocket(testAddress); + } + + @Test + public void getManagedSocket_WhenPassedAValidZmqSocket_ShouldReturnARpcSocket() throws Exception { + //Prepare data + String firstAddress = "tcp://localhost:5554"; + RpcSocket firstRpcSocket = manager.getManagedSocket(firstAddress); + ZMQ.Socket firstZmqSocket = firstRpcSocket.getSocket(); + + String secondAddress = "tcp://localhost:5555"; + RpcSocket secondRpcSocket = manager.getManagedSocket(secondAddress); + ZMQ.Socket secondZmqSocket = secondRpcSocket.getSocket(); + + Assert.assertEquals(firstRpcSocket, manager.getManagedSocketFor(firstZmqSocket).get()); + Assert.assertEquals(secondRpcSocket, manager.getManagedSocketFor(secondZmqSocket).get()); + } + + @Test + public void getManagedSocket_WhenPassedNonManagedZmqSocket_ShouldReturnNone() throws Exception { + ZMQ.Socket nonManagedSocket = Context.getInstance().getZmqContext().socket(ZMQ.REQ); + nonManagedSocket.connect("tcp://localhost:5000"); + + //Prepare data + String firstAddress = "tcp://localhost:5554"; + RpcSocket firstRpcSocket = manager.getManagedSocket(firstAddress); + ZMQ.Socket firstZmqSocket = firstRpcSocket.getSocket(); + + Assert.assertSame(Optional.absent(), manager.getManagedSocketFor(nonManagedSocket) ); + Assert.assertSame(Optional.absent(), manager.getManagedSocketFor(null) ); + } + + @Test + public void stop_WhenCalled_ShouldEmptyManagedSockets() throws Exception { + manager.getManagedSocket("tcp://localhost:5554"); + manager.getManagedSocket("tcp://localhost:5555"); + Assert.assertTrue( 2 == manager.getManagedSockets().size()); + + manager.close(); + Assert.assertTrue( 0 == manager.getManagedSockets().size()); + } + + @Test + public void poller_WhenCalled_ShouldReturnAnInstanceOfPoller() throws Exception { + Assert.assertTrue (manager.getPoller() instanceof ZMQ.Poller); + } + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/FourSimpleChildren.xml b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/FourSimpleChildren.xml new file mode 100644 index 0000000000..5ac991b120 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/FourSimpleChildren.xml @@ -0,0 +1,6 @@ + + eth0 + ethernetCsmacd + false + some interface + diff --git a/opendaylight/md-sal/test/zeromq-test-consumer/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/pom.xml similarity index 87% rename from opendaylight/md-sal/test/zeromq-test-consumer/pom.xml rename to opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/pom.xml index 7c6bc21b46..fa7b73be0e 100644 --- a/opendaylight/md-sal/test/zeromq-test-consumer/pom.xml +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/pom.xml @@ -2,11 +2,11 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - sal-test-parent + sal-remoterpc-connector-test-parent org.opendaylight.controller.tests 1.0-SNAPSHOT - zeromq-test-consumer + sal-remoterpc-connector-test-consumer bundle scm:git:ssh://git.opendaylight.org:29418/controller.git @@ -23,11 +23,6 @@ org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer - - org.opendaylight.controller.sal.core.api, - org.opendaylight.yangtools.yang.common;version="[0.5,1)", - org.opendaylight.yangtools.yang.data.api, - @@ -75,6 +70,11 @@ org.opendaylight.yangtools yang-data-api + + org.opendaylight.yangtools + yang-data-impl + 0.5.9-SNAPSHOT + org.opendaylight.controller diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java new file mode 100644 index 0000000000..87078ea712 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java @@ -0,0 +1,122 @@ +package org.opendaylight.controller.sample.zeromq.consumer; + +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.net.URI; +import java.util.Hashtable; +import java.util.concurrent.*; + +import org.opendaylight.controller.sal.core.api.AbstractConsumer; +import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.Node; +import org.opendaylight.yangtools.yang.data.api.SimpleNode; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.opendaylight.yangtools.yang.data.impl.XmlTreeBuilder; +import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl; + +import javax.xml.stream.XMLStreamException; + +public class ExampleConsumer extends AbstractConsumer { + + private final URI namespace = URI.create("http://cisco.com/example"); + private final QName QNAME = new QName(namespace, "heartbeat"); + + private ConsumerSession session; + + private ServiceRegistration thisReg; + private Logger _logger = LoggerFactory.getLogger(ExampleConsumer.class); + + @Override + public void onSessionInitiated(ConsumerSession session) { + this.session = session; + } + + public RpcResult invokeRpc(QName qname, CompositeNode input) { + _logger.info("Invoking RPC:[{}] with Input:[{}]", qname.getLocalName(), input); + RpcResult result = null; + Future> future = ExampleConsumer.this.session.rpc(qname, input); + try { + result = future.get(); + } catch (Exception e) { + e.printStackTrace(); + } + _logger.info("Returning Result:[{}]", result); + return result; + } + + @Override + protected void startImpl(BundleContext context){ + thisReg = context.registerService(ExampleConsumer.class, this, new Hashtable()); + } + @Override + protected void stopImpl(BundleContext context) { + super.stopImpl(context); + thisReg.unregister(); + } + + public CompositeNode getValidCompositeNodeWithOneSimpleChild() throws FileNotFoundException { + InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/OneSimpleChild.xml"); + return loadCompositeNode(xmlStream); + } + + public CompositeNode getValidCompositeNodeWithTwoSimpleChildren() throws FileNotFoundException { + InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/TwoSimpleChildren.xml"); + return loadCompositeNode(xmlStream); + } + + public CompositeNode getValidCompositeNodeWithFourSimpleChildren() throws FileNotFoundException { + InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/FourSimpleChildren.xml"); + return loadCompositeNode(xmlStream); + } + + public CompositeNode getValidCompositeNodeWithOneSimpleOneCompositeChild() throws FileNotFoundException { + InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/OneSimpleOneCompositeChild.xml"); + return loadCompositeNode(xmlStream); + } + + public CompositeNode getValidCompositeNodeWithTwoCompositeChildren() throws FileNotFoundException { + InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/TwoCompositeChildren.xml"); + return loadCompositeNode(xmlStream); + } + + public CompositeNode getInvalidCompositeNodeSimpleChild() throws FileNotFoundException { + InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/InvalidSimpleChild.xml"); + return loadCompositeNode(xmlStream); + } + + public CompositeNode getInvalidCompositeNodeCompositeChild() throws FileNotFoundException { + InputStream xmlStream = ExampleConsumer.class.getResourceAsStream("/InvalidCompositeChild.xml"); + return loadCompositeNode(xmlStream); + } + + //Note to self: Stolen from TestUtils + ///Users/alefan/odl/controller4/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/TestUtils.java + // Figure out how to include TestUtils through pom ...was getting errors + private CompositeNode loadCompositeNode(InputStream xmlInputStream) throws FileNotFoundException { + if (xmlInputStream == null) { + throw new IllegalArgumentException(); + } + Node dataTree; + try { + dataTree = XmlTreeBuilder.buildDataTree(xmlInputStream); + } catch (XMLStreamException e) { + _logger.error("Error during building data tree from XML", e); + return null; + } + if (dataTree == null) { + _logger.error("data tree is null"); + return null; + } + if (dataTree instanceof SimpleNode) { + _logger.error("RPC XML was resolved as SimpleNode"); + return null; + } + return (CompositeNode) dataTree; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/FourSimpleChildren.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/FourSimpleChildren.xml new file mode 100644 index 0000000000..5ac991b120 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/FourSimpleChildren.xml @@ -0,0 +1,6 @@ + + eth0 + ethernetCsmacd + false + some interface + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/InvalidCompositeChild.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/InvalidCompositeChild.xml new file mode 100644 index 0000000000..3979d02ccf --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/InvalidCompositeChild.xml @@ -0,0 +1,14 @@ + + + eth1 + ethernet + false + some interface + + + error + ethernet + true + some interface + + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/InvalidSimpleChild.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/InvalidSimpleChild.xml new file mode 100644 index 0000000000..6082d72a71 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/InvalidSimpleChild.xml @@ -0,0 +1,3 @@ + + error + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/OneSimpleChild.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/OneSimpleChild.xml new file mode 100644 index 0000000000..f431b0453d --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/OneSimpleChild.xml @@ -0,0 +1,3 @@ + + eth0 + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/OneSimpleOneCompositeChild.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/OneSimpleOneCompositeChild.xml new file mode 100644 index 0000000000..bca7682ee7 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/OneSimpleOneCompositeChild.xml @@ -0,0 +1,9 @@ + + eth0 + + eth1 + ethernetCsmacd + false + some interface + + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/TwoCompositeChildren.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/TwoCompositeChildren.xml new file mode 100644 index 0000000000..c49407e4c0 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/TwoCompositeChildren.xml @@ -0,0 +1,14 @@ + + + eth1 + ethernet + false + some interface + + + eth2 + ethernet + true + some interface + + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/TwoSimpleChildren.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/TwoSimpleChildren.xml new file mode 100644 index 0000000000..5f4729c99d --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/consumer-service/src/main/resources/TwoSimpleChildren.xml @@ -0,0 +1,4 @@ + + eth0 + ethernetCsmacd + \ No newline at end of file diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/pom.xml new file mode 100644 index 0000000000..5bfbcba5f8 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/pom.xml @@ -0,0 +1,26 @@ + + 4.0.0 + + org.opendaylight.controller + sal-parent + 1.0-SNAPSHOT + ../.. + + pom + org.opendaylight.controller.tests + sal-remoterpc-connector-test-parent + + scm:git:ssh://git.opendaylight.org:29418/controller.git + scm:git:ssh://git.opendaylight.org:29418/controller.git + https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL + + + + consumer-service + provider-service + test-it + test-nb + + + diff --git a/opendaylight/md-sal/test/zeromq-test-provider/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/pom.xml similarity index 89% rename from opendaylight/md-sal/test/zeromq-test-provider/pom.xml rename to opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/pom.xml index 10e15aa917..a13a5aeba0 100644 --- a/opendaylight/md-sal/test/zeromq-test-provider/pom.xml +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/pom.xml @@ -2,11 +2,11 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - sal-test-parent + sal-remoterpc-connector-test-parent org.opendaylight.controller.tests 1.0-SNAPSHOT - zeromq-test-provider + sal-remoterpc-connector-test-provider bundle scm:git:ssh://git.opendaylight.org:29418/controller.git @@ -70,7 +70,10 @@ org.opendaylight.yangtools yang-data-api - + + org.opendaylight.yangtools + yang-data-impl + org.opendaylight.controller sal-common-util @@ -78,7 +81,7 @@ org.opendaylight.controller - sal-zeromq-connector + sal-remoterpc-connector 1.0-SNAPSHOT diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java new file mode 100644 index 0000000000..6c294dddcc --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java @@ -0,0 +1,120 @@ +package org.opendaylight.controller.sample.zeromq.provider; + +import org.opendaylight.controller.sal.common.util.RpcErrors; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.sal.connector.remoterpc.dto.CompositeNodeImpl; +import org.opendaylight.controller.sal.core.api.AbstractProvider; +import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; +import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.Node; +import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl; +import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.*; + +public class ExampleProvider extends AbstractProvider implements RpcImplementation { + + private final URI namespace = URI.create("http://cisco.com/example"); + private final QName QNAME = new QName(namespace, "heartbeat"); + private RpcRegistration reg; + + private ServiceRegistration thisReg; + + private ProviderSession session; + private Logger _logger = LoggerFactory.getLogger(ExampleProvider.class); + + @Override + public void onSessionInitiated(ProviderSession session) { + this.session = session; + } + + @Override + public Set getSupportedRpcs() { + Set supportedRpcs = new HashSet(); + supportedRpcs.add(QNAME); + return supportedRpcs; + } + + @Override + public RpcResult invokeRpc(final QName rpc, CompositeNode input) { + boolean success = false; + CompositeNode output = null; + Collection errors = new ArrayList<>(); + + // Only handle supported RPC calls + if (getSupportedRpcs().contains(rpc)) { + if (input == null) { + errors.add(RpcErrors.getRpcError("app", "tag", "info", RpcError.ErrorSeverity.WARNING, "message:null input", RpcError.ErrorType.RPC, null)); + } + else { + if (isErroneousInput(input)) { + errors.add(RpcErrors.getRpcError("app", "tag", "info", RpcError.ErrorSeverity.ERROR, "message:error", RpcError.ErrorType.RPC, null)); + } + else { + success = true; + output = addSuccessNode(input); + } + } + } + return Rpcs.getRpcResult(success, output, errors); + } + + // Examines input -- dives into CompositeNodes and finds any value equal to "error" + private boolean isErroneousInput(CompositeNode input) { + for (Node n : input.getChildren()) { + if (n instanceof CompositeNode) { + if (isErroneousInput((CompositeNode)n)) { + return true; + } + } + else { //SimpleNode + if ((input.getChildren().get(0).getValue()).equals("error")) { + return true; + } + } + } + return false; + } + + // Adds a child SimpleNode containing the value "success" to the input CompositeNode + private CompositeNode addSuccessNode(CompositeNode input) { + List> list = new ArrayList>(input.getChildren()); + SimpleNodeTOImpl simpleNode = new SimpleNodeTOImpl(QNAME, input, "success"); + list.add(simpleNode); + return new CompositeNodeTOImpl(QNAME, null, list); + } + + @Override + protected void startImpl(BundleContext context) { + thisReg = context.registerService(ExampleProvider.class, this, new Hashtable()); + } + + @Override + protected void stopImpl(BundleContext context) { + if (reg != null) { + try { + reg.close(); + thisReg.unregister(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + + public void announce(QName name) { + _logger.debug("Announcing [{}]\n\n\n", name); + reg = this.session.addRpcImplementation(name, this); + } + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/pom.xml new file mode 100644 index 0000000000..4305a283e2 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/pom.xml @@ -0,0 +1,536 @@ + + 4.0.0 + + sal-remoterpc-connector-test-parent + org.opendaylight.controller.tests + 1.0-SNAPSHOT + + sal-remoterpc-connector-test-it + + scm:git:ssh://git.opendaylight.org:29418/controller.git + scm:git:ssh://git.opendaylight.org:29418/controller.git + https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL + + + + 3.0.0 + 1.5.0 + 0.2.3-SNAPSHOT + 0.2.3-SNAPSHOT + + + + + + commons-codec + commons-codec + 1.7 + + + + + + + + + org.ops4j.pax.exam + maven-paxexam-plugin + 1.2.4 + + + generate-config + + generate-depends-file + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + + org.ops4j.pax.exam + + + maven-paxexam-plugin + + + [1.2.4,) + + + + generate-depends-file + + + + + + + + + + + + + + + + + + org.opendaylight.yangtools.thirdparty + xtend-lib-osgi + 2.4.3 + + + org.opendaylight.controller.tests + sal-remoterpc-connector-test-provider + 1.0-SNAPSHOT + + + org.opendaylight.controller.tests + sal-remoterpc-connector-test-consumer + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-broker-impl + 1.0-SNAPSHOT + + + org.ops4j.pax.exam + pax-exam-container-native + ${exam.version} + test + + + org.ops4j.pax.exam + pax-exam-junit4 + ${exam.version} + test + + + org.ops4j.pax.exam + pax-exam-link-mvn + ${exam.version} + test + + + org.ops4j.pax.url + pax-url-aether + 1.5.2 + test + + + equinoxSDK381 + org.eclipse.osgi + 3.8.1.v20120830-144521 + test + + + org.slf4j + log4j-over-slf4j + 1.7.2 + + + ch.qos.logback + logback-core + 1.0.9 + + + ch.qos.logback + logback-classic + 1.0.9 + + + org.opendaylight.controller + sal-binding-api + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-common-util + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-core-api + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-remoterpc-connector + 1.0-SNAPSHOT + + + + org.opendaylight.controller + containermanager + 0.5.1-SNAPSHOT + + + org.osgi + org.osgi.compendium + + + commons-io + commons-io + + + + + + org.opendaylight.yangtools + yang-binding + + + org.opendaylight.yangtools + yang-common + + + org.opendaylight.yangtools + yang-data-api + + + + org.opendaylight.yangtools + yang-parser-impl + 0.5.9-SNAPSHOT + + + org.opendaylight.controller + sal-common-util + 1.0-SNAPSHOT + + + org.opendaylight.yangtools.thirdparty + antlr4-runtime-osgi-nohead + 4.0 + + + + + org.opendaylight.controller + zeromq-routingtable.implementation + 0.4.1-SNAPSHOT + + + org.opendaylight.controller + clustering.services + 0.4.1-SNAPSHOT + + + org.opendaylight.controller + sal + 0.5.1-SNAPSHOT + + + org.osgi + org.osgi.compendium + + + + + org.opendaylight.controller + sal.implementation + 0.4.0-SNAPSHOT + + + commons-io + commons-io + + + + + org.opendaylight.controller + containermanager + 0.5.0-SNAPSHOT + + + org.osgi + org.osgi.compendium + + + commons-io + commons-io + + + + + org.opendaylight.controller + containermanager.it.implementation + 0.5.0-SNAPSHOT + + + commons-io + commons-io + + + + + org.opendaylight.controller + clustering.stub + 0.4.0-SNAPSHOT + + + commons-io + commons-io + + + + + + org.apache.felix + org.apache.felix.dependencymanager.shell + 3.0.1 + + + org.osgi + org.osgi.compendium + + + + + eclipselink + javax.resource + 1.5.0.v200906010428 + + + com.google.guava + guava + + + org.opendaylight.controller + sal + 0.5.1-SNAPSHOT + + + org.opendaylight.controller + ietf-netconf-monitoring + ${netconf.version} + + + org.opendaylight.yangtools + yang-binding + + + org.opendaylight.yangtools.model + yang-ext + 2013.09.07.1 + + + org.opendaylight.yangtools.model + opendaylight-l2-types + 2013.08.27.1 + + + org.opendaylight.controller + sal-binding-it + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-binding-config + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-binding-broker-impl + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-broker-impl + 1.0-SNAPSHOT + + + + org.opendaylight.controller.model + model-inventory + 1.0-SNAPSHOT + + + org.opendaylight.yangtools + yang-common + + + org.opendaylight.controller + sal-connector-api + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-common-util + 1.0-SNAPSHOT + + + + org.opendaylight.controller + clustering.services + 0.4.1-SNAPSHOT + + + + equinoxSDK381 + org.eclipse.osgi + 3.8.1.v20120830-144521 + + + + org.codehaus.jackson + jackson-mapper-asl + 1.9.2 + + + org.codehaus.jackson + jackson-core-asl + 1.9.2 + + + org.zeromq + jeromq + 0.3.1 + + + org.opendaylight.yangtools.thirdparty + xtend-lib-osgi + 2.4.3 + test + + + org.opendaylight.controller + sal-binding-broker-impl + 1.0-SNAPSHOT + provided + + + org.ops4j.pax.exam + pax-exam-container-native + ${exam.version} + test + + + org.ops4j.pax.exam + pax-exam-junit4 + ${exam.version} + test + + + org.opendaylight.controller + config-netconf-connector + ${netconf.version} + test + + + org.opendaylight.controller + yang-store-impl + ${config.version} + + + org.opendaylight.controller + logback-config + ${config.version} + + + org.opendaylight.controller + config-persister-impl + ${config.version} + + + org.opendaylight.controller + config-persister-file-adapter + ${config.version} + + + org.opendaylight.controller + netconf-impl + ${netconf.version} + + + org.opendaylight.controller + netconf-client + ${netconf.version} + + + org.ops4j.pax.exam + pax-exam + ${exam.version} + + compile + + + org.ops4j.pax.exam + pax-exam-link-mvn + ${exam.version} + test + + + equinoxSDK381 + org.eclipse.osgi + 3.8.1.v20120830-144521 + test + + + org.slf4j + log4j-over-slf4j + 1.7.2 + + + ch.qos.logback + logback-core + 1.0.9 + + + ch.qos.logback + logback-classic + 1.0.9 + + + org.mockito + mockito-all + test + + + org.opendaylight.controller.model + model-flow-service + 1.0-SNAPSHOT + provided + + + org.opendaylight.controller + config-manager + 0.2.3-SNAPSHOT + + + commons-io + commons-io + + + + + org.opendaylight.controller.model + model-flow-management + 1.0-SNAPSHOT + provided + + + org.opendaylight.yangtools.thirdparty + antlr4-runtime-osgi-nohead + 4.0 + + + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/RouterTest.java b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/RouterTest.java new file mode 100644 index 0000000000..62c094d7a6 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/RouterTest.java @@ -0,0 +1,456 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sample.zeromq.test.it; + +import junit.framework.Assert; + +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.opendaylight.controller.sal.connector.remoterpc.Client; +import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcClient; +import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcServer; +import org.opendaylight.controller.sal.connector.remoterpc.ServerImpl; +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.sample.zeromq.provider.ExampleProvider; +import org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer; +import org.opendaylight.controller.test.sal.binding.it.TestHelper; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.ops4j.pax.exam.Configuration; +import org.ops4j.pax.exam.Option; +import org.ops4j.pax.exam.junit.PaxExam; +import org.ops4j.pax.exam.util.Filter; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleException; +import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; + +import javax.inject.Inject; + +import java.io.IOException; +import java.net.URI; +import java.util.Hashtable; + +import static org.opendaylight.controller.test.sal.binding.it.TestHelper.baseModelBundles; +import static org.opendaylight.controller.test.sal.binding.it.TestHelper.bindingAwareSalBundles; +import static org.opendaylight.controller.test.sal.binding.it.TestHelper.configMinumumBundles; +import static org.opendaylight.controller.test.sal.binding.it.TestHelper.mdSalCoreBundles; +import static org.ops4j.pax.exam.CoreOptions.*; + +@RunWith(PaxExam.class) +public class RouterTest { + + private Logger _logger = LoggerFactory.getLogger(RouterTest.class); + + public static final String ODL = "org.opendaylight.controller"; + public static final String YANG = "org.opendaylight.yangtools"; + public static final String SAMPLE = "org.opendaylight.controller.tests"; + private final URI namespace = URI.create("http://cisco.com/example"); + private final QName QNAME = new QName(namespace, "heartbeat"); + + + @Inject + org.osgi.framework.BundleContext ctx; + + @Inject + @Filter(timeout=60*1000) + Broker broker; + + private ZMQ.Context zmqCtx = ZMQ.context(1); + //private Server router; + //private ExampleProvider provider; + + //@Test + public void testInvokeRpc() throws Exception{ + //Thread.sleep(1000); + //Send announcement + ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class); + Assert.assertNotNull(providerRef); + + ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef); + Assert.assertNotNull(provider); + + ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class); + Assert.assertNotNull(consumerRef); + ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef); + Assert.assertNotNull(consumer); + + + _logger.debug("Provider sends announcement [{}]", "heartbeat"); + provider.announce(QNAME); + ServiceReference routerRef = ctx.getServiceReference(Client.class); + Client router = (Client) ctx.getService(routerRef); + _logger.debug("Found router[{}]", router); + _logger.debug("Invoking RPC [{}]", QNAME); + for (int i = 0; i < 3; i++) { + RpcResult result = router.invokeRpc(QNAME, consumer.getValidCompositeNodeWithOneSimpleChild()); + _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors()); + Assert.assertNotNull(result); + } + } + + @Test + public void testInvokeRpcWithValidSimpleNode() throws Exception{ + //Thread.sleep(1500); + + ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class); + Assert.assertNotNull(providerRef); + ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef); + Assert.assertNotNull(provider); + ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class); + Assert.assertNotNull(consumerRef); + ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef); + Assert.assertNotNull(consumer); + + // Provider sends announcement + _logger.debug("Provider sends announcement [{}]", "heartbeat"); + provider.announce(QNAME); + // Consumer invokes RPC + _logger.debug("Invoking RPC [{}]", QNAME); + CompositeNode input = consumer.getValidCompositeNodeWithOneSimpleChild(); + for (int i = 0; i < 3; i++) { + RpcResult result = consumer.invokeRpc(QNAME, input); + Assert.assertNotNull(result); + _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors()); + Assert.assertTrue(result.isSuccessful()); + Assert.assertNotNull(result.getResult()); + Assert.assertEquals(0, result.getErrors().size()); + Assert.assertEquals(input.getChildren().size()+1, result.getResult().getChildren().size()); + } + } + + @Test + public void testInvokeRpcWithValidSimpleNodes() throws Exception{ + //Thread.sleep(1500); + + ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class); + Assert.assertNotNull(providerRef); + ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef); + Assert.assertNotNull(provider); + ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class); + Assert.assertNotNull(consumerRef); + ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef); + Assert.assertNotNull(consumer); + + // Provider sends announcement + _logger.debug("Provider sends announcement [{}]", "heartbeat"); + provider.announce(QNAME); + // Consumer invokes RPC + _logger.debug("Invoking RPC [{}]", QNAME); + CompositeNode input = consumer.getValidCompositeNodeWithFourSimpleChildren(); + for (int i = 0; i < 3; i++) { + RpcResult result = consumer.invokeRpc(QNAME, input); + Assert.assertNotNull(result); + _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors()); + Assert.assertTrue(result.isSuccessful()); + Assert.assertNotNull(result.getResult()); + Assert.assertEquals(0, result.getErrors().size()); + Assert.assertEquals(input.getChildren().size()+1, result.getResult().getChildren().size()); + } + } + + @Test + public void testInvokeRpcWithValidCompositeNode() throws Exception{ + //Thread.sleep(1500); + + ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class); + Assert.assertNotNull(providerRef); + ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef); + Assert.assertNotNull(provider); + ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class); + Assert.assertNotNull(consumerRef); + ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef); + Assert.assertNotNull(consumer); + + // Provider sends announcement + _logger.debug("Provider sends announcement [{}]", "heartbeat"); + provider.announce(QNAME); + // Consumer invokes RPC + _logger.debug("Invoking RPC [{}]", QNAME); + CompositeNode input = consumer.getValidCompositeNodeWithTwoCompositeChildren(); + for (int i = 0; i < 3; i++) { + RpcResult result = consumer.invokeRpc(QNAME, input); + Assert.assertNotNull(result); + _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors()); + Assert.assertTrue(result.isSuccessful()); + Assert.assertNotNull(result.getResult()); + Assert.assertEquals(0, result.getErrors().size()); + Assert.assertEquals(input.getChildren().size()+1, result.getResult().getChildren().size()); + } + } + + @Test + public void testInvokeRpcWithNullInput() throws Exception{ + //Thread.sleep(1500); + + ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class); + Assert.assertNotNull(providerRef); + ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef); + Assert.assertNotNull(provider); + ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class); + Assert.assertNotNull(consumerRef); + ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef); + Assert.assertNotNull(consumer); + + // Provider sends announcement + _logger.debug("Provider sends announcement [{}]", QNAME.getLocalName()); + provider.announce(QNAME); + // Consumer invokes RPC + _logger.debug("Invoking RPC [{}]", QNAME); + for (int i = 0; i < 3; i++) { + RpcResult result = consumer.invokeRpc(QNAME, null); + Assert.assertNotNull(result); + _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors()); + Assert.assertFalse(result.isSuccessful()); + Assert.assertNull(result.getResult()); + Assert.assertEquals(1, result.getErrors().size()); + Assert.assertEquals(RpcError.ErrorSeverity.WARNING, ((RpcError)result.getErrors().toArray()[0]).getSeverity()); + } + } + + @Test + public void testInvokeRpcWithInvalidSimpleNode() throws Exception{ + //Thread.sleep(1500); + + ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class); + Assert.assertNotNull(providerRef); + ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef); + Assert.assertNotNull(provider); + ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class); + Assert.assertNotNull(consumerRef); + ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef); + Assert.assertNotNull(consumer); + + // Provider sends announcement + _logger.debug("Provider sends announcement [{}]", QNAME.getLocalName()); + provider.announce(QNAME); + // Consumer invokes RPC + _logger.debug("Invoking RPC [{}]", QNAME); + CompositeNode input = consumer.getInvalidCompositeNodeSimpleChild(); + for (int i = 0; i < 3; i++) { + RpcResult result = consumer.invokeRpc(QNAME, input); + Assert.assertNotNull(result); + _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors()); + Assert.assertFalse(result.isSuccessful()); + Assert.assertNull(result.getResult()); + Assert.assertEquals(1, result.getErrors().size()); + Assert.assertEquals(RpcError.ErrorSeverity.ERROR, ((RpcError)result.getErrors().toArray()[0]).getSeverity()); + } + } + + @Test + public void testInvokeRpcWithInvalidCompositeNode() throws Exception{ + //Thread.sleep(1500); + + ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class); + Assert.assertNotNull(providerRef); + ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef); + Assert.assertNotNull(provider); + ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class); + Assert.assertNotNull(consumerRef); + ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef); + Assert.assertNotNull(consumer); + + // Provider sends announcement + _logger.debug("Provider sends announcement [{}]", QNAME.getLocalName()); + provider.announce(QNAME); + // Consumer invokes RPC + _logger.debug("Invoking RPC [{}]", QNAME); + CompositeNode input = consumer.getInvalidCompositeNodeCompositeChild(); + for (int i = 0; i < 3; i++) { + RpcResult result = consumer.invokeRpc(QNAME, input); + Assert.assertNotNull(result); + _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors()); + Assert.assertFalse(result.isSuccessful()); + Assert.assertNull(result.getResult()); + Assert.assertEquals(1, result.getErrors().size()); + Assert.assertEquals(RpcError.ErrorSeverity.ERROR, ((RpcError)result.getErrors().toArray()[0]).getSeverity()); + } + } + + //@Test + // This method is UNTESTED -- need to get around the bundling issues before I know if this even work +// public void testInvokeRpcWithValidCompositeNode() throws Exception{ +// Thread.sleep(10000); +// //Send announcement +// ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class); +// Assert.assertNotNull(providerRef); +// +// ExampleProvider provider = (ExampleProvider)ctx.getService(providerRef); +// Assert.assertNotNull(provider); +// +// ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class); +// Assert.assertNotNull(consumerRef); +// +// ExampleConsumer consumer = (ExampleConsumer)ctx.getService(consumerRef); +// Assert.assertNotNull(consumer); +// +// _logger.debug("Provider sends announcement [{}]", "heartbeat"); +// provider.announce(QNAME); +// ServiceReference routerRef = ctx.getServiceReference(Client.class); +// Client router = (Client) ctx.getService(routerRef); +// _logger.debug("Found router[{}]", router); +// _logger.debug("Invoking RPC [{}]", QNAME); +// for (int i = 0; i < 3; i++) { +// RpcResult result = router.getInstance().invokeRpc(QNAME, consumer.getValidCompositeNodeWithOneSimpleChild()); +// _logger.debug("{}-> Result is: Successful:[{}], Payload:[{}], Errors: [{}]", i, result.isSuccessful(), result.getResult(), result.getErrors()); +// Assert.assertNotNull(result); +// } +// } + + private Message send(Message msg) throws IOException { + ZMQ.Socket reqSocket = zmqCtx.socket(ZMQ.REQ); + reqSocket.connect("tcp://localhost:5555"); + reqSocket.send(Message.serialize(msg)); + Message response = parseMessage(reqSocket); + + return response; + } + + /** + * @param socket + * @return + */ + private Message parseMessage(ZMQ.Socket socket) { + + Message msg = null; + try { + byte[] bytes = socket.recv(); + _logger.debug("Received bytes:[{}]", bytes.length); + msg = (Message) Message.deserialize(bytes); + } catch (Throwable t) { + t.printStackTrace(); + } + return msg; + } + + + private void printState(){ + Bundle[] b = ctx.getBundles(); + _logger.debug("\n\nNumber of bundles [{}]\n\n]", b.length); + for (int i=0;i + + + prefix:schema-service-singleton + yang-schema-service + + + prefix:hash-map-data-store + hash-map-data-store + + + prefix:dom-broker-impl + dom-broker + + dom:dom-data-store + ref_hash-map-data-store + + + + prefix:binding-broker-impl + binding-broker-impl + + binding:binding-notification-service + ref_binding-notification-broker + + + binding:binding-data-broker + ref_binding-data-broker + + + + prefix:runtime-generated-mapping + runtime-mapping-singleton + + + prefix:binding-notification-broker + binding-notification-broker + + + prefix:binding-data-broker + binding-data-broker + + dom:dom-broker-osgi-registry + ref_dom-broker + + + binding:binding-dom-mapping-service + ref_runtime-mapping-singleton + + + + prefix:remote-zeromq-rpc-server + remoter + 5666 + + prefix:dom-broker-osgi-registry + ref_dom-broker + + + + + + dom:schema-service + + ref_yang-schema-service + /config/modules/module[name='schema-service-singleton']/instance[name='yang-schema-service'] + + + + binding:binding-notification-service + + ref_binding-notification-broker + /config/modules/module[name='binding-notification-broker']/instance[name='binding-notification-broker'] + + + + dom:dom-data-store + + ref_hash-map-data-store + /config/modules/module[name='hash-map-data-store']/instance[name='hash-map-data-store'] + + + + binding:binding-broker-osgi-registry + + ref_binding-broker-impl + /config/modules/module[name='binding-broker-impl']/instance[name='binding-broker-impl'] + + + + binding-impl:binding-dom-mapping-service + + ref_runtime-mapping-singleton + /config/modules/module[name='runtime-generated-mapping']/instance[name='runtime-mapping-singleton'] + + + + dom:dom-broker-osgi-registry + + ref_dom-broker + /config/modules/module[name='dom-broker-impl']/instance[name='dom-broker'] + + + + binding:binding-data-broker + + ref_binding-data-broker + /config/modules/module[name='binding-data-broker']/instance[name='binding-data-broker'] + + + + + + +//END OF SNAPSHOT +urn:opendaylight:params:xml:ns:yang:controller:config?module=config&revision=2013-04-05 +urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl?module=opendaylight-sal-binding-broker-impl&revision=2013-10-28 +urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding?module=opendaylight-md-sal-binding&revision=2013-10-28 +urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl?module=opendaylight-sal-dom-broker-impl&revision=2013-10-28 +urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom?module=opendaylight-md-sal-dom&revision=2013-10-28 +urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc?module=odl-sal-dom-rpc-remote-cfg&revision=2013-10-28 +//END OF CONFIG diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/resources/logback.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/resources/logback.xml new file mode 100644 index 0000000000..1d17796373 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/pom.xml new file mode 100644 index 0000000000..dd7e36cfb4 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/pom.xml @@ -0,0 +1,109 @@ + + + 4.0.0 + + sal-remoterpc-connector-test-parent + org.opendaylight.controller.tests + 1.0-SNAPSHOT + + + sal-remoterpc-connector-test-nb + bundle + + + + + org.apache.felix + maven-bundle-plugin + ${bundle.plugin.version} + true + + + + + + com.sun.jersey.spi.container.servlet, + org.codehaus.jackson.annotate, + javax.ws.rs, + javax.ws.rs.core, + javax.xml.bind, + javax.xml.bind.annotation, + org.slf4j, + org.apache.catalina.filters, + org.codehaus.jackson.jaxrs, + org.opendaylight.controller.sample.zeromq.provider, + org.opendaylight.controller.sample.zeromq.consumer, + org.opendaylight.controller.sal.utils, + org.opendaylight.yangtools.yang.common, + org.opendaylight.controller.sal.connector.api, + org.opendaylight.controller.sal.connector.remoterpc.api;version="[0.4,1)", + org.opendaylight.controller.sal.connector.remoterpc.impl;version="[0.4,1)", + org.opendaylight.controller.sal.connector.remoterpc.dto, + org.opendaylight.controller.sal.connector.remoterpc.util, + org.osgi.framework, + com.google.common.base, + org.opendaylight.yangtools.yang.data.api, + !org.codehaus.enunciate.jaxrs + + + /controller/nb/v2/zmqnb + ,${classes;ANNOTATION;javax.ws.rs.Path} + + ${project.basedir}/src/main/resources/META-INF + + + + + + + org.opendaylight.controller + containermanager + 0.5.1-SNAPSHOT + + + org.opendaylight.controller + commons.northbound + 0.4.1-SNAPSHOT + + + org.opendaylight.controller + sal + 0.5.1-SNAPSHOT + + + org.opendaylight.controller.tests + sal-remoterpc-connector-test-provider + 1.0-SNAPSHOT + + + org.opendaylight.controller.tests + sal-remoterpc-connector-test-consumer + 1.0-SNAPSHOT + + + org.opendaylight.controller + sal-remoterpc-connector + 1.0-SNAPSHOT + + + org.osgi + org.osgi.core + 5.0.0 + + + junit + junit + + + org.opendaylight.controller + zeromq-routingtable.implementation + 0.4.1-SNAPSHOT + + + com.google.guava + guava + + + + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/java/org/opendaylight/controller/tests/zmqrouter/rest/Router.java b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/java/org/opendaylight/controller/tests/zmqrouter/rest/Router.java new file mode 100644 index 0000000000..6c9ec4e788 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/java/org/opendaylight/controller/tests/zmqrouter/rest/Router.java @@ -0,0 +1,246 @@ +package org.opendaylight.controller.tests.zmqrouter.rest; + +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException; +import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException; +import org.opendaylight.controller.sal.connector.remoterpc.dto.CompositeNodeImpl; +import org.opendaylight.controller.sal.connector.remoterpc.impl.RoutingTableImpl; +import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; +import org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer; +import org.opendaylight.controller.sample.zeromq.provider.ExampleProvider; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.osgi.framework.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import java.io.Serializable; +import java.net.URI; +import java.util.Set; + +@Path("router") +public class Router { + private Logger _logger = LoggerFactory.getLogger(Router.class); + private final URI namespace = URI.create("http://cisco.com/example"); + private final QName QNAME = new QName(namespace, "heartbeat"); + + + @GET + @Path("/hello") + @Produces(MediaType.TEXT_PLAIN) + public String hello() { + return "Hello"; + } + + @GET + @Path("/announce") + @Produces(MediaType.TEXT_PLAIN) + public String announce() { + _logger.info("Announce request received"); + + BundleContext ctx = getBundleContext(); + ServiceReference providerRef = ctx.getServiceReference(ExampleProvider.class); + if (providerRef == null) { + _logger.debug("Could not get provider reference"); + return "Could not get provider reference"; + } + + ExampleProvider provider = (ExampleProvider) ctx.getService(providerRef); + if (provider == null) { + _logger.info("Could not get provider service"); + return "Could not get provider service"; + } + + provider.announce(QNAME); + return "Announcement sent "; + + } + + @GET + @Path("/rpc") + @Produces(MediaType.TEXT_PLAIN) + public String invokeRpc() throws Exception { + _logger.info("Invoking RPC"); + + ExampleConsumer consumer = getConsumer(); + RpcResult result = consumer.invokeRpc(QNAME, new CompositeNodeImpl()); + _logger.info("Result [{}]", result.isSuccessful()); + + return stringify(result); + } + + @GET + @Path("/rpc-success") + @Produces(MediaType.TEXT_PLAIN) + public String invokeRpcSuccess() throws Exception { + ExampleConsumer consumer = getConsumer(); + RpcResult result = consumer.invokeRpc(QNAME, consumer.getValidCompositeNodeWithFourSimpleChildren()); //TODO: Change this + _logger.info("Result [{}]", result.isSuccessful()); + + return stringify(result); + } + + @GET + @Path("/rpc-failure") + @Produces(MediaType.TEXT_PLAIN) + public String invokeRpcFailure() throws Exception { + ExampleConsumer consumer = getConsumer(); + //RpcResult result = consumer.invokeRpc(QNAME, consumer.getInvalidCompositeNodeCompositeChild()); //TODO: Change this + RpcResult result = consumer.invokeRpc(QNAME, null); //TODO: Change this + _logger.info("Result [{}]", result.isSuccessful()); + + return stringify(result); + } + + @GET + @Path("/routingtable") + @Produces(MediaType.TEXT_PLAIN) + public String invokeRoutingTable() { + _logger.info("Invoking adding an entry in routing table"); + + BundleContext ctx = getBundleContext(); + ServiceReference routingTableServiceReference = ctx.getServiceReference(RoutingTable.class); + if (routingTableServiceReference == null) { + _logger.debug("Could not get routing table impl reference"); + return "Could not get routingtable referen "; + } + RoutingTable routingTable = (RoutingTableImpl) ctx.getService(routingTableServiceReference); + if (routingTable == null) { + _logger.info("Could not get routing table service"); + return "Could not get routing table service"; + } + + + RoutingIdentifierImpl rii = new RoutingIdentifierImpl(); + try { + routingTable.addGlobalRoute(rii.toString(), "172.27.12.1:5000"); + } catch (RoutingTableException e) { + _logger.error("error in adding routing identifier" + e.getMessage()); + + } catch (SystemException e) { + _logger.error("error in adding routing identifier" + e.getMessage()); + } + + Set routes = routingTable.getRoutes(rii.toString()); + + StringBuilder stringBuilder = new StringBuilder(); + for (String route : routes) { + stringBuilder.append(route); + } + + _logger.info("Result [{}] routes added for route" + rii + stringBuilder.toString()); + + return stringBuilder.toString(); + } + + @GET + @Path("/routingtabledelete") + @Produces(MediaType.TEXT_PLAIN) + public String invokeDeleteRoutingTable() { + _logger.info("Invoking adding an entry in routing table"); + + BundleContext ctx = getBundleContext(); + ServiceReference routingTableServiceReference = ctx.getServiceReference(RoutingTable.class); + if (routingTableServiceReference == null) { + _logger.debug("Could not get routing table impl reference"); + return "Could not get routingtable referen "; + } + RoutingTable routingTable = (RoutingTableImpl) ctx.getService(routingTableServiceReference); + if (routingTable == null) { + _logger.info("Could not get routing table service"); + return "Could not get routing table service"; + } + + + RoutingIdentifierImpl rii = new RoutingIdentifierImpl(); + try { + routingTable.removeGlobalRoute(rii.toString()); + } catch (RoutingTableException e) { + _logger.error("error in adding routing identifier" + e.getMessage()); + + } catch (SystemException e) { + _logger.error("error in adding routing identifier" + e.getMessage()); + } + + Set routes = routingTable.getRoutes(rii.toString()); + + StringBuilder stringBuilder = new StringBuilder(); + if (routes != null) { + for (String route : routes) { + stringBuilder.append(route); + } + } else { + stringBuilder.append(" successfully"); + } + + _logger.info("Result [{}] routes removed for route" + rii + stringBuilder.toString()); + + return stringBuilder.toString(); + } + + private String stringify(RpcResult result) { + CompositeNode node = result.getResult(); + StringBuilder builder = new StringBuilder("result:").append(XmlUtils.compositeNodeToXml(node)).append("\n") + .append("error:").append(result.getErrors()).append("\n"); + + return builder.toString(); + } + + private BundleContext getBundleContext() { + ClassLoader tlcl = Thread.currentThread().getContextClassLoader(); + Bundle bundle = null; + + if (tlcl instanceof BundleReference) { + bundle = ((BundleReference) tlcl).getBundle(); + } else { + _logger.info("Unable to determine the bundle context based on " + + "thread context classloader."); + bundle = FrameworkUtil.getBundle(this.getClass()); + } + return (bundle == null ? null : bundle.getBundleContext()); + } + + private ExampleConsumer getConsumer() { + BundleContext ctx = getBundleContext(); + ServiceReference consumerRef = ctx.getServiceReference(ExampleConsumer.class); + if (consumerRef == null) { + _logger.debug("Could not get consumer reference"); + throw new NullPointerException("Could not get consumer reference"); + } + ExampleConsumer consumer = (ExampleConsumer) ctx.getService(consumerRef); + if (consumer == null) { + _logger.info("Could not get consumer service"); + throw new NullPointerException("Could not get consumer service"); + } + return consumer; + } + + class RoutingIdentifierImpl implements RpcRouter.RouteIdentifier, Serializable { + + private final URI namespace = URI.create("http://cisco.com/example"); + private final QName QNAME = new QName(namespace, "global"); + private final QName instance = new QName(URI.create("127.0.0.1"), "local"); + + @Override + public QName getContext() { + return QNAME; + } + + @Override + public QName getType() { + return QNAME; + } + + @Override + public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier getRoute() { + return InstanceIdentifier.of(instance); + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/resources/WEB-INF/web.xml b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/resources/WEB-INF/web.xml new file mode 100644 index 0000000000..5bd21398fb --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/resources/WEB-INF/web.xml @@ -0,0 +1,58 @@ + + + + JAXRSZmq + com.sun.jersey.spi.container.servlet.ServletContainer + + javax.ws.rs.Application + org.opendaylight.controller.northbound.commons.NorthboundApplication + + 1 + + + + JAXRSZmq + /* + + + + + + + NB api + /* + POST + GET + PUT + PATCH + DELETE + HEAD + + + System-Admin + Network-Admin + Network-Operator + Container-User + + + + + System-Admin + + + Network-Admin + + + Network-Operator + + + Container-User + + + + BASIC + opendaylight + + diff --git a/opendaylight/md-sal/sal-zeromq-connector/pom.xml b/opendaylight/md-sal/sal-zeromq-connector/pom.xml deleted file mode 100644 index 7859908768..0000000000 --- a/opendaylight/md-sal/sal-zeromq-connector/pom.xml +++ /dev/null @@ -1,149 +0,0 @@ - - - 4.0.0 - - org.opendaylight.controller - sal-parent - 1.0-SNAPSHOT - - - sal-zeromq-connector - bundle - - - 2.10.3 - - - - - - org.apache.felix - maven-bundle-plugin - true - - - - org.opendaylight.controller.sal.connector.api, - org.opendaylight.controller.sal.core.api, - org.opendaylight.yangtools.concepts;version="[0.1,1)", - org.opendaylight.yangtools.yang.common;version="[0.5,1)", - org.opendaylight.yangtools.yang.data.api;version="[0.5,1)", - org.zeromq;version="[0.3,1)" - - org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Activator - - - - - - net.alchim31.maven - scala-maven-plugin - 3.1.6 - - incremental - - -target:jvm-1.7 - - - -source1.7 - -target1.7 - - - - - scala-compile - - compile - - - - scala-test-compile - - testCompile - - - - - - - maven-compiler-plugin - - - default-compile - none - - - default-testCompile - none - - - - - - - - - org.scala-lang - scala-library - ${scala.version} - - - - org.opendaylight.controller - containermanager - 0.5.1-SNAPSHOT - - - org.opendaylight.controller - commons.northbound - 0.4.1-SNAPSHOT - - - org.opendaylight.controller - sal - 0.5.1-SNAPSHOT - - - org.opendaylight.yangtools - yang-binding - - - org.opendaylight.yangtools - yang-common - - - org.opendaylight.controller - sal-connector-api - - - org.opendaylight.controller - sal-common-util - 1.0-SNAPSHOT - - - - junit - junit - - - org.jeromq - jeromq - 0.3.0-SNAPSHOT - - - - - - sonatype-nexus-snapshots - https://oss.sonatype.org/content/repositories/snapshots - - false - - - true - - - - - diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChange.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChange.java deleted file mode 100644 index ba90f3705f..0000000000 --- a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RouteChange.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.sal.connector.remoterpc.api; - -import java.util.Map; -import java.util.Set; - -public interface RouteChange { - - Map> getRemovals(); - Map> getAnnouncements(); -} diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Activator.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Activator.java deleted file mode 100644 index 5b927a56b1..0000000000 --- a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/Activator.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq; - -import org.opendaylight.controller.sal.core.api.AbstractProvider; -import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; -import org.osgi.framework.BundleContext; - -public class Activator extends AbstractProvider { - - ZeroMqRpcRouter router; - - @Override - public void onSessionInitiated(ProviderSession session) { - router = ZeroMqRpcRouter.getInstance(); - router.setBrokerSession(session); - router.start(); - } - - @Override - protected void stopImpl(BundleContext context) { - router.stop(); - } - -} diff --git a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/ZeroMqRpcRouter.java b/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/ZeroMqRpcRouter.java deleted file mode 100644 index af94804322..0000000000 --- a/opendaylight/md-sal/sal-zeromq-connector/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/router/zeromq/ZeroMqRpcRouter.java +++ /dev/null @@ -1,450 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.sal.connector.remoterpc.router.zeromq; - -import java.io.IOException; -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.util.Collection; -import java.util.Collections; -import java.util.Enumeration; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.sal.connector.remoterpc.router.zeromq.Message.MessageType; -import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; -import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; -import org.opendaylight.controller.sal.core.api.RpcImplementation; -import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.zeromq.ZMQ; - -/** - * ZeroMq based implementation of RpcRouter - * TODO: - * 1. Make it multi VM aware - * 2. Make rpc request handling async and non-blocking. Note zmq socket is not thread safe - * 3. sendRpc() should use connection pooling - * 4. Read properties from config file using existing(?) ODL properties framework - */ -public class ZeroMqRpcRouter implements RpcRouter { - - private ExecutorService serverPool; - private static ExecutorService handlersPool; - - private Map, String> routingTable; - - private ProviderSession brokerSession; - - private ZMQ.Context context; - private ZMQ.Socket publisher; - private ZMQ.Socket subscriber; - private ZMQ.Socket replySocket; - - private static ZeroMqRpcRouter _instance = new ZeroMqRpcRouter(); - - private final RpcFacade facade = new RpcFacade(); - private final RpcListener listener = new RpcListener(); - - private final String localIp = getLocalIpAddress(); - - private String pubPort = System.getProperty("pub.port");// port on which announcements are sent - private String subPort = System.getProperty("sub.port");// other controller's pub port - private String pubIp = System.getProperty("pub.ip"); // other controller's ip - private String rpcPort = System.getProperty("rpc.port");// port on which RPC messages are received - - private Logger _logger = LoggerFactory.getLogger(ZeroMqRpcRouter.class); - - //Prevent instantiation - private ZeroMqRpcRouter() { - } - - public static ZeroMqRpcRouter getInstance() { - return _instance; - } - - public void start() { - context = ZMQ.context(2); - publisher = context.socket(ZMQ.PUB); - int ret = publisher.bind("tcp://*:" + pubPort); - // serverPool = Executors.newSingleThreadExecutor(); - serverPool = Executors.newCachedThreadPool(); - handlersPool = Executors.newCachedThreadPool(); - routingTable = new ConcurrentHashMap, String>(); - - // Start listening for announce and rpc messages - serverPool.execute(receive()); - - brokerSession.addRpcRegistrationListener(listener); - - Set currentlySupported = brokerSession.getSupportedRpcs(); - for (QName rpc : currentlySupported) { - listener.onRpcImplementationAdded(rpc); - } - - } - - public void stop() { - if (handlersPool != null) - handlersPool.shutdown(); - if (serverPool != null) - serverPool.shutdown(); - if (publisher != null) { - publisher.setLinger(0); - publisher.close(); - } - if (replySocket != null) { - replySocket.setLinger(0); - replySocket.close(); - } - if (subscriber != null) { - subscriber.setLinger(0); - subscriber.close(); - } - if (context != null) - context.term(); - - } - - private Runnable receive() { - return new Runnable() { - public void run() { - try { - // Bind to RPC reply socket - replySocket = context.socket(ZMQ.REP); - replySocket.bind("tcp://*:" + rpcPort); - - // Bind to publishing controller - subscriber = context.socket(ZMQ.SUB); - String pubAddress = "tcp://" + pubIp + ":" + subPort; - subscriber.connect(pubAddress); - _logger.debug("{} Subscribing at[{}]", Thread.currentThread().getName(), pubAddress); - - //subscribe for announcements - //TODO: Message type would be changed. Update this - subscriber.subscribe(Message.serialize(Message.MessageType.ANNOUNCE)); - - // Poller enables listening on multiple sockets using a single thread - ZMQ.Poller poller = new ZMQ.Poller(2); - poller.register(replySocket, ZMQ.Poller.POLLIN); - poller.register(subscriber, ZMQ.Poller.POLLIN); - - //TODO: Add code to restart the thread after exception - while (!Thread.currentThread().isInterrupted()) { - - poller.poll(); - - if (poller.pollin(0)) { - handleRpcCall(); - } - if (poller.pollin(1)) { - handleAnnouncement(); - } - } - } catch (Exception e) { - e.printStackTrace(); - } - replySocket.setLinger(0); - replySocket.close(); - subscriber.setLinger(0); - subscriber.close(); - } - }; - } - - /** - * @throws IOException - * @throws ClassNotFoundException - */ - private void handleAnnouncement() throws IOException, ClassNotFoundException { - - _logger.info("Announcement received"); - Message.MessageType topic = (MessageType) Message.deserialize(subscriber.recv()); - - if (subscriber.hasReceiveMore()) { - try { - Message m = (Message) Message.deserialize(subscriber.recv()); - _logger.debug("Announcement message [{}]", m); - - // TODO: check on msg type or topic. Both - // should be same. Need to normalize. - if (Message.MessageType.ANNOUNCE == m.getType()) - updateRoutingTable(m); - } catch (IOException | ClassNotFoundException e) { - e.printStackTrace(); - } - } - - } - - /** - * @throws InterruptedException - * @throws ExecutionException - */ - private void handleRpcCall() throws InterruptedException, ExecutionException { - try { - Message request = parseMessage(replySocket); - - _logger.debug("Received rpc request [{}]", request); - - // Call broker to process the message then reply - Future> rpc = brokerSession.rpc( - (QName) request.getRoute().getType(), (CompositeNode) request.getPayload()); - - RpcResult result = rpc.get(); - - Message response = new Message.MessageBuilder() - .type(MessageType.RESPONSE) - .sender(localIp + ":" + rpcPort) - .route(request.getRoute()) - //.payload(result) TODO: enable and test - .build(); - - replySocket.send(Message.serialize(response)); - - _logger.debug("Sent rpc response [{}]", response); - - } catch (IOException ex) { - //TODO: handle exception and send error codes to caller - ex.printStackTrace(); - } - } - - - @Override - public Future> sendRpc( - final RpcRequest input) { - - return handlersPool.submit(new Callable>() { - - @Override - public RpcReply call() { - ZMQ.Socket requestSocket = context.socket(ZMQ.REQ); - - // TODO pick the ip and port from routing table based on routing identifier - requestSocket.connect("tcp://" + pubIp + ":5554"); - - Message requestMessage = new Message.MessageBuilder() - .type(MessageType.REQUEST) - .sender(localIp + ":" + rpcPort) - .route(input.getRoutingInformation()) - .payload(input.getPayload()) - .build(); - - _logger.debug("Sending rpc request [{}]", requestMessage); - - RpcReply reply = null; - - try { - - requestSocket.send(Message.serialize(requestMessage)); - final Message response = parseMessage(requestSocket); - - _logger.debug("Received response [{}]", response); - - reply = new RpcReply() { - - @Override - public Object getPayload() { - return response.getPayload(); - } - }; - } catch (IOException ex) { - // TODO: Pass exception back to the caller - ex.printStackTrace(); - } - - return reply; - } - }); - } - - /** - * TODO: Remove this implementation and use RoutingTable implementation to send announcements - * Publishes a notice to other controllers in the cluster - * - * @param notice - */ - public void publish(final Message notice) { - Runnable task = new Runnable() { - public void run() { - - try { - - publisher.sendMore(Message.serialize(Message.MessageType.ANNOUNCE)); - publisher.send(Message.serialize(notice)); - _logger.debug("Announcement sent [{}]", notice); - } catch (IOException ex) { - _logger.error("Error in sending announcement [{}]", notice); - ex.printStackTrace(); - } - } - }; - handlersPool.execute(task); - } - - /** - * Finds IPv4 address of the local VM - * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which - * address will be returned. Read IP from a property file or enhance the code to make it deterministic. - * Should we use IP or hostname? - * - * @return - */ - private String getLocalIpAddress() { - String hostAddress = null; - Enumeration e = null; - try { - e = NetworkInterface.getNetworkInterfaces(); - } catch (SocketException e1) { - e1.printStackTrace(); - } - while (e.hasMoreElements()) { - - NetworkInterface n = (NetworkInterface) e.nextElement(); - - Enumeration ee = n.getInetAddresses(); - while (ee.hasMoreElements()) { - InetAddress i = (InetAddress) ee.nextElement(); - if ((i instanceof Inet4Address) && (i.isSiteLocalAddress())) - hostAddress = i.getHostAddress(); - } - } - return hostAddress; - - } - - /** - * TODO: Change to use external routing table implementation - * - * @param msg - */ - private void updateRoutingTable(Message msg) { - routingTable.put(msg.getRoute(), msg.getSender()); - RpcRouter.RouteIdentifier route = msg.getRoute(); - - // Currently only registers rpc implementation. - // TODO: do registration for instance based routing - QName rpcType = route.getType(); - RpcRegistration registration = brokerSession.addRpcImplementation(rpcType, facade); - _logger.debug("Routing table updated"); - } - - /** - * @param socket - * @return - */ - private Message parseMessage(ZMQ.Socket socket) { - - Message msg = null; - try { - byte[] bytes = socket.recv(); - _logger.debug("Received bytes:[{}]", bytes.length); - msg = (Message) Message.deserialize(bytes); - } catch (Throwable t) { - t.printStackTrace(); - } - return msg; - } - - private class RpcFacade implements RpcImplementation { - - @Override - public Set getSupportedRpcs() { - return Collections.emptySet(); - } - - @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { - - RouteIdentifierImpl routeId = new RouteIdentifierImpl(); - routeId.setType(rpc); - - RpcRequestImpl request = new RpcRequestImpl(); - request.setRouteIdentifier(routeId); - request.setPayload(input); - - final Future> ret = sendRpc(request); - - //TODO: Review result handling - RpcResult result = new RpcResult() { - @Override - public boolean isSuccessful() { - try { - ret.get(); - } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); - return false; - } - return true; - } - - @Override - public CompositeNode getResult() { - return null; - } - - @Override - public Collection getErrors() { - return Collections.EMPTY_LIST; - } - }; - return result; - } - } - - /** - * Listener for rpc registrations - */ - private class RpcListener implements RpcRegistrationListener { - - @Override - public void onRpcImplementationAdded(QName name) { - - _logger.debug("Announcing registration for [{}]", name); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(); - routeId.setType(name); - - //TODO: Make notice immutable and change message type - Message notice = new Message.MessageBuilder() - .type(MessageType.ANNOUNCE) - .sender("tcp://" + localIp + ":" + rpcPort) - .route(routeId) - .build(); - - publish(notice); - } - - @Override - public void onRpcImplementationRemoved(QName name) { - // TODO: send a rpc-deregistrtation notice - - } - } - - public void setBrokerSession(ProviderSession session) { - this.brokerSession = session; - - } - -} diff --git a/opendaylight/md-sal/test/pom.xml b/opendaylight/md-sal/test/pom.xml deleted file mode 100644 index f9e500ea2b..0000000000 --- a/opendaylight/md-sal/test/pom.xml +++ /dev/null @@ -1,24 +0,0 @@ - - 4.0.0 - - sal-parent - 1.0-SNAPSHOT - org.opendaylight.controller - - pom - org.opendaylight.controller.tests - sal-test-parent - - scm:git:ssh://git.opendaylight.org:29418/controller.git - scm:git:ssh://git.opendaylight.org:29418/controller.git - https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL - - - - zeromq-test-consumer - zeromq-test-it - zeromq-test-provider - - - diff --git a/opendaylight/md-sal/test/zeromq-test-consumer/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java b/opendaylight/md-sal/test/zeromq-test-consumer/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java deleted file mode 100644 index a56a7dedff..0000000000 --- a/opendaylight/md-sal/test/zeromq-test-consumer/src/main/java/org/opendaylight/controller/sample/zeromq/consumer/ExampleConsumer.java +++ /dev/null @@ -1,51 +0,0 @@ -package org.opendaylight.controller.sample.zeromq.consumer; - -import java.net.URI; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.opendaylight.controller.sal.core.api.AbstractConsumer; -import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.osgi.framework.BundleContext; - -public class ExampleConsumer extends AbstractConsumer { - - private final URI namespace = URI.create("http://cisco.com/example"); - private final QName QNAME = new QName(namespace,"heartbeat"); - - ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); - private ConsumerSession session; - - - @Override - public void onSessionInitiated(ConsumerSession session) { - this.session = session; - executor.scheduleAtFixedRate(new Runnable() { - - @Override - public void run() { - int count = 0; - try { - Future> future = ExampleConsumer.this.session.rpc(QNAME, null); - RpcResult result = future.get(); - System.out.println("Result received. Status is :" + result.isSuccessful()); - } catch (Exception e) { - e.printStackTrace(); - } - - } - }, 0, 10, TimeUnit.SECONDS); - } - - @Override - protected void stopImpl(BundleContext context) { - // TODO Auto-generated method stub - super.stopImpl(context); - executor.shutdown(); - } -} diff --git a/opendaylight/md-sal/test/zeromq-test-it/pom.xml b/opendaylight/md-sal/test/zeromq-test-it/pom.xml deleted file mode 100644 index 56945d1d34..0000000000 --- a/opendaylight/md-sal/test/zeromq-test-it/pom.xml +++ /dev/null @@ -1,184 +0,0 @@ - - 4.0.0 - - sal-test-parent - org.opendaylight.controller.tests - 1.0-SNAPSHOT - - zeromq-test-it - - scm:git:ssh://git.opendaylight.org:29418/controller.git - scm:git:ssh://git.opendaylight.org:29418/controller.git - https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL - - - - 3.0.0 - 1.5.0 - - - - - - org.ops4j.pax.exam - maven-paxexam-plugin - 1.2.4 - - - generate-config - - generate-depends-file - - - - - - - - - - org.eclipse.m2e - lifecycle-mapping - 1.0.0 - - - - - - - org.ops4j.pax.exam - - - maven-paxexam-plugin - - - [1.2.4,) - - - - generate-depends-file - - - - - - - - - - - - - - - - - - org.opendaylight.yangtools.thirdparty - xtend-lib-osgi - 2.4.3 - - - org.opendaylight.controller.tests - zeromq-test-provider - 1.0-SNAPSHOT - - - org.opendaylight.controller.tests - zeromq-test-consumer - 1.0-SNAPSHOT - - - org.opendaylight.controller - sal-broker-impl - 1.0-SNAPSHOT - - - org.ops4j.pax.exam - pax-exam-container-native - ${exam.version} - test - - - org.ops4j.pax.exam - pax-exam-junit4 - ${exam.version} - test - - - org.ops4j.pax.exam - pax-exam-link-mvn - ${exam.version} - test - - - equinoxSDK381 - org.eclipse.osgi - 3.8.1.v20120830-144521 - test - - - org.slf4j - log4j-over-slf4j - 1.7.2 - - - ch.qos.logback - logback-core - 1.0.9 - - - ch.qos.logback - logback-classic - 1.0.9 - - - org.opendaylight.controller - sal-binding-api - 1.0-SNAPSHOT - - - org.opendaylight.controller - sal-common-util - 1.0-SNAPSHOT - - - org.opendaylight.controller - sal-core-api - 1.0-SNAPSHOT - - - - - org.opendaylight.controller - containermanager - 0.5.1-SNAPSHOT - - - - org.opendaylight.controller - sal - 0.5.1-SNAPSHOT - - - org.opendaylight.yangtools - yang-binding - - - org.opendaylight.yangtools - yang-common - - - org.opendaylight.yangtools - yang-data-api - - - - org.opendaylight.controller - sal-common-util - 1.0-SNAPSHOT - - - diff --git a/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceConsumerController.java b/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceConsumerController.java deleted file mode 100644 index c17b143d70..0000000000 --- a/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceConsumerController.java +++ /dev/null @@ -1,75 +0,0 @@ -package org.opendaylight.controller.sample.zeromq.test.it; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.ops4j.pax.exam.Configuration; -import org.ops4j.pax.exam.Option; -import org.ops4j.pax.exam.junit.PaxExam; -import org.osgi.framework.BundleContext; - -import javax.inject.Inject; - -import static org.junit.Assert.assertTrue; -import static org.ops4j.pax.exam.CoreOptions.*; - -@RunWith(PaxExam.class) -public class ServiceConsumerController { - - public static final String ODL = "org.opendaylight.controller"; - public static final String YANG = "org.opendaylight.yangtools"; - public static final String SAMPLE = "org.opendaylight.controller.samples"; - - @Test - public void properInitialized() throws Exception { - - Thread.sleep(30000); // Waiting for services to get wired. - assertTrue(true); - //assertTrue(consumer.createToast(WhiteBread.class, 5)); - - } - -// @Inject -// BindingAwareBroker broker; - -// @Inject -// ToastConsumer consumer; - - @Inject - BundleContext ctx; - - @Configuration - public Option[] config() { - return options(systemProperty("osgi.console").value("2401"), - systemProperty("pub.port").value("5557"), - systemProperty("sub.port").value("5556"), - systemProperty("rpc.port").value("5555"), - systemProperty("pub.ip").value("localhost"), - mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), // - mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), // - mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), // - mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), // - - //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), // - mavenBundle(ODL, "sal-common").versionAsInProject(), // - mavenBundle(ODL, "sal-common-api").versionAsInProject(),// - mavenBundle(ODL, "sal-common-impl").versionAsInProject(), // - mavenBundle(ODL, "sal-common-util").versionAsInProject(), // - mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), // - mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), // - mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), // - mavenBundle(ODL, "sal-connector-api").versionAsInProject(), // - mavenBundle(SAMPLE, "zeromq-test-consumer").versionAsInProject(), // - mavenBundle(ODL, "sal-zeromq-connector").versionAsInProject(), // - mavenBundle(YANG, "concepts").versionAsInProject(), - mavenBundle(YANG, "yang-binding").versionAsInProject(), // - mavenBundle(YANG, "yang-common").versionAsInProject(), // - mavenBundle(YANG, "yang-data-api").versionAsInProject(), // - mavenBundle(YANG, "yang-model-api").versionAsInProject(), // - mavenBundle(YANG+".thirdparty", "xtend-lib-osgi").versionAsInProject(), // - mavenBundle("com.google.guava", "guava").versionAsInProject(), // - mavenBundle("org.jeromq", "jeromq").versionAsInProject(), - junitBundles() - ); - } - -} diff --git a/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceProviderController.java b/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceProviderController.java deleted file mode 100644 index 2d28b0b5b5..0000000000 --- a/opendaylight/md-sal/test/zeromq-test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/ServiceProviderController.java +++ /dev/null @@ -1,86 +0,0 @@ -package org.opendaylight.controller.sample.zeromq.test.it; - -import static org.junit.Assert.*; -import static org.ops4j.pax.exam.CoreOptions.junitBundles; -import static org.ops4j.pax.exam.CoreOptions.mavenBundle; -import static org.ops4j.pax.exam.CoreOptions.options; -import static org.ops4j.pax.exam.CoreOptions.systemPackages; -import static org.ops4j.pax.exam.CoreOptions.systemProperty; -import static org.ops4j.pax.exam.CoreOptions.maven; - -import java.util.Collection; - -import javax.inject.Inject; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; -import org.ops4j.pax.exam.Configuration; -import org.ops4j.pax.exam.CoreOptions; -import org.ops4j.pax.exam.Option; -import org.ops4j.pax.exam.junit.PaxExam; -import org.osgi.framework.BundleContext; -import org.osgi.framework.InvalidSyntaxException; -import org.osgi.framework.ServiceReference; - -@RunWith(PaxExam.class) -public class ServiceProviderController { - - public static final String ODL = "org.opendaylight.controller"; - public static final String YANG = "org.opendaylight.yangtools"; - public static final String SAMPLE = "org.opendaylight.controller.samples"; - - @Test - public void properInitialized() throws Exception { - - Thread.sleep(30000); // Waiting for services to get wired. - assertTrue(true); - //assertTrue(consumer.createToast(WhiteBread.class, 5)); - - } - -// @Inject -// BindingAwareBroker broker; - -// @Inject -// ToastConsumer consumer; - - @Inject - BundleContext ctx; - - @Configuration - public Option[] config() { - return options(systemProperty("osgi.console").value("2401"), - systemProperty("pub.port").value("5556"), - systemProperty("sub.port").value("5557"), - systemProperty("rpc.port").value("5554"), - systemProperty("pub.ip").value("localhost"), - mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), // - mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), // - mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), // - mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), // - - //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), // - mavenBundle(ODL, "sal-common").versionAsInProject(), // - mavenBundle(ODL, "sal-common-api").versionAsInProject(),// - mavenBundle(ODL, "sal-common-impl").versionAsInProject(), // - mavenBundle(ODL, "sal-common-util").versionAsInProject(), // - mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), // - mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), // - mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), // - mavenBundle(ODL, "sal-connector-api").versionAsInProject(), // - mavenBundle(SAMPLE, "zeromq-test-provider").versionAsInProject(), // - mavenBundle(ODL, "sal-zeromq-connector").versionAsInProject(), // - mavenBundle(YANG, "concepts").versionAsInProject(), - mavenBundle(YANG, "yang-binding").versionAsInProject(), // - mavenBundle(YANG, "yang-common").versionAsInProject(), // - mavenBundle(YANG, "yang-data-api").versionAsInProject(), // - mavenBundle(YANG, "yang-model-api").versionAsInProject(), // - mavenBundle(YANG+".thirdparty", "xtend-lib-osgi").versionAsInProject(), // - mavenBundle("com.google.guava", "guava").versionAsInProject(), // - mavenBundle("org.jeromq", "jeromq").versionAsInProject(), - junitBundles() - ); - } - -} diff --git a/opendaylight/md-sal/test/zeromq-test-provider/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java b/opendaylight/md-sal/test/zeromq-test-provider/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java deleted file mode 100644 index ec7d7a8285..0000000000 --- a/opendaylight/md-sal/test/zeromq-test-provider/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java +++ /dev/null @@ -1,67 +0,0 @@ -package org.opendaylight.controller.sample.zeromq.provider; - -import java.net.URI; -import java.util.Collection; -import java.util.Collections; -import java.util.Set; - -import org.opendaylight.controller.sal.common.util.Rpcs; -import org.opendaylight.controller.sal.core.api.AbstractProvider; -import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; -import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; -import org.opendaylight.controller.sal.core.api.RpcImplementation; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.osgi.framework.BundleContext; - -public class ExampleProvider extends AbstractProvider implements RpcImplementation { - - private final URI namespace = URI.create("http://cisco.com/example"); - private final QName QNAME = new QName(namespace,"heartbeat"); - private RpcRegistration reg; - - - @Override - public void onSessionInitiated(ProviderSession session) { - //Adding heartbeat 10 times just to make sure subscriber get it - for (int i=0;i<10;i++){ - System.out.println("ExampleProvider: Adding " + QNAME + " " + i); - reg = session.addRpcImplementation(QNAME, this); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - - @Override - public Set getSupportedRpcs() { - return Collections.singleton(QNAME); - } - - @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { - if(QNAME.equals(rpc)) { - RpcResult output = Rpcs.getRpcResult(true, null, Collections.emptySet()); - return output; - } - RpcResult output = Rpcs.getRpcResult(false, null, Collections.emptySet()); - return output; - } - - @Override - protected void stopImpl(BundleContext context) { - if(reg != null) { - try { - reg.close(); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - } - -} diff --git a/opendaylight/md-sal/zeromq-routingtable/implementation/pom.xml b/opendaylight/md-sal/zeromq-routingtable/implementation/pom.xml index 37c973e864..2926786849 100644 --- a/opendaylight/md-sal/zeromq-routingtable/implementation/pom.xml +++ b/opendaylight/md-sal/zeromq-routingtable/implementation/pom.xml @@ -1,6 +1,6 @@ + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 org.opendaylight.controller @@ -28,7 +28,10 @@ true - + + org.opendaylight.controller.sal.connector.remoterpc.api, + org.opendaylight.controller.sal.connector.remoterpc.impl + javax.xml.bind.annotation, org.opendaylight.controller.sal.core, @@ -67,7 +70,12 @@ org.opendaylight.controller sal - 0.5.1-SNAPSHOT + + + org.osgi + org.osgi.compendium + + org.opendaylight.controller