<version>${mdsal.version}</version>
</dependency>
+
<!-- toaster -->
<dependency>
<groupId>org.opendaylight.controller.samples</groupId>
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-maven-plugin</artifactId>
- <version>${yangtools.version}</version>
- <dependencies>
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>maven-sal-api-gen-plugin</artifactId>
- <version>${yangtools.version}</version>
- <type>jar</type>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-binding</artifactId>
- <version>${yangtools.version}</version>
- <type>jar</type>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>sal-rest-docgen-maven</artifactId>
- <version>${mdsal.version}</version>
- <type>jar</type>
- </dependency>
- </dependencies>
- <executions>
- <execution>
- <goals>
- <goal>generate-sources</goal>
- </goals>
- <configuration>
- <yangFilesRootDir>src</yangFilesRootDir>
- <codeGenerators>
- <generator>
- <codeGeneratorClass>org.opendaylight.controller.sal.rest.doc.maven.StaticDocGenerator</codeGeneratorClass>
- <outputBaseDir>${project.build.directory}/generated-resources/swagger-api-documentation/explorer/static</outputBaseDir>
- </generator>
- </codeGenerators>
- <inspectDependencies>true</inspectDependencies>
- </configuration>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
<scm>
<bundle>mvn:org.opendaylight.controller.samples/clustering-it-provider/${project.version}</bundle>
<configfile finalname="${config.configfile.directory}/20-clustering-test-app.xml">mvn:org.opendaylight.controller.samples/clustering-it-config/${project.version}/xml/config</configfile>
</feature>
-
</features>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</dependency>
+
+
+ <!-- message-bus -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>messagebus-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>messagebus-impl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>messagebus-config</artifactId>
+ <version>${mdsal.version}</version>
+ <type>xml</type>
+ <classifier>config</classifier>
+ </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>netconf-connector-config</artifactId>
<bundle>mvn:org.opendaylight.controller/netconf-tcp/${netconf.version}</bundle>
</feature>
+ <feature name='odl-message-bus' version='${project.version}'>
+ <feature version='${project.version}'>odl-netconf-connector</feature>
+ <feature version='${project.version}'>odl-mdsal-broker</feature>
+ <bundle>mvn:org.opendaylight.controller/messagebus-api/${project.version}</bundle>
+ <bundle>mvn:org.opendaylight.controller/messagebus-impl/${project.version}</bundle>
+ <configfile finalname="${config.configfile.directory}/05-message-bus.xml">mvn:org.opendaylight.controller/messagebus-config/${project.version}/xml/config</configfile>
+ </feature>
<!-- Optional TODO: Remove TODO Comments -->
</features>
<!-- MessageBus -->
<dependency>
<groupId>org.opendaylight.controller</groupId>
- <artifactId>message-bus-api</artifactId>
+ <artifactId>messagebus-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
- <artifactId>message-bus-impl</artifactId>
+ <artifactId>messagebus-impl</artifactId>
<version>${project.version}</version>
</dependency>
<version>1.2.0-SNAPSHOT</version>
</parent>
- <artifactId>message-bus-api</artifactId>
+ <artifactId>messagebus-api</artifactId>
<name>${project.artifactId}</name>
<packaging>bundle</packaging>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>\r
+<!--\r
+Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.\r
+\r
+This program and the accompanying materials are made available under the\r
+terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+and is available at http://www.eclipse.org/legal/epl-v10.html\r
+-->\r
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
+\r
+ <modelVersion>4.0.0</modelVersion>\r
+\r
+ <parent>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>sal-parent</artifactId>\r
+ <version>1.2.0-SNAPSHOT</version>\r
+ </parent>\r
+\r
+ <artifactId>messagebus-config</artifactId>\r
+ <packaging>jar</packaging>\r
+ <description>Configuration files for message-bus</description>\r
+\r
+ <dependencies>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>config-api</artifactId>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>messagebus-api</artifactId>\r
+ <version>${project.version}</version>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>messagebus-impl</artifactId>\r
+ <version>${project.version}</version>\r
+ </dependency>\r
+ </dependencies>\r
+\r
+ <build>\r
+ <plugins>\r
+ <plugin>\r
+ <groupId>org.codehaus.mojo</groupId>\r
+ <artifactId>build-helper-maven-plugin</artifactId>\r
+ <executions>\r
+ <execution>\r
+ <id>attach-artifacts</id>\r
+ <goals>\r
+ <goal>attach-artifact</goal>\r
+ </goals>\r
+ <phase>package</phase>\r
+ <configuration>\r
+ <artifacts>\r
+ <artifact>\r
+ <file>${project.build.directory}/classes/initial/05-message-bus.xml</file>\r
+ <type>xml</type>\r
+ <classifier>config</classifier>\r
+ </artifact>\r
+ </artifacts>\r
+ </configuration>\r
+ </execution>\r
+ </executions>\r
+ </plugin>\r
+ </plugins>\r
+ </build>\r
+\r
+ <scm>\r
+ <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>\r
+ <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>\r
+ <tag>HEAD</tag>\r
+ <url>https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=summary</url>\r
+ </scm>\r
+</project>\r
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<snapshot>
+ <configuration>
+ <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <module>
+ <name>messagebus-app</name>
+ <type xmlns:binding-impl="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">binding-impl:messagebus-app-impl</type>
+ <binding-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">
+ <type xmlns:md-sal-binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">md-sal-binding:binding-broker-osgi-registry</type>
+ <name>binding-osgi-broker</name>
+ </binding-broker>
+ <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">
+ <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
+ <name>dom-broker</name>
+ </dom-broker>
+ <namespace-to-stream>
+ <urn-prefix>urn:ietf:params:xml:ns:yang:smiv2</urn-prefix>
+ <stream-name>SNMP</stream-name>
+ </namespace-to-stream>
+ <namespace-to-stream>
+ <urn-prefix>urn:ietf:params:xml:ns:yang:ietf-syslog-notification</urn-prefix>
+ <stream-name>SYSLOG</stream-name>
+ </namespace-to-stream>
+ </module>
+ </modules>
+ </data>
+ </configuration>
+ <required-capabilities>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl?module=messagebus-app-impl&revision=2015-02-03</capability>
+ </required-capabilities>
+</snapshot>
<?xml version="1.0" encoding="UTF-8"?>\r
+<!--\r
+Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.\r
+\r
+This program and the accompanying materials are made available under the\r
+terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+and is available at http://www.eclipse.org/legal/epl-v10.html\r
+-->\r
<project xmlns="http://maven.apache.org/POM/4.0.0"\r
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"\r
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
<version>1.2.0-SNAPSHOT</version>\r
</parent>\r
\r
- <artifactId>message-bus-impl</artifactId>\r
+ <artifactId>messagebus-impl</artifactId>\r
<name>${project.artifactId}</name>\r
\r
<packaging>bundle</packaging>\r
<groupId>org.opendaylight.controller</groupId>\r
<artifactId>sal-binding-api</artifactId>\r
</dependency>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>sal-netconf-connector</artifactId>\r
+ </dependency>\r
<dependency>\r
<groupId>org.opendaylight.controller</groupId>\r
<artifactId>sal-core-api</artifactId>\r
</dependency>\r
<dependency>\r
<groupId>org.opendaylight.controller</groupId>\r
- <artifactId>message-bus-api</artifactId>\r
+ <artifactId>messagebus-api</artifactId>\r
+ <version>1.2.0-SNAPSHOT</version>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.opendaylight.controller</groupId>\r
+ <artifactId>sal-netconf-connector</artifactId>\r
</dependency>\r
<dependency>\r
<groupId>org.opendaylight.controller</groupId>\r
<artifactId>sal-binding-config</artifactId>\r
</dependency>\r
+\r
+ <!-- Testing Dependencies -->\r
+ <dependency>\r
+ <groupId>junit</groupId>\r
+ <artifactId>junit</artifactId>\r
+ <scope>test</scope>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.glassfish.jersey.test-framework.providers</groupId>\r
+ <artifactId>jersey-test-framework-provider-grizzly2</artifactId>\r
+ <scope>test</scope>\r
+ </dependency>\r
+ <dependency>\r
+ <groupId>org.mockito</groupId>\r
+ <artifactId>mockito-all</artifactId>\r
+ <scope>test</scope>\r
+ </dependency>\r
</dependencies>\r
\r
<build>\r
/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
*/
package org.opendaylight.controller.config.yang.messagebus.app.impl;
+import java.util.List;
import org.opendaylight.controller.config.api.DependencyResolver;
import org.opendaylight.controller.config.api.ModuleIdentifier;
-import org.opendaylight.controller.mdsal.InitializationContext;
-import org.opendaylight.controller.mdsal.Providers;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
+import org.opendaylight.controller.messagebus.app.impl.NetconfEventSourceManager;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-
-public class MessageBusAppImplModule extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
+public class MessageBusAppImplModule extends
+ org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageBusAppImplModule.class);
private BundleContext bundleContext;
return bundleContext;
}
- public void setBundleContext(BundleContext bundleContext) {
+ public void setBundleContext(final BundleContext bundleContext) {
this.bundleContext = bundleContext;
}
- public MessageBusAppImplModule( ModuleIdentifier identifier, DependencyResolver dependencyResolver) {
+ public MessageBusAppImplModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public MessageBusAppImplModule( ModuleIdentifier identifier,
- DependencyResolver dependencyResolver,
- MessageBusAppImplModule oldModule,
- java.lang.AutoCloseable oldInstance) {
+ public MessageBusAppImplModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver,
+ final MessageBusAppImplModule oldModule, final java.lang.AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
@Override
- protected void customValidation() {}
+ protected void customValidation() {
+ }
@Override
public java.lang.AutoCloseable createInstance() {
- List<NamespaceToStream> namespaceMapping = getNamespaceToStream();
- InitializationContext ic = new InitializationContext(namespaceMapping);
+ final List<NamespaceToStream> namespaceMapping = getNamespaceToStream();
+
+ final ProviderContext bindingCtx = getBindingBrokerDependency().registerProvider(new Providers.BindingAware());
+ final ProviderSession domCtx = getDomBrokerDependency().registerProvider(new Providers.BindingIndependent());
- final Providers.BindingAware bap = new Providers.BindingAware(ic);
- final Providers.BindingIndependent bip = new Providers.BindingIndependent(ic);
+ final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class);
+ final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class);
+ final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class);
+ final MountPointService bindingMount = bindingCtx.getSALService(MountPointService.class);
+ final RpcProviderRegistry rpcRegistry = bindingCtx.getSALService(RpcProviderRegistry.class);
- getBindingBrokerDependency().registerProvider(bap, getBundleContext());
- getDomBrokerDependency().registerProvider(bip);
+ final EventSourceTopology eventSourceTopology = new EventSourceTopology(dataBroker, rpcRegistry);
+ final NetconfEventSourceManager eventSourceManager = new NetconfEventSourceManager(dataBroker, domPublish,
+ domMount, bindingMount, eventSourceTopology, getNamespaceToStream());
- AutoCloseable closer = new AutoCloseable() {
- @Override public void close() {
- closeProvider(bap);
- closeProvider(bip);
+ final AutoCloseable closer = new AutoCloseable() {
+ @Override
+ public void close() {
+ eventSourceTopology.close();
+ eventSourceManager.close();
}
};
return closer;
}
- private void closeProvider(AutoCloseable closable) {
+ private void closeProvider(final AutoCloseable closable) {
try {
closable.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.error("Exception while closing: {}\n Exception: {}", closable, e);
}
}
/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.mdsal;
+package org.opendaylight.controller.config.yang.messagebus.app.impl;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
private static final Logger LOGGER = LoggerFactory.getLogger(Providers.class);
public static class BindingAware implements BindingAwareProvider, AutoCloseable {
- private final InitializationContext initializationContext;
- public BindingAware(InitializationContext ic) {
- this.initializationContext = ic;
- }
@Override
- public void onSessionInitiated(BindingAwareBroker.ProviderContext session) {
- initializationContext.set(session);
-
+ public void onSessionInitiated(final BindingAwareBroker.ProviderContext session) {
LOGGER.info("BindingAwareBroker.ProviderContext initialized");
}
}
public static class BindingIndependent extends AbstractProvider implements AutoCloseable {
- private final InitializationContext initializationContext;
-
- public BindingIndependent(InitializationContext ic) {
- this.initializationContext = ic;
- }
@Override
- public void onSessionInitiated(Broker.ProviderSession session) {
- initializationContext.set(session);
-
+ public void onSessionInitiated(final Broker.ProviderSession session) {
LOGGER.info("Broker.ProviderSession initialized");
}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.mdsal;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-public class DataStore {
- private static final FutureCallback<Void> DEFAULT_CALLBACK =
- new FutureCallback<Void>() {
- public void onSuccess(Void result) {
- // TODO: Implement default behaviour
- }
-
- public void onFailure(Throwable t) {
- // TODO: Implement default behaviour
- };
- };
-
- private final MdSAL mdSAL;
-
- public DataStore(MdSAL mdSAL) {
- this.mdSAL = mdSAL;
- }
-
- public ListenerRegistration<DataChangeListener> registerDataChangeListener(LogicalDatastoreType store,
- InstanceIdentifier<?> path,
- DataChangeListener listener,
- AsyncDataBroker.DataChangeScope triggeringScope) {
- return mdSAL.getDataBroker().registerDataChangeListener(store, path, listener, triggeringScope);
- }
-
- public <T extends DataObject> void asyncPUT(LogicalDatastoreType datastoreType,
- InstanceIdentifier<T> path,
- T data) {
- asyncPUT(datastoreType, path, data, DEFAULT_CALLBACK);
- }
-
- public <T extends DataObject> void asyncPUT(LogicalDatastoreType datastoreType,
- InstanceIdentifier<T> path,
- T data,
- FutureCallback<Void> callback) {
- WriteTransaction tx = mdSAL.getDataBroker().newWriteOnlyTransaction();
- tx.put(datastoreType, path, data, true);
- execPut(tx, callback);
- }
-
- public <T extends DataObject> T read(LogicalDatastoreType datastoreType,
- InstanceIdentifier<T> path) {
-
- ReadOnlyTransaction tx = mdSAL.getDataBroker().newReadOnlyTransaction();
- T result = null;
-
- try {
- result = tx.read(datastoreType, path).get().get();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return result;
- }
-
- private static void execPut(WriteTransaction tx, FutureCallback<Void> callback) {
- Futures.addCallback(tx.submit(), callback);
- }
-}
+++ /dev/null
-/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.mdsal;
-
-import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
-import org.opendaylight.controller.messagebus.app.impl.EventAggregator;
-import org.opendaylight.controller.messagebus.app.impl.EventSourceManager;
-import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class InitializationContext {
- private static final Logger LOGGER = LoggerFactory.getLogger(InitializationContext.class);
-
- private final MdSAL mdSal;
- private final DataStore dataStore;
- private final EventSourceTopology eventSourceTopology;
- private final EventSourceManager eventSourceManager;
- private final EventAggregator eventAggregator;
-
- public InitializationContext(List<NamespaceToStream> namespaceMapping) {
- this.mdSal = new MdSAL();
- this.dataStore = new DataStore(mdSal);
- this.eventSourceTopology = new EventSourceTopology(dataStore);
- this.eventSourceManager = new EventSourceManager(dataStore, mdSal, eventSourceTopology, namespaceMapping);
- this.eventAggregator = new EventAggregator(mdSal, eventSourceTopology);
- }
-
- public synchronized void set(BindingAwareBroker.ProviderContext session) {
- mdSal.setBindingAwareContext(session);
-
- if (mdSal.isReady()) {
- initialize();
- }
- }
-
- public synchronized void set(Broker.ProviderSession session) {
- mdSal.setBindingIndependentContext(session);
-
- if (mdSal.isReady()) {
- initialize();
- }
- }
-
- private void initialize() {
- eventSourceTopology.mdsalReady();
- eventSourceManager.mdsalReady();
- eventAggregator.mdsalReady();
-
- LOGGER.info("InitializationContext started.");
- }
-}
+++ /dev/null
-/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.mdsal;
-
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareService;
-import org.opendaylight.controller.sal.binding.api.mount.MountInstance;
-import org.opendaylight.controller.sal.binding.api.mount.MountProviderService;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.BrokerService;
-import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
-import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
-import org.opendaylight.controller.sal.core.api.notify.NotificationService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.RpcService;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MdSAL {
- private static final Logger LOGGER = LoggerFactory.getLogger(MdSAL.class);
-
- private BindingAwareBroker.ProviderContext bindingAwareContext;
- private Broker.ProviderSession bindingIndependentContext;
-
- // -----------------------------
- // ----- FRAMEWORK METHODS -----
- // -----------------------------
- public void setBindingAwareContext(BindingAwareBroker.ProviderContext bindingAwareContext) {
- this.bindingAwareContext = bindingAwareContext;
- }
-
- public void setBindingIndependentContext(Broker.ProviderSession bindingIndependentContext) {
- this.bindingIndependentContext = bindingIndependentContext;
- }
-
- //TODO: We should hide brokers and expose functionalities instead
- public DataBroker getDataBroker() {
- return getBaSalService(DataBroker.class);
- }
-
- public synchronized boolean isReady() {
- return (bindingAwareContext != null && bindingIndependentContext != null);
- }
-
- // -----------------------
- // ----- API METHODS -----
- // -----------------------
- // TODO: Factor out API methods to interface
- // method does not return registration object. Rather will hold references internally and manipulate using node id and API
- public <T extends RpcService> void addRpcImplementation(Class<T> serviceInterface,
- T implementation)
- throws IllegalStateException {
- bindingAwareContext.addRpcImplementation(serviceInterface, implementation);
- }
-
- // method does not return registration object. Rather will hold references internally and manipulate using node id and API
- public <T extends RpcService> void addRpcImplementation(Node node,
- Class<T> serviceInterface,
- T implementation)
- throws IllegalStateException {
- BindingAwareBroker.RoutedRpcRegistration<T> registration
- = addRoutedRpcImplementation(serviceInterface, implementation);
-
- NodeRef nodeRef = createNodeRef(node.getId());
- registration.registerPath(NodeContext.class, nodeRef.getValue());
- }
-
- public ListenerRegistration<NotificationListener> addNotificationListener(String nodeId,
- QName notification,
- NotificationListener listener) {
- YangInstanceIdentifier yii = inventoryNodeBIIdentifier(nodeId);
-
- NotificationService notificationService =
- getBiSalService(DOMMountPointService.class)
- .getMountPoint(yii)
- .get()
- .getService(NotificationPublishService.class)
- .get();
-
- ListenerRegistration<NotificationListener> registration =
- notificationService.addNotificationListener(notification, listener);
-
- LOGGER.info("Notification listener registered for {}, at node {}", notification, nodeId);
-
- return registration;
- }
-
- public ListenerRegistration<NotificationListener> addNotificationListener(QName notification,
- NotificationListener listener) {
- NotificationService notificationService =
- getBiSalService(NotificationPublishService.class);
-
- ListenerRegistration<NotificationListener> registration =
- notificationService.addNotificationListener(notification, listener);
-
- LOGGER.info("Notification listener registered for {}.", notification);
-
- return registration;
- }
-
- public <T extends RpcService> T getRpcService(Class<T> serviceInterface) {
- return bindingAwareContext.getRpcService(serviceInterface);
- }
-
- public <T extends RpcService> T getRpcService(String nodeId, Class<T> serviceInterface) {
- MountProviderService mountProviderService = getBaSalService(MountProviderService.class);
-
- InstanceIdentifier<Node> key = InstanceIdentifier.create(Nodes.class)
- .child(Node.class,
- new NodeKey(new NodeId(nodeId)));
-
- MountInstance mountPoint = mountProviderService.getMountPoint(key);
- return mountPoint.getRpcService(serviceInterface);
- }
-
- public void publishNotification(CompositeNode notification) {
- getBiSalService(NotificationPublishService.class).publish(notification);
- }
-
- public SchemaContext getSchemaContext(String nodeId) {
- YangInstanceIdentifier yii = inventoryNodeBIIdentifier(nodeId);
-
- SchemaContext schemaContext =
- getBiSalService(DOMMountPointService.class)
- .getMountPoint(yii)
- .get().getSchemaContext();
-
- return schemaContext;
- }
-
- // ---------------------------
- // ----- UTILITY METHODS -----
- // ---------------------------
- private <T extends BindingAwareService> T getBaSalService(Class<T> service) {
- return bindingAwareContext.getSALService(service);
- }
-
- private <T extends BrokerService> T getBiSalService(Class<T> service) {
- return bindingIndependentContext.getService(service);
- }
-
- private static final String NODE_ID_NAME = "id";
-
- public static YangInstanceIdentifier inventoryNodeBIIdentifier(String nodeId) {
- return YangInstanceIdentifier.builder()
- .node(Nodes.QNAME)
- .nodeWithKey(Node.QNAME,
- QName.create(Node.QNAME.getNamespace(),
- Node.QNAME.getRevision(),
- NODE_ID_NAME),
- nodeId)
- .build();
- }
-
- private <T extends RpcService> BindingAwareBroker.RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> serviceInterface,
- T implementation)
- throws IllegalStateException {
- return bindingAwareContext.addRoutedRpcImplementation(serviceInterface, implementation);
- }
-
- public static NodeRef createNodeRef(NodeId nodeId) {
- NodeKey nodeKey = new NodeKey(nodeId);
- InstanceIdentifier<Node> path = InstanceIdentifier
- .builder(Nodes.class)
- .child(Node.class, nodeKey)
- .build();
- return new NodeRef(path);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.messagebus.app.impl;
-
-import java.util.List;
-import java.util.concurrent.Future;
-import org.opendaylight.controller.mdsal.MdSAL;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-// TODO: implement topic created notification
-public class EventAggregator implements EventAggregatorService {
- private static final Logger LOGGER = LoggerFactory.getLogger(EventAggregator.class);
-
- private final MdSAL mdSAL;
- private final EventSourceTopology eventSourceTopology;
-
- public EventAggregator(final MdSAL mdSAL, final EventSourceTopology eventSourceTopology) {
- this.mdSAL = mdSAL;
- this.eventSourceTopology = eventSourceTopology;
- }
-
- public void mdsalReady() {
- mdSAL.addRpcImplementation(EventAggregatorService.class, this);
- }
-
- @Override
- public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
- LOGGER.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
- input.getNotificationPattern(),
- input.getNodeIdPattern());
-
- Topic topic = new Topic(new NotificationPattern(input.getNotificationPattern()), input.getNodeIdPattern().getValue(), mdSAL);
-
- //# Make sure we capture all nodes from now on
- eventSourceTopology.registerDataChangeListener(topic);
-
- //# Notify existing nodes
- //# Code reader note: Context of Node type is NetworkTopology
- List<Node> nodes = eventSourceTopology.snapshot();
- for (Node node : nodes) {
- NodeId nodeIdToNotify = node.getAugmentation(Node1.class).getEventSourceNode();
- topic.notifyNode(nodeIdToNotify);
- }
-
- CreateTopicOutput cto = new CreateTopicOutputBuilder()
- .setTopicId(topic.getTopicId())
- .build();
-
- return Util.resultFor(cto);
- }
-
- @Override
- public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
- // 1. UNREGISTER DATA CHANGE LISTENER -> ?
- // 2. CLOSE TOPIC
- return null;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.messagebus.app.impl;
-
-import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.mdsal.DataStore;
-import org.opendaylight.controller.mdsal.MdSAL;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public final class EventSourceManager implements DataChangeListener {
- private static final Logger LOGGER = LoggerFactory.getLogger(EventSourceManager.class);
- private static final InstanceIdentifier<Node> INVENTORY_PATH = InstanceIdentifier.create(Nodes.class)
- .child(Node.class);
- private final DataStore dataStore;
- private final MdSAL mdSal;
- private final EventSourceTopology eventSourceTopology;
- private final Map<String, String> streamMap;
-
- public EventSourceManager(DataStore dataStore,
- MdSAL mdSal,
- EventSourceTopology eventSourceTopology,
- List<NamespaceToStream> namespaceMapping) {
- this.dataStore = dataStore;
- this.mdSal = mdSal;
- this.eventSourceTopology = eventSourceTopology;
- this.streamMap = namespaceToStreamMapping(namespaceMapping);
- }
-
- private Map namespaceToStreamMapping(List<NamespaceToStream> namespaceMapping) {
- Map<String, String> streamMap = new HashMap<>(namespaceMapping.size());
-
- for (NamespaceToStream nToS : namespaceMapping) {
- streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName());
- }
-
- return streamMap;
- }
-
- public void mdsalReady() {
- dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- INVENTORY_PATH,
- this,
- DataBroker.DataChangeScope.SUBTREE);
-
- LOGGER.info("EventSourceManager initialized.");
- }
-
- @Override
- public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
- //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect.
- LOGGER.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
-
- Node node = Util.getAffectedNode(event);
- // we listen on node tree, therefore we should rather throw IllegalStateException when node is null
- if ( node == null ) {
- LOGGER.debug("OnDataChanged Event. Node is null.");
- return;
- }
- if ( isNetconfNode(node) == false ) {
- LOGGER.debug("OnDataChanged Event. Not a Netconf node.");
- return;
- }
- if ( isEventSource(node) == false ) {
- LOGGER.debug("OnDataChanged Event. Node an EventSource node.");
- return;
- }
-
- NetconfEventSource netconfEventSource = new NetconfEventSource(mdSal,
- node.getKey().getId().getValue(),
- streamMap);
- mdSal.addRpcImplementation(node, EventSourceService.class, netconfEventSource);
-
- InstanceIdentifier<NetconfNode> nodeInstanceIdentifier =
- InstanceIdentifier.create(Nodes.class)
- .child(Node.class, node.getKey())
- .augmentation(NetconfNode.class);
-
- dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- nodeInstanceIdentifier,
- netconfEventSource,
- DataBroker.DataChangeScope.SUBTREE);
-
- eventSourceTopology.insert(node);
- }
-
- private boolean isNetconfNode(Node node) {
- return node.getAugmentation(NetconfNode.class) != null ;
- }
-
- public boolean isEventSource(Node node) {
- NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
-
- return isEventSource(netconfNode);
- }
-
- private boolean isEventSource(NetconfNode node) {
- for (String capability : node.getInitialCapability()) {
- if(capability.startsWith("urn:ietf:params:xml:ns:netconf:notification")) {
- return true;
- }
- }
-
- return false;
- }
-}
\ No newline at end of file
/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
package org.opendaylight.controller.messagebus.app.impl;
import com.google.common.base.Preconditions;
+import java.util.Map;
import java.util.regex.Pattern;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.mdsal.MdSAL;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.LoggerFactory;
-public class Topic implements DataChangeListener {
- private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(Topic.class);
+public class EventSourceTopic implements DataChangeListener {
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
private final NotificationPattern notificationPattern;
+ private final EventSourceService sourceService;
private final Pattern nodeIdPattern;
private final TopicId topicId;
- private final MdSAL mdSal;
- public Topic(final NotificationPattern notificationPattern, final String nodeIdPattern, final MdSAL mdSal) {
+ public EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdPattern, final EventSourceService eventSource) {
this.notificationPattern = Preconditions.checkNotNull(notificationPattern);
+ this.sourceService = eventSource;
// FIXME: regex should be the language of nodeIdPattern
final String regex = Util.wildcardToRegex(nodeIdPattern);
this.nodeIdPattern = Pattern.compile(regex);
- this.mdSal = Preconditions.checkNotNull(mdSal);
+
// FIXME: We need to perform some salting in order to make
// the topic IDs less predictable.
@Override
public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
- // TODO: affected must return topologyNode !!!
- final Node node = Util.getAffectedNode(event);
- if (nodeIdPattern.matcher(node.getId().getValue()).matches()) {
- notifyNode(node.getId());
- } else {
- LOG.debug("Skipping node {}", node.getId());
+ for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
+ if (changeEntry.getValue() instanceof Node) {
+ final Node node = (Node) changeEntry.getValue();
+ if (nodeIdPattern.matcher(node.getId().getValue()).matches()) {
+ notifyNode(changeEntry.getKey());
+ }
+ }
}
}
- public void notifyNode(final NodeId nodeId) {
- JoinTopicInput jti = getJoinTopicInputArgument(nodeId);
- EventSourceService ess = mdSal.getRpcService(EventSourceService.class);
- Preconditions.checkState(ess != null, "EventSourceService is not registered");
-
- ess.joinTopic(jti);
+ public void notifyNode(final InstanceIdentifier<?> nodeId) {
+ try {
+ sourceService.joinTopic(getJoinTopicInputArgument(nodeId));
+ } catch (final Exception e) {
+ LOG.error("Could not invoke join topic for node {}", nodeId);
+ }
}
- private JoinTopicInput getJoinTopicInputArgument(final NodeId nodeId) {
- NodeRef nodeRef = MdSAL.createNodeRef(nodeId);
- JoinTopicInput jti =
+ private JoinTopicInput getJoinTopicInputArgument(final InstanceIdentifier<?> path) {
+ final NodeRef nodeRef = new NodeRef(path);
+ final JoinTopicInput jti =
new JoinTopicInputBuilder()
.setNode(nodeRef.getValue())
.setTopicId(topicId)
.build();
return jti;
}
+
+
}
/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
package org.opendaylight.controller.messagebus.app.impl;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.mdsal.DataStore;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.regex.Pattern;
-public class EventSourceTopology {
- private static final Logger LOGGER = LoggerFactory.getLogger(EventSourceTopology.class);
+public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
- private static final String topologyId = "EVENT-SOURCE-TOPOLOGY" ;
- private static final TopologyKey topologyKey = new TopologyKey(new TopologyId(topologyId));
- private static final LogicalDatastoreType datastoreType = LogicalDatastoreType.OPERATIONAL;
+ private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
+ private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
+ private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
- private static final InstanceIdentifier<Topology> topologyInstanceIdentifier =
+ private static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
InstanceIdentifier.create(NetworkTopology.class)
- .child(Topology.class, topologyKey);
+ .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
- private static final InstanceIdentifier<TopologyTypes1> topologyTypeInstanceIdentifier =
- topologyInstanceIdentifier
+ private static final InstanceIdentifier<TopologyTypes1> TOPOLOGY_TYPE_PATH =
+ EVENT_SOURCE_TOPOLOGY_PATH
.child(TopologyTypes.class)
.augmentation(TopologyTypes1.class);
- private static final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
- .network.topology.rev131021.network.topology.topology.Node> eventSourceTopologyPath =
- InstanceIdentifier.create(NetworkTopology.class)
- .child(Topology.class)
- .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
- .network.topology.rev131021.network.topology.topology.Node.class);
-
private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> registrations =
new ConcurrentHashMap<>();
- private final DataStore dataStore;
+ private final DataBroker dataBroker;
+ private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
+ private final EventSourceService eventSourceService;
+ private final RpcProviderRegistry rpcRegistry;
+ private final ExecutorService executorService;
+
+ public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
+ this.dataBroker = dataBroker;
+ this.executorService = Executors.newCachedThreadPool();
+ this.rpcRegistry = rpcRegistry;
+ aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
+ eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
+
+ final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
+ final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
+ putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
+ }
+
+ private <T extends DataObject> void putData(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path, final T data) {
- public EventSourceTopology(DataStore dataStore) {
- this.dataStore = dataStore;
+ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+ tx.put(store, path, data, true);
+ tx.submit();
}
- public void mdsalReady() {
- TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
- TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
+ private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
+ final NodeKey nodeKey = node.getKey();
+ final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
+ final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
+ putData(OPERATIONAL, augmentPath, nodeAgument);
+ }
- dataStore.asyncPUT(datastoreType, topologyTypeInstanceIdentifier, topologyTypeAugment);
+ private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
+ executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic));
}
- public void insert(Node node) {
- String nodeId = node.getKey().getId().getValue();
- NodeKey nodeKey = new NodeKey(new NodeId(nodeId));
- InstanceIdentifier<Node1> topologyNodeAugment
- = topologyInstanceIdentifier
- .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
- .network.topology.rev131021.network.topology.topology.Node.class, nodeKey)
- .augmentation(Node1.class);
-
- Node1 nodeAgument = new Node1Builder().setEventSourceNode(node.getId()).build();
- dataStore.asyncPUT(datastoreType, topologyNodeAugment, nodeAgument);
+ @Override
+ public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
+ LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
+ input.getNotificationPattern(),
+ input.getNodeIdPattern());
+
+ final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
+ final String nodeIdPattern = input.getNodeIdPattern().getValue();
+ final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern));
+ final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService);
+
+ registerTopic(eventSourceTopic);
+
+ notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic);
+
+ final CreateTopicOutput cto = new CreateTopicOutputBuilder()
+ .setTopicId(eventSourceTopic.getTopicId())
+ .build();
+
+ return Util.resultFor(cto);
+ }
+
+ @Override
+ public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
+ return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented"));
}
- // TODO: Should we expose this functioanlity over RPC?
- public List<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
- .network.topology.rev131021.network.topology.topology.Node> snapshot() {
- Topology topology = dataStore.read(datastoreType, topologyInstanceIdentifier);
- return topology.getNode();
+ @Override
+ public void close() {
+ aggregatorRpcReg.close();
}
- public void registerDataChangeListener(DataChangeListener listener) {
- ListenerRegistration<DataChangeListener> listenerRegistration = dataStore.registerDataChangeListener(datastoreType,
- eventSourceTopologyPath,
+ public void registerTopic(final EventSourceTopic listener) {
+ final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
+ EVENT_SOURCE_TOPOLOGY_PATH,
listener,
DataBroker.DataChangeScope.SUBTREE);
registrations.put(listener, listenerRegistration);
}
+
+ public void register(final Node node, final NetconfEventSource netconfEventSource) {
+ final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey());
+ rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource)
+ .registerPath(NodeContext.class, sourcePath);
+ insert(sourcePath,node);
+ // FIXME: Return registration object.
+ }
+
+ private class NotifyAllNodeExecutor implements Runnable {
+
+ private final EventSourceTopic topic;
+ private final DataBroker dataBroker;
+ private final Pattern nodeIdPatternRegex;
+
+ public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) {
+ this.topic = topic;
+ this.dataBroker = dataBroker;
+ this.nodeIdPatternRegex = nodeIdPatternRegex;
+ }
+
+ @Override
+ public void run() {
+ //# Code reader note: Context of Node type is NetworkTopology
+ final List<Node> nodes = snapshot();
+ for (final Node node : nodes) {
+ if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
+ topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
+ }
+ }
+ }
+
+ private List<Node> snapshot() {
+ try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) {
+
+ final Optional<Topology> data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet();
+
+ if(data.isPresent()) {
+ final List<Node> nodeList = data.get().getNode();
+ if(nodeList != null) {
+ return nodeList;
+ }
+ }
+ return Collections.emptyList();
+ } catch (final ReadFailedException e) {
+ LOG.error("Unable to retrieve node list.", e);
+ return Collections.emptyList();
+ }
+ }
+ }
}
/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
package org.opendaylight.controller.messagebus.app.impl;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
+
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.dom.DOMResult;
+import javax.xml.transform.dom.DOMSource;
+
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.mdsal.MdSAL;
-import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+
+public class NetconfEventSource implements EventSourceService, DOMNotificationListener, DataChangeListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
+
+ private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME);
+ private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id"));
+ private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload"));
+
+ private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
+ private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
-public class NetconfEventSource implements EventSourceService, NotificationListener, DataChangeListener {
- private static final Logger LOGGER = LoggerFactory.getLogger(NetconfEventSource.class);
- private final MdSAL mdSal;
private final String nodeId;
- private final List<String> activeStreams = new ArrayList<>();
+
+ private final DOMMountPoint netconfMount;
+ private final DOMNotificationPublishService domPublish;
+ private final NotificationsService notificationRpcService;
+
+ private final Set<String> activeStreams = new ConcurrentSkipListSet<>();
private final Map<String, String> urnPrefixToStreamMap;
- public NetconfEventSource(final MdSAL mdSal, final String nodeId, final Map<String, String> streamMap) {
- Preconditions.checkNotNull(mdSal);
- Preconditions.checkNotNull(nodeId);
- this.mdSal = mdSal;
+ public NetconfEventSource(final String nodeId, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) {
+ this.netconfMount = netconfMount;
+ this.notificationRpcService = bindingMount.getService(RpcConsumerRegistry.class).get().getRpcService(NotificationsService.class);
this.nodeId = nodeId;
this.urnPrefixToStreamMap = streamMap;
-
- LOGGER.info("NetconfEventSource [{}] created.", nodeId);
+ this.domPublish = publishService;
+ LOG.info("NetconfEventSource [{}] created.", nodeId);
}
@Override
final String regex = Util.wildcardToRegex(notificationPattern.getValue());
final Pattern pattern = Pattern.compile(regex);
- List<QName> matchingNotifications = Util.expandQname(availableNotifications(), pattern);
+ final List<SchemaPath> matchingNotifications = Util.expandQname(availableNotifications(), pattern);
registerNotificationListener(matchingNotifications);
- return null;
+ final JoinTopicOutput output = new JoinTopicOutputBuilder().build();
+ return com.google.common.util.concurrent.Futures.immediateFuture(RpcResultBuilder.success(output).build());
}
- private List<QName> availableNotifications() {
+ private List<SchemaPath> availableNotifications() {
// FIXME: use SchemaContextListener to get changes asynchronously
- Set<NotificationDefinition> availableNotifications = mdSal.getSchemaContext(nodeId).getNotifications();
- List<QName> qNs = new ArrayList<>(availableNotifications.size());
- for (NotificationDefinition nd : availableNotifications) {
- qNs.add(nd.getQName());
+ final Set<NotificationDefinition> availableNotifications = netconfMount.getSchemaContext().getNotifications();
+ final List<SchemaPath> qNs = new ArrayList<>(availableNotifications.size());
+ for (final NotificationDefinition nd : availableNotifications) {
+ qNs.add(nd.getPath());
}
-
return qNs;
}
- private void registerNotificationListener(final List<QName> notificationsToSubscribe) {
- for (QName qName : notificationsToSubscribe) {
- startSubscription(qName);
- // FIXME: do not lose this registration
- final ListenerRegistration<NotificationListener> reg = mdSal.addNotificationListener(nodeId, qName, this);
+ private void registerNotificationListener(final List<SchemaPath> notificationsToSubscribe) {
+
+ final Optional<DOMNotificationService> notifyService = netconfMount.getService(DOMNotificationService.class);
+ if(notifyService.isPresent()) {
+ for (final SchemaPath qName : notificationsToSubscribe) {
+ startSubscription(qName);
+ }
+ // FIXME: Capture registration
+ notifyService.get().registerNotificationListener(this, notificationsToSubscribe);
}
}
- private synchronized void startSubscription(final QName qName) {
- String streamName = resolveStream(qName);
+ private void startSubscription(final SchemaPath path) {
+ final String streamName = resolveStream(path.getLastComponent());
if (streamIsActive(streamName) == false) {
- LOGGER.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId);
+ LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId);
startSubscription(streamName);
}
}
- private synchronized void resubscribeToActiveStreams() {
- for (String streamName : activeStreams) {
+ private void resubscribeToActiveStreams() {
+ for (final String streamName : activeStreams) {
startSubscription(streamName);
}
}
private synchronized void startSubscription(final String streamName) {
- CreateSubscriptionInput subscriptionInput = getSubscriptionInput(streamName);
- mdSal.getRpcService(nodeId, NotificationsService.class).createSubscription(subscriptionInput);
+ final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+ .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName))
+ .build();
+ netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
activeStreams.add(streamName);
}
- private static CreateSubscriptionInput getSubscriptionInput(final String streamName) {
- CreateSubscriptionInputBuilder csib = new CreateSubscriptionInputBuilder();
- csib.setStream(new StreamNameType(streamName));
- return csib.build();
- }
-
private String resolveStream(final QName qName) {
String streamName = null;
- for (Map.Entry<String, String> entry : urnPrefixToStreamMap.entrySet()) {
- String nameSpace = qName.getNamespace().toString();
- String urnPrefix = entry.getKey();
+ for (final Map.Entry<String, String> entry : urnPrefixToStreamMap.entrySet()) {
+ final String nameSpace = qName.getNamespace().toString();
+ final String urnPrefix = entry.getKey();
if( nameSpace.startsWith(urnPrefix) ) {
streamName = entry.getValue();
break;
return activeStreams.contains(streamName);
}
- // PASS
- @Override public Set<QName> getSupportedNotifications() {
- return null;
+ @Override
+ public void onNotification(final DOMNotification notification) {
+ final ContainerNode topicNotification = Builders.containerBuilder()
+ .withNodeIdentifier(TOPIC_NOTIFICATION_ARG)
+ .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId))
+ .withChild(encapsulate(notification))
+ .build();
+ try {
+ domPublish.putNotification(new TopicDOMNotification(topicNotification));
+ } catch (final InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
}
- @Override
- public void onNotification(final CompositeNode notification) {
- LOGGER.info("NetconfEventSource {} received notification {}. Will publish to MD-SAL.", nodeId, notification);
- ImmutableCompositeNode payload = ImmutableCompositeNode.builder()
- .setQName(QName.create(TopicNotification.QNAME, "payload"))
- .add(notification).toInstance();
- ImmutableCompositeNode icn = ImmutableCompositeNode.builder()
- .setQName(TopicNotification.QNAME)
- .add(payload)
- .addLeaf("event-source", nodeId)
- .toInstance();
-
- mdSal.publishNotification(icn);
+ private AnyXmlNode encapsulate(final DOMNotification body) {
+ // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools
+ final Document doc = XmlUtil.newDocument();
+ final Optional<String> namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString());
+ final Element element = XmlUtil.createElement(doc , "payload", namespace);
+
+
+ final DOMResult result = new DOMResult(element);
+
+ final SchemaContext context = netconfMount.getSchemaContext();
+ final SchemaPath schemaPath = body.getType();
+ try {
+ NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context);
+ return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG)
+ .withValue(new DOMSource(element))
+ .build();
+ } catch (IOException | XMLStreamException e) {
+ LOG.error("Unable to encapsulate notification.",e);
+ throw Throwables.propagate(e);
+ }
}
@Override
boolean wasConnected = false;
boolean nowConnected = false;
- for (Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getOriginalData().entrySet()) {
+ for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getOriginalData().entrySet()) {
if ( isNetconfNode(changeEntry) ) {
- NetconfNode nn = (NetconfNode)changeEntry.getValue();
+ final NetconfNode nn = (NetconfNode)changeEntry.getValue();
wasConnected = nn.isConnected();
}
}
- for (Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getUpdatedData().entrySet()) {
+ for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getUpdatedData().entrySet()) {
if ( isNetconfNode(changeEntry) ) {
- NetconfNode nn = (NetconfNode)changeEntry.getValue();
+ final NetconfNode nn = (NetconfNode)changeEntry.getValue();
nowConnected = nn.isConnected();
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.base.Optional;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(NetconfEventSourceManager.class);
+ private static final TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName()));
+ private static final InstanceIdentifier<Node> NETCONF_DEVICE_PATH = InstanceIdentifier.create(NetworkTopology.class)
+ .child(Topology.class, NETCONF_TOPOLOGY_KEY)
+ .child(Node.class);
+
+ private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder()
+ .node(NetworkTopology.QNAME)
+ .node(Topology.QNAME)
+ .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"),TopologyNetconf.QNAME.getLocalName())
+ .node(Node.QNAME)
+ .build();
+ private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id");
+
+
+ private final EventSourceTopology eventSourceTopology;
+ private final Map<String, String> streamMap;
+
+ private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSource> netconfSources = new ConcurrentHashMap<>();
+ private final ListenerRegistration<DataChangeListener> listenerReg;
+ private final DOMNotificationPublishService publishService;
+ private final DOMMountPointService domMounts;
+ private final MountPointService bindingMounts;
+
+ public NetconfEventSourceManager(final DataBroker dataStore,
+ final DOMNotificationPublishService domPublish,
+ final DOMMountPointService domMount,
+ final MountPointService bindingMount,
+ final EventSourceTopology eventSourceTopology,
+ final List<NamespaceToStream> namespaceMapping) {
+
+ listenerReg = dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
+ this.eventSourceTopology = eventSourceTopology;
+ this.streamMap = namespaceToStreamMapping(namespaceMapping);
+ this.domMounts = domMount;
+ this.bindingMounts = bindingMount;
+ this.publishService = domPublish;
+ LOGGER.info("EventSourceManager initialized.");
+ }
+
+ private Map<String,String> namespaceToStreamMapping(final List<NamespaceToStream> namespaceMapping) {
+ final Map<String, String> streamMap = new HashMap<>(namespaceMapping.size());
+
+ for (final NamespaceToStream nToS : namespaceMapping) {
+ streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName());
+ }
+
+ return streamMap;
+ }
+
+ @Override
+ public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
+ //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect.
+ LOGGER.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
+ for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
+ if (changeEntry.getValue() instanceof Node) {
+ nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue());
+ }
+ }
+
+
+ for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
+ if (changeEntry.getValue() instanceof Node) {
+ nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue());
+ }
+ }
+
+
+ }
+
+ private void nodeUpdated(final InstanceIdentifier<?> key, final Node node) {
+
+ // we listen on node tree, therefore we should rather throw IllegalStateException when node is null
+ if ( node == null ) {
+ LOGGER.debug("OnDataChanged Event. Node is null.");
+ return;
+ }
+ if ( isNetconfNode(node) == false ) {
+ LOGGER.debug("OnDataChanged Event. Not a Netconf node.");
+ return;
+ }
+ if ( isEventSource(node) == false ) {
+ LOGGER.debug("OnDataChanged Event. Node an EventSource node.");
+ return;
+ }
+ if(node.getAugmentation(NetconfNode.class).getConnectionStatus() != ConnectionStatus.Connected ) {
+ return;
+ }
+
+ if(!netconfSources.containsKey(key)) {
+ createEventSource(key,node);
+ }
+ }
+
+ private void createEventSource(final InstanceIdentifier<?> key, final Node node) {
+ final Optional<DOMMountPoint> netconfMount = domMounts.getMountPoint(domMountPath(node.getNodeId()));
+ final Optional<MountPoint> bindingMount = bindingMounts.getMountPoint(key);
+
+ if(netconfMount.isPresent() && bindingMount.isPresent()) {
+ final String nodeId = node.getNodeId().getValue();
+ final NetconfEventSource netconfEventSource = new NetconfEventSource(nodeId, streamMap, netconfMount.get(), publishService, bindingMount.get());
+ eventSourceTopology.register(node,netconfEventSource);
+ netconfSources.putIfAbsent(key, netconfEventSource);
+ }
+ }
+
+ private YangInstanceIdentifier domMountPath(final NodeId nodeId) {
+ return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH).nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build();
+ }
+
+ private boolean isNetconfNode(final Node node) {
+ return node.getAugmentation(NetconfNode.class) != null ;
+ }
+
+ public boolean isEventSource(final Node node) {
+ final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+
+ return isEventSource(netconfNode);
+ }
+
+ private boolean isEventSource(final NetconfNode node) {
+ for (final String capability : node.getAvailableCapabilities().getAvailableCapability()) {
+ if(capability.startsWith("(urn:ietf:params:xml:ns:netconf:notification")) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public void close() {
+ listenerReg.close();
+ }
+}
\ No newline at end of file
--- /dev/null
+
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class TopicDOMNotification implements DOMNotification {
+
+ private static final SchemaPath TOPIC_NOTIFICATION_ID = SchemaPath.create(true, TopicNotification.QNAME);
+ private final ContainerNode body;
+
+ public TopicDOMNotification(final ContainerNode body) {
+ this.body = body;
+ }
+
+ @Override
+ public SchemaPath getType() {
+ return TOPIC_NOTIFICATION_ID;
+ }
+
+ @Override
+ public ContainerNode getBody() {
+ return body;
+ }
+
+ @Override
+ public String toString() {
+ return "TopicDOMNotification [body=" + body + "]";
+ }
+}
/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
package org.opendaylight.controller.messagebus.app.impl;
-import com.google.common.util.concurrent.Futures;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
+
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+import com.google.common.util.concurrent.Futures;
public final class Util {
private static final MessageDigest messageDigestTemplate = getDigestInstance();
private static MessageDigest getDigestInstance() {
try {
return MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
+ } catch (final NoSuchAlgorithmException e) {
throw new RuntimeException("Unable to get MD5 instance");
}
}
- public static String md5String(final String inputString) {
+ static String md5String(final String inputString) {
try {
- MessageDigest md = (MessageDigest)messageDigestTemplate.clone();
+ final MessageDigest md = (MessageDigest)messageDigestTemplate.clone();
md.update(inputString.getBytes("UTF-8"), 0, inputString.length());
return new BigInteger(1, md.digest()).toString(16);
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new RuntimeException("Unable to get MD5 instance");
}
}
public static <T> Future<RpcResult<T>> resultFor(final T output) {
- RpcResult<T> result = Rpcs.getRpcResult(true, output, Collections.<RpcError>emptyList());
+ final RpcResult<T> result = RpcResultBuilder.success(output).build();
return Futures.immediateFuture(result);
}
- /**
- * Extracts affected node from data change event.
- * @param event
- * @return
- */
- public static Node getAffectedNode(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
- // TODO: expect listener method to be called even when change impact node
- // TODO: test with change.getCreatedData()
- for (Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
- if (isNode(changeEntry)) {
- return (Node) changeEntry.getValue();
- }
- }
-
- return null;
- }
-
- private static boolean isNode(final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry ) {
- return Node.class.equals(changeEntry.getKey().getTargetType());
- }
-
/**
* Method filters qnames based on wildcard strings
*
- * @param availableQnames
+ * @param list
* @param patterh matching pattern
* @return list of filtered qnames
*/
- public static List<QName> expandQname(final List<QName> availableQnames, final Pattern pattern) {
- List<QName> matchingQnames = new ArrayList<>();
+ public static List<SchemaPath> expandQname(final List<SchemaPath> list, final Pattern pattern) {
+ final List<SchemaPath> matchingQnames = new ArrayList<>();
- for (QName qname : availableQnames) {
- String namespace = qname.getNamespace().toString();
+ for (final SchemaPath notification : list) {
+ final String namespace = notification.getLastComponent().getNamespace().toString();
if (pattern.matcher(namespace).matches()) {
- matchingQnames.add(qname);
+ matchingQnames.add(notification);
}
}
-
return matchingQnames;
}
* @return
*/
static String wildcardToRegex(final String wildcard){
- StringBuffer s = new StringBuffer(wildcard.length());
+ final StringBuffer s = new StringBuffer(wildcard.length());
s.append('^');
- for (char c : wildcard.toCharArray()) {
+ for (final char c : wildcard.toCharArray()) {
switch(c) {
case '*':
s.append(".*");
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.config.yang.messagebus.app.impl;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
+import org.osgi.framework.BundleContext;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class MessageBusAppImplModuleFactoryTest {
+
+ DependencyResolver dependencyResolverMock;
+ BundleContext bundleContextMock;
+ MessageBusAppImplModuleFactory messageBusAppImplModuleFactory;
+ DynamicMBeanWithInstance dynamicMBeanWithInstanceMock;
+
+ @BeforeClass
+ public static void initTestClass() throws IllegalAccessException, InstantiationException {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ dependencyResolverMock = mock(DependencyResolver.class);
+ bundleContextMock = mock(BundleContext.class);
+ dynamicMBeanWithInstanceMock = mock(DynamicMBeanWithInstance.class);
+ messageBusAppImplModuleFactory = new MessageBusAppImplModuleFactory();
+ }
+
+ @Test
+ public void createModuleTest() {
+ assertNotNull("Module has not been created correctly.", messageBusAppImplModuleFactory.createModule("instanceName1", dependencyResolverMock, bundleContextMock));
+ }
+
+ @Test
+ public void createModuleBTest() throws Exception{
+ MessageBusAppImplModule messageBusAppImplModuleMock = mock(MessageBusAppImplModule.class);
+ doReturn(messageBusAppImplModuleMock).when(dynamicMBeanWithInstanceMock).getModule();
+ assertNotNull("Module has not been created correctly.", messageBusAppImplModuleFactory.createModule("instanceName1", dependencyResolverMock, dynamicMBeanWithInstanceMock, bundleContextMock));
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.config.yang.messagebus.app.impl;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.ModuleIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.osgi.framework.BundleContext;
+
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doNothing;
+
+public class MessageBusAppImplModuleTest {
+
+ MessageBusAppImplModule messageBusAppImplModule;
+ ModuleIdentifier moduleIdentifier;
+ DependencyResolver dependencyResolverMock;
+
+ @BeforeClass
+ public static void initTestClass() throws IllegalAccessException, InstantiationException {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ moduleIdentifier = new ModuleIdentifier("factoryName1", "instanceName1");
+ dependencyResolverMock = mock(DependencyResolver.class);
+ messageBusAppImplModule = new MessageBusAppImplModule(moduleIdentifier, dependencyResolverMock);
+ }
+
+ @Test
+ public void constructorTest() {
+ assertNotNull("Instance has not been created correctly.", messageBusAppImplModule);
+ }
+
+ @Test
+ public void constructorBTest() {
+ MessageBusAppImplModule messageBusAppImplModuleOld = mock(MessageBusAppImplModule.class);
+ java.lang.AutoCloseable oldInstanceAutocloseableMock = mock(AutoCloseable.class);
+ MessageBusAppImplModule messageBusAppImplModule = new MessageBusAppImplModule(moduleIdentifier, dependencyResolverMock, messageBusAppImplModuleOld, oldInstanceAutocloseableMock);
+ assertNotNull("Instance has not been created correctly.", messageBusAppImplModule);
+ }
+
+ @Test
+ public void setGetBundleContextTest() {
+ BundleContext bundleContext = mock(BundleContext.class);
+ messageBusAppImplModule.setBundleContext(bundleContext);
+ assertEquals("Set and/or get method/s don't work correctly.", bundleContext, messageBusAppImplModule.getBundleContext());
+ }
+
+ @Test
+ public void createInstanceTest() {
+ createInstanceTestHelper();
+ messageBusAppImplModule.getInstance();
+ assertNotNull("AutoCloseable instance has not been created correctly.", messageBusAppImplModule.createInstance());
+ }
+
+ private void createInstanceTestHelper(){
+ NamespaceToStream namespaceToStream = mock(NamespaceToStream.class);
+ List<NamespaceToStream> listNamespaceToStreamMock = new ArrayList<>();
+ listNamespaceToStreamMock.add(namespaceToStream);
+ messageBusAppImplModule.setNamespaceToStream(listNamespaceToStreamMock);
+ ObjectName objectName = mock(ObjectName.class);
+ org.opendaylight.controller.sal.core.api.Broker domBrokerDependency = mock(Broker.class);
+ org.opendaylight.controller.sal.binding.api.BindingAwareBroker bindingBrokerDependency = mock(BindingAwareBroker.class);
+ when(dependencyResolverMock.resolveInstance((java.lang.Class) notNull(), (javax.management.ObjectName) notNull(), eq(AbstractMessageBusAppImplModule.domBrokerJmxAttribute))).thenReturn(domBrokerDependency);
+ when(dependencyResolverMock.resolveInstance((java.lang.Class) notNull(), (javax.management.ObjectName) notNull(), eq(AbstractMessageBusAppImplModule.bindingBrokerJmxAttribute))).thenReturn(bindingBrokerDependency);
+ messageBusAppImplModule.setBindingBroker(objectName);
+ messageBusAppImplModule.setDomBroker(objectName);
+ BindingAwareBroker.ProviderContext providerContextMock = mock(BindingAwareBroker.ProviderContext.class);
+ doReturn(providerContextMock).when(bindingBrokerDependency).registerProvider(any(BindingAwareProvider.class));
+ Broker.ProviderSession providerSessionMock = mock(Broker.ProviderSession.class);
+ doReturn(providerSessionMock).when(domBrokerDependency).registerProvider(any(Provider.class));
+
+ DataBroker dataBrokerMock = mock(DataBroker.class);
+ doReturn(dataBrokerMock).when(providerContextMock).getSALService(DataBroker.class);
+ RpcProviderRegistry rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
+ doReturn(rpcProviderRegistryMock).when(providerContextMock).getSALService(RpcProviderRegistry.class);
+ BindingAwareBroker.RpcRegistration rpcRegistrationMock = mock(BindingAwareBroker.RpcRegistration.class);
+ doReturn(rpcRegistrationMock).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
+ EventSourceService eventSourceServiceMock = mock(EventSourceService.class);
+ doReturn(eventSourceServiceMock).when(rpcProviderRegistryMock).getRpcService(EventSourceService.class);
+
+ WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
+ doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
+ doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class));
+ CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+ doReturn(checkedFutureMock).when(writeTransactionMock).submit();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.any;
+
+public class EventSourceTopicTest {
+
+ EventSourceTopic eventSourceTopic;
+ org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node dataObjectMock;
+ NodeId nodeIdMock;
+ EventSourceService eventSourceServiceMock;
+
+ @BeforeClass
+ public static void initTestClass() throws IllegalAccessException, InstantiationException {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ NotificationPattern notificationPattern = new NotificationPattern("value1");
+ eventSourceServiceMock = mock(EventSourceService.class);
+ eventSourceTopic = new EventSourceTopic(notificationPattern, "nodeIdPattern1", eventSourceServiceMock);
+ }
+
+ @Test
+ public void createModuleTest() {
+ assertNotNull("Instance has not been created correctly.", eventSourceTopic);
+ }
+
+ @Test
+ public void getTopicIdTest() {
+ assertNotNull("Topic has not been created correctly.", eventSourceTopic.getTopicId());
+ }
+
+ @Test
+ public void onDataChangedTest() {
+ AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
+ onDataChangedTestHelper(asyncDataChangeEventMock);
+ eventSourceTopic.onDataChanged(asyncDataChangeEventMock);
+ verify(dataObjectMock, times(1)).getId();
+ verify(nodeIdMock, times(1)).getValue();
+ }
+
+ private void onDataChangedTestHelper(AsyncDataChangeEvent asyncDataChangeEventMock){
+ Map<InstanceIdentifier<?>, DataObject> map = new HashMap<>();
+ InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
+ dataObjectMock = mock(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
+ map.put(instanceIdentifierMock, dataObjectMock);
+ doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
+
+ nodeIdMock = mock(NodeId.class);
+ doReturn(nodeIdMock).when(dataObjectMock).getId();
+ doReturn("0").when(nodeIdMock).getValue();
+ }
+
+ @Test
+ public void notifyNodeTest() {
+ InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
+ eventSourceTopic.notifyNode(instanceIdentifierMock);
+ verify(eventSourceServiceMock, times(1)).joinTopic(any(JoinTopicInput.class));
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.Pattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.eq;
+
+public class EventSourceTopologyTest {
+
+ EventSourceTopology eventSourceTopology;
+ DataBroker dataBrokerMock;
+ RpcProviderRegistry rpcProviderRegistryMock;
+ CreateTopicInput createTopicInputMock;
+ ListenerRegistration listenerRegistrationMock;
+ NodeKey nodeKey;
+
+ @BeforeClass
+ public static void initTestClass() throws IllegalAccessException, InstantiationException {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ dataBrokerMock = mock(DataBroker.class);
+ rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
+
+ }
+
+ @Test
+ public void constructorTest() {
+ constructorTestHelper();
+ eventSourceTopology = new EventSourceTopology(dataBrokerMock, rpcProviderRegistryMock);
+ assertNotNull("Instance has not been created correctly.", eventSourceTopology);
+ }
+
+ private void constructorTestHelper(){
+ WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
+ doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
+ doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class));
+ CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+ doReturn(checkedFutureMock).when(writeTransactionMock).submit();
+ }
+
+ @Test
+ public void createTopicTest() throws Exception{
+ createTopicTestHelper();
+ assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
+ }
+
+ private void createTopicTestHelper() throws Exception{
+ constructorTestHelper();
+ createTopicInputMock = mock(CreateTopicInput.class);
+ eventSourceTopology = new EventSourceTopology(dataBrokerMock, rpcProviderRegistryMock);
+
+ NotificationPattern notificationPattern = new NotificationPattern("value1");
+ doReturn(notificationPattern).when(createTopicInputMock).getNotificationPattern();
+ Pattern pattern = new Pattern("valuePattern1");
+ doReturn(pattern).when(createTopicInputMock).getNodeIdPattern();
+
+ listenerRegistrationMock = mock(ListenerRegistration.class);
+ doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL),
+ any(InstanceIdentifier.class),
+ any(EventSourceTopic.class),
+ eq(DataBroker.DataChangeScope.SUBTREE));
+
+ ReadOnlyTransaction readOnlyTransactionMock = mock(ReadOnlyTransaction.class);
+ doReturn(readOnlyTransactionMock).when(dataBrokerMock).newReadOnlyTransaction();
+
+ CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+ doReturn(checkedFutureMock).when(readOnlyTransactionMock).read(eq(LogicalDatastoreType.OPERATIONAL),
+ any(InstanceIdentifier.class));
+ Optional optionalMock = mock(Optional.class);
+ doReturn(optionalMock).when(checkedFutureMock).checkedGet();
+ doReturn(true).when(optionalMock).isPresent();
+
+ Topology topologyMock = mock(Topology.class);
+ doReturn(topologyMock).when(optionalMock).get();
+ Node nodeMock = mock(Node.class);
+ List<Node> nodeList = new ArrayList<>();
+ nodeList.add(nodeMock);
+ doReturn(nodeList).when(topologyMock).getNode();
+
+ NodeId nodeId = new NodeId("nodeIdValue1");
+ doReturn(nodeId).when(nodeMock).getNodeId();
+ }
+
+ @Test
+ public void destroyTopicTest() throws Exception{
+ createTopicTestHelper();
+ DestroyTopicInput destroyTopicInput = null;
+ assertNotNull("Instance has not been created correctly.", eventSourceTopology.destroyTopic(destroyTopicInput));
+ }
+
+ @Test
+ public void closeTest() throws Exception{
+ BindingAwareBroker.RpcRegistration rpcRegistrationMock = mock(BindingAwareBroker.RpcRegistration.class);
+ doReturn(rpcRegistrationMock).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
+ doNothing().when(rpcRegistrationMock).close();
+ createTopicTestHelper();
+ eventSourceTopology.createTopic(createTopicInputMock);
+ eventSourceTopology.close();
+ verify(rpcRegistrationMock, times(1)).close();
+ }
+
+ @Test
+ public void registerTest() throws Exception {
+ createTopicTestHelper();
+ Node nodeMock = mock(Node.class);
+ NetconfEventSource netconfEventSourceMock = mock(NetconfEventSource.class);
+
+ NodeId nodeId = new NodeId("nodeIdValue1");
+ nodeKey = new NodeKey(nodeId);
+ doReturn(nodeKey).when(nodeMock).getKey();
+
+ BindingAwareBroker.RoutedRpcRegistration routedRpcRegistrationMock = mock(BindingAwareBroker.RoutedRpcRegistration.class);
+ doReturn(routedRpcRegistrationMock).when(rpcProviderRegistryMock).addRoutedRpcImplementation(EventSourceService.class, netconfEventSourceMock);
+ eventSourceTopology.register(nodeMock, netconfEventSourceMock);
+ verify(routedRpcRegistrationMock, times(1)).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.base.Optional;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
+import org.opendaylight.controller.md.sal.binding.api.BindingService;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.notNull;
+
+public class NetconfEventSourceManagerTest {
+
+ NetconfEventSourceManager netconfEventSourceManager;
+ ListenerRegistration listenerRegistrationMock;
+ DOMMountPointService domMountPointServiceMock;
+ MountPointService mountPointServiceMock;
+ EventSourceTopology eventSourceTopologyMock;
+ AsyncDataChangeEvent asyncDataChangeEventMock;
+
+ @BeforeClass
+ public static void initTestClass() throws IllegalAccessException, InstantiationException {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ DataBroker dataBrokerMock = mock(DataBroker.class);
+ DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
+ domMountPointServiceMock = mock(DOMMountPointService.class);
+ mountPointServiceMock = mock(MountPointService.class);
+ eventSourceTopologyMock = mock(EventSourceTopology.class);
+ List<NamespaceToStream> namespaceToStreamList = new ArrayList<>();
+
+ listenerRegistrationMock = mock(ListenerRegistration.class);
+ doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(AsyncDataBroker.DataChangeScope.SUBTREE));
+
+ netconfEventSourceManager = new NetconfEventSourceManager(dataBrokerMock, domNotificationPublishServiceMock, domMountPointServiceMock,
+ mountPointServiceMock, eventSourceTopologyMock, namespaceToStreamList);
+ }
+
+ @Test
+ public void constructorTest() {
+ assertNotNull("Instance has not been created correctly.", netconfEventSourceManager);
+ }
+
+ @Test
+ public void onDataChangedTest() {
+ AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
+ Map<InstanceIdentifier, DataObject> map = new HashMap<>();
+ InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
+ Node dataObjectMock = mock(Node.class);
+ map.put(instanceIdentifierMock, dataObjectMock);
+ doReturn(map).when(asyncDataChangeEventMock).getCreatedData();
+ doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
+ netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+ verify(dataObjectMock, times(2)).getAugmentation(NetconfNode.class);
+ }
+
+ @Test
+ public void onDataChangedCreateEventSourceTest() {
+ onDataChangedCreateEventSourceTestHelper();
+ netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+ verify(eventSourceTopologyMock, times(1)).register(any(Node.class), any(NetconfEventSource.class));
+ }
+
+ private void onDataChangedCreateEventSourceTestHelper(){
+ asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
+ Map<InstanceIdentifier, DataObject> map = new HashMap<>();
+ InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
+ Node dataObjectMock = mock(Node.class);
+ map.put(instanceIdentifierMock, dataObjectMock);
+ doReturn(map).when(asyncDataChangeEventMock).getCreatedData();
+ doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
+
+ NetconfNode netconfNodeMock = mock(NetconfNode.class);
+ AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
+ doReturn(netconfNodeMock).when(dataObjectMock).getAugmentation(NetconfNode.class);
+ doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
+ List<String> availableCapabilityList = new ArrayList<>();
+ availableCapabilityList.add("(urn:ietf:params:xml:ns:netconf:notification_availableCapabilityString1");
+ doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
+
+ doReturn(NetconfNodeFields.ConnectionStatus.Connected).when(netconfNodeMock).getConnectionStatus();
+
+ Optional optionalMock = mock(Optional.class);
+ Optional optionalBindingMountMock = mock(Optional.class);
+ NodeId nodeId = new NodeId("nodeId1");
+ doReturn(nodeId).when(dataObjectMock).getNodeId();
+ doReturn(optionalMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull());
+ doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class));
+ doReturn(true).when(optionalMock).isPresent();
+ doReturn(true).when(optionalBindingMountMock).isPresent();
+
+ DOMMountPoint domMountPointMock = mock(DOMMountPoint.class);
+ MountPoint mountPointMock = mock(MountPoint.class);
+ doReturn(domMountPointMock).when(optionalMock).get();
+ doReturn(mountPointMock).when(optionalBindingMountMock).get();
+
+ RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class);
+ Optional<BindingService> onlyOptionalMock = (Optional<BindingService>) mock(Optional.class);
+ NotificationsService notificationsServiceMock = mock(NotificationsService.class);
+
+ doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class);
+ doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
+ doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
+ }
+
+ @Test
+ public void isEventSourceTest() {
+ Node nodeMock = mock(Node.class);
+ NetconfNode netconfNodeMock = mock(NetconfNode.class);
+ AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
+ doReturn(netconfNodeMock).when(nodeMock).getAugmentation(NetconfNode.class);
+ doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
+ List<String> availableCapabilityList = new ArrayList<>();
+ availableCapabilityList.add("(urn:ietf:params:xml:ns:netconf:notification_availableCapabilityString1");
+ doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
+ assertTrue("Method has not been run correctly.", netconfEventSourceManager.isEventSource(nodeMock));
+ }
+
+ @Test
+ public void isNotEventSourceTest() {
+ Node nodeMock = mock(Node.class);
+ NetconfNode netconfNodeMock = mock(NetconfNode.class);
+ AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
+ doReturn(netconfNodeMock).when(nodeMock).getAugmentation(NetconfNode.class);
+ doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
+ List<String> availableCapabilityList = new ArrayList<>();
+ availableCapabilityList.add("availableCapabilityString1");
+ doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
+ assertFalse("Method has not been run correctly.", netconfEventSourceManager.isEventSource(nodeMock));
+ }
+
+ @Test
+ public void closeTest() {
+ netconfEventSourceManager.close();
+ verify(listenerRegistrationMock, times(1)).close();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.BindingService;
+import org.opendaylight.controller.md.sal.binding.api.MountPoint;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+import org.opendaylight.yangtools.yang.common.QName;
+
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+
+public class NetconfEventSourceTest {
+
+ NetconfEventSource netconfEventSource;
+ DOMMountPoint domMountPointMock;
+ JoinTopicInput joinTopicInputMock;
+
+ @BeforeClass
+ public static void initTestClass() throws IllegalAccessException, InstantiationException {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ Map<String, String> streamMap = new HashMap<>();
+ streamMap.put("string1", "string2");
+ domMountPointMock = mock(DOMMountPoint.class);
+ DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class);
+ MountPoint mountPointMock = mock(MountPoint.class);
+
+ RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class);
+ Optional<BindingService> onlyOptionalMock = (Optional<BindingService>) mock(Optional.class);
+ NotificationsService notificationsServiceMock = mock(NotificationsService.class);
+
+ doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class);
+ doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
+ doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
+ netconfEventSource = new NetconfEventSource("nodeId1", streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock);
+ }
+
+ @Test
+ public void constructorTest() {
+ assertNotNull("Instance has not been created correctly.", netconfEventSource);
+ }
+
+ @Test
+ public void joinTopicTest() throws Exception{
+ joinTopicTestHelper();
+ assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock));
+ }
+
+ private void joinTopicTestHelper() throws Exception{
+ joinTopicInputMock = mock(JoinTopicInput.class);
+ NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
+ doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
+ doReturn("regexString1").when(notificationPatternMock).getValue();
+
+ SchemaContext schemaContextMock = mock(SchemaContext.class);
+ doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
+ Set<NotificationDefinition> notificationDefinitionSet = new HashSet<>();
+ NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class);
+ notificationDefinitionSet.add(notificationDefinitionMock);
+
+ URI uri = new URI("uriStr1");
+ QName qName = new QName(uri, "localName1");
+ org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName);
+ doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications();
+ doReturn(schemaPath).when(notificationDefinitionMock).getPath();
+
+ Optional<DOMNotificationService> domNotificationServiceOptionalMock = (Optional<DOMNotificationService>) mock(Optional.class);
+ doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class);
+ doReturn(true).when(domNotificationServiceOptionalMock).isPresent();
+
+ DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class);
+ doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
+ ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class);
+ doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class));
+ }
+
+ @Test (expected=NullPointerException.class)
+ public void onNotificationTest() {
+ DOMNotification domNotificationMock = mock(DOMNotification.class);
+ ContainerNode containerNodeMock = mock(ContainerNode.class);
+ SchemaContext schemaContextMock = mock(SchemaContext.class);
+ SchemaPath schemaPathMock = mock(SchemaPath.class);
+ doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
+ doReturn(schemaPathMock).when(domNotificationMock).getType();
+ doReturn(containerNodeMock).when(domNotificationMock).getBody();
+ netconfEventSource.onNotification(domNotificationMock);
+ }
+
+ @Test
+ public void onDataChangedTest() {
+ InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class);
+ AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
+ NetconfNode dataObjectMock = mock(NetconfNode.class);
+ Map<InstanceIdentifier, DataObject> dataChangeMap = new HashMap<>();
+ dataChangeMap.put(brmIdent, dataObjectMock);
+ doReturn(dataChangeMap).when(asyncDataChangeEventMock).getOriginalData();
+ doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData();
+
+ netconfEventSource.onDataChanged(asyncDataChangeEventMock);
+ verify(dataObjectMock, times(2)).isConnected();
+ }
+
+ @Test
+ public void onDataChangedResubscribeTest() throws Exception{
+ InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class);
+ AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
+ NetconfNode dataObjectMock = mock(NetconfNode.class);
+ Map<InstanceIdentifier, DataObject> dataChangeMap = new HashMap<>();
+ dataChangeMap.put(brmIdent, dataObjectMock);
+ doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData();
+ doReturn(true).when(dataObjectMock).isConnected();
+
+ Set<String> localSet = getActiveStreams();
+ localSet.add("activeStream1");
+
+ Optional<DOMService> optionalMock = (Optional<DOMService>) mock(Optional.class);
+ doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class);
+ DOMRpcService domRpcServiceMock = mock(DOMRpcService.class);
+ doReturn(domRpcServiceMock).when(optionalMock).get();
+ CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+ doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class));
+
+ netconfEventSource.onDataChanged(asyncDataChangeEventMock);
+ verify(dataObjectMock, times(1)).isConnected();
+ assertEquals("Size of set has not been set correctly.", 1, getActiveStreams().size());
+ }
+
+ private Set getActiveStreams() throws Exception{
+ Field nesField = NetconfEventSource.class.getDeclaredField("activeStreams");
+ nesField.setAccessible(true);
+ return (Set) nesField.get(netconfEventSource);
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+public class TopicDOMNotificationTest {
+
+ ContainerNode containerNodeBodyMock;
+ TopicDOMNotification topicDOMNotification;
+
+ @BeforeClass
+ public static void initTestClass() throws IllegalAccessException, InstantiationException {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ containerNodeBodyMock = mock(ContainerNode.class);
+ topicDOMNotification = new TopicDOMNotification(containerNodeBodyMock);
+ }
+
+ @Test
+ public void constructorTest() {
+ assertNotNull("Instance has not been created correctly.", topicDOMNotification);
+ }
+
+ @Test
+ public void getTypeTest() {
+ SchemaPath TOPIC_NOTIFICATION_ID = SchemaPath.create(true, TopicNotification.QNAME);
+ assertEquals("Type has not been created correctly.", TOPIC_NOTIFICATION_ID, topicDOMNotification.getType());
+ }
+
+ @Test
+ public void getBodyTest() {
+ assertEquals("String has not been created correctly.", containerNodeBodyMock, topicDOMNotification.getBody());
+ }
+
+ @Test
+ public void getToStringTest() {
+ String bodyString = "TopicDOMNotification [body=" + containerNodeBodyMock + "]";
+ assertEquals("String has not been created correctly.", bodyString, topicDOMNotification.toString());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.junit.Test;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class UtilTest {
+
+ @Test
+ public void testMD5Hash() throws Exception {
+ // empty string
+ createAndAssertHash("", "d41d8cd98f00b204e9800998ecf8427e");
+
+ // non-empty string
+ createAndAssertHash("The Guardian", "69b929ae473ed732d5fb8e0a55a8dc8d");
+
+ // the same hash for the same string
+ createAndAssertHash("The Independent", "db793706d70c37dcc16454fa8eb21b1c");
+ createAndAssertHash("The Independent", "db793706d70c37dcc16454fa8eb21b1c"); // one more time
+
+ // different strings must have different hashes
+ createAndAssertHash("orange", "fe01d67a002dfa0f3ac084298142eccd");
+ createAndAssertHash("yellow", "d487dd0b55dfcacdd920ccbdaeafa351");
+ }
+
+ //TODO: IllegalArgumentException would be better
+ @Test(expected = RuntimeException.class)
+ public void testMD5HashInvalidInput() throws Exception {
+ Util.md5String(null);
+ }
+
+ @Test
+ public void testWildcardToRegex() throws Exception {
+ // empty wildcard string
+ createAndAssertRegex("", "^$");
+
+ // wildcard string is a char to be replaced
+ createAndAssertRegex("*", "^.*$");
+ createAndAssertRegex("?", "^.$");
+ final String relevantChars = "()[]$^.{}|\\";
+ for (final char c : relevantChars.toCharArray()) {
+ final char oneChar[] = {c};
+ final String wildcardStr = new String(oneChar);
+ final String expectedRegex = "^\\" + c + "$";
+ createAndAssertRegex(wildcardStr, expectedRegex);
+ }
+
+ // wildcard string consists of more chars
+ createAndAssertRegex("a", "^a$");
+ createAndAssertRegex("aBc", "^aBc$");
+ createAndAssertRegex("a1b2C34", "^a1b2C34$");
+ createAndAssertRegex("*?()[]$^.{}|\\X", "^.*.\\(\\)\\[\\]\\$\\^\\.\\{\\}\\|\\\\X$");
+ createAndAssertRegex("a*BB?37|42$", "^a.*BB.37\\|42\\$$");
+ }
+
+ @Test
+ public void testResultFor() throws Exception {
+ {
+ final String expectedResult = "dummy string";
+ RpcResult<String> rpcResult = Util.resultFor(expectedResult).get();
+ assertEquals(expectedResult, rpcResult.getResult());
+ assertTrue(rpcResult.isSuccessful());
+ assertTrue(rpcResult.getErrors().isEmpty());
+ }
+ {
+ final Integer expectedResult = 42;
+ RpcResult<Integer> rpcResult = Util.resultFor(expectedResult).get();
+ assertEquals(expectedResult, rpcResult.getResult());
+ assertTrue(rpcResult.isSuccessful());
+ assertTrue(rpcResult.getErrors().isEmpty());
+ }
+ }
+
+ @Test
+ public void testExpandQname() throws Exception {
+ // match no path because the list of the allowed paths is empty
+ {
+ final List<SchemaPath> paths = new ArrayList<>();
+ final Pattern regexPattern = Pattern.compile(".*"); // match everything
+ final List<SchemaPath> matchingPaths = Util.expandQname(paths, regexPattern);
+ assertTrue(matchingPaths.isEmpty());
+ }
+
+ // match no path because of regex pattern
+ {
+ final List<SchemaPath> paths = createSchemaPathList();
+ final Pattern regexPattern = Pattern.compile("^@.*");
+ final List<SchemaPath> matchingPaths = Util.expandQname(paths, regexPattern);
+ assertTrue(matchingPaths.isEmpty());
+ }
+
+ // match all paths
+ {
+ final List<SchemaPath> paths = createSchemaPathList();
+ final Pattern regexPattern = Pattern.compile(".*");
+ final List<SchemaPath> matchingPaths = Util.expandQname(paths, regexPattern);
+ assertTrue(matchingPaths.contains(paths.get(0)));
+ assertTrue(matchingPaths.contains(paths.get(1)));
+ assertEquals(paths.size(), matchingPaths.size());
+ }
+
+ // match one path only
+ {
+ final List<SchemaPath> paths = createSchemaPathList();
+ final Pattern regexPattern = Pattern.compile(".*yyy$");
+ final List<SchemaPath> matchingPaths = Util.expandQname(paths, regexPattern);
+ assertTrue(matchingPaths.contains(paths.get(1)));
+ assertEquals(1, matchingPaths.size());
+ }
+ }
+
+ private static void createAndAssertHash(final String inString, final String expectedHash) {
+ assertEquals("Incorrect hash.", expectedHash, Util.md5String(inString));
+ }
+
+ private static void createAndAssertRegex(final String wildcardStr, final String expectedRegex) {
+ assertEquals("Incorrect regex string.", expectedRegex, Util.wildcardToRegex(wildcardStr));
+ }
+
+ private static List<SchemaPath> createSchemaPathList() {
+ final QName qname1 = QName.create("urn:odl:xxx", "2015-01-01", "localName");
+ final QName qname2 = QName.create("urn:odl:yyy", "2015-01-01", "localName");
+ final SchemaPath path1 = SchemaPath.create(true, qname1);
+ final SchemaPath path2 = SchemaPath.create(true, qname2);
+ return Arrays.asList(path1, path2);
+ }
+}
<!-- Message Bus -->
<module>messagebus-api</module>
<module>messagebus-impl</module>
+ <module>messagebus-config</module>
</modules>
<build>
</goals>
</pluginExecutionFilter>
<action>
- <execute></execute>
+ <execute/>
</action>
</pluginExecution>
</pluginExecutions>
</modules>
</profile>
</profiles>
-</project>
+</project>
\ No newline at end of file