From: Robert Varga Date: Sat, 7 Feb 2015 21:55:46 +0000 (+0100) Subject: Initial message bus implementation X-Git-Tag: release/lithium~540^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=22c6645e0793c65ba1f0c1004d79cf83e770d765 Initial message bus implementation This patch follows up on the API definition and builds a simple implementation. Change-Id: Ic128538a02d71a40ea44efe53f3c4b4503c068ac Signed-off-by: Robert Gallas Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/mdsal-artifacts/pom.xml b/opendaylight/md-sal/mdsal-artifacts/pom.xml index f88e09cecb..420f888cf1 100644 --- a/opendaylight/md-sal/mdsal-artifacts/pom.xml +++ b/opendaylight/md-sal/mdsal-artifacts/pom.xml @@ -309,6 +309,18 @@ xml + + + org.opendaylight.controller + message-bus-api + ${project.version} + + + org.opendaylight.controller + message-bus-impl + ${project.version} + + diff --git a/opendaylight/md-sal/messagebus-impl/pom.xml b/opendaylight/md-sal/messagebus-impl/pom.xml new file mode 100644 index 0000000000..8e088ba578 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/pom.xml @@ -0,0 +1,116 @@ + + + 4.0.0 + + + org.opendaylight.controller + sal-parent + 1.2.0-SNAPSHOT + + + message-bus-impl + ${project.artifactId} + + bundle + + + + org.opendaylight.controller + ietf-netconf-notifications + 0.3.0-SNAPSHOT + + + org.opendaylight.controller + sal-binding-api + + + org.opendaylight.controller + sal-core-api + + + org.opendaylight.controller + sal-common-util + + + org.opendaylight.yangtools + yang-data-impl + + + org.opendaylight.controller + config-api + + + org.opendaylight.controller + message-bus-api + + + org.opendaylight.controller + sal-binding-config + + + + + + + org.opendaylight.yangtools + yang-maven-plugin + + + + generate-sources + + + + + + org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl + + + ${project.build.directory}/generated-sources/sal + + + + + org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator + + ${project.build.directory}/generated-sources/config + + + urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang + + + + + org.opendaylight.yangtools.yang.unified.doc.generator.maven.DocumentationGeneratorImpl + target/site/models + + + true + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.8 + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/config + + + + + + + + diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java new file mode 100644 index 0000000000..1c2b78a85b --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java @@ -0,0 +1,75 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.config.yang.messagebus.app.impl; + +import org.opendaylight.controller.config.api.DependencyResolver; +import org.opendaylight.controller.config.api.ModuleIdentifier; +import org.opendaylight.controller.mdsal.InitializationContext; +import org.opendaylight.controller.mdsal.Providers; +import org.osgi.framework.BundleContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class MessageBusAppImplModule extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule { + private static final Logger LOGGER = LoggerFactory.getLogger(MessageBusAppImplModule.class); + + private BundleContext bundleContext; + + public BundleContext getBundleContext() { + return bundleContext; + } + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; + } + + public MessageBusAppImplModule( ModuleIdentifier identifier, DependencyResolver dependencyResolver) { + super(identifier, dependencyResolver); + } + + public MessageBusAppImplModule( ModuleIdentifier identifier, + DependencyResolver dependencyResolver, + MessageBusAppImplModule oldModule, + java.lang.AutoCloseable oldInstance) { + super(identifier, dependencyResolver, oldModule, oldInstance); + } + + @Override + protected void customValidation() {} + + @Override + public java.lang.AutoCloseable createInstance() { + List namespaceMapping = getNamespaceToStream(); + InitializationContext ic = new InitializationContext(namespaceMapping); + + final Providers.BindingAware bap = new Providers.BindingAware(ic); + final Providers.BindingIndependent bip = new Providers.BindingIndependent(ic); + + getBindingBrokerDependency().registerProvider(bap, getBundleContext()); + getDomBrokerDependency().registerProvider(bip); + + AutoCloseable closer = new AutoCloseable() { + @Override public void close() { + closeProvider(bap); + closeProvider(bip); + } + }; + + return closer; + } + + private void closeProvider(AutoCloseable closable) { + try { + closable.close(); + } catch (Exception e) { + LOGGER.error("Exception while closing: {}\n Exception: {}", closable, e); + } + } +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactory.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactory.java new file mode 100644 index 0000000000..8bee2d1d12 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactory.java @@ -0,0 +1,51 @@ +/* +* Generated file +* +* Generated from: yang module name: message-bus-app-impl yang module local name: messagebus-app-impl +* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator +* Generated at: Tue Feb 03 09:03:11 CET 2015 +* +* Do not modify this file unless it is present under src/main directory +*/ +package org.opendaylight.controller.config.yang.messagebus.app.impl; + +import org.opendaylight.controller.config.api.DependencyResolver; +import org.opendaylight.controller.config.api.DynamicMBeanWithInstance; +import org.opendaylight.controller.config.spi.Module; +import org.osgi.framework.BundleContext; + +public class MessageBusAppImplModuleFactory extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModuleFactory { + @Override + public Module createModule(String instanceName, + DependencyResolver dependencyResolver, + BundleContext bundleContext) { + + MessageBusAppImplModule module = + (MessageBusAppImplModule) super.createModule(instanceName, + dependencyResolver, + bundleContext); + + module.setBundleContext(bundleContext); + + return module; + } + + @Override + public Module createModule(String instanceName, + DependencyResolver dependencyResolver, + DynamicMBeanWithInstance old, + BundleContext bundleContext) + throws Exception { + + MessageBusAppImplModule module = + (MessageBusAppImplModule) super.createModule(instanceName, + dependencyResolver, + old, + bundleContext); + + module.setBundleContext(bundleContext); + + return module; + } + +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java new file mode 100644 index 0000000000..a881fac850 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.mdsal; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; + +public class DataStore { + private static final FutureCallback DEFAULT_CALLBACK = + new FutureCallback() { + public void onSuccess(Void result) { + // TODO: Implement default behaviour + } + + public void onFailure(Throwable t) { + // TODO: Implement default behaviour + }; + }; + + private final MdSAL mdSAL; + + public DataStore(MdSAL mdSAL) { + this.mdSAL = mdSAL; + } + + public ListenerRegistration registerDataChangeListener(LogicalDatastoreType store, + InstanceIdentifier path, + DataChangeListener listener, + AsyncDataBroker.DataChangeScope triggeringScope) { + return mdSAL.getDataBroker().registerDataChangeListener(store, path, listener, triggeringScope); + } + + public void asyncPUT(LogicalDatastoreType datastoreType, + InstanceIdentifier path, + T data) { + asyncPUT(datastoreType, path, data, DEFAULT_CALLBACK); + } + + public void asyncPUT(LogicalDatastoreType datastoreType, + InstanceIdentifier path, + T data, + FutureCallback callback) { + WriteTransaction tx = mdSAL.getDataBroker().newWriteOnlyTransaction(); + tx.put(datastoreType, path, data, true); + execPut(tx, callback); + } + + public T read(LogicalDatastoreType datastoreType, + InstanceIdentifier path) { + + ReadOnlyTransaction tx = mdSAL.getDataBroker().newReadOnlyTransaction(); + T result = null; + + try { + result = tx.read(datastoreType, path).get().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return result; + } + + private static void execPut(WriteTransaction tx, FutureCallback callback) { + Futures.addCallback(tx.submit(), callback); + } +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java new file mode 100644 index 0000000000..c73fb2ad83 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java @@ -0,0 +1,61 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.mdsal; + +import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream; +import org.opendaylight.controller.messagebus.app.impl.EventAggregator; +import org.opendaylight.controller.messagebus.app.impl.EventSourceManager; +import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; +import org.opendaylight.controller.sal.core.api.Broker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class InitializationContext { + private static final Logger LOGGER = LoggerFactory.getLogger(InitializationContext.class); + + private final MdSAL mdSal; + private final DataStore dataStore; + private final EventSourceTopology eventSourceTopology; + private final EventSourceManager eventSourceManager; + private final EventAggregator eventAggregator; + + public InitializationContext(List namespaceMapping) { + this.mdSal = new MdSAL(); + this.dataStore = new DataStore(mdSal); + this.eventSourceTopology = new EventSourceTopology(dataStore); + this.eventSourceManager = new EventSourceManager(dataStore, mdSal, eventSourceTopology, namespaceMapping); + this.eventAggregator = new EventAggregator(mdSal, eventSourceTopology); + } + + public synchronized void set(BindingAwareBroker.ProviderContext session) { + mdSal.setBindingAwareContext(session); + + if (mdSal.isReady()) { + initialize(); + } + } + + public synchronized void set(Broker.ProviderSession session) { + mdSal.setBindingIndependentContext(session); + + if (mdSal.isReady()) { + initialize(); + } + } + + private void initialize() { + eventSourceTopology.mdsalReady(); + eventSourceManager.mdsalReady(); + eventAggregator.mdsalReady(); + + LOGGER.info("InitializationContext started."); + } +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java new file mode 100644 index 0000000000..03b220a5fd --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java @@ -0,0 +1,188 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.mdsal; + +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; +import org.opendaylight.controller.sal.binding.api.BindingAwareService; +import org.opendaylight.controller.sal.binding.api.mount.MountInstance; +import org.opendaylight.controller.sal.binding.api.mount.MountProviderService; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.sal.core.api.BrokerService; +import org.opendaylight.controller.sal.core.api.notify.NotificationListener; +import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService; +import org.opendaylight.controller.sal.core.api.notify.NotificationService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.RpcService; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MdSAL { + private static final Logger LOGGER = LoggerFactory.getLogger(MdSAL.class); + + private BindingAwareBroker.ProviderContext bindingAwareContext; + private Broker.ProviderSession bindingIndependentContext; + + // ----------------------------- + // ----- FRAMEWORK METHODS ----- + // ----------------------------- + public void setBindingAwareContext(BindingAwareBroker.ProviderContext bindingAwareContext) { + this.bindingAwareContext = bindingAwareContext; + } + + public void setBindingIndependentContext(Broker.ProviderSession bindingIndependentContext) { + this.bindingIndependentContext = bindingIndependentContext; + } + + //TODO: We should hide brokers and expose functionalities instead + public DataBroker getDataBroker() { + return getBaSalService(DataBroker.class); + } + + public synchronized boolean isReady() { + return (bindingAwareContext != null && bindingIndependentContext != null); + } + + // ----------------------- + // ----- API METHODS ----- + // ----------------------- + // TODO: Factor out API methods to interface + // method does not return registration object. Rather will hold references internally and manipulate using node id and API + public void addRpcImplementation(Class serviceInterface, + T implementation) + throws IllegalStateException { + bindingAwareContext.addRpcImplementation(serviceInterface, implementation); + } + + // method does not return registration object. Rather will hold references internally and manipulate using node id and API + public void addRpcImplementation(Node node, + Class serviceInterface, + T implementation) + throws IllegalStateException { + BindingAwareBroker.RoutedRpcRegistration registration + = addRoutedRpcImplementation(serviceInterface, implementation); + + NodeRef nodeRef = createNodeRef(node.getId()); + registration.registerPath(NodeContext.class, nodeRef.getValue()); + } + + public ListenerRegistration addNotificationListener(String nodeId, + QName notification, + NotificationListener listener) { + YangInstanceIdentifier yii = inventoryNodeBIIdentifier(nodeId); + + NotificationService notificationService = + getBiSalService(DOMMountPointService.class) + .getMountPoint(yii) + .get() + .getService(NotificationPublishService.class) + .get(); + + ListenerRegistration registration = + notificationService.addNotificationListener(notification, listener); + + LOGGER.info("Notification listener registered for {}, at node {}", notification, nodeId); + + return registration; + } + + public ListenerRegistration addNotificationListener(QName notification, + NotificationListener listener) { + NotificationService notificationService = + getBiSalService(NotificationPublishService.class); + + ListenerRegistration registration = + notificationService.addNotificationListener(notification, listener); + + LOGGER.info("Notification listener registered for {}.", notification); + + return registration; + } + + public T getRpcService(Class serviceInterface) { + return bindingAwareContext.getRpcService(serviceInterface); + } + + public T getRpcService(String nodeId, Class serviceInterface) { + MountProviderService mountProviderService = getBaSalService(MountProviderService.class); + + InstanceIdentifier key = InstanceIdentifier.create(Nodes.class) + .child(Node.class, + new NodeKey(new NodeId(nodeId))); + + MountInstance mountPoint = mountProviderService.getMountPoint(key); + return mountPoint.getRpcService(serviceInterface); + } + + public void publishNotification(CompositeNode notification) { + getBiSalService(NotificationPublishService.class).publish(notification); + } + + public SchemaContext getSchemaContext(String nodeId) { + YangInstanceIdentifier yii = inventoryNodeBIIdentifier(nodeId); + + SchemaContext schemaContext = + getBiSalService(DOMMountPointService.class) + .getMountPoint(yii) + .get().getSchemaContext(); + + return schemaContext; + } + + // --------------------------- + // ----- UTILITY METHODS ----- + // --------------------------- + private T getBaSalService(Class service) { + return bindingAwareContext.getSALService(service); + } + + private T getBiSalService(Class service) { + return bindingIndependentContext.getService(service); + } + + private static final String NODE_ID_NAME = "id"; + + public static YangInstanceIdentifier inventoryNodeBIIdentifier(String nodeId) { + return YangInstanceIdentifier.builder() + .node(Nodes.QNAME) + .nodeWithKey(Node.QNAME, + QName.create(Node.QNAME.getNamespace(), + Node.QNAME.getRevision(), + NODE_ID_NAME), + nodeId) + .build(); + } + + private BindingAwareBroker.RoutedRpcRegistration addRoutedRpcImplementation(Class serviceInterface, + T implementation) + throws IllegalStateException { + return bindingAwareContext.addRoutedRpcImplementation(serviceInterface, implementation); + } + + public static NodeRef createNodeRef(NodeId nodeId) { + NodeKey nodeKey = new NodeKey(nodeId); + InstanceIdentifier path = InstanceIdentifier + .builder(Nodes.class) + .child(Node.class, nodeKey) + .build(); + return new NodeRef(path); + } +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/Providers.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/Providers.java new file mode 100644 index 0000000000..a28e588d43 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/Providers.java @@ -0,0 +1,57 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.mdsal; + +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; +import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; +import org.opendaylight.controller.sal.core.api.AbstractProvider; +import org.opendaylight.controller.sal.core.api.Broker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Providers { + private static final Logger LOGGER = LoggerFactory.getLogger(Providers.class); + + public static class BindingAware implements BindingAwareProvider, AutoCloseable { + private final InitializationContext initializationContext; + + public BindingAware(InitializationContext ic) { + this.initializationContext = ic; + } + + @Override + public void onSessionInitiated(BindingAwareBroker.ProviderContext session) { + initializationContext.set(session); + + LOGGER.info("BindingAwareBroker.ProviderContext initialized"); + } + + @Override + public void close() throws Exception {} + } + + public static class BindingIndependent extends AbstractProvider implements AutoCloseable { + private final InitializationContext initializationContext; + + public BindingIndependent(InitializationContext ic) { + this.initializationContext = ic; + } + + @Override + public void onSessionInitiated(Broker.ProviderSession session) { + initializationContext.set(session); + + LOGGER.info("Broker.ProviderSession initialized"); + } + + @Override + public void close() throws Exception {} + } + +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java new file mode 100644 index 0000000000..4b77bf2a4c --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.messagebus.app.impl; + +import java.util.List; +import java.util.concurrent.Future; +import org.opendaylight.controller.mdsal.MdSAL; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO: implement topic created notification +public class EventAggregator implements EventAggregatorService { + private static final Logger LOGGER = LoggerFactory.getLogger(EventAggregator.class); + + private final MdSAL mdSAL; + private final EventSourceTopology eventSourceTopology; + + public EventAggregator(final MdSAL mdSAL, final EventSourceTopology eventSourceTopology) { + this.mdSAL = mdSAL; + this.eventSourceTopology = eventSourceTopology; + } + + public void mdsalReady() { + mdSAL.addRpcImplementation(EventAggregatorService.class, this); + } + + @Override + public Future> createTopic(final CreateTopicInput input) { + LOGGER.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}", + input.getNotificationPattern(), + input.getNodeIdPattern()); + + Topic topic = new Topic(new NotificationPattern(input.getNotificationPattern()), input.getNodeIdPattern().getValue(), mdSAL); + + //# Make sure we capture all nodes from now on + eventSourceTopology.registerDataChangeListener(topic); + + //# Notify existing nodes + //# Code reader note: Context of Node type is NetworkTopology + List nodes = eventSourceTopology.snapshot(); + for (Node node : nodes) { + NodeId nodeIdToNotify = node.getAugmentation(Node1.class).getEventSourceNode(); + topic.notifyNode(nodeIdToNotify); + } + + CreateTopicOutput cto = new CreateTopicOutputBuilder() + .setTopicId(topic.getTopicId()) + .build(); + + return Util.resultFor(cto); + } + + @Override + public Future> destroyTopic(final DestroyTopicInput input) { + // 1. UNREGISTER DATA CHANGE LISTENER -> ? + // 2. CLOSE TOPIC + return null; + } +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java new file mode 100644 index 0000000000..a84eddd458 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.messagebus.app.impl; + +import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.mdsal.DataStore; +import org.opendaylight.controller.mdsal.MdSAL; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public final class EventSourceManager implements DataChangeListener { + private static final Logger LOGGER = LoggerFactory.getLogger(EventSourceManager.class); + private static final InstanceIdentifier INVENTORY_PATH = InstanceIdentifier.create(Nodes.class) + .child(Node.class); + private final DataStore dataStore; + private final MdSAL mdSal; + private final EventSourceTopology eventSourceTopology; + private final Map streamMap; + + public EventSourceManager(DataStore dataStore, + MdSAL mdSal, + EventSourceTopology eventSourceTopology, + List namespaceMapping) { + this.dataStore = dataStore; + this.mdSal = mdSal; + this.eventSourceTopology = eventSourceTopology; + this.streamMap = namespaceToStreamMapping(namespaceMapping); + } + + private Map namespaceToStreamMapping(List namespaceMapping) { + Map streamMap = new HashMap<>(namespaceMapping.size()); + + for (NamespaceToStream nToS : namespaceMapping) { + streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName()); + } + + return streamMap; + } + + public void mdsalReady() { + dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, + INVENTORY_PATH, + this, + DataBroker.DataChangeScope.SUBTREE); + + LOGGER.info("EventSourceManager initialized."); + } + + @Override + public void onDataChanged(AsyncDataChangeEvent, DataObject> event) { + //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect. + LOGGER.debug("[DataChangeEvent, DataObject>: {}]", event); + + Node node = Util.getAffectedNode(event); + // we listen on node tree, therefore we should rather throw IllegalStateException when node is null + if ( node == null ) { + LOGGER.debug("OnDataChanged Event. Node is null."); + return; + } + if ( isNetconfNode(node) == false ) { + LOGGER.debug("OnDataChanged Event. Not a Netconf node."); + return; + } + if ( isEventSource(node) == false ) { + LOGGER.debug("OnDataChanged Event. Node an EventSource node."); + return; + } + + NetconfEventSource netconfEventSource = new NetconfEventSource(mdSal, + node.getKey().getId().getValue(), + streamMap); + mdSal.addRpcImplementation(node, EventSourceService.class, netconfEventSource); + + InstanceIdentifier nodeInstanceIdentifier = + InstanceIdentifier.create(Nodes.class) + .child(Node.class, node.getKey()) + .augmentation(NetconfNode.class); + + dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, + nodeInstanceIdentifier, + netconfEventSource, + DataBroker.DataChangeScope.SUBTREE); + + eventSourceTopology.insert(node); + } + + private boolean isNetconfNode(Node node) { + return node.getAugmentation(NetconfNode.class) != null ; + } + + public boolean isEventSource(Node node) { + NetconfNode netconfNode = node.getAugmentation(NetconfNode.class); + + return isEventSource(netconfNode); + } + + private boolean isEventSource(NetconfNode node) { + for (String capability : node.getInitialCapability()) { + if(capability.startsWith("urn:ietf:params:xml:ns:netconf:notification")) { + return true; + } + } + + return false; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java new file mode 100644 index 0000000000..c0700971dd --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.messagebus.app.impl; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.mdsal.DataStore; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class EventSourceTopology { + private static final Logger LOGGER = LoggerFactory.getLogger(EventSourceTopology.class); + + private static final String topologyId = "EVENT-SOURCE-TOPOLOGY" ; + private static final TopologyKey topologyKey = new TopologyKey(new TopologyId(topologyId)); + private static final LogicalDatastoreType datastoreType = LogicalDatastoreType.OPERATIONAL; + + private static final InstanceIdentifier topologyInstanceIdentifier = + InstanceIdentifier.create(NetworkTopology.class) + .child(Topology.class, topologyKey); + + private static final InstanceIdentifier topologyTypeInstanceIdentifier = + topologyInstanceIdentifier + .child(TopologyTypes.class) + .augmentation(TopologyTypes1.class); + + private static final InstanceIdentifier eventSourceTopologyPath = + InstanceIdentifier.create(NetworkTopology.class) + .child(Topology.class) + .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang + .network.topology.rev131021.network.topology.topology.Node.class); + + private final Map> registrations = + new ConcurrentHashMap<>(); + + private final DataStore dataStore; + + public EventSourceTopology(DataStore dataStore) { + this.dataStore = dataStore; + } + + public void mdsalReady() { + TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); + TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); + + dataStore.asyncPUT(datastoreType, topologyTypeInstanceIdentifier, topologyTypeAugment); + } + + public void insert(Node node) { + String nodeId = node.getKey().getId().getValue(); + NodeKey nodeKey = new NodeKey(new NodeId(nodeId)); + InstanceIdentifier topologyNodeAugment + = topologyInstanceIdentifier + .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang + .network.topology.rev131021.network.topology.topology.Node.class, nodeKey) + .augmentation(Node1.class); + + Node1 nodeAgument = new Node1Builder().setEventSourceNode(node.getId()).build(); + dataStore.asyncPUT(datastoreType, topologyNodeAugment, nodeAgument); + } + + // TODO: Should we expose this functioanlity over RPC? + public List snapshot() { + Topology topology = dataStore.read(datastoreType, topologyInstanceIdentifier); + return topology.getNode(); + } + + public void registerDataChangeListener(DataChangeListener listener) { + ListenerRegistration listenerRegistration = dataStore.registerDataChangeListener(datastoreType, + eventSourceTopologyPath, + listener, + DataBroker.DataChangeScope.SUBTREE); + + registrations.put(listener, listenerRegistration); + } +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java new file mode 100644 index 0000000000..9c0697f3fb --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.messagebus.app.impl; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.regex.Pattern; +import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.mdsal.MdSAL; +import org.opendaylight.controller.sal.core.api.notify.NotificationListener; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; +import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NetconfEventSource implements EventSourceService, NotificationListener, DataChangeListener { + private static final Logger LOGGER = LoggerFactory.getLogger(NetconfEventSource.class); + + private final MdSAL mdSal; + private final String nodeId; + + private final List activeStreams = new ArrayList<>(); + + private final Map urnPrefixToStreamMap; + + public NetconfEventSource(final MdSAL mdSal, final String nodeId, final Map streamMap) { + Preconditions.checkNotNull(mdSal); + Preconditions.checkNotNull(nodeId); + + this.mdSal = mdSal; + this.nodeId = nodeId; + this.urnPrefixToStreamMap = streamMap; + + LOGGER.info("NetconfEventSource [{}] created.", nodeId); + } + + @Override + public Future> joinTopic(final JoinTopicInput input) { + final NotificationPattern notificationPattern = input.getNotificationPattern(); + + // FIXME: default language should already be regex + final String regex = Util.wildcardToRegex(notificationPattern.getValue()); + + final Pattern pattern = Pattern.compile(regex); + List matchingNotifications = Util.expandQname(availableNotifications(), pattern); + registerNotificationListener(matchingNotifications); + return null; + } + + private List availableNotifications() { + // FIXME: use SchemaContextListener to get changes asynchronously + Set availableNotifications = mdSal.getSchemaContext(nodeId).getNotifications(); + List qNs = new ArrayList<>(availableNotifications.size()); + for (NotificationDefinition nd : availableNotifications) { + qNs.add(nd.getQName()); + } + + return qNs; + } + + private void registerNotificationListener(final List notificationsToSubscribe) { + for (QName qName : notificationsToSubscribe) { + startSubscription(qName); + // FIXME: do not lose this registration + final ListenerRegistration reg = mdSal.addNotificationListener(nodeId, qName, this); + } + } + + private synchronized void startSubscription(final QName qName) { + String streamName = resolveStream(qName); + + if (streamIsActive(streamName) == false) { + LOGGER.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId); + startSubscription(streamName); + } + } + + private synchronized void resubscribeToActiveStreams() { + for (String streamName : activeStreams) { + startSubscription(streamName); + } + } + + private synchronized void startSubscription(final String streamName) { + CreateSubscriptionInput subscriptionInput = getSubscriptionInput(streamName); + mdSal.getRpcService(nodeId, NotificationsService.class).createSubscription(subscriptionInput); + activeStreams.add(streamName); + } + + private static CreateSubscriptionInput getSubscriptionInput(final String streamName) { + CreateSubscriptionInputBuilder csib = new CreateSubscriptionInputBuilder(); + csib.setStream(new StreamNameType(streamName)); + return csib.build(); + } + + private String resolveStream(final QName qName) { + String streamName = null; + + for (Map.Entry entry : urnPrefixToStreamMap.entrySet()) { + String nameSpace = qName.getNamespace().toString(); + String urnPrefix = entry.getKey(); + if( nameSpace.startsWith(urnPrefix) ) { + streamName = entry.getValue(); + break; + } + } + + return streamName; + } + + private boolean streamIsActive(final String streamName) { + return activeStreams.contains(streamName); + } + + // PASS + @Override public Set getSupportedNotifications() { + return null; + } + + @Override + public void onNotification(final CompositeNode notification) { + LOGGER.info("NetconfEventSource {} received notification {}. Will publish to MD-SAL.", nodeId, notification); + ImmutableCompositeNode payload = ImmutableCompositeNode.builder() + .setQName(QName.create(TopicNotification.QNAME, "payload")) + .add(notification).toInstance(); + ImmutableCompositeNode icn = ImmutableCompositeNode.builder() + .setQName(TopicNotification.QNAME) + .add(payload) + .addLeaf("event-source", nodeId) + .toInstance(); + + mdSal.publishNotification(icn); + } + + @Override + public void onDataChanged(final AsyncDataChangeEvent, DataObject> change) { + boolean wasConnected = false; + boolean nowConnected = false; + + for (Map.Entry, DataObject> changeEntry : change.getOriginalData().entrySet()) { + if ( isNetconfNode(changeEntry) ) { + NetconfNode nn = (NetconfNode)changeEntry.getValue(); + wasConnected = nn.isConnected(); + } + } + + for (Map.Entry, DataObject> changeEntry : change.getUpdatedData().entrySet()) { + if ( isNetconfNode(changeEntry) ) { + NetconfNode nn = (NetconfNode)changeEntry.getValue(); + nowConnected = nn.isConnected(); + } + } + + if (wasConnected == false && nowConnected == true) { + resubscribeToActiveStreams(); + } + } + + private static boolean isNetconfNode(final Map.Entry, DataObject> changeEntry ) { + return NetconfNode.class.equals(changeEntry.getKey().getTargetType()); + } + +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Topic.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Topic.java new file mode 100644 index 0000000000..aebde0c043 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Topic.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.messagebus.app.impl; + +import com.google.common.base.Preconditions; +import java.util.regex.Pattern; +import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.mdsal.MdSAL; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.slf4j.LoggerFactory; + +public class Topic implements DataChangeListener { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(Topic.class); + private final NotificationPattern notificationPattern; + private final Pattern nodeIdPattern; + private final TopicId topicId; + private final MdSAL mdSal; + + public Topic(final NotificationPattern notificationPattern, final String nodeIdPattern, final MdSAL mdSal) { + this.notificationPattern = Preconditions.checkNotNull(notificationPattern); + + // FIXME: regex should be the language of nodeIdPattern + final String regex = Util.wildcardToRegex(nodeIdPattern); + this.nodeIdPattern = Pattern.compile(regex); + this.mdSal = Preconditions.checkNotNull(mdSal); + + // FIXME: We need to perform some salting in order to make + // the topic IDs less predictable. + this.topicId = new TopicId(Util.md5String(notificationPattern + nodeIdPattern)); + } + + public TopicId getTopicId() { + return topicId; + } + + @Override + public void onDataChanged(final AsyncDataChangeEvent, DataObject> event) { + // TODO: affected must return topologyNode !!! + final Node node = Util.getAffectedNode(event); + if (nodeIdPattern.matcher(node.getId().getValue()).matches()) { + notifyNode(node.getId()); + } else { + LOG.debug("Skipping node {}", node.getId()); + } + } + + public void notifyNode(final NodeId nodeId) { + JoinTopicInput jti = getJoinTopicInputArgument(nodeId); + EventSourceService ess = mdSal.getRpcService(EventSourceService.class); + + if (ess == null) { + throw new IllegalStateException("EventSourceService is not registered."); + } + + ess.joinTopic(jti); + } + + private JoinTopicInput getJoinTopicInputArgument(final NodeId nodeId) { + NodeRef nodeRef = MdSAL.createNodeRef(nodeId); + JoinTopicInput jti = + new JoinTopicInputBuilder() + .setNode(nodeRef.getValue()) + .setTopicId(topicId) + .setNotificationPattern(notificationPattern) + .build(); + return jti; + } +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java new file mode 100644 index 0000000000..9927d85c3e --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.messagebus.app.impl; + +import com.google.common.util.concurrent.Futures; +import java.math.BigInteger; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.regex.Pattern; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; + +public final class Util { + private static final MessageDigest messageDigestTemplate = getDigestInstance(); + + private static MessageDigest getDigestInstance() { + try { + return MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("Unable to get MD5 instance"); + } + } + + public static String md5String(final String inputString) { + + try { + MessageDigest md = (MessageDigest)messageDigestTemplate.clone(); + md.update(inputString.getBytes("UTF-8"), 0, inputString.length()); + return new BigInteger(1, md.digest()).toString(16); + } catch (Exception e) { + throw new RuntimeException("Unable to get MD5 instance"); + } + } + + public static Future> resultFor(final T output) { + RpcResult result = Rpcs.getRpcResult(true, output, Collections.emptyList()); + return Futures.immediateFuture(result); + } + + /** + * Extracts affected node from data change event. + * @param event + * @return + */ + public static Node getAffectedNode(final AsyncDataChangeEvent, DataObject> event) { + // TODO: expect listener method to be called even when change impact node + // TODO: test with change.getCreatedData() + for (Map.Entry, DataObject> changeEntry : event.getUpdatedData().entrySet()) { + if (isNode(changeEntry)) { + return (Node) changeEntry.getValue(); + } + } + + return null; + } + + private static boolean isNode(final Map.Entry, DataObject> changeEntry ) { + return Node.class.equals(changeEntry.getKey().getTargetType()); + } + + /** + * Method filters qnames based on wildcard strings + * + * @param availableQnames + * @param patterh matching pattern + * @return list of filtered qnames + */ + public static List expandQname(final List availableQnames, final Pattern pattern) { + List matchingQnames = new ArrayList<>(); + + for (QName qname : availableQnames) { + String namespace = qname.getNamespace().toString(); + if (pattern.matcher(namespace).matches()) { + matchingQnames.add(qname); + } + } + + return matchingQnames; + } + + /** + * CREDIT to http://www.rgagnon.com/javadetails/java-0515.html + * @param wildcard + * @return + */ + static String wildcardToRegex(final String wildcard){ + StringBuffer s = new StringBuffer(wildcard.length()); + s.append('^'); + for (char c : wildcard.toCharArray()) { + switch(c) { + case '*': + s.append(".*"); + break; + case '?': + s.append('.'); + break; + // escape special regexp-characters + case '(': + case ')': + case '[': + case ']': + case '$': + case '^': + case '.': + case '{': + case '}': + case '|': + case '\\': + s.append("\\"); + s.append(c); + break; + default: + s.append(c); + break; + } + } + s.append('$'); + return s.toString(); + } +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang b/opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang new file mode 100644 index 0000000000..bed6b1085a --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang @@ -0,0 +1,64 @@ +module messagebus-app-impl { + yang-version 1; + namespace "urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl"; + prefix "binding-impl"; + + import config { prefix config; revision-date 2013-04-05; } + import opendaylight-md-sal-binding {prefix sal;} + import opendaylight-md-sal-dom {prefix dom;} + + + description + "Service definition for Message Bus application implementation."; + + revision "2015-02-03" { + description "Second revision. Message Bus opensourcing"; + } + + identity messagebus-app-impl { + base config:module-type; + config:java-name-prefix MessageBusAppImpl; + } + + augment "/config:modules/config:module/config:configuration" { + case messagebus-app-impl { + when "/config:modules/config:module/config:type = 'messagebus-app-impl'"; + + container binding-broker { + uses config:service-ref { + refine type { + mandatory true; + config:required-identity sal:binding-broker-osgi-registry; + } + } + } + + container dom-broker { + uses config:service-ref { + refine type { + mandatory true; + config:required-identity dom:dom-broker-osgi-registry; + } + } + } + + list namespace-to-stream { + key urn-prefix; + + leaf urn-prefix { + type string; + } + + leaf stream-name { + type string; + } + } + } + } + + augment "/config:modules/config:module/config:state" { + case messagebus-app-impl { + when "/config:modules/config:module/config:type = 'messagebus-app-impl'"; + } + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/pom.xml b/opendaylight/md-sal/pom.xml index 3151380647..bdeb8a65a8 100644 --- a/opendaylight/md-sal/pom.xml +++ b/opendaylight/md-sal/pom.xml @@ -93,6 +93,7 @@ messagebus-api + messagebus-impl