Initial message bus implementation 31/15031/8
authorRobert Varga <rovarga@cisco.com>
Sat, 7 Feb 2015 21:55:46 +0000 (22:55 +0100)
committerRobert Varga <rovarga@cisco.com>
Wed, 18 Feb 2015 23:58:37 +0000 (00:58 +0100)
This patch follows up on the API definition and builds a simple
implementation.

Change-Id: Ic128538a02d71a40ea44efe53f3c4b4503c068ac
Signed-off-by: Robert Gallas <rgallas@cisco.com>
Signed-off-by: Robert Varga <rovarga@cisco.com>
16 files changed:
opendaylight/md-sal/mdsal-artifacts/pom.xml
opendaylight/md-sal/messagebus-impl/pom.xml [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactory.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/Providers.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Topic.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang [new file with mode: 0644]
opendaylight/md-sal/pom.xml

index f88e09cecba5b456e73b9ce25e6fc09efa74bc7c..420f888cf179740f309f9a6ace29912344ca0fe9 100644 (file)
                 <type>xml</type>
             </dependency>
 
+            <!-- MessageBus -->
+            <dependency>
+                <groupId>org.opendaylight.controller</groupId>
+                <artifactId>message-bus-api</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.opendaylight.controller</groupId>
+                <artifactId>message-bus-impl</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
 </project>
diff --git a/opendaylight/md-sal/messagebus-impl/pom.xml b/opendaylight/md-sal/messagebus-impl/pom.xml
new file mode 100644 (file)
index 0000000..8e088ba
--- /dev/null
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>\r
+<project xmlns="http://maven.apache.org/POM/4.0.0"\r
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"\r
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
+    <modelVersion>4.0.0</modelVersion>\r
+\r
+    <parent>\r
+        <groupId>org.opendaylight.controller</groupId>\r
+        <artifactId>sal-parent</artifactId>\r
+        <version>1.2.0-SNAPSHOT</version>\r
+    </parent>\r
+\r
+    <artifactId>message-bus-impl</artifactId>\r
+    <name>${project.artifactId}</name>\r
+\r
+    <packaging>bundle</packaging>\r
+\r
+    <dependencies>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>ietf-netconf-notifications</artifactId>\r
+            <version>0.3.0-SNAPSHOT</version>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-binding-api</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-core-api</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-common-util</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.yangtools</groupId>\r
+            <artifactId>yang-data-impl</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>config-api</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>message-bus-api</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-binding-config</artifactId>\r
+        </dependency>\r
+    </dependencies>\r
+\r
+    <build>\r
+        <plugins>\r
+            <plugin>\r
+                <groupId>org.opendaylight.yangtools</groupId>\r
+                <artifactId>yang-maven-plugin</artifactId>\r
+                <executions>\r
+                    <execution>\r
+                        <goals>\r
+                            <goal>generate-sources</goal>\r
+                        </goals>\r
+                        <configuration>\r
+                            <codeGenerators>\r
+                                <generator>\r
+                                    <codeGeneratorClass>\r
+                                        org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl\r
+                                    </codeGeneratorClass>\r
+                                    <outputBaseDir>\r
+                                        ${project.build.directory}/generated-sources/sal\r
+                                    </outputBaseDir>\r
+                                </generator>\r
+                                <generator>\r
+                                    <codeGeneratorClass>\r
+                                        org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator\r
+                                    </codeGeneratorClass>\r
+                                    <outputBaseDir>${project.build.directory}/generated-sources/config</outputBaseDir>\r
+                                    <additionalConfiguration>\r
+                                        <namespaceToPackage1>\r
+                                            urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang\r
+                                        </namespaceToPackage1>\r
+                                    </additionalConfiguration>\r
+                                </generator>\r
+                                <generator>\r
+                                    <codeGeneratorClass>org.opendaylight.yangtools.yang.unified.doc.generator.maven.DocumentationGeneratorImpl</codeGeneratorClass>\r
+                                    <outputBaseDir>target/site/models</outputBaseDir>\r
+                                </generator>\r
+                            </codeGenerators>\r
+                            <inspectDependencies>true</inspectDependencies>\r
+                        </configuration>\r
+                    </execution>\r
+                </executions>\r
+            </plugin>\r
+            <plugin>\r
+                <groupId>org.codehaus.mojo</groupId>\r
+                <artifactId>build-helper-maven-plugin</artifactId>\r
+                <version>1.8</version>\r
+                <executions>\r
+                    <execution>\r
+                        <id>add-source</id>\r
+                        <phase>generate-sources</phase>\r
+                        <goals>\r
+                            <goal>add-source</goal>\r
+                        </goals>\r
+                        <configuration>\r
+                            <sources>\r
+                                <source>${project.build.directory}/generated-sources/config</source>\r
+                            </sources>\r
+                        </configuration>\r
+                    </execution>\r
+                </executions>\r
+            </plugin>\r
+        </plugins>\r
+    </build>\r
+</project>\r
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java
new file mode 100644 (file)
index 0000000..1c2b78a
--- /dev/null
@@ -0,0 +1,75 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.config.yang.messagebus.app.impl;
+
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.ModuleIdentifier;
+import org.opendaylight.controller.mdsal.InitializationContext;
+import org.opendaylight.controller.mdsal.Providers;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class MessageBusAppImplModule extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
+    private static final Logger LOGGER = LoggerFactory.getLogger(MessageBusAppImplModule.class);
+
+    private BundleContext bundleContext;
+
+    public BundleContext getBundleContext() {
+        return bundleContext;
+    }
+
+    public void setBundleContext(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
+    }
+
+    public MessageBusAppImplModule( ModuleIdentifier identifier, DependencyResolver dependencyResolver) {
+        super(identifier, dependencyResolver);
+    }
+
+    public MessageBusAppImplModule( ModuleIdentifier identifier,
+                                    DependencyResolver dependencyResolver,
+                                    MessageBusAppImplModule oldModule,
+                                    java.lang.AutoCloseable oldInstance) {
+        super(identifier, dependencyResolver, oldModule, oldInstance);
+    }
+
+    @Override
+    protected void customValidation() {}
+
+    @Override
+    public java.lang.AutoCloseable createInstance() {
+        List<NamespaceToStream> namespaceMapping = getNamespaceToStream();
+        InitializationContext ic = new InitializationContext(namespaceMapping);
+
+        final Providers.BindingAware bap = new Providers.BindingAware(ic);
+        final Providers.BindingIndependent bip = new Providers.BindingIndependent(ic);
+
+        getBindingBrokerDependency().registerProvider(bap, getBundleContext());
+        getDomBrokerDependency().registerProvider(bip);
+
+        AutoCloseable closer = new AutoCloseable() {
+            @Override public void close()  {
+                closeProvider(bap);
+                closeProvider(bip);
+            }
+        };
+
+        return closer;
+    }
+
+    private void closeProvider(AutoCloseable closable) {
+        try {
+            closable.close();
+        } catch (Exception e) {
+            LOGGER.error("Exception while closing: {}\n Exception: {}", closable, e);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactory.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactory.java
new file mode 100644 (file)
index 0000000..8bee2d1
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+* Generated file
+*
+* Generated from: yang module name: message-bus-app-impl yang module local name: messagebus-app-impl
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Tue Feb 03 09:03:11 CET 2015
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.messagebus.app.impl;
+
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
+import org.opendaylight.controller.config.spi.Module;
+import org.osgi.framework.BundleContext;
+
+public class MessageBusAppImplModuleFactory extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModuleFactory {
+    @Override
+    public Module createModule(String instanceName,
+                               DependencyResolver dependencyResolver,
+                               BundleContext bundleContext) {
+
+        MessageBusAppImplModule module =
+                (MessageBusAppImplModule) super.createModule(instanceName,
+                        dependencyResolver,
+                        bundleContext);
+
+        module.setBundleContext(bundleContext);
+
+        return module;
+    }
+
+    @Override
+    public Module createModule(String instanceName,
+                               DependencyResolver dependencyResolver,
+                               DynamicMBeanWithInstance old,
+                               BundleContext bundleContext)
+            throws Exception {
+
+        MessageBusAppImplModule module =
+                (MessageBusAppImplModule) super.createModule(instanceName,
+                        dependencyResolver,
+                        old,
+                        bundleContext);
+
+        module.setBundleContext(bundleContext);
+
+        return module;
+    }
+
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java
new file mode 100644 (file)
index 0000000..a881fac
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.mdsal;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public class DataStore {
+    private static final FutureCallback<Void> DEFAULT_CALLBACK =
+            new FutureCallback<Void>() {
+                public void onSuccess(Void result) {
+                    // TODO: Implement default behaviour
+                }
+
+                public void onFailure(Throwable t) {
+                    // TODO: Implement default behaviour
+                };
+            };
+
+    private final MdSAL mdSAL;
+
+    public DataStore(MdSAL mdSAL) {
+        this.mdSAL = mdSAL;
+    }
+
+    public ListenerRegistration<DataChangeListener> registerDataChangeListener(LogicalDatastoreType store,
+                                                                               InstanceIdentifier<?> path,
+                                                                               DataChangeListener listener,
+                                                                               AsyncDataBroker.DataChangeScope triggeringScope) {
+        return mdSAL.getDataBroker().registerDataChangeListener(store, path, listener, triggeringScope);
+    }
+
+    public <T extends DataObject> void asyncPUT(LogicalDatastoreType datastoreType,
+                                                InstanceIdentifier<T> path,
+                                                T data) {
+        asyncPUT(datastoreType, path, data, DEFAULT_CALLBACK);
+    }
+
+    public <T extends DataObject> void asyncPUT(LogicalDatastoreType datastoreType,
+                                                InstanceIdentifier<T> path,
+                                                T data,
+                                                FutureCallback<Void> callback) {
+        WriteTransaction tx = mdSAL.getDataBroker().newWriteOnlyTransaction();
+        tx.put(datastoreType, path, data, true);
+        execPut(tx, callback);
+    }
+
+    public <T extends DataObject> T read(LogicalDatastoreType datastoreType,
+                                         InstanceIdentifier<T> path) {
+
+        ReadOnlyTransaction tx = mdSAL.getDataBroker().newReadOnlyTransaction();
+        T result = null;
+
+        try {
+            result = tx.read(datastoreType, path).get().get();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        return result;
+    }
+
+    private static void execPut(WriteTransaction tx, FutureCallback<Void> callback) {
+        Futures.addCallback(tx.submit(), callback);
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java
new file mode 100644 (file)
index 0000000..c73fb2a
--- /dev/null
@@ -0,0 +1,61 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.mdsal;
+
+import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
+import org.opendaylight.controller.messagebus.app.impl.EventAggregator;
+import org.opendaylight.controller.messagebus.app.impl.EventSourceManager;
+import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class InitializationContext {
+    private static final Logger LOGGER = LoggerFactory.getLogger(InitializationContext.class);
+
+    private final MdSAL mdSal;
+    private final DataStore dataStore;
+    private final EventSourceTopology eventSourceTopology;
+    private final EventSourceManager eventSourceManager;
+    private final EventAggregator eventAggregator;
+
+    public InitializationContext(List<NamespaceToStream> namespaceMapping) {
+        this.mdSal = new MdSAL();
+        this.dataStore = new DataStore(mdSal);
+        this.eventSourceTopology = new EventSourceTopology(dataStore);
+        this.eventSourceManager = new EventSourceManager(dataStore, mdSal, eventSourceTopology, namespaceMapping);
+        this.eventAggregator = new EventAggregator(mdSal, eventSourceTopology);
+    }
+
+    public synchronized void set(BindingAwareBroker.ProviderContext session) {
+        mdSal.setBindingAwareContext(session);
+
+        if (mdSal.isReady()) {
+            initialize();
+        }
+    }
+
+    public synchronized void set(Broker.ProviderSession session) {
+        mdSal.setBindingIndependentContext(session);
+
+        if (mdSal.isReady()) {
+            initialize();
+        }
+    }
+
+    private void initialize() {
+        eventSourceTopology.mdsalReady();
+        eventSourceManager.mdsalReady();
+        eventAggregator.mdsalReady();
+
+        LOGGER.info("InitializationContext started.");
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java
new file mode 100644 (file)
index 0000000..03b220a
--- /dev/null
@@ -0,0 +1,188 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.mdsal;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareService;
+import org.opendaylight.controller.sal.binding.api.mount.MountInstance;
+import org.opendaylight.controller.sal.binding.api.mount.MountProviderService;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.BrokerService;
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
+import org.opendaylight.controller.sal.core.api.notify.NotificationService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.RpcService;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MdSAL {
+    private static final Logger LOGGER = LoggerFactory.getLogger(MdSAL.class);
+
+    private BindingAwareBroker.ProviderContext bindingAwareContext;
+    private Broker.ProviderSession bindingIndependentContext;
+
+    // -----------------------------
+    // ----- FRAMEWORK METHODS -----
+    // -----------------------------
+    public void setBindingAwareContext(BindingAwareBroker.ProviderContext bindingAwareContext) {
+        this.bindingAwareContext = bindingAwareContext;
+    }
+
+    public void setBindingIndependentContext(Broker.ProviderSession bindingIndependentContext) {
+        this.bindingIndependentContext = bindingIndependentContext;
+    }
+
+    //TODO: We should hide brokers and expose functionalities instead
+    public DataBroker getDataBroker() {
+        return getBaSalService(DataBroker.class);
+    }
+
+    public synchronized boolean isReady() {
+        return (bindingAwareContext != null && bindingIndependentContext != null);
+    }
+
+    // -----------------------
+    // ----- API METHODS -----
+    // -----------------------
+    // TODO: Factor out API methods to interface
+    // method does not return registration object. Rather will hold references internally and manipulate using node id and API
+    public <T extends RpcService> void addRpcImplementation(Class<T> serviceInterface,
+                                                            T implementation)
+            throws IllegalStateException {
+        bindingAwareContext.addRpcImplementation(serviceInterface, implementation);
+    }
+
+    // method does not return registration object. Rather will hold references internally and manipulate using node id and API
+    public <T extends RpcService> void addRpcImplementation(Node node,
+                                                            Class<T> serviceInterface,
+                                                            T implementation)
+            throws IllegalStateException {
+        BindingAwareBroker.RoutedRpcRegistration<T> registration
+                = addRoutedRpcImplementation(serviceInterface, implementation);
+
+        NodeRef nodeRef = createNodeRef(node.getId());
+        registration.registerPath(NodeContext.class, nodeRef.getValue());
+    }
+
+    public ListenerRegistration<NotificationListener> addNotificationListener(String nodeId,
+                                                                              QName notification,
+                                                                              NotificationListener listener) {
+        YangInstanceIdentifier yii = inventoryNodeBIIdentifier(nodeId);
+
+        NotificationService notificationService =
+                getBiSalService(DOMMountPointService.class)
+                        .getMountPoint(yii)
+                        .get()
+                        .getService(NotificationPublishService.class)
+                        .get();
+
+        ListenerRegistration<NotificationListener> registration =
+                notificationService.addNotificationListener(notification, listener);
+
+        LOGGER.info("Notification listener registered for {}, at node {}", notification, nodeId);
+
+        return registration;
+    }
+
+    public ListenerRegistration<NotificationListener> addNotificationListener(QName notification,
+                                                                              NotificationListener listener) {
+        NotificationService notificationService =
+                getBiSalService(NotificationPublishService.class);
+
+        ListenerRegistration<NotificationListener> registration =
+                notificationService.addNotificationListener(notification, listener);
+
+        LOGGER.info("Notification listener registered for {}.", notification);
+
+        return registration;
+    }
+
+    public <T extends RpcService> T getRpcService(Class<T> serviceInterface) {
+        return bindingAwareContext.getRpcService(serviceInterface);
+    }
+
+    public <T extends RpcService> T getRpcService(String nodeId, Class<T> serviceInterface) {
+        MountProviderService mountProviderService = getBaSalService(MountProviderService.class);
+
+        InstanceIdentifier<Node> key = InstanceIdentifier.create(Nodes.class)
+                                                         .child(Node.class,
+                                                                 new NodeKey(new NodeId(nodeId)));
+
+        MountInstance mountPoint = mountProviderService.getMountPoint(key);
+        return mountPoint.getRpcService(serviceInterface);
+    }
+
+    public void publishNotification(CompositeNode notification) {
+        getBiSalService(NotificationPublishService.class).publish(notification);
+    }
+
+    public SchemaContext getSchemaContext(String nodeId) {
+        YangInstanceIdentifier yii = inventoryNodeBIIdentifier(nodeId);
+
+        SchemaContext schemaContext =
+                getBiSalService(DOMMountPointService.class)
+                        .getMountPoint(yii)
+                        .get().getSchemaContext();
+
+        return schemaContext;
+    }
+
+    // ---------------------------
+    // ----- UTILITY METHODS -----
+    // ---------------------------
+    private <T extends BindingAwareService> T getBaSalService(Class<T> service) {
+        return bindingAwareContext.getSALService(service);
+    }
+
+    private <T extends BrokerService> T getBiSalService(Class<T> service) {
+        return bindingIndependentContext.getService(service);
+    }
+
+    private static final String NODE_ID_NAME = "id";
+
+    public static YangInstanceIdentifier inventoryNodeBIIdentifier(String nodeId) {
+        return YangInstanceIdentifier.builder()
+                .node(Nodes.QNAME)
+                .nodeWithKey(Node.QNAME,
+                             QName.create(Node.QNAME.getNamespace(),
+                                          Node.QNAME.getRevision(),
+                                          NODE_ID_NAME),
+                             nodeId)
+                .build();
+    }
+
+    private <T extends RpcService> BindingAwareBroker.RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> serviceInterface,
+                                                                                                          T implementation)
+            throws IllegalStateException {
+        return bindingAwareContext.addRoutedRpcImplementation(serviceInterface, implementation);
+    }
+
+    public static NodeRef createNodeRef(NodeId nodeId) {
+        NodeKey nodeKey = new NodeKey(nodeId);
+        InstanceIdentifier<Node> path = InstanceIdentifier
+                .builder(Nodes.class)
+                .child(Node.class, nodeKey)
+                .build();
+        return new NodeRef(path);
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/Providers.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/Providers.java
new file mode 100644 (file)
index 0000000..a28e588
--- /dev/null
@@ -0,0 +1,57 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.mdsal;
+
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
+import org.opendaylight.controller.sal.core.api.AbstractProvider;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Providers {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Providers.class);
+
+    public static class BindingAware implements BindingAwareProvider, AutoCloseable {
+        private final InitializationContext initializationContext;
+
+        public BindingAware(InitializationContext ic) {
+            this.initializationContext = ic;
+        }
+
+        @Override
+        public void onSessionInitiated(BindingAwareBroker.ProviderContext session) {
+            initializationContext.set(session);
+
+            LOGGER.info("BindingAwareBroker.ProviderContext initialized");
+        }
+
+        @Override
+        public void close() throws Exception {}
+    }
+
+    public static class BindingIndependent extends AbstractProvider implements AutoCloseable {
+        private final InitializationContext initializationContext;
+
+        public BindingIndependent(InitializationContext ic) {
+            this.initializationContext = ic;
+        }
+
+        @Override
+        public void onSessionInitiated(Broker.ProviderSession session) {
+            initializationContext.set(session);
+
+            LOGGER.info("Broker.ProviderSession initialized");
+        }
+
+        @Override
+        public void close() throws Exception {}
+    }
+
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java
new file mode 100644 (file)
index 0000000..4b77bf2
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import org.opendaylight.controller.mdsal.MdSAL;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO: implement topic created notification
+public class EventAggregator implements EventAggregatorService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventAggregator.class);
+
+    private final MdSAL mdSAL;
+    private final EventSourceTopology eventSourceTopology;
+
+    public EventAggregator(final MdSAL mdSAL, final EventSourceTopology eventSourceTopology) {
+        this.mdSAL = mdSAL;
+        this.eventSourceTopology = eventSourceTopology;
+    }
+
+    public void mdsalReady() {
+        mdSAL.addRpcImplementation(EventAggregatorService.class, this);
+    }
+
+    @Override
+    public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
+        LOGGER.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
+                input.getNotificationPattern(),
+                input.getNodeIdPattern());
+
+        Topic topic = new Topic(new NotificationPattern(input.getNotificationPattern()), input.getNodeIdPattern().getValue(), mdSAL);
+
+        //# Make sure we capture all nodes from now on
+        eventSourceTopology.registerDataChangeListener(topic);
+
+        //# Notify existing nodes
+        //# Code reader note: Context of Node type is NetworkTopology
+        List<Node> nodes = eventSourceTopology.snapshot();
+        for (Node node : nodes) {
+            NodeId nodeIdToNotify = node.getAugmentation(Node1.class).getEventSourceNode();
+            topic.notifyNode(nodeIdToNotify);
+        }
+
+        CreateTopicOutput cto = new CreateTopicOutputBuilder()
+                .setTopicId(topic.getTopicId())
+                .build();
+
+        return Util.resultFor(cto);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
+        // 1. UNREGISTER DATA CHANGE LISTENER -> ?
+        // 2. CLOSE TOPIC
+        return null;
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java
new file mode 100644 (file)
index 0000000..a84eddd
--- /dev/null
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.mdsal.DataStore;
+import org.opendaylight.controller.mdsal.MdSAL;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public final class EventSourceManager implements DataChangeListener {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventSourceManager.class);
+    private static final InstanceIdentifier<Node> INVENTORY_PATH = InstanceIdentifier.create(Nodes.class)
+                                                                                     .child(Node.class);
+    private final DataStore dataStore;
+    private final MdSAL mdSal;
+    private final EventSourceTopology eventSourceTopology;
+    private final Map<String, String> streamMap;
+
+    public EventSourceManager(DataStore dataStore,
+                              MdSAL mdSal,
+                              EventSourceTopology eventSourceTopology,
+                              List<NamespaceToStream> namespaceMapping) {
+        this.dataStore = dataStore;
+        this.mdSal = mdSal;
+        this.eventSourceTopology = eventSourceTopology;
+        this.streamMap = namespaceToStreamMapping(namespaceMapping);
+    }
+
+    private Map namespaceToStreamMapping(List<NamespaceToStream> namespaceMapping) {
+        Map<String, String> streamMap = new HashMap<>(namespaceMapping.size());
+
+        for (NamespaceToStream nToS  : namespaceMapping) {
+            streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName());
+        }
+
+        return streamMap;
+    }
+
+    public void mdsalReady() {
+        dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+                                             INVENTORY_PATH,
+                                             this,
+                                             DataBroker.DataChangeScope.SUBTREE);
+
+        LOGGER.info("EventSourceManager initialized.");
+    }
+
+    @Override
+    public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
+        //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect.
+        LOGGER.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
+
+        Node node = Util.getAffectedNode(event);
+        // we listen on node tree, therefore we should rather throw IllegalStateException when node is null
+        if ( node == null ) {
+            LOGGER.debug("OnDataChanged Event. Node is null.");
+            return;
+        }
+        if ( isNetconfNode(node) == false ) {
+            LOGGER.debug("OnDataChanged Event. Not a Netconf node.");
+            return;
+        }
+        if ( isEventSource(node) == false ) {
+            LOGGER.debug("OnDataChanged Event. Node an EventSource node.");
+            return;
+        }
+
+        NetconfEventSource netconfEventSource = new NetconfEventSource(mdSal,
+                                                                       node.getKey().getId().getValue(),
+                                                                       streamMap);
+        mdSal.addRpcImplementation(node, EventSourceService.class, netconfEventSource);
+
+        InstanceIdentifier<NetconfNode> nodeInstanceIdentifier =
+                InstanceIdentifier.create(Nodes.class)
+                        .child(Node.class, node.getKey())
+                        .augmentation(NetconfNode.class);
+
+        dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+                nodeInstanceIdentifier,
+                netconfEventSource,
+                DataBroker.DataChangeScope.SUBTREE);
+
+        eventSourceTopology.insert(node);
+    }
+
+    private boolean isNetconfNode(Node node)  {
+        return node.getAugmentation(NetconfNode.class) != null ;
+    }
+
+    public boolean isEventSource(Node node) {
+        NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+
+        return isEventSource(netconfNode);
+    }
+
+    private boolean isEventSource(NetconfNode node) {
+        for (String capability : node.getInitialCapability()) {
+            if(capability.startsWith("urn:ietf:params:xml:ns:netconf:notification")) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
new file mode 100644 (file)
index 0000000..c070097
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.mdsal.DataStore;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class EventSourceTopology {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventSourceTopology.class);
+
+    private static final String topologyId = "EVENT-SOURCE-TOPOLOGY" ;
+    private static final TopologyKey topologyKey = new TopologyKey(new TopologyId(topologyId));
+    private static final LogicalDatastoreType datastoreType = LogicalDatastoreType.OPERATIONAL;
+
+    private static final InstanceIdentifier<Topology> topologyInstanceIdentifier =
+            InstanceIdentifier.create(NetworkTopology.class)
+                    .child(Topology.class, topologyKey);
+
+    private static final InstanceIdentifier<TopologyTypes1> topologyTypeInstanceIdentifier =
+            topologyInstanceIdentifier
+                    .child(TopologyTypes.class)
+                    .augmentation(TopologyTypes1.class);
+
+    private static final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
+                                            .network.topology.rev131021.network.topology.topology.Node> eventSourceTopologyPath =
+            InstanceIdentifier.create(NetworkTopology.class)
+                    .child(Topology.class)
+                    .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
+                            .network.topology.rev131021.network.topology.topology.Node.class);
+
+    private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> registrations =
+            new ConcurrentHashMap<>();
+
+    private final DataStore dataStore;
+
+    public EventSourceTopology(DataStore dataStore) {
+        this.dataStore = dataStore;
+    }
+
+    public void mdsalReady() {
+        TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
+        TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
+
+        dataStore.asyncPUT(datastoreType, topologyTypeInstanceIdentifier, topologyTypeAugment);
+    }
+
+    public void insert(Node node) {
+        String nodeId = node.getKey().getId().getValue();
+        NodeKey nodeKey = new NodeKey(new NodeId(nodeId));
+        InstanceIdentifier<Node1> topologyNodeAugment
+                = topologyInstanceIdentifier
+                .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
+                        .network.topology.rev131021.network.topology.topology.Node.class, nodeKey)
+                .augmentation(Node1.class);
+
+        Node1 nodeAgument = new Node1Builder().setEventSourceNode(node.getId()).build();
+        dataStore.asyncPUT(datastoreType, topologyNodeAugment, nodeAgument);
+    }
+
+    // TODO: Should we expose this functioanlity over RPC?
+    public List<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
+                .network.topology.rev131021.network.topology.topology.Node> snapshot() {
+        Topology topology = dataStore.read(datastoreType, topologyInstanceIdentifier);
+        return topology.getNode();
+    }
+
+    public void registerDataChangeListener(DataChangeListener listener) {
+        ListenerRegistration<DataChangeListener> listenerRegistration = dataStore.registerDataChangeListener(datastoreType,
+                eventSourceTopologyPath,
+                listener,
+                DataBroker.DataChangeScope.SUBTREE);
+
+        registrations.put(listener, listenerRegistration);
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java
new file mode 100644 (file)
index 0000000..9c0697f
--- /dev/null
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.mdsal.MdSAL;
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetconfEventSource implements EventSourceService, NotificationListener, DataChangeListener {
+    private static final Logger LOGGER = LoggerFactory.getLogger(NetconfEventSource.class);
+
+    private final MdSAL mdSal;
+    private final String nodeId;
+
+    private final List<String> activeStreams = new ArrayList<>();
+
+    private final Map<String, String> urnPrefixToStreamMap;
+
+    public NetconfEventSource(final MdSAL mdSal, final String nodeId, final Map<String, String> streamMap) {
+        Preconditions.checkNotNull(mdSal);
+        Preconditions.checkNotNull(nodeId);
+
+        this.mdSal = mdSal;
+        this.nodeId = nodeId;
+        this.urnPrefixToStreamMap = streamMap;
+
+        LOGGER.info("NetconfEventSource [{}] created.", nodeId);
+    }
+
+    @Override
+    public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
+        final NotificationPattern notificationPattern = input.getNotificationPattern();
+
+        // FIXME: default language should already be regex
+        final String regex = Util.wildcardToRegex(notificationPattern.getValue());
+
+        final Pattern pattern = Pattern.compile(regex);
+        List<QName> matchingNotifications = Util.expandQname(availableNotifications(), pattern);
+        registerNotificationListener(matchingNotifications);
+        return null;
+    }
+
+    private List<QName> availableNotifications() {
+        // FIXME: use SchemaContextListener to get changes asynchronously
+        Set<NotificationDefinition> availableNotifications = mdSal.getSchemaContext(nodeId).getNotifications();
+        List<QName> qNs = new ArrayList<>(availableNotifications.size());
+        for (NotificationDefinition nd : availableNotifications) {
+            qNs.add(nd.getQName());
+        }
+
+        return qNs;
+    }
+
+    private void registerNotificationListener(final List<QName> notificationsToSubscribe) {
+        for (QName qName : notificationsToSubscribe) {
+            startSubscription(qName);
+            // FIXME: do not lose this registration
+            final ListenerRegistration<NotificationListener> reg = mdSal.addNotificationListener(nodeId, qName, this);
+        }
+    }
+
+    private synchronized void startSubscription(final QName qName) {
+        String streamName = resolveStream(qName);
+
+        if (streamIsActive(streamName) == false) {
+            LOGGER.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId);
+            startSubscription(streamName);
+        }
+    }
+
+    private synchronized void resubscribeToActiveStreams() {
+        for (String streamName : activeStreams) {
+            startSubscription(streamName);
+        }
+    }
+
+    private synchronized void startSubscription(final String streamName) {
+        CreateSubscriptionInput subscriptionInput = getSubscriptionInput(streamName);
+        mdSal.getRpcService(nodeId, NotificationsService.class).createSubscription(subscriptionInput);
+        activeStreams.add(streamName);
+    }
+
+    private static CreateSubscriptionInput getSubscriptionInput(final String streamName) {
+        CreateSubscriptionInputBuilder csib = new CreateSubscriptionInputBuilder();
+        csib.setStream(new StreamNameType(streamName));
+        return csib.build();
+    }
+
+    private String resolveStream(final QName qName) {
+        String streamName = null;
+
+        for (Map.Entry<String, String> entry : urnPrefixToStreamMap.entrySet()) {
+            String nameSpace = qName.getNamespace().toString();
+            String urnPrefix = entry.getKey();
+            if( nameSpace.startsWith(urnPrefix) ) {
+                streamName = entry.getValue();
+                break;
+            }
+        }
+
+        return streamName;
+    }
+
+    private boolean streamIsActive(final String streamName) {
+        return activeStreams.contains(streamName);
+    }
+
+    // PASS
+    @Override public Set<QName> getSupportedNotifications() {
+        return null;
+    }
+
+    @Override
+    public void onNotification(final CompositeNode notification) {
+        LOGGER.info("NetconfEventSource {} received notification {}. Will publish to MD-SAL.", nodeId, notification);
+        ImmutableCompositeNode payload = ImmutableCompositeNode.builder()
+                .setQName(QName.create(TopicNotification.QNAME, "payload"))
+                .add(notification).toInstance();
+        ImmutableCompositeNode icn = ImmutableCompositeNode.builder()
+                .setQName(TopicNotification.QNAME)
+                .add(payload)
+                .addLeaf("event-source", nodeId)
+                .toInstance();
+
+        mdSal.publishNotification(icn);
+    }
+
+    @Override
+    public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+        boolean wasConnected = false;
+        boolean nowConnected = false;
+
+        for (Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getOriginalData().entrySet()) {
+            if ( isNetconfNode(changeEntry) ) {
+                NetconfNode nn = (NetconfNode)changeEntry.getValue();
+                wasConnected = nn.isConnected();
+            }
+        }
+
+        for (Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getUpdatedData().entrySet()) {
+            if ( isNetconfNode(changeEntry) ) {
+                NetconfNode nn = (NetconfNode)changeEntry.getValue();
+                nowConnected = nn.isConnected();
+            }
+        }
+
+        if (wasConnected == false && nowConnected == true) {
+            resubscribeToActiveStreams();
+        }
+    }
+
+    private static boolean isNetconfNode(final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry )  {
+        return NetconfNode.class.equals(changeEntry.getKey().getTargetType());
+    }
+
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Topic.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Topic.java
new file mode 100644 (file)
index 0000000..aebde0c
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.base.Preconditions;
+import java.util.regex.Pattern;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.mdsal.MdSAL;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.LoggerFactory;
+
+public class Topic implements DataChangeListener {
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(Topic.class);
+    private final NotificationPattern notificationPattern;
+    private final Pattern nodeIdPattern;
+    private final TopicId topicId;
+    private final MdSAL mdSal;
+
+    public Topic(final NotificationPattern notificationPattern, final String nodeIdPattern, final MdSAL mdSal) {
+        this.notificationPattern = Preconditions.checkNotNull(notificationPattern);
+
+        // FIXME: regex should be the language of nodeIdPattern
+        final String regex = Util.wildcardToRegex(nodeIdPattern);
+        this.nodeIdPattern = Pattern.compile(regex);
+        this.mdSal = Preconditions.checkNotNull(mdSal);
+
+        // FIXME: We need to perform some salting in order to make
+        //        the topic IDs less predictable.
+        this.topicId = new TopicId(Util.md5String(notificationPattern + nodeIdPattern));
+    }
+
+    public TopicId getTopicId() {
+        return topicId;
+    }
+
+    @Override
+    public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
+        // TODO: affected must return topologyNode !!!
+        final Node node = Util.getAffectedNode(event);
+        if (nodeIdPattern.matcher(node.getId().getValue()).matches()) {
+            notifyNode(node.getId());
+        } else {
+            LOG.debug("Skipping node {}", node.getId());
+        }
+    }
+
+    public void notifyNode(final NodeId nodeId) {
+        JoinTopicInput jti = getJoinTopicInputArgument(nodeId);
+        EventSourceService ess = mdSal.getRpcService(EventSourceService.class);
+
+        if (ess == null) {
+            throw new IllegalStateException("EventSourceService is not registered.");
+        }
+
+        ess.joinTopic(jti);
+    }
+
+    private JoinTopicInput getJoinTopicInputArgument(final NodeId nodeId) {
+        NodeRef nodeRef = MdSAL.createNodeRef(nodeId);
+        JoinTopicInput jti =
+                new JoinTopicInputBuilder()
+                        .setNode(nodeRef.getValue())
+                        .setTopicId(topicId)
+                        .setNotificationPattern(notificationPattern)
+                        .build();
+        return jti;
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java
new file mode 100644 (file)
index 0000000..9927d85
--- /dev/null
@@ -0,0 +1,137 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.util.concurrent.Futures;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+public final class Util {
+    private static final MessageDigest messageDigestTemplate = getDigestInstance();
+
+    private static MessageDigest getDigestInstance() {
+        try {
+            return MessageDigest.getInstance("MD5");
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("Unable to get MD5 instance");
+        }
+    }
+
+    public static String md5String(final String inputString) {
+
+        try {
+            MessageDigest md = (MessageDigest)messageDigestTemplate.clone();
+            md.update(inputString.getBytes("UTF-8"), 0, inputString.length());
+            return new BigInteger(1, md.digest()).toString(16);
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to get MD5 instance");
+        }
+    }
+
+    public static <T> Future<RpcResult<T>> resultFor(final T output) {
+        RpcResult<T> result = Rpcs.getRpcResult(true, output, Collections.<RpcError>emptyList());
+        return Futures.immediateFuture(result);
+    }
+
+    /**
+     * Extracts affected node from data change event.
+     * @param event
+     * @return
+     */
+    public static Node getAffectedNode(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
+        // TODO: expect listener method to be called even when change impact node
+        // TODO: test with change.getCreatedData()
+        for (Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
+            if (isNode(changeEntry)) {
+                return (Node) changeEntry.getValue();
+            }
+        }
+
+        return null;
+    }
+
+    private static boolean isNode(final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry )  {
+        return Node.class.equals(changeEntry.getKey().getTargetType());
+    }
+
+    /**
+     * Method filters qnames based on wildcard strings
+     *
+     * @param availableQnames
+     * @param patterh matching pattern
+     * @return list of filtered qnames
+     */
+    public static List<QName> expandQname(final List<QName> availableQnames, final Pattern pattern) {
+        List<QName> matchingQnames = new ArrayList<>();
+
+        for (QName qname : availableQnames) {
+            String namespace = qname.getNamespace().toString();
+            if (pattern.matcher(namespace).matches()) {
+                matchingQnames.add(qname);
+            }
+        }
+
+        return matchingQnames;
+    }
+
+    /**
+     * CREDIT to http://www.rgagnon.com/javadetails/java-0515.html
+     * @param wildcard
+     * @return
+     */
+    static String wildcardToRegex(final String wildcard){
+        StringBuffer s = new StringBuffer(wildcard.length());
+        s.append('^');
+        for (char c : wildcard.toCharArray()) {
+            switch(c) {
+                case '*':
+                    s.append(".*");
+                    break;
+                case '?':
+                    s.append('.');
+                    break;
+                // escape special regexp-characters
+                case '(':
+                case ')':
+                case '[':
+                case ']':
+                case '$':
+                case '^':
+                case '.':
+                case '{':
+                case '}':
+                case '|':
+                case '\\':
+                    s.append("\\");
+                    s.append(c);
+                    break;
+                default:
+                    s.append(c);
+                    break;
+            }
+        }
+        s.append('$');
+        return s.toString();
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang b/opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang
new file mode 100644 (file)
index 0000000..bed6b10
--- /dev/null
@@ -0,0 +1,64 @@
+module messagebus-app-impl {
+    yang-version 1;
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl";
+    prefix "binding-impl";
+
+    import config { prefix config; revision-date 2013-04-05; }
+    import opendaylight-md-sal-binding {prefix sal;}
+    import opendaylight-md-sal-dom {prefix dom;}
+
+
+    description
+        "Service definition for Message Bus application implementation.";
+    revision "2015-02-03" {
+        description "Second revision. Message Bus opensourcing";
+    }
+
+    identity messagebus-app-impl {
+        base config:module-type;
+        config:java-name-prefix MessageBusAppImpl;
+    }
+    
+    augment "/config:modules/config:module/config:configuration" {
+        case messagebus-app-impl {
+            when "/config:modules/config:module/config:type = 'messagebus-app-impl'";
+            
+            container binding-broker {
+                uses config:service-ref {
+                    refine type {
+                        mandatory true;
+                        config:required-identity sal:binding-broker-osgi-registry;
+                    }
+                }
+            }
+
+            container dom-broker {
+                uses config:service-ref {
+                    refine type {
+                        mandatory true;
+                        config:required-identity dom:dom-broker-osgi-registry;
+                    }
+                }
+            }
+
+            list namespace-to-stream {
+                key urn-prefix;
+
+                leaf urn-prefix {
+                    type string;
+                }
+
+                leaf stream-name {
+                    type string;
+                }
+            }
+        }
+    }
+    
+    augment "/config:modules/config:module/config:state" {
+        case messagebus-app-impl {
+            when "/config:modules/config:module/config:type = 'messagebus-app-impl'";
+        }
+    }
+}
\ No newline at end of file
index 3151380647e22113b5a72f1dc5a090b7ffde13a8..bdeb8a65a8c2aeac617d34286bae46404767df35 100644 (file)
@@ -93,6 +93,7 @@
 
     <!-- Message Bus -->
     <module>messagebus-api</module>
+    <module>messagebus-impl</module>
   </modules>
 
   <build>