From: Igor Bartak Date: Wed, 18 Mar 2015 17:00:13 +0000 (+0100) Subject: BUG 2799: Migration of Message Bus from deprecated Helium MD-SAL APIs X-Git-Tag: release/lithium~381^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=35128aa4927b06a97e3d1f505a6852105dc81fed BUG 2799: Migration of Message Bus from deprecated Helium MD-SAL APIs to Lithium API (copyrights corrected) Change-Id: I2206b4b532e4feead26c166b793966b077f0f26f Signed-off-by: Igor Bartak --- diff --git a/features/mdsal/pom.xml b/features/mdsal/pom.xml index 35d5dc21b3..9222e13ebb 100644 --- a/features/mdsal/pom.xml +++ b/features/mdsal/pom.xml @@ -348,6 +348,7 @@ ${mdsal.version} + org.opendaylight.controller.samples @@ -462,48 +463,6 @@ - - org.opendaylight.yangtools - yang-maven-plugin - ${yangtools.version} - - - org.opendaylight.yangtools - maven-sal-api-gen-plugin - ${yangtools.version} - jar - - - org.opendaylight.yangtools - yang-binding - ${yangtools.version} - jar - - - org.opendaylight.controller - sal-rest-docgen-maven - ${mdsal.version} - jar - - - - - - generate-sources - - - src - - - org.opendaylight.controller.sal.rest.doc.maven.StaticDocGenerator - ${project.build.directory}/generated-resources/swagger-api-documentation/explorer/static - - - true - - - - diff --git a/features/mdsal/src/main/resources/features.xml b/features/mdsal/src/main/resources/features.xml index bec365cda0..fb368abf2c 100644 --- a/features/mdsal/src/main/resources/features.xml +++ b/features/mdsal/src/main/resources/features.xml @@ -110,5 +110,4 @@ mvn:org.opendaylight.controller.samples/clustering-it-provider/${project.version} mvn:org.opendaylight.controller.samples/clustering-it-config/${project.version}/xml/config - diff --git a/features/netconf-connector/pom.xml b/features/netconf-connector/pom.xml index 9cb2b1e33f..b98f839979 100644 --- a/features/netconf-connector/pom.xml +++ b/features/netconf-connector/pom.xml @@ -135,6 +135,24 @@ org.bouncycastle bcprov-jdk15on + + + + + org.opendaylight.controller + messagebus-api + + + org.opendaylight.controller + messagebus-impl + + + org.opendaylight.controller + messagebus-config + ${mdsal.version} + xml + config + org.opendaylight.controller netconf-connector-config diff --git a/features/netconf-connector/src/main/resources/features.xml b/features/netconf-connector/src/main/resources/features.xml index 7cabbb4929..24a92bdef4 100644 --- a/features/netconf-connector/src/main/resources/features.xml +++ b/features/netconf-connector/src/main/resources/features.xml @@ -117,6 +117,13 @@ mvn:org.opendaylight.controller/netconf-tcp/${netconf.version} + + odl-netconf-connector + odl-mdsal-broker + mvn:org.opendaylight.controller/messagebus-api/${project.version} + mvn:org.opendaylight.controller/messagebus-impl/${project.version} + mvn:org.opendaylight.controller/messagebus-config/${project.version}/xml/config + diff --git a/opendaylight/md-sal/mdsal-artifacts/pom.xml b/opendaylight/md-sal/mdsal-artifacts/pom.xml index 420f888cf1..d9f0f8800f 100644 --- a/opendaylight/md-sal/mdsal-artifacts/pom.xml +++ b/opendaylight/md-sal/mdsal-artifacts/pom.xml @@ -312,12 +312,12 @@ org.opendaylight.controller - message-bus-api + messagebus-api ${project.version} org.opendaylight.controller - message-bus-impl + messagebus-impl ${project.version} diff --git a/opendaylight/md-sal/messagebus-api/pom.xml b/opendaylight/md-sal/messagebus-api/pom.xml index 542308a96f..dda7af4f18 100644 --- a/opendaylight/md-sal/messagebus-api/pom.xml +++ b/opendaylight/md-sal/messagebus-api/pom.xml @@ -17,7 +17,7 @@ and is available at http://www.eclipse.org/legal/epl-v10.html 1.2.0-SNAPSHOT - message-bus-api + messagebus-api ${project.artifactId} bundle diff --git a/opendaylight/md-sal/messagebus-config/pom.xml b/opendaylight/md-sal/messagebus-config/pom.xml new file mode 100644 index 0000000000..262a53e4ec --- /dev/null +++ b/opendaylight/md-sal/messagebus-config/pom.xml @@ -0,0 +1,73 @@ + + + + + 4.0.0 + + + org.opendaylight.controller + sal-parent + 1.2.0-SNAPSHOT + + + messagebus-config + jar + Configuration files for message-bus + + + + org.opendaylight.controller + config-api + + + org.opendaylight.controller + messagebus-api + ${project.version} + + + org.opendaylight.controller + messagebus-impl + ${project.version} + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + attach-artifacts + + attach-artifact + + package + + + + ${project.build.directory}/classes/initial/05-message-bus.xml + xml + config + + + + + + + + + + + scm:git:ssh://git.opendaylight.org:29418/controller.git + scm:git:ssh://git.opendaylight.org:29418/controller.git + HEAD + https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=summary + + diff --git a/opendaylight/md-sal/messagebus-config/src/main/resources/initial/05-message-bus.xml b/opendaylight/md-sal/messagebus-config/src/main/resources/initial/05-message-bus.xml new file mode 100644 index 0000000000..eed06cfd3a --- /dev/null +++ b/opendaylight/md-sal/messagebus-config/src/main/resources/initial/05-message-bus.xml @@ -0,0 +1,39 @@ + + + + + + + + messagebus-app + binding-impl:messagebus-app-impl + + md-sal-binding:binding-broker-osgi-registry + binding-osgi-broker + + + dom:dom-broker-osgi-registry + dom-broker + + + urn:ietf:params:xml:ns:yang:smiv2 + SNMP + + + urn:ietf:params:xml:ns:yang:ietf-syslog-notification + SYSLOG + + + + + + + urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl?module=messagebus-app-impl&revision=2015-02-03 + + diff --git a/opendaylight/md-sal/messagebus-impl/pom.xml b/opendaylight/md-sal/messagebus-impl/pom.xml index ccb72191c4..7e3b599ffe 100644 --- a/opendaylight/md-sal/messagebus-impl/pom.xml +++ b/opendaylight/md-sal/messagebus-impl/pom.xml @@ -1,4 +1,11 @@ + @@ -10,7 +17,7 @@ 1.2.0-SNAPSHOT - message-bus-impl + messagebus-impl ${project.artifactId} bundle @@ -24,6 +31,10 @@ org.opendaylight.controller sal-binding-api + + org.opendaylight.controller + sal-netconf-connector + org.opendaylight.controller sal-core-api @@ -42,12 +53,34 @@ org.opendaylight.controller - message-bus-api + messagebus-api + 1.2.0-SNAPSHOT + + + org.opendaylight.controller + sal-netconf-connector org.opendaylight.controller sal-binding-config + + + + junit + junit + test + + + org.glassfish.jersey.test-framework.providers + jersey-test-framework-provider-grizzly2 + test + + + org.mockito + mockito-all + test + diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java index 1c2b78a85b..022292a6f3 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -7,17 +7,24 @@ */ package org.opendaylight.controller.config.yang.messagebus.app.impl; +import java.util.List; import org.opendaylight.controller.config.api.DependencyResolver; import org.opendaylight.controller.config.api.ModuleIdentifier; -import org.opendaylight.controller.mdsal.InitializationContext; -import org.opendaylight.controller.mdsal.Providers; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.MountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology; +import org.opendaylight.controller.messagebus.app.impl.NetconfEventSourceManager; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; +import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.osgi.framework.BundleContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - -public class MessageBusAppImplModule extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule { +public class MessageBusAppImplModule extends + org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule { private static final Logger LOGGER = LoggerFactory.getLogger(MessageBusAppImplModule.class); private BundleContext bundleContext; @@ -26,49 +33,55 @@ public class MessageBusAppImplModule extends org.opendaylight.controller.config. return bundleContext; } - public void setBundleContext(BundleContext bundleContext) { + public void setBundleContext(final BundleContext bundleContext) { this.bundleContext = bundleContext; } - public MessageBusAppImplModule( ModuleIdentifier identifier, DependencyResolver dependencyResolver) { + public MessageBusAppImplModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver) { super(identifier, dependencyResolver); } - public MessageBusAppImplModule( ModuleIdentifier identifier, - DependencyResolver dependencyResolver, - MessageBusAppImplModule oldModule, - java.lang.AutoCloseable oldInstance) { + public MessageBusAppImplModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver, + final MessageBusAppImplModule oldModule, final java.lang.AutoCloseable oldInstance) { super(identifier, dependencyResolver, oldModule, oldInstance); } @Override - protected void customValidation() {} + protected void customValidation() { + } @Override public java.lang.AutoCloseable createInstance() { - List namespaceMapping = getNamespaceToStream(); - InitializationContext ic = new InitializationContext(namespaceMapping); + final List namespaceMapping = getNamespaceToStream(); + + final ProviderContext bindingCtx = getBindingBrokerDependency().registerProvider(new Providers.BindingAware()); + final ProviderSession domCtx = getDomBrokerDependency().registerProvider(new Providers.BindingIndependent()); - final Providers.BindingAware bap = new Providers.BindingAware(ic); - final Providers.BindingIndependent bip = new Providers.BindingIndependent(ic); + final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class); + final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class); + final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class); + final MountPointService bindingMount = bindingCtx.getSALService(MountPointService.class); + final RpcProviderRegistry rpcRegistry = bindingCtx.getSALService(RpcProviderRegistry.class); - getBindingBrokerDependency().registerProvider(bap, getBundleContext()); - getDomBrokerDependency().registerProvider(bip); + final EventSourceTopology eventSourceTopology = new EventSourceTopology(dataBroker, rpcRegistry); + final NetconfEventSourceManager eventSourceManager = new NetconfEventSourceManager(dataBroker, domPublish, + domMount, bindingMount, eventSourceTopology, getNamespaceToStream()); - AutoCloseable closer = new AutoCloseable() { - @Override public void close() { - closeProvider(bap); - closeProvider(bip); + final AutoCloseable closer = new AutoCloseable() { + @Override + public void close() { + eventSourceTopology.close(); + eventSourceManager.close(); } }; return closer; } - private void closeProvider(AutoCloseable closable) { + private void closeProvider(final AutoCloseable closable) { try { closable.close(); - } catch (Exception e) { + } catch (final Exception e) { LOGGER.error("Exception while closing: {}\n Exception: {}", closable, e); } } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/Providers.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/Providers.java similarity index 62% rename from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/Providers.java rename to opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/Providers.java index a28e588d43..7f7aa2cdf7 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/Providers.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/Providers.java @@ -1,12 +1,12 @@ /** - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.mdsal; +package org.opendaylight.controller.config.yang.messagebus.app.impl; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; @@ -19,16 +19,10 @@ public class Providers { private static final Logger LOGGER = LoggerFactory.getLogger(Providers.class); public static class BindingAware implements BindingAwareProvider, AutoCloseable { - private final InitializationContext initializationContext; - public BindingAware(InitializationContext ic) { - this.initializationContext = ic; - } @Override - public void onSessionInitiated(BindingAwareBroker.ProviderContext session) { - initializationContext.set(session); - + public void onSessionInitiated(final BindingAwareBroker.ProviderContext session) { LOGGER.info("BindingAwareBroker.ProviderContext initialized"); } @@ -37,16 +31,9 @@ public class Providers { } public static class BindingIndependent extends AbstractProvider implements AutoCloseable { - private final InitializationContext initializationContext; - - public BindingIndependent(InitializationContext ic) { - this.initializationContext = ic; - } @Override - public void onSessionInitiated(Broker.ProviderSession session) { - initializationContext.set(session); - + public void onSessionInitiated(final Broker.ProviderSession session) { LOGGER.info("Broker.ProviderSession initialized"); } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java deleted file mode 100644 index a881fac850..0000000000 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.mdsal; - -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.binding.DataObject; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; - -public class DataStore { - private static final FutureCallback DEFAULT_CALLBACK = - new FutureCallback() { - public void onSuccess(Void result) { - // TODO: Implement default behaviour - } - - public void onFailure(Throwable t) { - // TODO: Implement default behaviour - }; - }; - - private final MdSAL mdSAL; - - public DataStore(MdSAL mdSAL) { - this.mdSAL = mdSAL; - } - - public ListenerRegistration registerDataChangeListener(LogicalDatastoreType store, - InstanceIdentifier path, - DataChangeListener listener, - AsyncDataBroker.DataChangeScope triggeringScope) { - return mdSAL.getDataBroker().registerDataChangeListener(store, path, listener, triggeringScope); - } - - public void asyncPUT(LogicalDatastoreType datastoreType, - InstanceIdentifier path, - T data) { - asyncPUT(datastoreType, path, data, DEFAULT_CALLBACK); - } - - public void asyncPUT(LogicalDatastoreType datastoreType, - InstanceIdentifier path, - T data, - FutureCallback callback) { - WriteTransaction tx = mdSAL.getDataBroker().newWriteOnlyTransaction(); - tx.put(datastoreType, path, data, true); - execPut(tx, callback); - } - - public T read(LogicalDatastoreType datastoreType, - InstanceIdentifier path) { - - ReadOnlyTransaction tx = mdSAL.getDataBroker().newReadOnlyTransaction(); - T result = null; - - try { - result = tx.read(datastoreType, path).get().get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return result; - } - - private static void execPut(WriteTransaction tx, FutureCallback callback) { - Futures.addCallback(tx.submit(), callback); - } -} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java deleted file mode 100644 index c73fb2ad83..0000000000 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.mdsal; - -import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream; -import org.opendaylight.controller.messagebus.app.impl.EventAggregator; -import org.opendaylight.controller.messagebus.app.impl.EventSourceManager; -import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; -import org.opendaylight.controller.sal.core.api.Broker; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class InitializationContext { - private static final Logger LOGGER = LoggerFactory.getLogger(InitializationContext.class); - - private final MdSAL mdSal; - private final DataStore dataStore; - private final EventSourceTopology eventSourceTopology; - private final EventSourceManager eventSourceManager; - private final EventAggregator eventAggregator; - - public InitializationContext(List namespaceMapping) { - this.mdSal = new MdSAL(); - this.dataStore = new DataStore(mdSal); - this.eventSourceTopology = new EventSourceTopology(dataStore); - this.eventSourceManager = new EventSourceManager(dataStore, mdSal, eventSourceTopology, namespaceMapping); - this.eventAggregator = new EventAggregator(mdSal, eventSourceTopology); - } - - public synchronized void set(BindingAwareBroker.ProviderContext session) { - mdSal.setBindingAwareContext(session); - - if (mdSal.isReady()) { - initialize(); - } - } - - public synchronized void set(Broker.ProviderSession session) { - mdSal.setBindingIndependentContext(session); - - if (mdSal.isReady()) { - initialize(); - } - } - - private void initialize() { - eventSourceTopology.mdsalReady(); - eventSourceManager.mdsalReady(); - eventAggregator.mdsalReady(); - - LOGGER.info("InitializationContext started."); - } -} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java deleted file mode 100644 index 03b220a5fd..0000000000 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.mdsal; - -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; -import org.opendaylight.controller.sal.binding.api.BindingAwareService; -import org.opendaylight.controller.sal.binding.api.mount.MountInstance; -import org.opendaylight.controller.sal.binding.api.mount.MountProviderService; -import org.opendaylight.controller.sal.core.api.Broker; -import org.opendaylight.controller.sal.core.api.BrokerService; -import org.opendaylight.controller.sal.core.api.notify.NotificationListener; -import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService; -import org.opendaylight.controller.sal.core.api.notify.NotificationService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.binding.RpcService; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MdSAL { - private static final Logger LOGGER = LoggerFactory.getLogger(MdSAL.class); - - private BindingAwareBroker.ProviderContext bindingAwareContext; - private Broker.ProviderSession bindingIndependentContext; - - // ----------------------------- - // ----- FRAMEWORK METHODS ----- - // ----------------------------- - public void setBindingAwareContext(BindingAwareBroker.ProviderContext bindingAwareContext) { - this.bindingAwareContext = bindingAwareContext; - } - - public void setBindingIndependentContext(Broker.ProviderSession bindingIndependentContext) { - this.bindingIndependentContext = bindingIndependentContext; - } - - //TODO: We should hide brokers and expose functionalities instead - public DataBroker getDataBroker() { - return getBaSalService(DataBroker.class); - } - - public synchronized boolean isReady() { - return (bindingAwareContext != null && bindingIndependentContext != null); - } - - // ----------------------- - // ----- API METHODS ----- - // ----------------------- - // TODO: Factor out API methods to interface - // method does not return registration object. Rather will hold references internally and manipulate using node id and API - public void addRpcImplementation(Class serviceInterface, - T implementation) - throws IllegalStateException { - bindingAwareContext.addRpcImplementation(serviceInterface, implementation); - } - - // method does not return registration object. Rather will hold references internally and manipulate using node id and API - public void addRpcImplementation(Node node, - Class serviceInterface, - T implementation) - throws IllegalStateException { - BindingAwareBroker.RoutedRpcRegistration registration - = addRoutedRpcImplementation(serviceInterface, implementation); - - NodeRef nodeRef = createNodeRef(node.getId()); - registration.registerPath(NodeContext.class, nodeRef.getValue()); - } - - public ListenerRegistration addNotificationListener(String nodeId, - QName notification, - NotificationListener listener) { - YangInstanceIdentifier yii = inventoryNodeBIIdentifier(nodeId); - - NotificationService notificationService = - getBiSalService(DOMMountPointService.class) - .getMountPoint(yii) - .get() - .getService(NotificationPublishService.class) - .get(); - - ListenerRegistration registration = - notificationService.addNotificationListener(notification, listener); - - LOGGER.info("Notification listener registered for {}, at node {}", notification, nodeId); - - return registration; - } - - public ListenerRegistration addNotificationListener(QName notification, - NotificationListener listener) { - NotificationService notificationService = - getBiSalService(NotificationPublishService.class); - - ListenerRegistration registration = - notificationService.addNotificationListener(notification, listener); - - LOGGER.info("Notification listener registered for {}.", notification); - - return registration; - } - - public T getRpcService(Class serviceInterface) { - return bindingAwareContext.getRpcService(serviceInterface); - } - - public T getRpcService(String nodeId, Class serviceInterface) { - MountProviderService mountProviderService = getBaSalService(MountProviderService.class); - - InstanceIdentifier key = InstanceIdentifier.create(Nodes.class) - .child(Node.class, - new NodeKey(new NodeId(nodeId))); - - MountInstance mountPoint = mountProviderService.getMountPoint(key); - return mountPoint.getRpcService(serviceInterface); - } - - public void publishNotification(CompositeNode notification) { - getBiSalService(NotificationPublishService.class).publish(notification); - } - - public SchemaContext getSchemaContext(String nodeId) { - YangInstanceIdentifier yii = inventoryNodeBIIdentifier(nodeId); - - SchemaContext schemaContext = - getBiSalService(DOMMountPointService.class) - .getMountPoint(yii) - .get().getSchemaContext(); - - return schemaContext; - } - - // --------------------------- - // ----- UTILITY METHODS ----- - // --------------------------- - private T getBaSalService(Class service) { - return bindingAwareContext.getSALService(service); - } - - private T getBiSalService(Class service) { - return bindingIndependentContext.getService(service); - } - - private static final String NODE_ID_NAME = "id"; - - public static YangInstanceIdentifier inventoryNodeBIIdentifier(String nodeId) { - return YangInstanceIdentifier.builder() - .node(Nodes.QNAME) - .nodeWithKey(Node.QNAME, - QName.create(Node.QNAME.getNamespace(), - Node.QNAME.getRevision(), - NODE_ID_NAME), - nodeId) - .build(); - } - - private BindingAwareBroker.RoutedRpcRegistration addRoutedRpcImplementation(Class serviceInterface, - T implementation) - throws IllegalStateException { - return bindingAwareContext.addRoutedRpcImplementation(serviceInterface, implementation); - } - - public static NodeRef createNodeRef(NodeId nodeId) { - NodeKey nodeKey = new NodeKey(nodeId); - InstanceIdentifier path = InstanceIdentifier - .builder(Nodes.class) - .child(Node.class, nodeKey) - .build(); - return new NodeRef(path); - } -} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java deleted file mode 100644 index 4b77bf2a4c..0000000000 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.messagebus.app.impl; - -import java.util.List; -import java.util.concurrent.Future; -import org.opendaylight.controller.mdsal.MdSAL; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -// TODO: implement topic created notification -public class EventAggregator implements EventAggregatorService { - private static final Logger LOGGER = LoggerFactory.getLogger(EventAggregator.class); - - private final MdSAL mdSAL; - private final EventSourceTopology eventSourceTopology; - - public EventAggregator(final MdSAL mdSAL, final EventSourceTopology eventSourceTopology) { - this.mdSAL = mdSAL; - this.eventSourceTopology = eventSourceTopology; - } - - public void mdsalReady() { - mdSAL.addRpcImplementation(EventAggregatorService.class, this); - } - - @Override - public Future> createTopic(final CreateTopicInput input) { - LOGGER.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}", - input.getNotificationPattern(), - input.getNodeIdPattern()); - - Topic topic = new Topic(new NotificationPattern(input.getNotificationPattern()), input.getNodeIdPattern().getValue(), mdSAL); - - //# Make sure we capture all nodes from now on - eventSourceTopology.registerDataChangeListener(topic); - - //# Notify existing nodes - //# Code reader note: Context of Node type is NetworkTopology - List nodes = eventSourceTopology.snapshot(); - for (Node node : nodes) { - NodeId nodeIdToNotify = node.getAugmentation(Node1.class).getEventSourceNode(); - topic.notifyNode(nodeIdToNotify); - } - - CreateTopicOutput cto = new CreateTopicOutputBuilder() - .setTopicId(topic.getTopicId()) - .build(); - - return Util.resultFor(cto); - } - - @Override - public Future> destroyTopic(final DestroyTopicInput input) { - // 1. UNREGISTER DATA CHANGE LISTENER -> ? - // 2. CLOSE TOPIC - return null; - } -} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java deleted file mode 100644 index a84eddd458..0000000000 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.messagebus.app.impl; - -import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.mdsal.DataStore; -import org.opendaylight.controller.mdsal.MdSAL; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; -import org.opendaylight.yangtools.yang.binding.DataObject; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public final class EventSourceManager implements DataChangeListener { - private static final Logger LOGGER = LoggerFactory.getLogger(EventSourceManager.class); - private static final InstanceIdentifier INVENTORY_PATH = InstanceIdentifier.create(Nodes.class) - .child(Node.class); - private final DataStore dataStore; - private final MdSAL mdSal; - private final EventSourceTopology eventSourceTopology; - private final Map streamMap; - - public EventSourceManager(DataStore dataStore, - MdSAL mdSal, - EventSourceTopology eventSourceTopology, - List namespaceMapping) { - this.dataStore = dataStore; - this.mdSal = mdSal; - this.eventSourceTopology = eventSourceTopology; - this.streamMap = namespaceToStreamMapping(namespaceMapping); - } - - private Map namespaceToStreamMapping(List namespaceMapping) { - Map streamMap = new HashMap<>(namespaceMapping.size()); - - for (NamespaceToStream nToS : namespaceMapping) { - streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName()); - } - - return streamMap; - } - - public void mdsalReady() { - dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, - INVENTORY_PATH, - this, - DataBroker.DataChangeScope.SUBTREE); - - LOGGER.info("EventSourceManager initialized."); - } - - @Override - public void onDataChanged(AsyncDataChangeEvent, DataObject> event) { - //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect. - LOGGER.debug("[DataChangeEvent, DataObject>: {}]", event); - - Node node = Util.getAffectedNode(event); - // we listen on node tree, therefore we should rather throw IllegalStateException when node is null - if ( node == null ) { - LOGGER.debug("OnDataChanged Event. Node is null."); - return; - } - if ( isNetconfNode(node) == false ) { - LOGGER.debug("OnDataChanged Event. Not a Netconf node."); - return; - } - if ( isEventSource(node) == false ) { - LOGGER.debug("OnDataChanged Event. Node an EventSource node."); - return; - } - - NetconfEventSource netconfEventSource = new NetconfEventSource(mdSal, - node.getKey().getId().getValue(), - streamMap); - mdSal.addRpcImplementation(node, EventSourceService.class, netconfEventSource); - - InstanceIdentifier nodeInstanceIdentifier = - InstanceIdentifier.create(Nodes.class) - .child(Node.class, node.getKey()) - .augmentation(NetconfNode.class); - - dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, - nodeInstanceIdentifier, - netconfEventSource, - DataBroker.DataChangeScope.SUBTREE); - - eventSourceTopology.insert(node); - } - - private boolean isNetconfNode(Node node) { - return node.getAugmentation(NetconfNode.class) != null ; - } - - public boolean isEventSource(Node node) { - NetconfNode netconfNode = node.getAugmentation(NetconfNode.class); - - return isEventSource(netconfNode); - } - - private boolean isEventSource(NetconfNode node) { - for (String capability : node.getInitialCapability()) { - if(capability.startsWith("urn:ietf:params:xml:ns:netconf:notification")) { - return true; - } - } - - return false; - } -} \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Topic.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java similarity index 67% rename from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Topic.java rename to opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java index a32413064e..98e168eee9 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Topic.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -9,36 +9,36 @@ package org.opendaylight.controller.messagebus.app.impl; import com.google.common.base.Preconditions; +import java.util.Map; import java.util.regex.Pattern; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.mdsal.MdSAL; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.LoggerFactory; -public class Topic implements DataChangeListener { - private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(Topic.class); +public class EventSourceTopic implements DataChangeListener { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class); private final NotificationPattern notificationPattern; + private final EventSourceService sourceService; private final Pattern nodeIdPattern; private final TopicId topicId; - private final MdSAL mdSal; - public Topic(final NotificationPattern notificationPattern, final String nodeIdPattern, final MdSAL mdSal) { + public EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdPattern, final EventSourceService eventSource) { this.notificationPattern = Preconditions.checkNotNull(notificationPattern); + this.sourceService = eventSource; // FIXME: regex should be the language of nodeIdPattern final String regex = Util.wildcardToRegex(nodeIdPattern); this.nodeIdPattern = Pattern.compile(regex); - this.mdSal = Preconditions.checkNotNull(mdSal); + // FIXME: We need to perform some salting in order to make // the topic IDs less predictable. @@ -51,26 +51,27 @@ public class Topic implements DataChangeListener { @Override public void onDataChanged(final AsyncDataChangeEvent, DataObject> event) { - // TODO: affected must return topologyNode !!! - final Node node = Util.getAffectedNode(event); - if (nodeIdPattern.matcher(node.getId().getValue()).matches()) { - notifyNode(node.getId()); - } else { - LOG.debug("Skipping node {}", node.getId()); + for (final Map.Entry, DataObject> changeEntry : event.getUpdatedData().entrySet()) { + if (changeEntry.getValue() instanceof Node) { + final Node node = (Node) changeEntry.getValue(); + if (nodeIdPattern.matcher(node.getId().getValue()).matches()) { + notifyNode(changeEntry.getKey()); + } + } } } - public void notifyNode(final NodeId nodeId) { - JoinTopicInput jti = getJoinTopicInputArgument(nodeId); - EventSourceService ess = mdSal.getRpcService(EventSourceService.class); - Preconditions.checkState(ess != null, "EventSourceService is not registered"); - - ess.joinTopic(jti); + public void notifyNode(final InstanceIdentifier nodeId) { + try { + sourceService.joinTopic(getJoinTopicInputArgument(nodeId)); + } catch (final Exception e) { + LOG.error("Could not invoke join topic for node {}", nodeId); + } } - private JoinTopicInput getJoinTopicInputArgument(final NodeId nodeId) { - NodeRef nodeRef = MdSAL.createNodeRef(nodeId); - JoinTopicInput jti = + private JoinTopicInput getJoinTopicInputArgument(final InstanceIdentifier path) { + final NodeRef nodeRef = new NodeRef(path); + final JoinTopicInput jti = new JoinTopicInputBuilder() .setNode(nodeRef.getValue()) .setTopicId(topicId) @@ -78,4 +79,6 @@ public class Topic implements DataChangeListener { .build(); return jti; } + + } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index c0700971dd..603f34bac9 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -8,99 +8,201 @@ package org.opendaylight.controller.messagebus.app.impl; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.Futures; + +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.mdsal.DataStore; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; +import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.regex.Pattern; -public class EventSourceTopology { - private static final Logger LOGGER = LoggerFactory.getLogger(EventSourceTopology.class); +public class EventSourceTopology implements EventAggregatorService, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class); - private static final String topologyId = "EVENT-SOURCE-TOPOLOGY" ; - private static final TopologyKey topologyKey = new TopologyKey(new TopologyId(topologyId)); - private static final LogicalDatastoreType datastoreType = LogicalDatastoreType.OPERATIONAL; + private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ; + private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID)); + private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL; - private static final InstanceIdentifier topologyInstanceIdentifier = + private static final InstanceIdentifier EVENT_SOURCE_TOPOLOGY_PATH = InstanceIdentifier.create(NetworkTopology.class) - .child(Topology.class, topologyKey); + .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY); - private static final InstanceIdentifier topologyTypeInstanceIdentifier = - topologyInstanceIdentifier + private static final InstanceIdentifier TOPOLOGY_TYPE_PATH = + EVENT_SOURCE_TOPOLOGY_PATH .child(TopologyTypes.class) .augmentation(TopologyTypes1.class); - private static final InstanceIdentifier eventSourceTopologyPath = - InstanceIdentifier.create(NetworkTopology.class) - .child(Topology.class) - .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang - .network.topology.rev131021.network.topology.topology.Node.class); - private final Map> registrations = new ConcurrentHashMap<>(); - private final DataStore dataStore; + private final DataBroker dataBroker; + private final RpcRegistration aggregatorRpcReg; + private final EventSourceService eventSourceService; + private final RpcProviderRegistry rpcRegistry; + private final ExecutorService executorService; + + public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) { + this.dataBroker = dataBroker; + this.executorService = Executors.newCachedThreadPool(); + this.rpcRegistry = rpcRegistry; + aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this); + eventSourceService = rpcRegistry.getRpcService(EventSourceService.class); + + final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); + final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); + putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment); + } + + private void putData(final LogicalDatastoreType store, + final InstanceIdentifier path, final T data) { - public EventSourceTopology(DataStore dataStore) { - this.dataStore = dataStore; + final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); + tx.put(store, path, data, true); + tx.submit(); } - public void mdsalReady() { - TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); - TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); + private void insert(final KeyedInstanceIdentifier sourcePath, final Node node) { + final NodeKey nodeKey = node.getKey(); + final InstanceIdentifier augmentPath = sourcePath.augmentation(Node1.class); + final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build(); + putData(OPERATIONAL, augmentPath, nodeAgument); + } - dataStore.asyncPUT(datastoreType, topologyTypeInstanceIdentifier, topologyTypeAugment); + private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){ + executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic)); } - public void insert(Node node) { - String nodeId = node.getKey().getId().getValue(); - NodeKey nodeKey = new NodeKey(new NodeId(nodeId)); - InstanceIdentifier topologyNodeAugment - = topologyInstanceIdentifier - .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang - .network.topology.rev131021.network.topology.topology.Node.class, nodeKey) - .augmentation(Node1.class); - - Node1 nodeAgument = new Node1Builder().setEventSourceNode(node.getId()).build(); - dataStore.asyncPUT(datastoreType, topologyNodeAugment, nodeAgument); + @Override + public Future> createTopic(final CreateTopicInput input) { + LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}", + input.getNotificationPattern(), + input.getNodeIdPattern()); + + final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern()); + final String nodeIdPattern = input.getNodeIdPattern().getValue(); + final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern)); + final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService); + + registerTopic(eventSourceTopic); + + notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic); + + final CreateTopicOutput cto = new CreateTopicOutputBuilder() + .setTopicId(eventSourceTopic.getTopicId()) + .build(); + + return Util.resultFor(cto); + } + + @Override + public Future> destroyTopic(final DestroyTopicInput input) { + return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented")); } - // TODO: Should we expose this functioanlity over RPC? - public List snapshot() { - Topology topology = dataStore.read(datastoreType, topologyInstanceIdentifier); - return topology.getNode(); + @Override + public void close() { + aggregatorRpcReg.close(); } - public void registerDataChangeListener(DataChangeListener listener) { - ListenerRegistration listenerRegistration = dataStore.registerDataChangeListener(datastoreType, - eventSourceTopologyPath, + public void registerTopic(final EventSourceTopic listener) { + final ListenerRegistration listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL, + EVENT_SOURCE_TOPOLOGY_PATH, listener, DataBroker.DataChangeScope.SUBTREE); registrations.put(listener, listenerRegistration); } + + public void register(final Node node, final NetconfEventSource netconfEventSource) { + final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()); + rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource) + .registerPath(NodeContext.class, sourcePath); + insert(sourcePath,node); + // FIXME: Return registration object. + } + + private class NotifyAllNodeExecutor implements Runnable { + + private final EventSourceTopic topic; + private final DataBroker dataBroker; + private final Pattern nodeIdPatternRegex; + + public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) { + this.topic = topic; + this.dataBroker = dataBroker; + this.nodeIdPatternRegex = nodeIdPatternRegex; + } + + @Override + public void run() { + //# Code reader note: Context of Node type is NetworkTopology + final List nodes = snapshot(); + for (final Node node : nodes) { + if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { + topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); + } + } + } + + private List snapshot() { + try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) { + + final Optional data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet(); + + if(data.isPresent()) { + final List nodeList = data.get().getNode(); + if(nodeList != null) { + return nodeList; + } + } + return Collections.emptyList(); + } catch (final ReadFailedException e) { + LOG.error("Unable to retrieve node list.", e); + return Collections.emptyList(); + } + } + } } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java index 9c0697f3fb..0d54beb644 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -8,57 +8,92 @@ package org.opendaylight.controller.messagebus.app.impl; -import com.google.common.base.Preconditions; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Future; import java.util.regex.Pattern; + +import javax.xml.stream.XMLStreamException; +import javax.xml.transform.dom.DOMResult; +import javax.xml.transform.dom.DOMSource; + import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.binding.api.MountPoint; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.mdsal.MdSAL; -import org.opendaylight.controller.sal.core.api.notify.NotificationListener; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMNotification; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; +import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; -import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +import com.google.common.base.Optional; +import com.google.common.base.Throwables; + +public class NetconfEventSource implements EventSourceService, DOMNotificationListener, DataChangeListener { + + private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class); + + private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME); + private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id")); + private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload")); + + private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream")); + private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); -public class NetconfEventSource implements EventSourceService, NotificationListener, DataChangeListener { - private static final Logger LOGGER = LoggerFactory.getLogger(NetconfEventSource.class); - private final MdSAL mdSal; private final String nodeId; - private final List activeStreams = new ArrayList<>(); + + private final DOMMountPoint netconfMount; + private final DOMNotificationPublishService domPublish; + private final NotificationsService notificationRpcService; + + private final Set activeStreams = new ConcurrentSkipListSet<>(); private final Map urnPrefixToStreamMap; - public NetconfEventSource(final MdSAL mdSal, final String nodeId, final Map streamMap) { - Preconditions.checkNotNull(mdSal); - Preconditions.checkNotNull(nodeId); - this.mdSal = mdSal; + public NetconfEventSource(final String nodeId, final Map streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) { + this.netconfMount = netconfMount; + this.notificationRpcService = bindingMount.getService(RpcConsumerRegistry.class).get().getRpcService(NotificationsService.class); this.nodeId = nodeId; this.urnPrefixToStreamMap = streamMap; - - LOGGER.info("NetconfEventSource [{}] created.", nodeId); + this.domPublish = publishService; + LOG.info("NetconfEventSource [{}] created.", nodeId); } @Override @@ -69,63 +104,63 @@ public class NetconfEventSource implements EventSourceService, NotificationListe final String regex = Util.wildcardToRegex(notificationPattern.getValue()); final Pattern pattern = Pattern.compile(regex); - List matchingNotifications = Util.expandQname(availableNotifications(), pattern); + final List matchingNotifications = Util.expandQname(availableNotifications(), pattern); registerNotificationListener(matchingNotifications); - return null; + final JoinTopicOutput output = new JoinTopicOutputBuilder().build(); + return com.google.common.util.concurrent.Futures.immediateFuture(RpcResultBuilder.success(output).build()); } - private List availableNotifications() { + private List availableNotifications() { // FIXME: use SchemaContextListener to get changes asynchronously - Set availableNotifications = mdSal.getSchemaContext(nodeId).getNotifications(); - List qNs = new ArrayList<>(availableNotifications.size()); - for (NotificationDefinition nd : availableNotifications) { - qNs.add(nd.getQName()); + final Set availableNotifications = netconfMount.getSchemaContext().getNotifications(); + final List qNs = new ArrayList<>(availableNotifications.size()); + for (final NotificationDefinition nd : availableNotifications) { + qNs.add(nd.getPath()); } - return qNs; } - private void registerNotificationListener(final List notificationsToSubscribe) { - for (QName qName : notificationsToSubscribe) { - startSubscription(qName); - // FIXME: do not lose this registration - final ListenerRegistration reg = mdSal.addNotificationListener(nodeId, qName, this); + private void registerNotificationListener(final List notificationsToSubscribe) { + + final Optional notifyService = netconfMount.getService(DOMNotificationService.class); + if(notifyService.isPresent()) { + for (final SchemaPath qName : notificationsToSubscribe) { + startSubscription(qName); + } + // FIXME: Capture registration + notifyService.get().registerNotificationListener(this, notificationsToSubscribe); } } - private synchronized void startSubscription(final QName qName) { - String streamName = resolveStream(qName); + private void startSubscription(final SchemaPath path) { + final String streamName = resolveStream(path.getLastComponent()); if (streamIsActive(streamName) == false) { - LOGGER.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId); + LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId); startSubscription(streamName); } } - private synchronized void resubscribeToActiveStreams() { - for (String streamName : activeStreams) { + private void resubscribeToActiveStreams() { + for (final String streamName : activeStreams) { startSubscription(streamName); } } private synchronized void startSubscription(final String streamName) { - CreateSubscriptionInput subscriptionInput = getSubscriptionInput(streamName); - mdSal.getRpcService(nodeId, NotificationsService.class).createSubscription(subscriptionInput); + final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) + .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName)) + .build(); + netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); activeStreams.add(streamName); } - private static CreateSubscriptionInput getSubscriptionInput(final String streamName) { - CreateSubscriptionInputBuilder csib = new CreateSubscriptionInputBuilder(); - csib.setStream(new StreamNameType(streamName)); - return csib.build(); - } - private String resolveStream(final QName qName) { String streamName = null; - for (Map.Entry entry : urnPrefixToStreamMap.entrySet()) { - String nameSpace = qName.getNamespace().toString(); - String urnPrefix = entry.getKey(); + for (final Map.Entry entry : urnPrefixToStreamMap.entrySet()) { + final String nameSpace = qName.getNamespace().toString(); + final String urnPrefix = entry.getKey(); if( nameSpace.startsWith(urnPrefix) ) { streamName = entry.getValue(); break; @@ -139,24 +174,40 @@ public class NetconfEventSource implements EventSourceService, NotificationListe return activeStreams.contains(streamName); } - // PASS - @Override public Set getSupportedNotifications() { - return null; + @Override + public void onNotification(final DOMNotification notification) { + final ContainerNode topicNotification = Builders.containerBuilder() + .withNodeIdentifier(TOPIC_NOTIFICATION_ARG) + .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId)) + .withChild(encapsulate(notification)) + .build(); + try { + domPublish.putNotification(new TopicDOMNotification(topicNotification)); + } catch (final InterruptedException e) { + throw Throwables.propagate(e); + } } - @Override - public void onNotification(final CompositeNode notification) { - LOGGER.info("NetconfEventSource {} received notification {}. Will publish to MD-SAL.", nodeId, notification); - ImmutableCompositeNode payload = ImmutableCompositeNode.builder() - .setQName(QName.create(TopicNotification.QNAME, "payload")) - .add(notification).toInstance(); - ImmutableCompositeNode icn = ImmutableCompositeNode.builder() - .setQName(TopicNotification.QNAME) - .add(payload) - .addLeaf("event-source", nodeId) - .toInstance(); - - mdSal.publishNotification(icn); + private AnyXmlNode encapsulate(final DOMNotification body) { + // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools + final Document doc = XmlUtil.newDocument(); + final Optional namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString()); + final Element element = XmlUtil.createElement(doc , "payload", namespace); + + + final DOMResult result = new DOMResult(element); + + final SchemaContext context = netconfMount.getSchemaContext(); + final SchemaPath schemaPath = body.getType(); + try { + NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context); + return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG) + .withValue(new DOMSource(element)) + .build(); + } catch (IOException | XMLStreamException e) { + LOG.error("Unable to encapsulate notification.",e); + throw Throwables.propagate(e); + } } @Override @@ -164,16 +215,16 @@ public class NetconfEventSource implements EventSourceService, NotificationListe boolean wasConnected = false; boolean nowConnected = false; - for (Map.Entry, DataObject> changeEntry : change.getOriginalData().entrySet()) { + for (final Map.Entry, DataObject> changeEntry : change.getOriginalData().entrySet()) { if ( isNetconfNode(changeEntry) ) { - NetconfNode nn = (NetconfNode)changeEntry.getValue(); + final NetconfNode nn = (NetconfNode)changeEntry.getValue(); wasConnected = nn.isConnected(); } } - for (Map.Entry, DataObject> changeEntry : change.getUpdatedData().entrySet()) { + for (final Map.Entry, DataObject> changeEntry : change.getUpdatedData().entrySet()) { if ( isNetconfNode(changeEntry) ) { - NetconfNode nn = (NetconfNode)changeEntry.getValue(); + final NetconfNode nn = (NetconfNode)changeEntry.getValue(); nowConnected = nn.isConnected(); } } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManager.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManager.java new file mode 100644 index 0000000000..6533136522 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManager.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.messagebus.app.impl; + +import com.google.common.base.Optional; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.binding.api.MountPoint; +import org.opendaylight.controller.md.sal.binding.api.MountPointService; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(NetconfEventSourceManager.class); + private static final TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())); + private static final InstanceIdentifier NETCONF_DEVICE_PATH = InstanceIdentifier.create(NetworkTopology.class) + .child(Topology.class, NETCONF_TOPOLOGY_KEY) + .child(Node.class); + + private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder() + .node(NetworkTopology.QNAME) + .node(Topology.QNAME) + .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"),TopologyNetconf.QNAME.getLocalName()) + .node(Node.QNAME) + .build(); + private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id"); + + + private final EventSourceTopology eventSourceTopology; + private final Map streamMap; + + private final ConcurrentHashMap, NetconfEventSource> netconfSources = new ConcurrentHashMap<>(); + private final ListenerRegistration listenerReg; + private final DOMNotificationPublishService publishService; + private final DOMMountPointService domMounts; + private final MountPointService bindingMounts; + + public NetconfEventSourceManager(final DataBroker dataStore, + final DOMNotificationPublishService domPublish, + final DOMMountPointService domMount, + final MountPointService bindingMount, + final EventSourceTopology eventSourceTopology, + final List namespaceMapping) { + + listenerReg = dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE); + this.eventSourceTopology = eventSourceTopology; + this.streamMap = namespaceToStreamMapping(namespaceMapping); + this.domMounts = domMount; + this.bindingMounts = bindingMount; + this.publishService = domPublish; + LOGGER.info("EventSourceManager initialized."); + } + + private Map namespaceToStreamMapping(final List namespaceMapping) { + final Map streamMap = new HashMap<>(namespaceMapping.size()); + + for (final NamespaceToStream nToS : namespaceMapping) { + streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName()); + } + + return streamMap; + } + + @Override + public void onDataChanged(final AsyncDataChangeEvent, DataObject> event) { + //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect. + LOGGER.debug("[DataChangeEvent, DataObject>: {}]", event); + for (final Map.Entry, DataObject> changeEntry : event.getCreatedData().entrySet()) { + if (changeEntry.getValue() instanceof Node) { + nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue()); + } + } + + + for (final Map.Entry, DataObject> changeEntry : event.getUpdatedData().entrySet()) { + if (changeEntry.getValue() instanceof Node) { + nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue()); + } + } + + + } + + private void nodeUpdated(final InstanceIdentifier key, final Node node) { + + // we listen on node tree, therefore we should rather throw IllegalStateException when node is null + if ( node == null ) { + LOGGER.debug("OnDataChanged Event. Node is null."); + return; + } + if ( isNetconfNode(node) == false ) { + LOGGER.debug("OnDataChanged Event. Not a Netconf node."); + return; + } + if ( isEventSource(node) == false ) { + LOGGER.debug("OnDataChanged Event. Node an EventSource node."); + return; + } + if(node.getAugmentation(NetconfNode.class).getConnectionStatus() != ConnectionStatus.Connected ) { + return; + } + + if(!netconfSources.containsKey(key)) { + createEventSource(key,node); + } + } + + private void createEventSource(final InstanceIdentifier key, final Node node) { + final Optional netconfMount = domMounts.getMountPoint(domMountPath(node.getNodeId())); + final Optional bindingMount = bindingMounts.getMountPoint(key); + + if(netconfMount.isPresent() && bindingMount.isPresent()) { + final String nodeId = node.getNodeId().getValue(); + final NetconfEventSource netconfEventSource = new NetconfEventSource(nodeId, streamMap, netconfMount.get(), publishService, bindingMount.get()); + eventSourceTopology.register(node,netconfEventSource); + netconfSources.putIfAbsent(key, netconfEventSource); + } + } + + private YangInstanceIdentifier domMountPath(final NodeId nodeId) { + return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH).nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build(); + } + + private boolean isNetconfNode(final Node node) { + return node.getAugmentation(NetconfNode.class) != null ; + } + + public boolean isEventSource(final Node node) { + final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class); + + return isEventSource(netconfNode); + } + + private boolean isEventSource(final NetconfNode node) { + for (final String capability : node.getAvailableCapabilities().getAvailableCapability()) { + if(capability.startsWith("(urn:ietf:params:xml:ns:netconf:notification")) { + return true; + } + } + + return false; + } + + @Override + public void close() { + listenerReg.close(); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotification.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotification.java new file mode 100644 index 0000000000..bd27db7d52 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotification.java @@ -0,0 +1,40 @@ + +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.messagebus.app.impl; + +import org.opendaylight.controller.md.sal.dom.api.DOMNotification; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +public class TopicDOMNotification implements DOMNotification { + + private static final SchemaPath TOPIC_NOTIFICATION_ID = SchemaPath.create(true, TopicNotification.QNAME); + private final ContainerNode body; + + public TopicDOMNotification(final ContainerNode body) { + this.body = body; + } + + @Override + public SchemaPath getType() { + return TOPIC_NOTIFICATION_ID; + } + + @Override + public ContainerNode getBody() { + return body; + } + + @Override + public String toString() { + return "TopicDOMNotification [body=" + body + "]"; + } +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java index 9927d85c3e..1c0b8b3ef8 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -8,24 +8,19 @@ package org.opendaylight.controller.messagebus.app.impl; -import com.google.common.util.concurrent.Futures; import java.math.BigInteger; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.Future; import java.util.regex.Pattern; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.sal.common.util.Rpcs; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yangtools.yang.binding.DataObject; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError; + import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +import com.google.common.util.concurrent.Futures; public final class Util { private static final MessageDigest messageDigestTemplate = getDigestInstance(); @@ -33,65 +28,43 @@ public final class Util { private static MessageDigest getDigestInstance() { try { return MessageDigest.getInstance("MD5"); - } catch (NoSuchAlgorithmException e) { + } catch (final NoSuchAlgorithmException e) { throw new RuntimeException("Unable to get MD5 instance"); } } - public static String md5String(final String inputString) { + static String md5String(final String inputString) { try { - MessageDigest md = (MessageDigest)messageDigestTemplate.clone(); + final MessageDigest md = (MessageDigest)messageDigestTemplate.clone(); md.update(inputString.getBytes("UTF-8"), 0, inputString.length()); return new BigInteger(1, md.digest()).toString(16); - } catch (Exception e) { + } catch (final Exception e) { throw new RuntimeException("Unable to get MD5 instance"); } } public static Future> resultFor(final T output) { - RpcResult result = Rpcs.getRpcResult(true, output, Collections.emptyList()); + final RpcResult result = RpcResultBuilder.success(output).build(); return Futures.immediateFuture(result); } - /** - * Extracts affected node from data change event. - * @param event - * @return - */ - public static Node getAffectedNode(final AsyncDataChangeEvent, DataObject> event) { - // TODO: expect listener method to be called even when change impact node - // TODO: test with change.getCreatedData() - for (Map.Entry, DataObject> changeEntry : event.getUpdatedData().entrySet()) { - if (isNode(changeEntry)) { - return (Node) changeEntry.getValue(); - } - } - - return null; - } - - private static boolean isNode(final Map.Entry, DataObject> changeEntry ) { - return Node.class.equals(changeEntry.getKey().getTargetType()); - } - /** * Method filters qnames based on wildcard strings * - * @param availableQnames + * @param list * @param patterh matching pattern * @return list of filtered qnames */ - public static List expandQname(final List availableQnames, final Pattern pattern) { - List matchingQnames = new ArrayList<>(); + public static List expandQname(final List list, final Pattern pattern) { + final List matchingQnames = new ArrayList<>(); - for (QName qname : availableQnames) { - String namespace = qname.getNamespace().toString(); + for (final SchemaPath notification : list) { + final String namespace = notification.getLastComponent().getNamespace().toString(); if (pattern.matcher(namespace).matches()) { - matchingQnames.add(qname); + matchingQnames.add(notification); } } - return matchingQnames; } @@ -101,9 +74,9 @@ public final class Util { * @return */ static String wildcardToRegex(final String wildcard){ - StringBuffer s = new StringBuffer(wildcard.length()); + final StringBuffer s = new StringBuffer(wildcard.length()); s.append('^'); - for (char c : wildcard.toCharArray()) { + for (final char c : wildcard.toCharArray()) { switch(c) { case '*': s.append(".*"); diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java new file mode 100644 index 0000000000..13c4221025 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.config.yang.messagebus.app.impl; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.config.api.DependencyResolver; +import org.opendaylight.controller.config.api.DynamicMBeanWithInstance; +import org.osgi.framework.BundleContext; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +public class MessageBusAppImplModuleFactoryTest { + + DependencyResolver dependencyResolverMock; + BundleContext bundleContextMock; + MessageBusAppImplModuleFactory messageBusAppImplModuleFactory; + DynamicMBeanWithInstance dynamicMBeanWithInstanceMock; + + @BeforeClass + public static void initTestClass() throws IllegalAccessException, InstantiationException { + } + + @Before + public void setUp() throws Exception { + dependencyResolverMock = mock(DependencyResolver.class); + bundleContextMock = mock(BundleContext.class); + dynamicMBeanWithInstanceMock = mock(DynamicMBeanWithInstance.class); + messageBusAppImplModuleFactory = new MessageBusAppImplModuleFactory(); + } + + @Test + public void createModuleTest() { + assertNotNull("Module has not been created correctly.", messageBusAppImplModuleFactory.createModule("instanceName1", dependencyResolverMock, bundleContextMock)); + } + + @Test + public void createModuleBTest() throws Exception{ + MessageBusAppImplModule messageBusAppImplModuleMock = mock(MessageBusAppImplModule.class); + doReturn(messageBusAppImplModuleMock).when(dynamicMBeanWithInstanceMock).getModule(); + assertNotNull("Module has not been created correctly.", messageBusAppImplModuleFactory.createModule("instanceName1", dependencyResolverMock, dynamicMBeanWithInstanceMock, bundleContextMock)); + } + +} diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java new file mode 100644 index 0000000000..079436422e --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.config.yang.messagebus.app.impl; + +import com.google.common.util.concurrent.CheckedFuture; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.config.api.DependencyResolver; +import org.opendaylight.controller.config.api.ModuleIdentifier; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; +import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; +import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.sal.core.api.Provider; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.osgi.framework.BundleContext; + +import javax.management.ObjectName; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.notNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doNothing; + +public class MessageBusAppImplModuleTest { + + MessageBusAppImplModule messageBusAppImplModule; + ModuleIdentifier moduleIdentifier; + DependencyResolver dependencyResolverMock; + + @BeforeClass + public static void initTestClass() throws IllegalAccessException, InstantiationException { + } + + @Before + public void setUp() throws Exception { + moduleIdentifier = new ModuleIdentifier("factoryName1", "instanceName1"); + dependencyResolverMock = mock(DependencyResolver.class); + messageBusAppImplModule = new MessageBusAppImplModule(moduleIdentifier, dependencyResolverMock); + } + + @Test + public void constructorTest() { + assertNotNull("Instance has not been created correctly.", messageBusAppImplModule); + } + + @Test + public void constructorBTest() { + MessageBusAppImplModule messageBusAppImplModuleOld = mock(MessageBusAppImplModule.class); + java.lang.AutoCloseable oldInstanceAutocloseableMock = mock(AutoCloseable.class); + MessageBusAppImplModule messageBusAppImplModule = new MessageBusAppImplModule(moduleIdentifier, dependencyResolverMock, messageBusAppImplModuleOld, oldInstanceAutocloseableMock); + assertNotNull("Instance has not been created correctly.", messageBusAppImplModule); + } + + @Test + public void setGetBundleContextTest() { + BundleContext bundleContext = mock(BundleContext.class); + messageBusAppImplModule.setBundleContext(bundleContext); + assertEquals("Set and/or get method/s don't work correctly.", bundleContext, messageBusAppImplModule.getBundleContext()); + } + + @Test + public void createInstanceTest() { + createInstanceTestHelper(); + messageBusAppImplModule.getInstance(); + assertNotNull("AutoCloseable instance has not been created correctly.", messageBusAppImplModule.createInstance()); + } + + private void createInstanceTestHelper(){ + NamespaceToStream namespaceToStream = mock(NamespaceToStream.class); + List listNamespaceToStreamMock = new ArrayList<>(); + listNamespaceToStreamMock.add(namespaceToStream); + messageBusAppImplModule.setNamespaceToStream(listNamespaceToStreamMock); + ObjectName objectName = mock(ObjectName.class); + org.opendaylight.controller.sal.core.api.Broker domBrokerDependency = mock(Broker.class); + org.opendaylight.controller.sal.binding.api.BindingAwareBroker bindingBrokerDependency = mock(BindingAwareBroker.class); + when(dependencyResolverMock.resolveInstance((java.lang.Class) notNull(), (javax.management.ObjectName) notNull(), eq(AbstractMessageBusAppImplModule.domBrokerJmxAttribute))).thenReturn(domBrokerDependency); + when(dependencyResolverMock.resolveInstance((java.lang.Class) notNull(), (javax.management.ObjectName) notNull(), eq(AbstractMessageBusAppImplModule.bindingBrokerJmxAttribute))).thenReturn(bindingBrokerDependency); + messageBusAppImplModule.setBindingBroker(objectName); + messageBusAppImplModule.setDomBroker(objectName); + BindingAwareBroker.ProviderContext providerContextMock = mock(BindingAwareBroker.ProviderContext.class); + doReturn(providerContextMock).when(bindingBrokerDependency).registerProvider(any(BindingAwareProvider.class)); + Broker.ProviderSession providerSessionMock = mock(Broker.ProviderSession.class); + doReturn(providerSessionMock).when(domBrokerDependency).registerProvider(any(Provider.class)); + + DataBroker dataBrokerMock = mock(DataBroker.class); + doReturn(dataBrokerMock).when(providerContextMock).getSALService(DataBroker.class); + RpcProviderRegistry rpcProviderRegistryMock = mock(RpcProviderRegistry.class); + doReturn(rpcProviderRegistryMock).when(providerContextMock).getSALService(RpcProviderRegistry.class); + BindingAwareBroker.RpcRegistration rpcRegistrationMock = mock(BindingAwareBroker.RpcRegistration.class); + doReturn(rpcRegistrationMock).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class)); + EventSourceService eventSourceServiceMock = mock(EventSourceService.class); + doReturn(eventSourceServiceMock).when(rpcProviderRegistryMock).getRpcService(EventSourceService.class); + + WriteTransaction writeTransactionMock = mock(WriteTransaction.class); + doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction(); + doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class)); + CheckedFuture checkedFutureMock = mock(CheckedFuture.class); + doReturn(checkedFutureMock).when(writeTransactionMock).submit(); + } +} diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java new file mode 100644 index 0000000000..5e26213646 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.app.impl; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.any; + +public class EventSourceTopicTest { + + EventSourceTopic eventSourceTopic; + org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node dataObjectMock; + NodeId nodeIdMock; + EventSourceService eventSourceServiceMock; + + @BeforeClass + public static void initTestClass() throws IllegalAccessException, InstantiationException { + } + + @Before + public void setUp() throws Exception { + NotificationPattern notificationPattern = new NotificationPattern("value1"); + eventSourceServiceMock = mock(EventSourceService.class); + eventSourceTopic = new EventSourceTopic(notificationPattern, "nodeIdPattern1", eventSourceServiceMock); + } + + @Test + public void createModuleTest() { + assertNotNull("Instance has not been created correctly.", eventSourceTopic); + } + + @Test + public void getTopicIdTest() { + assertNotNull("Topic has not been created correctly.", eventSourceTopic.getTopicId()); + } + + @Test + public void onDataChangedTest() { + AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class); + onDataChangedTestHelper(asyncDataChangeEventMock); + eventSourceTopic.onDataChanged(asyncDataChangeEventMock); + verify(dataObjectMock, times(1)).getId(); + verify(nodeIdMock, times(1)).getValue(); + } + + private void onDataChangedTestHelper(AsyncDataChangeEvent asyncDataChangeEventMock){ + Map, DataObject> map = new HashMap<>(); + InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class); + dataObjectMock = mock(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class); + map.put(instanceIdentifierMock, dataObjectMock); + doReturn(map).when(asyncDataChangeEventMock).getUpdatedData(); + + nodeIdMock = mock(NodeId.class); + doReturn(nodeIdMock).when(dataObjectMock).getId(); + doReturn("0").when(nodeIdMock).getValue(); + } + + @Test + public void notifyNodeTest() { + InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class); + eventSourceTopic.notifyNode(instanceIdentifierMock); + verify(eventSourceServiceMock, times(1)).joinTopic(any(JoinTopicInput.class)); + } + +} diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java new file mode 100644 index 0000000000..c2f6ef54bf --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.app.impl; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; +import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.Pattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.eq; + +public class EventSourceTopologyTest { + + EventSourceTopology eventSourceTopology; + DataBroker dataBrokerMock; + RpcProviderRegistry rpcProviderRegistryMock; + CreateTopicInput createTopicInputMock; + ListenerRegistration listenerRegistrationMock; + NodeKey nodeKey; + + @BeforeClass + public static void initTestClass() throws IllegalAccessException, InstantiationException { + } + + @Before + public void setUp() throws Exception { + dataBrokerMock = mock(DataBroker.class); + rpcProviderRegistryMock = mock(RpcProviderRegistry.class); + + } + + @Test + public void constructorTest() { + constructorTestHelper(); + eventSourceTopology = new EventSourceTopology(dataBrokerMock, rpcProviderRegistryMock); + assertNotNull("Instance has not been created correctly.", eventSourceTopology); + } + + private void constructorTestHelper(){ + WriteTransaction writeTransactionMock = mock(WriteTransaction.class); + doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction(); + doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class)); + CheckedFuture checkedFutureMock = mock(CheckedFuture.class); + doReturn(checkedFutureMock).when(writeTransactionMock).submit(); + } + + @Test + public void createTopicTest() throws Exception{ + createTopicTestHelper(); + assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock)); + } + + private void createTopicTestHelper() throws Exception{ + constructorTestHelper(); + createTopicInputMock = mock(CreateTopicInput.class); + eventSourceTopology = new EventSourceTopology(dataBrokerMock, rpcProviderRegistryMock); + + NotificationPattern notificationPattern = new NotificationPattern("value1"); + doReturn(notificationPattern).when(createTopicInputMock).getNotificationPattern(); + Pattern pattern = new Pattern("valuePattern1"); + doReturn(pattern).when(createTopicInputMock).getNodeIdPattern(); + + listenerRegistrationMock = mock(ListenerRegistration.class); + doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), + any(InstanceIdentifier.class), + any(EventSourceTopic.class), + eq(DataBroker.DataChangeScope.SUBTREE)); + + ReadOnlyTransaction readOnlyTransactionMock = mock(ReadOnlyTransaction.class); + doReturn(readOnlyTransactionMock).when(dataBrokerMock).newReadOnlyTransaction(); + + CheckedFuture checkedFutureMock = mock(CheckedFuture.class); + doReturn(checkedFutureMock).when(readOnlyTransactionMock).read(eq(LogicalDatastoreType.OPERATIONAL), + any(InstanceIdentifier.class)); + Optional optionalMock = mock(Optional.class); + doReturn(optionalMock).when(checkedFutureMock).checkedGet(); + doReturn(true).when(optionalMock).isPresent(); + + Topology topologyMock = mock(Topology.class); + doReturn(topologyMock).when(optionalMock).get(); + Node nodeMock = mock(Node.class); + List nodeList = new ArrayList<>(); + nodeList.add(nodeMock); + doReturn(nodeList).when(topologyMock).getNode(); + + NodeId nodeId = new NodeId("nodeIdValue1"); + doReturn(nodeId).when(nodeMock).getNodeId(); + } + + @Test + public void destroyTopicTest() throws Exception{ + createTopicTestHelper(); + DestroyTopicInput destroyTopicInput = null; + assertNotNull("Instance has not been created correctly.", eventSourceTopology.destroyTopic(destroyTopicInput)); + } + + @Test + public void closeTest() throws Exception{ + BindingAwareBroker.RpcRegistration rpcRegistrationMock = mock(BindingAwareBroker.RpcRegistration.class); + doReturn(rpcRegistrationMock).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class)); + doNothing().when(rpcRegistrationMock).close(); + createTopicTestHelper(); + eventSourceTopology.createTopic(createTopicInputMock); + eventSourceTopology.close(); + verify(rpcRegistrationMock, times(1)).close(); + } + + @Test + public void registerTest() throws Exception { + createTopicTestHelper(); + Node nodeMock = mock(Node.class); + NetconfEventSource netconfEventSourceMock = mock(NetconfEventSource.class); + + NodeId nodeId = new NodeId("nodeIdValue1"); + nodeKey = new NodeKey(nodeId); + doReturn(nodeKey).when(nodeMock).getKey(); + + BindingAwareBroker.RoutedRpcRegistration routedRpcRegistrationMock = mock(BindingAwareBroker.RoutedRpcRegistration.class); + doReturn(routedRpcRegistrationMock).when(rpcProviderRegistryMock).addRoutedRpcImplementation(EventSourceService.class, netconfEventSourceMock); + eventSourceTopology.register(nodeMock, netconfEventSourceMock); + verify(routedRpcRegistrationMock, times(1)).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class)); + } + +} diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java new file mode 100644 index 0000000000..911c5db1c1 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.app.impl; + +import com.google.common.base.Optional; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream; +import org.opendaylight.controller.md.sal.binding.api.BindingService; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.MountPoint; +import org.opendaylight.controller.md.sal.binding.api.MountPointService; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.notNull; + +public class NetconfEventSourceManagerTest { + + NetconfEventSourceManager netconfEventSourceManager; + ListenerRegistration listenerRegistrationMock; + DOMMountPointService domMountPointServiceMock; + MountPointService mountPointServiceMock; + EventSourceTopology eventSourceTopologyMock; + AsyncDataChangeEvent asyncDataChangeEventMock; + + @BeforeClass + public static void initTestClass() throws IllegalAccessException, InstantiationException { + } + + @Before + public void setUp() throws Exception { + DataBroker dataBrokerMock = mock(DataBroker.class); + DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class); + domMountPointServiceMock = mock(DOMMountPointService.class); + mountPointServiceMock = mock(MountPointService.class); + eventSourceTopologyMock = mock(EventSourceTopology.class); + List namespaceToStreamList = new ArrayList<>(); + + listenerRegistrationMock = mock(ListenerRegistration.class); + doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(AsyncDataBroker.DataChangeScope.SUBTREE)); + + netconfEventSourceManager = new NetconfEventSourceManager(dataBrokerMock, domNotificationPublishServiceMock, domMountPointServiceMock, + mountPointServiceMock, eventSourceTopologyMock, namespaceToStreamList); + } + + @Test + public void constructorTest() { + assertNotNull("Instance has not been created correctly.", netconfEventSourceManager); + } + + @Test + public void onDataChangedTest() { + AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class); + Map map = new HashMap<>(); + InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class); + Node dataObjectMock = mock(Node.class); + map.put(instanceIdentifierMock, dataObjectMock); + doReturn(map).when(asyncDataChangeEventMock).getCreatedData(); + doReturn(map).when(asyncDataChangeEventMock).getUpdatedData(); + netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); + verify(dataObjectMock, times(2)).getAugmentation(NetconfNode.class); + } + + @Test + public void onDataChangedCreateEventSourceTest() { + onDataChangedCreateEventSourceTestHelper(); + netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); + verify(eventSourceTopologyMock, times(1)).register(any(Node.class), any(NetconfEventSource.class)); + } + + private void onDataChangedCreateEventSourceTestHelper(){ + asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class); + Map map = new HashMap<>(); + InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class); + Node dataObjectMock = mock(Node.class); + map.put(instanceIdentifierMock, dataObjectMock); + doReturn(map).when(asyncDataChangeEventMock).getCreatedData(); + doReturn(map).when(asyncDataChangeEventMock).getUpdatedData(); + + NetconfNode netconfNodeMock = mock(NetconfNode.class); + AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class); + doReturn(netconfNodeMock).when(dataObjectMock).getAugmentation(NetconfNode.class); + doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities(); + List availableCapabilityList = new ArrayList<>(); + availableCapabilityList.add("(urn:ietf:params:xml:ns:netconf:notification_availableCapabilityString1"); + doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability(); + + doReturn(NetconfNodeFields.ConnectionStatus.Connected).when(netconfNodeMock).getConnectionStatus(); + + Optional optionalMock = mock(Optional.class); + Optional optionalBindingMountMock = mock(Optional.class); + NodeId nodeId = new NodeId("nodeId1"); + doReturn(nodeId).when(dataObjectMock).getNodeId(); + doReturn(optionalMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull()); + doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class)); + doReturn(true).when(optionalMock).isPresent(); + doReturn(true).when(optionalBindingMountMock).isPresent(); + + DOMMountPoint domMountPointMock = mock(DOMMountPoint.class); + MountPoint mountPointMock = mock(MountPoint.class); + doReturn(domMountPointMock).when(optionalMock).get(); + doReturn(mountPointMock).when(optionalBindingMountMock).get(); + + RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class); + Optional onlyOptionalMock = (Optional) mock(Optional.class); + NotificationsService notificationsServiceMock = mock(NotificationsService.class); + + doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class); + doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get(); + doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class); + } + + @Test + public void isEventSourceTest() { + Node nodeMock = mock(Node.class); + NetconfNode netconfNodeMock = mock(NetconfNode.class); + AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class); + doReturn(netconfNodeMock).when(nodeMock).getAugmentation(NetconfNode.class); + doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities(); + List availableCapabilityList = new ArrayList<>(); + availableCapabilityList.add("(urn:ietf:params:xml:ns:netconf:notification_availableCapabilityString1"); + doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability(); + assertTrue("Method has not been run correctly.", netconfEventSourceManager.isEventSource(nodeMock)); + } + + @Test + public void isNotEventSourceTest() { + Node nodeMock = mock(Node.class); + NetconfNode netconfNodeMock = mock(NetconfNode.class); + AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class); + doReturn(netconfNodeMock).when(nodeMock).getAugmentation(NetconfNode.class); + doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities(); + List availableCapabilityList = new ArrayList<>(); + availableCapabilityList.add("availableCapabilityString1"); + doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability(); + assertFalse("Method has not been run correctly.", netconfEventSourceManager.isEventSource(nodeMock)); + } + + @Test + public void closeTest() { + netconfEventSourceManager.close(); + verify(listenerRegistrationMock, times(1)).close(); + } +} diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java new file mode 100644 index 0000000000..73117c12ba --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.app.impl; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.md.sal.binding.api.BindingService; +import org.opendaylight.controller.md.sal.binding.api.MountPoint; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMNotification; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +import org.opendaylight.yangtools.yang.common.QName; + +import java.lang.reflect.Field; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; + +public class NetconfEventSourceTest { + + NetconfEventSource netconfEventSource; + DOMMountPoint domMountPointMock; + JoinTopicInput joinTopicInputMock; + + @BeforeClass + public static void initTestClass() throws IllegalAccessException, InstantiationException { + } + + @Before + public void setUp() throws Exception { + Map streamMap = new HashMap<>(); + streamMap.put("string1", "string2"); + domMountPointMock = mock(DOMMountPoint.class); + DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class); + MountPoint mountPointMock = mock(MountPoint.class); + + RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class); + Optional onlyOptionalMock = (Optional) mock(Optional.class); + NotificationsService notificationsServiceMock = mock(NotificationsService.class); + + doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class); + doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get(); + doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class); + netconfEventSource = new NetconfEventSource("nodeId1", streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock); + } + + @Test + public void constructorTest() { + assertNotNull("Instance has not been created correctly.", netconfEventSource); + } + + @Test + public void joinTopicTest() throws Exception{ + joinTopicTestHelper(); + assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock)); + } + + private void joinTopicTestHelper() throws Exception{ + joinTopicInputMock = mock(JoinTopicInput.class); + NotificationPattern notificationPatternMock = mock(NotificationPattern.class); + doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern(); + doReturn("regexString1").when(notificationPatternMock).getValue(); + + SchemaContext schemaContextMock = mock(SchemaContext.class); + doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext(); + Set notificationDefinitionSet = new HashSet<>(); + NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class); + notificationDefinitionSet.add(notificationDefinitionMock); + + URI uri = new URI("uriStr1"); + QName qName = new QName(uri, "localName1"); + org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName); + doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications(); + doReturn(schemaPath).when(notificationDefinitionMock).getPath(); + + Optional domNotificationServiceOptionalMock = (Optional) mock(Optional.class); + doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class); + doReturn(true).when(domNotificationServiceOptionalMock).isPresent(); + + DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class); + doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get(); + ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class); + doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class)); + } + + @Test (expected=NullPointerException.class) + public void onNotificationTest() { + DOMNotification domNotificationMock = mock(DOMNotification.class); + ContainerNode containerNodeMock = mock(ContainerNode.class); + SchemaContext schemaContextMock = mock(SchemaContext.class); + SchemaPath schemaPathMock = mock(SchemaPath.class); + doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext(); + doReturn(schemaPathMock).when(domNotificationMock).getType(); + doReturn(containerNodeMock).when(domNotificationMock).getBody(); + netconfEventSource.onNotification(domNotificationMock); + } + + @Test + public void onDataChangedTest() { + InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class) + .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class); + AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class); + NetconfNode dataObjectMock = mock(NetconfNode.class); + Map dataChangeMap = new HashMap<>(); + dataChangeMap.put(brmIdent, dataObjectMock); + doReturn(dataChangeMap).when(asyncDataChangeEventMock).getOriginalData(); + doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData(); + + netconfEventSource.onDataChanged(asyncDataChangeEventMock); + verify(dataObjectMock, times(2)).isConnected(); + } + + @Test + public void onDataChangedResubscribeTest() throws Exception{ + InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class) + .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class); + AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class); + NetconfNode dataObjectMock = mock(NetconfNode.class); + Map dataChangeMap = new HashMap<>(); + dataChangeMap.put(brmIdent, dataObjectMock); + doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData(); + doReturn(true).when(dataObjectMock).isConnected(); + + Set localSet = getActiveStreams(); + localSet.add("activeStream1"); + + Optional optionalMock = (Optional) mock(Optional.class); + doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class); + DOMRpcService domRpcServiceMock = mock(DOMRpcService.class); + doReturn(domRpcServiceMock).when(optionalMock).get(); + CheckedFuture checkedFutureMock = mock(CheckedFuture.class); + doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class)); + + netconfEventSource.onDataChanged(asyncDataChangeEventMock); + verify(dataObjectMock, times(1)).isConnected(); + assertEquals("Size of set has not been set correctly.", 1, getActiveStreams().size()); + } + + private Set getActiveStreams() throws Exception{ + Field nesField = NetconfEventSource.class.getDeclaredField("activeStreams"); + nesField.setAccessible(true); + return (Set) nesField.get(netconfEventSource); + } + +} diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java new file mode 100644 index 0000000000..4872127e83 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.app.impl; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; + +public class TopicDOMNotificationTest { + + ContainerNode containerNodeBodyMock; + TopicDOMNotification topicDOMNotification; + + @BeforeClass + public static void initTestClass() throws IllegalAccessException, InstantiationException { + } + + @Before + public void setUp() throws Exception { + containerNodeBodyMock = mock(ContainerNode.class); + topicDOMNotification = new TopicDOMNotification(containerNodeBodyMock); + } + + @Test + public void constructorTest() { + assertNotNull("Instance has not been created correctly.", topicDOMNotification); + } + + @Test + public void getTypeTest() { + SchemaPath TOPIC_NOTIFICATION_ID = SchemaPath.create(true, TopicNotification.QNAME); + assertEquals("Type has not been created correctly.", TOPIC_NOTIFICATION_ID, topicDOMNotification.getType()); + } + + @Test + public void getBodyTest() { + assertEquals("String has not been created correctly.", containerNodeBodyMock, topicDOMNotification.getBody()); + } + + @Test + public void getToStringTest() { + String bodyString = "TopicDOMNotification [body=" + containerNodeBodyMock + "]"; + assertEquals("String has not been created correctly.", bodyString, topicDOMNotification.toString()); + } +} diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/UtilTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/UtilTest.java new file mode 100644 index 0000000000..2ff4654155 --- /dev/null +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/UtilTest.java @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.app.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; + +import org.junit.Test; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +public class UtilTest { + + @Test + public void testMD5Hash() throws Exception { + // empty string + createAndAssertHash("", "d41d8cd98f00b204e9800998ecf8427e"); + + // non-empty string + createAndAssertHash("The Guardian", "69b929ae473ed732d5fb8e0a55a8dc8d"); + + // the same hash for the same string + createAndAssertHash("The Independent", "db793706d70c37dcc16454fa8eb21b1c"); + createAndAssertHash("The Independent", "db793706d70c37dcc16454fa8eb21b1c"); // one more time + + // different strings must have different hashes + createAndAssertHash("orange", "fe01d67a002dfa0f3ac084298142eccd"); + createAndAssertHash("yellow", "d487dd0b55dfcacdd920ccbdaeafa351"); + } + + //TODO: IllegalArgumentException would be better + @Test(expected = RuntimeException.class) + public void testMD5HashInvalidInput() throws Exception { + Util.md5String(null); + } + + @Test + public void testWildcardToRegex() throws Exception { + // empty wildcard string + createAndAssertRegex("", "^$"); + + // wildcard string is a char to be replaced + createAndAssertRegex("*", "^.*$"); + createAndAssertRegex("?", "^.$"); + final String relevantChars = "()[]$^.{}|\\"; + for (final char c : relevantChars.toCharArray()) { + final char oneChar[] = {c}; + final String wildcardStr = new String(oneChar); + final String expectedRegex = "^\\" + c + "$"; + createAndAssertRegex(wildcardStr, expectedRegex); + } + + // wildcard string consists of more chars + createAndAssertRegex("a", "^a$"); + createAndAssertRegex("aBc", "^aBc$"); + createAndAssertRegex("a1b2C34", "^a1b2C34$"); + createAndAssertRegex("*?()[]$^.{}|\\X", "^.*.\\(\\)\\[\\]\\$\\^\\.\\{\\}\\|\\\\X$"); + createAndAssertRegex("a*BB?37|42$", "^a.*BB.37\\|42\\$$"); + } + + @Test + public void testResultFor() throws Exception { + { + final String expectedResult = "dummy string"; + RpcResult rpcResult = Util.resultFor(expectedResult).get(); + assertEquals(expectedResult, rpcResult.getResult()); + assertTrue(rpcResult.isSuccessful()); + assertTrue(rpcResult.getErrors().isEmpty()); + } + { + final Integer expectedResult = 42; + RpcResult rpcResult = Util.resultFor(expectedResult).get(); + assertEquals(expectedResult, rpcResult.getResult()); + assertTrue(rpcResult.isSuccessful()); + assertTrue(rpcResult.getErrors().isEmpty()); + } + } + + @Test + public void testExpandQname() throws Exception { + // match no path because the list of the allowed paths is empty + { + final List paths = new ArrayList<>(); + final Pattern regexPattern = Pattern.compile(".*"); // match everything + final List matchingPaths = Util.expandQname(paths, regexPattern); + assertTrue(matchingPaths.isEmpty()); + } + + // match no path because of regex pattern + { + final List paths = createSchemaPathList(); + final Pattern regexPattern = Pattern.compile("^@.*"); + final List matchingPaths = Util.expandQname(paths, regexPattern); + assertTrue(matchingPaths.isEmpty()); + } + + // match all paths + { + final List paths = createSchemaPathList(); + final Pattern regexPattern = Pattern.compile(".*"); + final List matchingPaths = Util.expandQname(paths, regexPattern); + assertTrue(matchingPaths.contains(paths.get(0))); + assertTrue(matchingPaths.contains(paths.get(1))); + assertEquals(paths.size(), matchingPaths.size()); + } + + // match one path only + { + final List paths = createSchemaPathList(); + final Pattern regexPattern = Pattern.compile(".*yyy$"); + final List matchingPaths = Util.expandQname(paths, regexPattern); + assertTrue(matchingPaths.contains(paths.get(1))); + assertEquals(1, matchingPaths.size()); + } + } + + private static void createAndAssertHash(final String inString, final String expectedHash) { + assertEquals("Incorrect hash.", expectedHash, Util.md5String(inString)); + } + + private static void createAndAssertRegex(final String wildcardStr, final String expectedRegex) { + assertEquals("Incorrect regex string.", expectedRegex, Util.wildcardToRegex(wildcardStr)); + } + + private static List createSchemaPathList() { + final QName qname1 = QName.create("urn:odl:xxx", "2015-01-01", "localName"); + final QName qname2 = QName.create("urn:odl:yyy", "2015-01-01", "localName"); + final SchemaPath path1 = SchemaPath.create(true, qname1); + final SchemaPath path2 = SchemaPath.create(true, qname2); + return Arrays.asList(path1, path2); + } +} diff --git a/opendaylight/md-sal/pom.xml b/opendaylight/md-sal/pom.xml index eca5213905..bf30a16842 100644 --- a/opendaylight/md-sal/pom.xml +++ b/opendaylight/md-sal/pom.xml @@ -84,6 +84,7 @@ messagebus-api messagebus-impl + messagebus-config @@ -150,7 +151,7 @@ - + @@ -234,4 +235,4 @@ - + \ No newline at end of file