From 277612ebea9b441977cdb8460b2e76090df6f9e8 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Thu, 13 Aug 2015 11:06:22 +0200 Subject: [PATCH] Decouple message bus from netconf connector Change-Id: I6a143e868adc1e5c7a9b114798e7009bb6ef8675 Signed-off-by: Maros Marsalek Signed-off-by: Tomas Cere --- .../mdsal/src/main/resources/features.xml | 5 +- opendaylight/md-sal/mdsal-artifacts/pom.xml | 5 + .../main/resources/initial/05-message-bus.xml | 14 +- opendaylight/md-sal/messagebus-impl/pom.xml | 10 +- .../app/impl/MessageBusAppImplModule.java | 61 +--- .../app/impl/EventSourceTopology.java | 1 + .../netconf/NetconfEventSource.java | 340 ------------------ .../netconf/NetconfEventSourceManager.java | 213 ----------- .../NetconfEventSourceRegistration.java | 196 ---------- .../StreamNotificationTopicRegistration.java | 204 ----------- .../src/main/yang/messagebus-app-impl.yang | 31 +- .../NetconfEventSourceManagerTest.java | 177 --------- .../netconf/NetconfEventSourceTest.java | 141 -------- .../netconf/NetconfTestUtils.java | 95 ----- opendaylight/md-sal/messagebus-util/pom.xml | 55 +++ .../messagebus/app/util}/Providers.java | 2 +- .../app/util}/TopicDOMNotification.java | 2 +- .../controller/messagebus/app/util}/Util.java | 2 +- .../app/util}/TopicDOMNotificationTest.java | 2 +- .../messagebus/app/util}/UtilTest.java | 2 +- opendaylight/md-sal/pom.xml | 1 + .../src/main/resources/features.xml | 8 + .../netconf/messagebus-netconf/pom.xml | 135 +++++++ .../netconf/MessageBusNetconfModule.java | 39 ++ .../MessageBusNetconfModuleFactory.java | 13 + ...nnectionNotificationTopicRegistration.java | 84 ++--- .../netconf/NetconfEventSource.java | 340 ++++++++++++++++++ .../netconf/NetconfEventSourceManager.java | 207 +++++++++++ .../NetconfEventSourceRegistration.java | 191 ++++++++++ .../NotificationTopicRegistration.java | 15 +- .../StreamNotificationTopicRegistration.java | 203 +++++++++++ .../resources/initial/06-message-netconf.xml | 44 +++ .../src/main/yang/messagebus-netconf.yang | 68 ++++ .../NetconfEventSourceManagerTest.java | 173 +++++++++ .../netconf/NetconfEventSourceTest.java | 139 +++++++ .../netconf/NetconfTestUtils.java | 87 +++++ .../netconf/netconf-artifacts/pom.xml | 5 + opendaylight/netconf/pom.xml | 1 + 38 files changed, 1776 insertions(+), 1535 deletions(-) delete mode 100644 opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java delete mode 100644 opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java delete mode 100644 opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java delete mode 100644 opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java delete mode 100644 opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java delete mode 100644 opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java delete mode 100644 opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java create mode 100644 opendaylight/md-sal/messagebus-util/pom.xml rename opendaylight/md-sal/{messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl => messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util}/Providers.java (95%) rename opendaylight/md-sal/{messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl => messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util}/TopicDOMNotification.java (95%) rename opendaylight/md-sal/{messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl => messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util}/Util.java (97%) rename opendaylight/md-sal/{messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl => messagebus-util/src/test/java/org/opendaylight/controller/messagebus/app/util}/TopicDOMNotificationTest.java (97%) rename opendaylight/md-sal/{messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl => messagebus-util/src/test/java/org/opendaylight/controller/messagebus/app/util}/UtilTest.java (98%) create mode 100644 opendaylight/netconf/messagebus-netconf/pom.xml create mode 100644 opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModule.java create mode 100644 opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModuleFactory.java rename opendaylight/{md-sal/messagebus-impl => netconf/messagebus-netconf}/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java (77%) create mode 100644 opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java create mode 100644 opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java create mode 100644 opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java rename opendaylight/{md-sal/messagebus-impl => netconf/messagebus-netconf}/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java (91%) create mode 100644 opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java create mode 100644 opendaylight/netconf/messagebus-netconf/src/main/resources/initial/06-message-netconf.xml create mode 100644 opendaylight/netconf/messagebus-netconf/src/main/yang/messagebus-netconf.yang create mode 100644 opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java create mode 100644 opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java create mode 100644 opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java diff --git a/features/mdsal/src/main/resources/features.xml b/features/mdsal/src/main/resources/features.xml index d5723b5c4f..01820510a9 100644 --- a/features/mdsal/src/main/resources/features.xml +++ b/features/mdsal/src/main/resources/features.xml @@ -97,13 +97,12 @@ mvn:org.opendaylight.controller.samples/clustering-it-config/${project.version}/xml/config - - - + mvn:org.opendaylight.controller.model/model-inventory/${mdsal.version} odl-mdsal-broker mvn:org.opendaylight.controller/messagebus-api/${project.version} mvn:org.opendaylight.controller/messagebus-spi/${project.version} + mvn:org.opendaylight.controller/messagebus-util/${project.version} mvn:org.opendaylight.controller/messagebus-impl/${project.version} mvn:org.opendaylight.controller/messagebus-config/${project.version}/xml/config diff --git a/opendaylight/md-sal/mdsal-artifacts/pom.xml b/opendaylight/md-sal/mdsal-artifacts/pom.xml index 05cff8141c..61ba3cc1a2 100644 --- a/opendaylight/md-sal/mdsal-artifacts/pom.xml +++ b/opendaylight/md-sal/mdsal-artifacts/pom.xml @@ -327,6 +327,11 @@ messagebus-impl ${project.version} + + org.opendaylight.controller + messagebus-util + ${project.version} + diff --git a/opendaylight/md-sal/messagebus-config/src/main/resources/initial/05-message-bus.xml b/opendaylight/md-sal/messagebus-config/src/main/resources/initial/05-message-bus.xml index 4714c075cb..3e00d93940 100644 --- a/opendaylight/md-sal/messagebus-config/src/main/resources/initial/05-message-bus.xml +++ b/opendaylight/md-sal/messagebus-config/src/main/resources/initial/05-message-bus.xml @@ -14,21 +14,9 @@ messagebus-app-impl binding-impl:messagebus-app-impl - md-sal-binding:binding-broker-osgi-registry + prefix:binding-broker-osgi-registry binding-osgi-broker - - dom:dom-broker-osgi-registry - dom-broker - - - urn:ietf:params:xml:ns:yang:smiv2 - SNMP - - - urn:ietf:params:xml:ns:yang:ietf-syslog-notification - SYSLOG - diff --git a/opendaylight/md-sal/messagebus-impl/pom.xml b/opendaylight/md-sal/messagebus-impl/pom.xml index 297056858e..108a295bb7 100644 --- a/opendaylight/md-sal/messagebus-impl/pom.xml +++ b/opendaylight/md-sal/messagebus-impl/pom.xml @@ -46,17 +46,15 @@ and is available at http://www.eclipse.org/legal/epl-v10.html org.opendaylight.controller messagebus-api - 1.3.0-SNAPSHOT + + + org.opendaylight.controller + messagebus-util org.opendaylight.controller messagebus-spi - 1.3.0-SNAPSHOT - - - - org.opendaylight.controller sal-binding-config diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java index 10dd9eabf3..a4e5514211 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java @@ -7,29 +7,17 @@ */ package org.opendaylight.controller.config.yang.messagebus.app.impl; -import java.util.HashSet; -import java.util.Set; - import org.opendaylight.controller.config.api.DependencyResolver; import org.opendaylight.controller.config.api.ModuleIdentifier; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.MountPointService; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; -import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology; -//import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager; -import org.opendaylight.controller.messagebus.spi.EventSource; -import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; -import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; +import org.opendaylight.controller.messagebus.app.util.Providers; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; -import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.osgi.framework.BundleContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - public class MessageBusAppImplModule extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule { private static final Logger LOGGER = LoggerFactory.getLogger(MessageBusAppImplModule.class); @@ -58,55 +46,12 @@ public class MessageBusAppImplModule extends org.opendaylight.controller.config. @Override public java.lang.AutoCloseable createInstance() { - final ProviderContext bindingCtx = getBindingBrokerDependency().registerProvider(new Providers.BindingAware()); - final ProviderSession domCtx = getDomBrokerDependency().registerProvider(new Providers.BindingIndependent()); final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class); - final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class); - final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class); final RpcProviderRegistry rpcRegistry = bindingCtx.getSALService(RpcProviderRegistry.class); - final MountPointService mountPointService = bindingCtx.getSALService(MountPointService.class); - final EventSourceRegistryWrapper eventSourceRegistryWrapper = new EventSourceRegistryWrapper(new EventSourceTopology(dataBroker, rpcRegistry)); -// final NetconfEventSourceManager netconfEventSourceManager -// = NetconfEventSourceManager.create(dataBroker, -// domPublish, -// domMount, -// mountPointService, -// eventSourceRegistryWrapper, -// getNamespaceToStream()); -// eventSourceRegistryWrapper.addAutoCloseable(netconfEventSourceManager); + final EventSourceTopology eventSourceTopology = new EventSourceTopology(dataBroker, rpcRegistry); LOGGER.info("Messagebus initialized"); - return eventSourceRegistryWrapper; - + return eventSourceTopology; } - //TODO: separate NetconfEventSource into separate bundle, remove this wrapper, return EventSourceTopology directly as EventSourceRegistry - private class EventSourceRegistryWrapper implements EventSourceRegistry{ - - private final EventSourceRegistry baseEventSourceRegistry; - private final Set autoCloseables = new HashSet<>(); - - public EventSourceRegistryWrapper(EventSourceRegistry baseEventSourceRegistry) { - this.baseEventSourceRegistry = baseEventSourceRegistry; - } - - public void addAutoCloseable(AutoCloseable ac){ - Preconditions.checkNotNull(ac); - autoCloseables.add(ac); - } - - @Override - public void close() throws Exception { - for(AutoCloseable ac : autoCloseables){ - ac.close(); - } - baseEventSourceRegistry.close(); - } - - @Override - public EventSourceRegistration registerEventSource(T eventSource) { - return this.baseEventSourceRegistry.registerEventSource(eventSource); - } - - } } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index 3aa470b10a..b79d12b9d6 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -16,6 +16,7 @@ import java.util.concurrent.Future; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.messagebus.app.util.Util; import org.opendaylight.controller.messagebus.spi.EventSource; import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java deleted file mode 100644 index 5eb32d64e7..0000000000 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java +++ /dev/null @@ -1,340 +0,0 @@ -///* -// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. -// * -// * This program and the accompanying materials are made available under the -// * terms of the Eclipse Public License v1.0 which accompanies this distribution, -// * and is available at http://www.eclipse.org/legal/epl-v10.html -// */ -// -//package org.opendaylight.controller.messagebus.eventsources.netconf; -// -//import static com.google.common.util.concurrent.Futures.immediateFuture; -// -//import java.io.IOException; -//import java.util.ArrayList; -//import java.util.Date; -//import java.util.HashMap; -//import java.util.List; -//import java.util.Map; -//import java.util.Set; -//import java.util.concurrent.Future; -//import java.util.regex.Pattern; -// -//import javax.xml.stream.XMLStreamException; -//import javax.xml.transform.dom.DOMResult; -//import javax.xml.transform.dom.DOMSource; -// -//import org.opendaylight.controller.md.sal.binding.api.DataBroker; -//import org.opendaylight.controller.md.sal.binding.api.MountPoint; -//import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -//import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -//import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -//import org.opendaylight.controller.md.sal.dom.api.DOMEvent; -//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; -//import org.opendaylight.controller.md.sal.dom.api.DOMNotification; -//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; -//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; -//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; -//import org.opendaylight.controller.messagebus.app.impl.TopicDOMNotification; -//import org.opendaylight.controller.messagebus.app.impl.Util; -//import org.opendaylight.controller.messagebus.spi.EventSource; -//import org.opendaylight.controller.config.util.xml.XmlUtil; -//import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; -//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; -//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; -//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification; -//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; -//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; -//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder; -//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus; -//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; -//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -//import org.opendaylight.yangtools.yang.common.QName; -//import org.opendaylight.yangtools.yang.common.RpcResult; -//import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; -//import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode; -//import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -//import org.opendaylight.yangtools.yang.data.impl.schema.Builders; -//import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -//import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; -//import org.opendaylight.yangtools.yang.model.api.SchemaContext; -//import org.opendaylight.yangtools.yang.model.api.SchemaPath; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -//import org.w3c.dom.Document; -//import org.w3c.dom.Element; -// -//import com.google.common.base.Optional; -//import com.google.common.base.Preconditions; -//import com.google.common.base.Throwables; -//import com.google.common.util.concurrent.CheckedFuture; -// -//public class NetconfEventSource implements EventSource, DOMNotificationListener { -// -// private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class); -// -// private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME); -// private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id")); -// private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "topic-id")); -// private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload")); -// private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource"; -// -// private final String nodeId; -// private final Node node; -// -// private final DOMMountPoint netconfMount; -// private final MountPoint mountPoint; -// private final DOMNotificationPublishService domPublish; -// -// private final Map urnPrefixToStreamMap; // key = urnPrefix, value = StreamName -// private final List notificationTopicRegistrationList = new ArrayList<>(); -// -// public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, final MountPoint mountPoint, final DOMNotificationPublishService publishService) { -// this.netconfMount = Preconditions.checkNotNull(netconfMount); -// this.mountPoint = Preconditions.checkNotNull(mountPoint); -// this.node = Preconditions.checkNotNull(node); -// this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap); -// this.domPublish = Preconditions.checkNotNull(publishService); -// this.nodeId = node.getNodeId().getValue(); -// this.initializeNotificationTopicRegistrationList(); -// -// LOG.info("NetconfEventSource [{}] created.", this.nodeId); -// } -// -// private void initializeNotificationTopicRegistrationList() { -// notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this)); -// Optional> streamMap = getAvailableStreams(); -// if(streamMap.isPresent()){ -// LOG.debug("Stream configuration compare..."); -// for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) { -// final String streamName = this.urnPrefixToStreamMap.get(urnPrefix); -// LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName); -// if(streamMap.get().containsKey(streamName)){ -// LOG.debug("Stream containig on device"); -// notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this)); -// } -// } -// } -// } -// -// private Optional> getAvailableStreams(){ -// -// Map streamMap = null; -// InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); -// Optional dataBroker = this.mountPoint.getService(DataBroker.class); -// -// if(dataBroker.isPresent()){ -// LOG.debug("GET Available streams ..."); -// ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction(); -// CheckedFuture, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream); -// -// try { -// Optional streams = checkFeature.checkedGet(); -// if(streams.isPresent()){ -// streamMap = new HashMap<>(); -// for(Stream stream : streams.get().getStream()){ -// LOG.debug("*** find stream {}", stream.getName().getValue()); -// streamMap.put(stream.getName().getValue(), stream); -// } -// } -// } catch (ReadFailedException e) { -// LOG.warn("Can not read streams for node {}",this.nodeId); -// } -// -// } else { -// LOG.warn("No databroker on node {}", this.nodeId); -// } -// -// return Optional.fromNullable(streamMap); -// } -// -// @Override -// public Future> joinTopic(final JoinTopicInput input) { -// LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId); -// final NotificationPattern notificationPattern = input.getNotificationPattern(); -// final List matchingNotifications = getMatchingNotifications(notificationPattern); -// return registerTopic(input.getTopicId(),matchingNotifications); -// -// } -// -// @Override -// public Future> disJoinTopic(DisJoinTopicInput input) { -// for(NotificationTopicRegistration reg : notificationTopicRegistrationList){ -// reg.unRegisterNotificationTopic(input.getTopicId()); -// } -// return Util.resultRpcSuccessFor((Void) null) ; -// } -// -// private synchronized Future> registerTopic(final TopicId topicId, final List notificationsToSubscribe){ -// LOG.debug("Join topic {} - register"); -// JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; -// if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){ -// LOG.debug("Notifications to subscribe has found - count {}",notificationsToSubscribe.size() ); -// final Optional notifyService = getDOMMountPoint().getService(DOMNotificationService.class); -// if(notifyService.isPresent()){ -// int registeredNotificationCount = 0; -// for(SchemaPath schemaNotification : notificationsToSubscribe){ -// for(NotificationTopicRegistration reg : notificationTopicRegistrationList){ -// LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(), schemaNotification.getLastComponent().getLocalName()); -// if(reg.checkNotificationPath(schemaNotification)){ -// LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() ); -// boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId); -// if(regSuccess){ -// registeredNotificationCount = registeredNotificationCount +1; -// } -// } -// } -// } -// if(registeredNotificationCount > 0){ -// joinTopicStatus = JoinTopicStatus.Up; -// } -// } else { -// LOG.warn("NO DOMNotification service on node {}", this.nodeId); -// } -// } else { -// LOG.debug("Notifications to subscribe has NOT found"); -// } -// -// final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build(); -// return immediateFuture(RpcResultBuilder.success(output).build()); -// -// } -// -// public void reActivateStreams(){ -// for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { -// LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId); -// reg.reActivateNotificationSource(); -// } -// } -// -// public void deActivateStreams(){ -// for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { -// LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId); -// reg.deActivateNotificationSource(); -// } -// } -// -// @Override -// public void onNotification(final DOMNotification notification) { -// SchemaPath notificationPath = notification.getType(); -// Date notificationEventTime = null; -// if(notification instanceof DOMEvent){ -// notificationEventTime = ((DOMEvent) notification).getEventTime(); -// } -// for(NotificationTopicRegistration notifReg : notificationTopicRegistrationList){ -// ArrayList topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath); -// if(topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false){ -// -// if(notifReg instanceof StreamNotificationTopicRegistration){ -// StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration)notifReg; -// streamReg.setLastEventTime(notificationEventTime); -// } -// -// for(TopicId topicId : topicIdsForNotification){ -// publishNotification(notification, topicId); -// LOG.debug("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue()); -// } -// -// } -// } -// } -// -// private void publishNotification(final DOMNotification notification, TopicId topicId){ -// final ContainerNode topicNotification = Builders.containerBuilder() -// .withNodeIdentifier(TOPIC_NOTIFICATION_ARG) -// .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId)) -// .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId)) -// .withChild(encapsulate(notification)) -// .build(); -// try { -// domPublish.putNotification(new TopicDOMNotification(topicNotification)); -// } catch (final InterruptedException e) { -// throw Throwables.propagate(e); -// } -// } -// -// private AnyXmlNode encapsulate(final DOMNotification body) { -// // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools -// final Document doc = XmlUtil.newDocument(); -// final Optional namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString()); -// final Element element = XmlUtil.createElement(doc , "payload", namespace); -// -// final DOMResult result = new DOMResult(element); -// -// final SchemaContext context = getDOMMountPoint().getSchemaContext(); -// final SchemaPath schemaPath = body.getType(); -// try { -// NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context); -// return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG) -// .withValue(new DOMSource(element)) -// .build(); -// } catch (IOException | XMLStreamException e) { -// LOG.error("Unable to encapsulate notification.",e); -// throw Throwables.propagate(e); -// } -// } -// -// private List getMatchingNotifications(NotificationPattern notificationPattern){ -// // FIXME: default language should already be regex -// final String regex = Util.wildcardToRegex(notificationPattern.getValue()); -// -// final Pattern pattern = Pattern.compile(regex); -// List availableNotifications = getAvailableNotifications(); -// if(availableNotifications == null || availableNotifications.isEmpty()){ -// return null; -// } -// return Util.expandQname(availableNotifications, pattern); -// } -// -// @Override -// public void close() throws Exception { -// for(NotificationTopicRegistration streamReg : notificationTopicRegistrationList){ -// streamReg.close(); -// } -// } -// -// @Override -// public NodeKey getSourceNodeKey(){ -// return getNode().getKey(); -// } -// -// @Override -// public List getAvailableNotifications() { -// -// final List availNotifList = new ArrayList<>(); -// // add Event Source Connection status notification -// availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); -// -// // FIXME: use SchemaContextListener to get changes asynchronously -// final Set availableNotifications = getDOMMountPoint().getSchemaContext().getNotifications(); -// // add all known notifications from netconf device -// for (final NotificationDefinition nd : availableNotifications) { -// availNotifList.add(nd.getPath()); -// } -// return availNotifList; -// } -// -// public Node getNode() { -// return node; -// } -// -// DOMMountPoint getDOMMountPoint() { -// return netconfMount; -// } -// -// MountPoint getMountPoint() { -// return mountPoint; -// } -// -// NetconfNode getNetconfNode(){ -// return node.getAugmentation(NetconfNode.class); -// } -// -//} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java deleted file mode 100644 index 8106d902b1..0000000000 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java +++ /dev/null @@ -1,213 +0,0 @@ -///* -// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. -// * -// * This program and the accompanying materials are made available under the -// * terms of the Eclipse Public License v1.0 which accompanies this distribution, -// * and is available at http://www.eclipse.org/legal/epl-v10.html -// */ -// -//package org.opendaylight.controller.messagebus.eventsources.netconf; -// -//import java.util.HashMap; -//import java.util.List; -//import java.util.Map; -//import java.util.concurrent.ConcurrentHashMap; -// -//import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream; -//import org.opendaylight.controller.md.sal.binding.api.DataBroker; -//import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -//import org.opendaylight.controller.md.sal.binding.api.MountPointService; -//import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; -//import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -//import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -//import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; -//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; -//import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; -//import org.opendaylight.yangtools.concepts.ListenerRegistration; -//import org.opendaylight.yangtools.yang.binding.DataObject; -//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import com.google.common.base.Preconditions; -// -//public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable { -// -// private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceManager.class); -// private static final TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())); -// private static final InstanceIdentifier NETCONF_DEVICE_PATH = InstanceIdentifier.create(NetworkTopology.class) -// .child(Topology.class, NETCONF_TOPOLOGY_KEY) -// .child(Node.class); -// -// private final Map streamMap; -// private final ConcurrentHashMap, NetconfEventSourceRegistration> registrationMap = new ConcurrentHashMap<>(); -// private final DOMNotificationPublishService publishService; -// private final DOMMountPointService domMounts; -// private final MountPointService mountPointService; -// private ListenerRegistration listenerRegistration; -// private final EventSourceRegistry eventSourceRegistry; -// -// public static NetconfEventSourceManager create(final DataBroker dataBroker, -// final DOMNotificationPublishService domPublish, -// final DOMMountPointService domMount, -// final MountPointService bindingMount, -// final EventSourceRegistry eventSourceRegistry, -// final List namespaceMapping){ -// -// final NetconfEventSourceManager eventSourceManager = -// new NetconfEventSourceManager(domPublish, domMount,bindingMount, eventSourceRegistry, namespaceMapping); -// -// eventSourceManager.initialize(dataBroker); -// -// return eventSourceManager; -// -// } -// -// private NetconfEventSourceManager(final DOMNotificationPublishService domPublish, -// final DOMMountPointService domMount, -// final MountPointService bindingMount, -// final EventSourceRegistry eventSourceRegistry, -// final List namespaceMapping) { -// -// Preconditions.checkNotNull(domPublish); -// Preconditions.checkNotNull(domMount); -// Preconditions.checkNotNull(bindingMount); -// Preconditions.checkNotNull(eventSourceRegistry); -// Preconditions.checkNotNull(namespaceMapping); -// this.streamMap = namespaceToStreamMapping(namespaceMapping); -// this.domMounts = domMount; -// this.mountPointService = bindingMount; -// this.publishService = domPublish; -// this.eventSourceRegistry = eventSourceRegistry; -// } -// -// private void initialize(final DataBroker dataBroker){ -// Preconditions.checkNotNull(dataBroker); -// listenerRegistration = dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE); -// LOG.info("NetconfEventSourceManager initialized."); -// } -// -// private Map namespaceToStreamMapping(final List namespaceMapping) { -// final Map streamMap = new HashMap<>(namespaceMapping.size()); -// -// for (final NamespaceToStream nToS : namespaceMapping) { -// streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName()); -// } -// -// return streamMap; -// } -// -// @Override -// public void onDataChanged(final AsyncDataChangeEvent, DataObject> event) { -// -// LOG.debug("[DataChangeEvent, DataObject>: {}]", event); -// for (final Map.Entry, DataObject> changeEntry : event.getCreatedData().entrySet()) { -// if (changeEntry.getValue() instanceof Node) { -// nodeCreated(changeEntry.getKey(),(Node) changeEntry.getValue()); -// } -// } -// -// for (final Map.Entry, DataObject> changeEntry : event.getUpdatedData().entrySet()) { -// if (changeEntry.getValue() instanceof Node) { -// nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue()); -// } -// } -// -// for(InstanceIdentifier removePath : event.getRemovedPaths()){ -// DataObject removeObject = event.getOriginalData().get(removePath); -// if(removeObject instanceof Node){ -// nodeRemoved(removePath); -// } -// } -// -// } -// -// private void nodeCreated(final InstanceIdentifier key, final Node node){ -// Preconditions.checkNotNull(key); -// if(validateNode(node) == false){ -// LOG.warn("NodeCreated event : Node [{}] is null or not valid.", key.toString()); -// return; -// } -// LOG.info("Netconf event source [{}] is creating...", key.toString()); -// NetconfEventSourceRegistration nesr = NetconfEventSourceRegistration.create(key, node, this); -// if(nesr != null){ -// NetconfEventSourceRegistration nesrOld = registrationMap.put(key, nesr); -// if(nesrOld != null){ -// nesrOld.close(); -// } -// } -// } -// -// private void nodeUpdated(final InstanceIdentifier key, final Node node){ -// Preconditions.checkNotNull(key); -// if(validateNode(node) == false){ -// LOG.warn("NodeUpdated event : Node [{}] is null or not valid.", key.toString()); -// return; -// } -// -// LOG.info("Netconf event source [{}] is updating...", key.toString()); -// NetconfEventSourceRegistration nesr = registrationMap.get(key); -// if(nesr != null){ -// nesr.updateStatus(); -// } else { -// nodeCreated(key, node); -// } -// } -// -// private void nodeRemoved(final InstanceIdentifier key){ -// Preconditions.checkNotNull(key); -// LOG.info("Netconf event source [{}] is removing...", key.toString()); -// NetconfEventSourceRegistration nesr = registrationMap.remove(key); -// if(nesr != null){ -// nesr.close(); -// } -// } -// -// private boolean validateNode(final Node node){ -// if(node == null){ -// return false; -// } -// return isNetconfNode(node); -// } -// -// Map getStreamMap() { -// return streamMap; -// } -// -// DOMNotificationPublishService getPublishService() { -// return publishService; -// } -// -// DOMMountPointService getDomMounts() { -// return domMounts; -// } -// -// EventSourceRegistry getEventSourceRegistry() { -// return eventSourceRegistry; -// } -// -// MountPointService getMountPointService() { -// return mountPointService; -// } -// -// private boolean isNetconfNode(final Node node) { -// return node.getAugmentation(NetconfNode.class) != null ; -// } -// -// @Override -// public void close() { -// listenerRegistration.close(); -// for(final NetconfEventSourceRegistration reg : registrationMap.values()){ -// reg.close(); -// } -// registrationMap.clear(); -// } -// -//} \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java deleted file mode 100644 index 9bc0a462e4..0000000000 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java +++ /dev/null @@ -1,196 +0,0 @@ -///* -// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. -// * -// * This program and the accompanying materials are made available under the -// * terms of the Eclipse Public License v1.0 which accompanies this distribution, -// * and is available at http://www.eclipse.org/legal/epl-v10.html -// */ -//package org.opendaylight.controller.messagebus.eventsources.netconf; -// -//import java.util.List; -// -//import org.opendaylight.controller.md.sal.binding.api.MountPoint; -//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; -//import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; -//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -//import org.opendaylight.yangtools.yang.common.QName; -//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import com.google.common.base.Optional; -//import com.google.common.base.Preconditions; -// -///** -// * Helper class to keep connection status of netconf node and event source registration object -// * -// */ -//public class NetconfEventSourceRegistration implements AutoCloseable{ -// -// private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceRegistration.class); -// private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder() -// .node(NetworkTopology.QNAME) -// .node(Topology.QNAME) -// .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"),TopologyNetconf.QNAME.getLocalName()) -// .node(Node.QNAME) -// .build(); -// private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id"); -// private static final String NotificationCapabilityPrefix = "(urn:ietf:params:xml:ns:netconf:notification"; -// -// private final Node node; -// private final InstanceIdentifier instanceIdent; -// private final NetconfEventSourceManager netconfEventSourceManager; -// private ConnectionStatus currentNetconfConnStatus; -// private EventSourceRegistration eventSourceRegistration; -// -// public static NetconfEventSourceRegistration create(final InstanceIdentifier instanceIdent, final Node node, -// final NetconfEventSourceManager netconfEventSourceManager){ -// Preconditions.checkNotNull(instanceIdent); -// Preconditions.checkNotNull(node); -// Preconditions.checkNotNull(netconfEventSourceManager); -// if(isEventSource(node) == false){ -// return null; -// } -// NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node, netconfEventSourceManager); -// nesr.updateStatus(); -// LOG.debug("NetconfEventSourceRegistration for node {} has been initialized...",node.getNodeId().getValue()); -// return nesr; -// } -// -// private static boolean isEventSource(final Node node) { -// final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class); -// if(netconfNode == null){ -// return false; -// } -// if (netconfNode.getAvailableCapabilities() == null) { -// return false; -// } -// final List capabilities = netconfNode.getAvailableCapabilities().getAvailableCapability(); -// if(capabilities == null || capabilities.isEmpty()) { -// return false; -// } -// for (final String capability : netconfNode.getAvailableCapabilities().getAvailableCapability()) { -// if(capability.startsWith(NotificationCapabilityPrefix)) { -// return true; -// } -// } -// -// return false; -// } -// -// private NetconfEventSourceRegistration(final InstanceIdentifier instanceIdent, final Node node, final NetconfEventSourceManager netconfEventSourceManager) { -// this.instanceIdent = instanceIdent; -// this.node = node; -// this.netconfEventSourceManager = netconfEventSourceManager; -// this.eventSourceRegistration =null; -// } -// -// public Node getNode() { -// return node; -// } -// -// Optional> getEventSourceRegistration() { -// return Optional.fromNullable(eventSourceRegistration); -// } -// -// NetconfNode getNetconfNode(){ -// return node.getAugmentation(NetconfNode.class); -// } -// -// void updateStatus(){ -// ConnectionStatus netconfConnStatus = getNetconfNode().getConnectionStatus(); -// LOG.info("Change status on node {}, new status is {}",this.node.getNodeId().getValue(),netconfConnStatus); -// if(netconfConnStatus.equals(currentNetconfConnStatus)){ -// return; -// } -// changeStatus(netconfConnStatus); -// } -// -// private boolean checkConnectionStatusType(ConnectionStatus status){ -// if( status == ConnectionStatus.Connected -// || status == ConnectionStatus.Connecting -// || status == ConnectionStatus.UnableToConnect){ -// return true; -// } -// return false; -// } -// -// private void changeStatus(ConnectionStatus newStatus){ -// Preconditions.checkNotNull(newStatus); -// if(checkConnectionStatusType(newStatus) == false){ -// throw new IllegalStateException("Unknown new Netconf Connection Status"); -// } -// if(this.currentNetconfConnStatus == null){ -// if (newStatus == ConnectionStatus.Connected){ -// registrationEventSource(); -// } -// } else if (this.currentNetconfConnStatus == ConnectionStatus.Connecting){ -// if (newStatus == ConnectionStatus.Connected){ -// if(this.eventSourceRegistration == null){ -// registrationEventSource(); -// } else { -// // reactivate stream on registered event source (invoke publish notification about connection) -// this.eventSourceRegistration.getInstance().reActivateStreams(); -// } -// } -// } else if (this.currentNetconfConnStatus == ConnectionStatus.Connected) { -// -// if(newStatus == ConnectionStatus.Connecting || newStatus == ConnectionStatus.UnableToConnect){ -// // deactivate streams on registered event source (invoke publish notification about connection) -// this.eventSourceRegistration.getInstance().deActivateStreams(); -// } -// } else if (this.currentNetconfConnStatus == ConnectionStatus.UnableToConnect){ -// if(newStatus == ConnectionStatus.Connected){ -// if(this.eventSourceRegistration == null){ -// registrationEventSource(); -// } else { -// // reactivate stream on registered event source (invoke publish notification about connection) -// this.eventSourceRegistration.getInstance().reActivateStreams(); -// } -// } -// } else { -// throw new IllegalStateException("Unknown current Netconf Connection Status"); -// } -// this.currentNetconfConnStatus = newStatus; -// } -// -// private void registrationEventSource(){ -// final Optional mountPoint = netconfEventSourceManager.getMountPointService().getMountPoint(instanceIdent); -// final Optional domMountPoint = netconfEventSourceManager.getDomMounts().getMountPoint(domMountPath(node.getNodeId())); -// EventSourceRegistration registration = null; -// if(domMountPoint.isPresent() && mountPoint.isPresent()) { -// final NetconfEventSource netconfEventSource = new NetconfEventSource( -// node, -// netconfEventSourceManager.getStreamMap(), -// domMountPoint.get(), -// mountPoint.get(), -// netconfEventSourceManager.getPublishService()); -// registration = netconfEventSourceManager.getEventSourceRegistry().registerEventSource(netconfEventSource); -// LOG.info("Event source {} has been registered",node.getNodeId().getValue()); -// } -// this.eventSourceRegistration = registration; -// } -// -// private YangInstanceIdentifier domMountPath(final NodeId nodeId) { -// return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH).nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build(); -// } -// -// private void closeEventSourceRegistration(){ -// if(getEventSourceRegistration().isPresent()){ -// getEventSourceRegistration().get().close(); -// } -// } -// -// @Override -// public void close() { -// closeEventSourceRegistration(); -// } -// -//} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java deleted file mode 100644 index 64ddb31d93..0000000000 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java +++ /dev/null @@ -1,204 +0,0 @@ -///* -// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. -// * -// * This program and the accompanying materials are made available under the -// * terms of the Eclipse Public License v1.0 which accompanies this distribution, -// * and is available at http://www.eclipse.org/legal/epl-v10.html -// */ -//package org.opendaylight.controller.messagebus.eventsources.netconf; -// -//import java.util.ArrayList; -//import java.util.Date; -//import java.util.List; -//import java.util.concurrent.ConcurrentHashMap; -// -//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; -//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; -//import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; -//import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -//import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; -//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; -//import org.opendaylight.yangtools.concepts.ListenerRegistration; -//import org.opendaylight.yangtools.yang.common.QName; -//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; -//import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -//import org.opendaylight.yangtools.yang.data.impl.schema.Builders; -//import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -//import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder; -//import org.opendaylight.yangtools.yang.model.api.SchemaPath; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import com.google.common.base.Optional; -//import com.google.common.util.concurrent.CheckedFuture; -// -//public class StreamNotificationTopicRegistration extends NotificationTopicRegistration { -// -// private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class); -// private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream")); -// private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); -// private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"startTime")); -// -// final private DOMMountPoint domMountPoint; -// final private String nodeId; -// final private NetconfEventSource netconfEventSource; -// final private Stream stream; -// private Date lastEventTime; -// -// private ConcurrentHashMap> notificationRegistrationMap = new ConcurrentHashMap<>(); -// private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); -// -// public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, NetconfEventSource netconfEventSource) { -// super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix); -// this.domMountPoint = netconfEventSource.getDOMMountPoint(); -// this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString(); -// this.netconfEventSource = netconfEventSource; -// this.stream = stream; -// this.lastEventTime= null; -// setReplaySupported(this.stream.isReplaySupport()); -// setActive(false); -// LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName()); -// } -// -// void activateNotificationSource() { -// if(isActive() == false){ -// LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId); -// final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) -// .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())) -// .build(); -// CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); -// try { -// csFuture.checkedGet(); -// setActive(true); -// } catch (DOMRpcException e) { -// LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId); -// setActive(false); -// return; -// } -// } else { -// LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId); -// } -// } -// -// void reActivateNotificationSource(){ -// if(isActive()){ -// LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId); -// DataContainerNodeAttrBuilder inputBuilder = -// Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) -// .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())); -// if(isReplaySupported() && this.getLastEventTime() != null){ -// inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime())); -// } -// final ContainerNode input = inputBuilder.build(); -// CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); -// try { -// csFuture.checkedGet(); -// setActive(true); -// } catch (DOMRpcException e) { -// LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId); -// setActive(false); -// return; -// } -// } -// } -// -// @Override -// void deActivateNotificationSource() { -// // no operations need -// } -// -// private void closeStream() { -// if(isActive()){ -// for(ListenerRegistration reg : notificationRegistrationMap.values()){ -// reg.close(); -// } -// notificationRegistrationMap.clear(); -// notificationTopicMap.clear(); -// setActive(false); -// } -// } -// -// private String getStreamName() { -// return getSourceName(); -// } -// -// @Override -// ArrayList getNotificationTopicIds(SchemaPath notificationPath){ -// return notificationTopicMap.get(notificationPath); -// } -// -// @Override -// boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){ -// -// if(checkNotificationPath(notificationPath) == false){ -// LOG.debug("Bad SchemaPath for notification try to register"); -// return false; -// } -// -// final Optional notifyService = domMountPoint.getService(DOMNotificationService.class); -// if(notifyService.isPresent() == false){ -// LOG.debug("DOMNotificationService is not present"); -// return false; -// } -// -// activateNotificationSource(); -// if(isActive() == false){ -// LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), notificationPath.toString()); -// return false; -// } -// -// ListenerRegistration registration = -// notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath); -// notificationRegistrationMap.put(notificationPath, registration); -// ArrayList topicIds = getNotificationTopicIds(notificationPath); -// if(topicIds == null){ -// topicIds = new ArrayList<>(); -// topicIds.add(topicId); -// } else { -// if(topicIds.contains(topicId) == false){ -// topicIds.add(topicId); -// } -// } -// -// notificationTopicMap.put(notificationPath, topicIds); -// return true; -// } -// -// @Override -// synchronized void unRegisterNotificationTopic(TopicId topicId) { -// List notificationPathToRemove = new ArrayList<>(); -// for(SchemaPath notifKey : notificationTopicMap.keySet()){ -// ArrayList topicList = notificationTopicMap.get(notifKey); -// if(topicList != null){ -// topicList.remove(topicId); -// if(topicList.isEmpty()){ -// notificationPathToRemove.add(notifKey); -// } -// } -// } -// for(SchemaPath notifKey : notificationPathToRemove){ -// notificationTopicMap.remove(notifKey); -// ListenerRegistration reg = notificationRegistrationMap.remove(notifKey); -// if(reg != null){ -// reg.close(); -// } -// } -// } -// -// Optional getLastEventTime() { -// return Optional.fromNullable(lastEventTime); -// } -// -// -// void setLastEventTime(Date lastEventTime) { -// this.lastEventTime = lastEventTime; -// } -// -// @Override -// public void close() throws Exception { -// closeStream(); -// } -// -//} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang b/opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang index 320afccb2e..01c1ba296a 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang +++ b/opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang @@ -10,7 +10,7 @@ module messagebus-app-impl { description "Service definition for Message Bus application implementation."; - + revision "2015-02-03" { description "Second revision. Message Bus opensourcing"; } @@ -24,7 +24,7 @@ module messagebus-app-impl { augment "/config:modules/config:module/config:configuration" { case messagebus-app-impl { when "/config:modules/config:module/config:type = 'messagebus-app-impl'"; - + container binding-broker { uses config:service-ref { refine type { @@ -34,32 +34,7 @@ module messagebus-app-impl { } } - container dom-broker { - uses config:service-ref { - refine type { - mandatory true; - config:required-identity dom:dom-broker-osgi-registry; - } - } - } - - list namespace-to-stream { - key urn-prefix; - - leaf urn-prefix { - type string; - } - - leaf stream-name { - type string; - } - } - } - } - - augment "/config:modules/config:module/config:state" { - case messagebus-app-impl { - when "/config:modules/config:module/config:type = 'messagebus-app-impl'"; } } + } \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java deleted file mode 100644 index 2ae7de25ee..0000000000 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java +++ /dev/null @@ -1,177 +0,0 @@ -///* -// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. -// * -// * This program and the accompanying materials are made available under the -// * terms of the Eclipse Public License v1.0 which accompanies this distribution, -// * and is available at http://www.eclipse.org/legal/epl-v10.html -// */ -//package org.opendaylight.controller.messagebus.eventsources.netconf; -// -//import static org.mockito.Matchers.any; -//import static org.mockito.Matchers.eq; -//import static org.mockito.Matchers.notNull; -//import static org.mockito.Mockito.doReturn; -//import static org.mockito.Mockito.mock; -//import static org.mockito.Mockito.times; -//import static org.mockito.Mockito.verify; -// -//import java.util.ArrayList; -//import java.util.HashMap; -//import java.util.List; -//import java.util.Map; -// -//import org.junit.Before; -//import org.junit.BeforeClass; -//import org.junit.Test; -//import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream; -//import org.opendaylight.controller.md.sal.binding.api.DataBroker; -//import org.opendaylight.controller.md.sal.binding.api.MountPoint; -//import org.opendaylight.controller.md.sal.binding.api.MountPointService; -//import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -//import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; -//import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -//import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -//import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; -//import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; -//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; -//import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology; -//import org.opendaylight.controller.messagebus.spi.EventSource; -//import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; -//import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; -//import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; -//import org.opendaylight.yangtools.concepts.ListenerRegistration; -//import org.opendaylight.yangtools.yang.binding.DataObject; -//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -// -//import com.google.common.base.Optional; -//import com.google.common.util.concurrent.CheckedFuture; -// -//public class NetconfEventSourceManagerTest { -// -// NetconfEventSourceManager netconfEventSourceManager; -// ListenerRegistration listenerRegistrationMock; -// DOMMountPointService domMountPointServiceMock; -// MountPointService mountPointServiceMock; -// EventSourceTopology eventSourceTopologyMock; -// AsyncDataChangeEvent asyncDataChangeEventMock; -// RpcProviderRegistry rpcProviderRegistryMock; -// EventSourceRegistry eventSourceRegistry; -// @BeforeClass -// public static void initTestClass() throws IllegalAccessException, InstantiationException { -// } -// -// @Before -// public void setUp() throws Exception { -// DataBroker dataBrokerMock = mock(DataBroker.class); -// DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class); -// domMountPointServiceMock = mock(DOMMountPointService.class); -// mountPointServiceMock = mock(MountPointService.class); -// eventSourceTopologyMock = mock(EventSourceTopology.class); -// rpcProviderRegistryMock = mock(RpcProviderRegistry.class); -// eventSourceRegistry = mock(EventSourceRegistry.class); -// List namespaceToStreamList = new ArrayList<>(); -// -// listenerRegistrationMock = mock(ListenerRegistration.class); -// doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(AsyncDataBroker.DataChangeScope.SUBTREE)); -// -// Optional optionalDomMountServiceMock = (Optional) mock(Optional.class); -// doReturn(true).when(optionalDomMountServiceMock).isPresent(); -// doReturn(optionalDomMountServiceMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull()); -// -// DOMMountPoint domMountPointMock = mock(DOMMountPoint.class); -// doReturn(domMountPointMock).when(optionalDomMountServiceMock).get(); -// -// -// Optional optionalBindingMountMock = mock(Optional.class); -// doReturn(true).when(optionalBindingMountMock).isPresent(); -// -// MountPoint mountPointMock = mock(MountPoint.class); -// doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class)); -// doReturn(mountPointMock).when(optionalBindingMountMock).get(); -// -// Optional optionalMpDataBroker = mock(Optional.class); -// DataBroker mpDataBroker = mock(DataBroker.class); -// doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class); -// doReturn(true).when(optionalMpDataBroker).isPresent(); -// doReturn(mpDataBroker).when(optionalMpDataBroker).get(); -// -// ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class); -// doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction(); -// CheckedFuture, ReadFailedException> checkFeature = (CheckedFuture, ReadFailedException>)mock(CheckedFuture.class); -// InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); -// doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream); -// Optional avStreams = NetconfTestUtils.getAvailableStream("stream01", true); -// doReturn(avStreams).when(checkFeature).checkedGet(); -// -// EventSourceRegistration esrMock = mock(EventSourceRegistration.class); -// -// netconfEventSourceManager = -// NetconfEventSourceManager.create(dataBrokerMock, -// domNotificationPublishServiceMock, -// domMountPointServiceMock, -// mountPointServiceMock, -// eventSourceRegistry, -// namespaceToStreamList); -// } -// -// @Test -// public void onDataChangedCreateEventSourceTestByCreateEntry() throws Exception { -// onDataChangedTestHelper(true,false,true,NetconfTestUtils.notification_capability_prefix); -// netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); -// verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class)); -// } -// -// @Test -// public void onDataChangedCreateEventSourceTestByUpdateEntry() throws Exception { -// onDataChangedTestHelper(false,true,true, NetconfTestUtils.notification_capability_prefix); -// netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); -// verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class)); -// } -// -// @Test -// public void onDataChangedCreateEventSourceTestNotNeconf() throws Exception { -// onDataChangedTestHelper(false,true,false,NetconfTestUtils.notification_capability_prefix); -// netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); -// verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class)); -// } -// -// @Test -// public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws Exception { -// onDataChangedTestHelper(true,false,true,"bad-prefix"); -// netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); -// verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class)); -// } -// -// private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws Exception{ -// asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class); -// Map mapCreate = new HashMap<>(); -// Map mapUpdate = new HashMap<>(); -// -// Node node01; -// String nodeId = "Node01"; -// doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData(); -// doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData(); -// -// if(isNetconf){ -// node01 = NetconfTestUtils.getNetconfNode(nodeId, "node01.test.local", ConnectionStatus.Connected, notificationCapabilityPrefix); -// -// } else { -// node01 = NetconfTestUtils.getNode(nodeId); -// } -// -// if(create){ -// mapCreate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01); -// } -// if(update){ -// mapUpdate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01); -// } -// -// } -// -//} \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java deleted file mode 100644 index 88d7d4fb5d..0000000000 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java +++ /dev/null @@ -1,141 +0,0 @@ -///* -// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. -// * -// * This program and the accompanying materials are made available under the -// * terms of the Eclipse Public License v1.0 which accompanies this distribution, -// * and is available at http://www.eclipse.org/legal/epl-v10.html -// */ -//package org.opendaylight.controller.messagebus.eventsources.netconf; -// -// -//import static org.junit.Assert.assertNotNull; -//import static org.mockito.Matchers.any; -//import static org.mockito.Mockito.doReturn; -//import static org.mockito.Mockito.mock; -// -//import java.net.URI; -//import java.util.HashMap; -//import java.util.HashSet; -//import java.util.Map; -//import java.util.Set; -// -//import org.junit.Before; -//import org.junit.Test; -//import org.opendaylight.controller.md.sal.binding.api.BindingService; -//import org.opendaylight.controller.md.sal.binding.api.DataBroker; -//import org.opendaylight.controller.md.sal.binding.api.MountPoint; -//import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -//import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -//import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; -//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; -//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; -//import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; -//import org.opendaylight.controller.md.sal.dom.api.DOMService; -//import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; -//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; -//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; -//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; -//import org.opendaylight.yangtools.concepts.ListenerRegistration; -//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -//import org.opendaylight.yangtools.yang.common.QName; -//import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -//import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; -//import org.opendaylight.yangtools.yang.model.api.SchemaContext; -//import org.opendaylight.yangtools.yang.model.api.SchemaPath; -// -//import com.google.common.base.Optional; -//import com.google.common.util.concurrent.CheckedFuture; -// -//public class NetconfEventSourceTest { -// -// NetconfEventSource netconfEventSource; -// DOMMountPoint domMountPointMock; -// MountPoint mountPointMock; -// JoinTopicInput joinTopicInputMock; -// -// @Before -// public void setUp() throws Exception { -// Map streamMap = new HashMap<>(); -// streamMap.put("uriStr1", "string2"); -// domMountPointMock = mock(DOMMountPoint.class); -// mountPointMock = mock(MountPoint.class); -// DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class); -// RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class); -// Optional onlyOptionalMock = (Optional) mock(Optional.class); -// NotificationsService notificationsServiceMock = mock(NotificationsService.class); -// doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class); -// -// Optional optionalMpDataBroker = (Optional) mock(Optional.class); -// DataBroker mpDataBroker = mock(DataBroker.class); -// doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class); -// doReturn(true).when(optionalMpDataBroker).isPresent(); -// doReturn(mpDataBroker).when(optionalMpDataBroker).get(); -// -// ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class); -// doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction(); -// CheckedFuture, ReadFailedException> checkFeature = (CheckedFuture, ReadFailedException>)mock(CheckedFuture.class); -// InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); -// doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream); -// Optional avStreams = NetconfTestUtils.getAvailableStream("stream01", true); -// doReturn(avStreams).when(checkFeature).checkedGet(); -// -// netconfEventSource = new NetconfEventSource( -// NetconfTestUtils.getNetconfNode("NodeId1", "node.test.local", ConnectionStatus.Connected, NetconfTestUtils.notification_capability_prefix), -// streamMap, -// domMountPointMock, -// mountPointMock , -// domNotificationPublishServiceMock); -// -// } -// -// @Test -// public void joinTopicTest() throws Exception{ -// joinTopicTestHelper(); -// assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock)); -// } -// -// private void joinTopicTestHelper() throws Exception{ -// joinTopicInputMock = mock(JoinTopicInput.class); -// TopicId topicId = new TopicId("topicID007"); -// doReturn(topicId).when(joinTopicInputMock).getTopicId(); -// NotificationPattern notificationPatternMock = mock(NotificationPattern.class); -// doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern(); -// doReturn("uriStr1").when(notificationPatternMock).getValue(); -// -// SchemaContext schemaContextMock = mock(SchemaContext.class); -// doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext(); -// Set notificationDefinitionSet = new HashSet<>(); -// NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class); -// notificationDefinitionSet.add(notificationDefinitionMock); -// -// URI uri = new URI("uriStr1"); -// QName qName = new QName(uri, "localName1"); -// org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName); -// doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications(); -// doReturn(schemaPath).when(notificationDefinitionMock).getPath(); -// -// Optional domNotificationServiceOptionalMock = (Optional) mock(Optional.class); -// doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class); -// doReturn(true).when(domNotificationServiceOptionalMock).isPresent(); -// -// DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class); -// doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get(); -// ListenerRegistration listenerRegistrationMock = (ListenerRegistration)mock(ListenerRegistration.class); -// doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(SchemaPath.class)); -// -// Optional optionalMock = (Optional) mock(Optional.class); -// doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class); -// doReturn(true).when(optionalMock).isPresent(); -// DOMRpcService domRpcServiceMock = mock(DOMRpcService.class); -// doReturn(domRpcServiceMock).when(optionalMock).get(); -// CheckedFuture checkedFutureMock = mock(CheckedFuture.class); -// doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class)); -// -// } -// -//} \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java deleted file mode 100644 index b92dce4e0f..0000000000 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java +++ /dev/null @@ -1,95 +0,0 @@ -///* -// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. -// * -// * This program and the accompanying materials are made available under the -// * terms of the Eclipse Public License v1.0 which accompanies this distribution, -// * and is available at http://www.eclipse.org/legal/epl-v10.html -// */ -//package org.opendaylight.controller.messagebus.eventsources.netconf; -// -//import java.util.ArrayList; -//import java.util.List; -// -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName; -//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilitiesBuilder; -//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder; -//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; -//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -// -//import com.google.common.base.Optional; -// -//public final class NetconfTestUtils { -// -// public static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification"; -// -// private NetconfTestUtils() { -// } -// -// public static Node getNetconfNode(String nodeIdent,String hostName,ConnectionStatus cs, String notificationCapabilityPrefix){ -// -// DomainName dn = new DomainName(hostName); -// Host host = new Host(dn); -// -// List avCapList = new ArrayList<>(); -// avCapList.add(notificationCapabilityPrefix +"_availableCapabilityString1"); -// AvailableCapabilities avCaps = new AvailableCapabilitiesBuilder().setAvailableCapability(avCapList).build(); -// NetconfNode nn = new NetconfNodeBuilder() -// .setConnectionStatus(cs) -// .setHost(host) -// .setAvailableCapabilities(avCaps) -// .build(); -// -// NodeId nodeId = new NodeId(nodeIdent); -// NodeKey nk = new NodeKey(nodeId); -// NodeBuilder nb = new NodeBuilder(); -// nb.setKey(nk); -// -// nb.addAugmentation(NetconfNode.class, nn); -// return nb.build(); -// } -// -// public static Node getNode(String nodeIdent){ -// NodeId nodeId = new NodeId(nodeIdent); -// NodeKey nk = new NodeKey(nodeId); -// NodeBuilder nb = new NodeBuilder(); -// nb.setKey(nk); -// return nb.build(); -// } -// -// public static InstanceIdentifier getInstanceIdentifier(Node node){ -// TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())); -// InstanceIdentifier nodeII = InstanceIdentifier.create(NetworkTopology.class) -// .child(Topology.class, NETCONF_TOPOLOGY_KEY) -// .child(Node.class, node.getKey()); -// return nodeII; -// } -// -// public static Optional getAvailableStream(String Name, boolean replaySupport){ -// Stream stream = new StreamBuilder() -// .setName(new StreamNameType(Name)) -// .setReplaySupport(replaySupport) -// .build(); -// List streamList = new ArrayList<>(); -// streamList.add(stream); -// Streams streams = new StreamsBuilder().setStream(streamList).build(); -// return Optional.of(streams); -// } -// -//} diff --git a/opendaylight/md-sal/messagebus-util/pom.xml b/opendaylight/md-sal/messagebus-util/pom.xml new file mode 100644 index 0000000000..6cc572e27c --- /dev/null +++ b/opendaylight/md-sal/messagebus-util/pom.xml @@ -0,0 +1,55 @@ + + + + 4.0.0 + + + org.opendaylight.controller + sal-parent + 1.3.0-SNAPSHOT + + + messagebus-util + ${project.artifactId} + + bundle + + + + org.opendaylight.controller + sal-core-api + + + org.opendaylight.controller + sal-binding-api + + + org.opendaylight.controller + messagebus-api + + + + junit + junit + test + + + org.glassfish.jersey.test-framework.providers + jersey-test-framework-provider-grizzly2 + test + + + org.mockito + mockito-all + test + + + diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/Providers.java b/opendaylight/md-sal/messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util/Providers.java similarity index 95% rename from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/Providers.java rename to opendaylight/md-sal/messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util/Providers.java index 2ac8336133..84c5455ad2 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/Providers.java +++ b/opendaylight/md-sal/messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util/Providers.java @@ -6,7 +6,7 @@ * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.config.yang.messagebus.app.impl; +package org.opendaylight.controller.messagebus.app.util; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotification.java b/opendaylight/md-sal/messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util/TopicDOMNotification.java similarity index 95% rename from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotification.java rename to opendaylight/md-sal/messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util/TopicDOMNotification.java index dfde11a682..b6c6cdf8ab 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotification.java +++ b/opendaylight/md-sal/messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util/TopicDOMNotification.java @@ -6,7 +6,7 @@ * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.messagebus.app.impl; +package org.opendaylight.controller.messagebus.app.util; import org.opendaylight.controller.md.sal.dom.api.DOMNotification; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification; diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java b/opendaylight/md-sal/messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util/Util.java similarity index 97% rename from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java rename to opendaylight/md-sal/messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util/Util.java index c70fd3c403..cc2066256d 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java +++ b/opendaylight/md-sal/messagebus-util/src/main/java/org/opendaylight/controller/messagebus/app/util/Util.java @@ -6,7 +6,7 @@ * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.messagebus.app.impl; +package org.opendaylight.controller.messagebus.app.util; import java.util.ArrayList; import java.util.List; diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java b/opendaylight/md-sal/messagebus-util/src/test/java/org/opendaylight/controller/messagebus/app/util/TopicDOMNotificationTest.java similarity index 97% rename from opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java rename to opendaylight/md-sal/messagebus-util/src/test/java/org/opendaylight/controller/messagebus/app/util/TopicDOMNotificationTest.java index b3f6438cc4..22595a2cbb 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java +++ b/opendaylight/md-sal/messagebus-util/src/test/java/org/opendaylight/controller/messagebus/app/util/TopicDOMNotificationTest.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.messagebus.app.impl; +package org.opendaylight.controller.messagebus.app.util; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/UtilTest.java b/opendaylight/md-sal/messagebus-util/src/test/java/org/opendaylight/controller/messagebus/app/util/UtilTest.java similarity index 98% rename from opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/UtilTest.java rename to opendaylight/md-sal/messagebus-util/src/test/java/org/opendaylight/controller/messagebus/app/util/UtilTest.java index 7aebb8fbdd..05425f01ad 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/UtilTest.java +++ b/opendaylight/md-sal/messagebus-util/src/test/java/org/opendaylight/controller/messagebus/app/util/UtilTest.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.messagebus.app.impl; +package org.opendaylight.controller.messagebus.app.util; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; diff --git a/opendaylight/md-sal/pom.xml b/opendaylight/md-sal/pom.xml index daa13337a6..920b066572 100644 --- a/opendaylight/md-sal/pom.xml +++ b/opendaylight/md-sal/pom.xml @@ -85,6 +85,7 @@ messagebus-api messagebus-spi messagebus-impl + messagebus-util messagebus-config diff --git a/opendaylight/netconf/features/netconf-connector/src/main/resources/features.xml b/opendaylight/netconf/features/netconf-connector/src/main/resources/features.xml index 70cbb4e0ea..963cd0ec5b 100644 --- a/opendaylight/netconf/features/netconf-connector/src/main/resources/features.xml +++ b/opendaylight/netconf/features/netconf-connector/src/main/resources/features.xml @@ -19,6 +19,14 @@ odl-netconf-connector-ssh + + + odl-netconf-connector-all + odl-message-bus-collector + mvn:org.opendaylight.controller/messagebus-netconf/${netconf.version} + mvn:org.opendaylight.controller/messagebus-netconf/${netconf.version}/xml/config + + odl-mdsal-broker odl-netconf-client diff --git a/opendaylight/netconf/messagebus-netconf/pom.xml b/opendaylight/netconf/messagebus-netconf/pom.xml new file mode 100644 index 0000000000..847327c93e --- /dev/null +++ b/opendaylight/netconf/messagebus-netconf/pom.xml @@ -0,0 +1,135 @@ + + + + 4.0.0 + + + org.opendaylight.controller + netconf-subsystem + 0.4.0-SNAPSHOT + + + messagebus-netconf + ${project.artifactId} + + bundle + + + + org.opendaylight.controller + ietf-netconf-notifications + + + org.opendaylight.controller + sal-binding-api + + + org.opendaylight.controller + sal-core-api + + + org.opendaylight.controller + sal-common-util + + + org.opendaylight.yangtools + yang-data-impl + + + org.opendaylight.controller + config-api + + + org.opendaylight.controller + messagebus-api + + + org.opendaylight.controller + messagebus-spi + + + org.opendaylight.controller + messagebus-util + + + org.opendaylight.controller + sal-netconf-connector + + + org.opendaylight.controller + sal-binding-config + + + + + junit + junit + test + + + org.glassfish.jersey.test-framework.providers + jersey-test-framework-provider-grizzly2 + test + + + org.mockito + mockito-all + test + + + + + + + org.apache.felix + maven-bundle-plugin + + + org.opendaylight.yangtools + yang-maven-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/config + + + + + attach-artifacts + + attach-artifact + + package + + + + ${project.build.directory}/classes/initial/06-message-netconf.xml + xml + config + + + + + + + + + diff --git a/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModule.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModule.java new file mode 100644 index 0000000000..f27518d380 --- /dev/null +++ b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModule.java @@ -0,0 +1,39 @@ +package org.opendaylight.controller.config.yang.messagebus.netconf; + +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.MountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.messagebus.app.util.Providers; +import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; +import org.opendaylight.controller.sal.core.api.Broker; + +public class MessageBusNetconfModule extends org.opendaylight.controller.config.yang.messagebus.netconf.AbstractMessageBusNetconfModule { + public MessageBusNetconfModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { + super(identifier, dependencyResolver); + } + + public MessageBusNetconfModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.messagebus.netconf.MessageBusNetconfModule oldModule, java.lang.AutoCloseable oldInstance) { + super(identifier, dependencyResolver, oldModule, oldInstance); + } + + @Override + public void customValidation() {} + + @Override + public java.lang.AutoCloseable createInstance() { + final BindingAwareBroker.ProviderContext bindingCtx = getBindingBrokerDependency().registerProvider(new Providers.BindingAware()); + final Broker.ProviderSession domCtx = getDomBrokerDependency().registerProvider(new Providers.BindingIndependent()); + + final MountPointService mountPointService = bindingCtx.getSALService(MountPointService.class); + final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class); + + final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class); + final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class); + + return NetconfEventSourceManager.create(dataBroker, domPublish, domMount, + mountPointService, getEventSourceRegistryDependency(), getNamespaceToStream()); + } + +} diff --git a/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModuleFactory.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModuleFactory.java new file mode 100644 index 0000000000..7681cdf02d --- /dev/null +++ b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/config/yang/messagebus/netconf/MessageBusNetconfModuleFactory.java @@ -0,0 +1,13 @@ +/* +* Generated file +* +* Generated from: yang module name: messagebus-netconf yang module local name: messagebus-netconf +* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator +* Generated at: Wed Jul 29 14:15:30 CEST 2015 +* +* Do not modify this file unless it is present under src/main directory +*/ +package org.opendaylight.controller.config.yang.messagebus.netconf; +public class MessageBusNetconfModuleFactory extends org.opendaylight.controller.config.yang.messagebus.netconf.AbstractMessageBusNetconfModuleFactory { + +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java similarity index 77% rename from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java rename to opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java index 61e1af84bc..9d3b2696aa 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java +++ b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java @@ -7,15 +7,15 @@ */ package org.opendaylight.controller.messagebus.eventsources.netconf; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; - import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import javax.xml.transform.dom.DOMSource; - import org.opendaylight.controller.md.sal.dom.api.DOMNotification; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; @@ -33,15 +33,14 @@ import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; - public class ConnectionNotificationTopicRegistration extends NotificationTopicRegistration { private static final Logger LOG = LoggerFactory.getLogger(ConnectionNotificationTopicRegistration.class); - public static final SchemaPath EVENT_SOURCE_STATUS_PATH = SchemaPath.create(true, QName.create(EventSourceStatusNotification.QNAME, "event-source-status")); - private static final NodeIdentifier EVENT_SOURCE_STATUS_ARG = new NodeIdentifier(EventSourceStatusNotification.QNAME); + public static final SchemaPath EVENT_SOURCE_STATUS_PATH = SchemaPath + .create(true, QName.create(EventSourceStatusNotification.QNAME, "event-source-status")); + private static final NodeIdentifier EVENT_SOURCE_STATUS_ARG = new NodeIdentifier( + EventSourceStatusNotification.QNAME); private static final String XMLNS_ATTRIBUTE_KEY = "xmlns"; private static final String XMLNS_URI = "http://www.w3.org/2000/xmlns/"; @@ -49,16 +48,16 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); public ConnectionNotificationTopicRegistration(String SourceName, DOMNotificationListener domNotificationListener) { - super(NotificationSourceType.ConnectionStatusChange, SourceName, EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString()); + super(NotificationSourceType.ConnectionStatusChange, SourceName, + EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString()); this.domNotificationListener = Preconditions.checkNotNull(domNotificationListener); LOG.info("Connection notification source has been initialized."); setActive(true); setReplaySupported(false); } - @Override - public void close() throws Exception { - if(isActive()){ + @Override public void close() throws Exception { + if (isActive()) { LOG.debug("Connection notification - publish Deactive"); publishNotification(EventSourceStatus.Deactive); notificationTopicMap.clear(); @@ -66,36 +65,32 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe } } - @Override - void activateNotificationSource() { + @Override void activateNotificationSource() { LOG.debug("Connection notification - publish Active"); publishNotification(EventSourceStatus.Active); } - @Override - void deActivateNotificationSource() { + @Override void deActivateNotificationSource() { LOG.debug("Connection notification - publish Inactive"); publishNotification(EventSourceStatus.Inactive); } - @Override - void reActivateNotificationSource() { + @Override void reActivateNotificationSource() { LOG.debug("Connection notification - reactivate - publish active"); publishNotification(EventSourceStatus.Active); } - @Override - boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) { - if(checkNotificationPath(notificationPath) == false){ + @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) { + if (checkNotificationPath(notificationPath) == false) { LOG.debug("Bad SchemaPath for notification try to register"); return false; } ArrayList topicIds = getNotificationTopicIds(notificationPath); - if(topicIds == null){ + if (topicIds == null) { topicIds = new ArrayList<>(); topicIds.add(topicId); } else { - if(topicIds.contains(topicId) == false){ + if (topicIds.contains(topicId) == false) { topicIds.add(topicId); } } @@ -103,57 +98,50 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe return true; } - @Override - ArrayList getNotificationTopicIds(SchemaPath notificationPath) { + @Override ArrayList getNotificationTopicIds(SchemaPath notificationPath) { return notificationTopicMap.get(notificationPath); } - @Override - synchronized void unRegisterNotificationTopic(TopicId topicId) { + @Override synchronized void unRegisterNotificationTopic(TopicId topicId) { List notificationPathToRemove = new ArrayList<>(); - for(SchemaPath notifKey : notificationTopicMap.keySet()){ + for (SchemaPath notifKey : notificationTopicMap.keySet()) { ArrayList topicList = notificationTopicMap.get(notifKey); - if(topicList != null){ + if (topicList != null) { topicList.remove(topicId); - if(topicList.isEmpty()){ + if (topicList.isEmpty()) { notificationPathToRemove.add(notifKey); } } } - for(SchemaPath notifKey : notificationPathToRemove){ + for (SchemaPath notifKey : notificationPathToRemove) { notificationTopicMap.remove(notifKey); } } - private void publishNotification(EventSourceStatus eventSourceStatus){ + private void publishNotification(EventSourceStatus eventSourceStatus) { final EventSourceStatusNotification notification = new EventSourceStatusNotificationBuilder() - .setStatus(eventSourceStatus) - .build(); + .setStatus(eventSourceStatus).build(); domNotificationListener.onNotification(createNotification(notification)); } - private DOMNotification createNotification(EventSourceStatusNotification notification){ - final ContainerNode cn = Builders.containerBuilder() - .withNodeIdentifier(EVENT_SOURCE_STATUS_ARG) - .withChild(encapsulate(notification)) - .build(); + private DOMNotification createNotification(EventSourceStatusNotification notification) { + final ContainerNode cn = Builders.containerBuilder().withNodeIdentifier(EVENT_SOURCE_STATUS_ARG) + .withChild(encapsulate(notification)).build(); DOMNotification dn = new DOMNotification() { - @Override - public SchemaPath getType() { + @Override public SchemaPath getType() { return EVENT_SOURCE_STATUS_PATH; } - @Override - public ContainerNode getBody() { + @Override public ContainerNode getBody() { return cn; } }; return dn; } - private AnyXmlNode encapsulate(EventSourceStatusNotification notification){ + private AnyXmlNode encapsulate(EventSourceStatusNotification notification) { DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder docBuilder; @@ -167,25 +155,23 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe Document doc = docBuilder.newDocument(); final Optional namespace = Optional.of(EVENT_SOURCE_STATUS_ARG.getNodeType().getNamespace().toString()); - final Element rootElement = createElement(doc , "EventSourceStatusNotification", namespace); + final Element rootElement = createElement(doc, "EventSourceStatusNotification", namespace); final Element sourceElement = doc.createElement("status"); sourceElement.appendChild(doc.createTextNode(notification.getStatus().name())); rootElement.appendChild(sourceElement); - return Builders.anyXmlBuilder().withNodeIdentifier(EVENT_SOURCE_STATUS_ARG) - .withValue(new DOMSource(rootElement)) - .build(); + .withValue(new DOMSource(rootElement)).build(); } // Helper to create root XML element with correct namespace and attribute private Element createElement(final Document document, final String qName, final Optional namespaceURI) { - if(namespaceURI.isPresent()) { + if (namespaceURI.isPresent()) { final Element element = document.createElementNS(namespaceURI.get(), qName); String name = XMLNS_ATTRIBUTE_KEY; - if(element.getPrefix() != null) { + if (element.getPrefix() != null) { name += ":" + element.getPrefix(); } element.setAttributeNS(XMLNS_URI, name, namespaceURI.get()); diff --git a/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java new file mode 100644 index 0000000000..6ef3277f5a --- /dev/null +++ b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java @@ -0,0 +1,340 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.messagebus.eventsources.netconf; + +import static com.google.common.util.concurrent.Futures.immediateFuture; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.CheckedFuture; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.regex.Pattern; +import javax.xml.stream.XMLStreamException; +import javax.xml.transform.dom.DOMResult; +import javax.xml.transform.dom.DOMSource; +import org.opendaylight.controller.config.util.xml.XmlUtil; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.MountPoint; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMEvent; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMNotification; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.controller.messagebus.app.util.TopicDOMNotification; +import org.opendaylight.controller.messagebus.app.util.Util; +import org.opendaylight.controller.messagebus.spi.EventSource; +import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +public class NetconfEventSource implements EventSource, DOMNotificationListener { + + private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class); + + private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME); + private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier( + QName.create(TopicNotification.QNAME, "node-id")); + private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier( + QName.create(TopicNotification.QNAME, "topic-id")); + private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier( + QName.create(TopicNotification.QNAME, "payload")); + private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource"; + + private final String nodeId; + private final Node node; + + private final DOMMountPoint netconfMount; + private final MountPoint mountPoint; + private final DOMNotificationPublishService domPublish; + + private final Map urnPrefixToStreamMap; // key = urnPrefix, value = StreamName + private final List notificationTopicRegistrationList = new ArrayList<>(); + + public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, + final MountPoint mountPoint, final DOMNotificationPublishService publishService) { + this.netconfMount = Preconditions.checkNotNull(netconfMount); + this.mountPoint = Preconditions.checkNotNull(mountPoint); + this.node = Preconditions.checkNotNull(node); + this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap); + this.domPublish = Preconditions.checkNotNull(publishService); + this.nodeId = node.getNodeId().getValue(); + this.initializeNotificationTopicRegistrationList(); + + LOG.info("NetconfEventSource [{}] created.", this.nodeId); + } + + private void initializeNotificationTopicRegistrationList() { + notificationTopicRegistrationList + .add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this)); + Optional> streamMap = getAvailableStreams(); + if (streamMap.isPresent()) { + LOG.debug("Stream configuration compare..."); + for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) { + final String streamName = this.urnPrefixToStreamMap.get(urnPrefix); + LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName); + if (streamMap.get().containsKey(streamName)) { + LOG.debug("Stream containig on device"); + notificationTopicRegistrationList + .add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName), urnPrefix, this)); + } + } + } + } + + private Optional> getAvailableStreams() { + + Map streamMap = null; + InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); + Optional dataBroker = this.mountPoint.getService(DataBroker.class); + + if (dataBroker.isPresent()) { + LOG.debug("GET Available streams ..."); + ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction(); + CheckedFuture, ReadFailedException> checkFeature = tx + .read(LogicalDatastoreType.OPERATIONAL, pathStream); + + try { + Optional streams = checkFeature.checkedGet(); + if (streams.isPresent()) { + streamMap = new HashMap<>(); + for (Stream stream : streams.get().getStream()) { + LOG.debug("*** find stream {}", stream.getName().getValue()); + streamMap.put(stream.getName().getValue(), stream); + } + } + } catch (ReadFailedException e) { + LOG.warn("Can not read streams for node {}", this.nodeId); + } + + } else { + LOG.warn("No databroker on node {}", this.nodeId); + } + + return Optional.fromNullable(streamMap); + } + + @Override public Future> joinTopic(final JoinTopicInput input) { + LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId); + final NotificationPattern notificationPattern = input.getNotificationPattern(); + final List matchingNotifications = getMatchingNotifications(notificationPattern); + return registerTopic(input.getTopicId(), matchingNotifications); + + } + + @Override public Future> disJoinTopic(DisJoinTopicInput input) { + for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { + reg.unRegisterNotificationTopic(input.getTopicId()); + } + return Util.resultRpcSuccessFor((Void) null); + } + + private synchronized Future> registerTopic(final TopicId topicId, + final List notificationsToSubscribe) { + LOG.debug("Join topic {} - register", topicId); + JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; + if (notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false) { + LOG.debug("Notifications to subscribe has found - count {}", notificationsToSubscribe.size()); + final Optional notifyService = getDOMMountPoint() + .getService(DOMNotificationService.class); + if (notifyService.isPresent()) { + int registeredNotificationCount = 0; + for (SchemaPath schemaNotification : notificationsToSubscribe) { + for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { + LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(), + schemaNotification.getLastComponent().getLocalName()); + if (reg.checkNotificationPath(schemaNotification)) { + LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), + topicId.getValue()); + boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId); + if (regSuccess) { + registeredNotificationCount = registeredNotificationCount + 1; + } + } + } + } + if (registeredNotificationCount > 0) { + joinTopicStatus = JoinTopicStatus.Up; + } + } else { + LOG.warn("NO DOMNotification service on node {}", this.nodeId); + } + } else { + LOG.debug("Notifications to subscribe has NOT found"); + } + + final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build(); + return immediateFuture(RpcResultBuilder.success(output).build()); + + } + + public void reActivateStreams() { + for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { + LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId); + reg.reActivateNotificationSource(); + } + } + + public void deActivateStreams() { + for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { + LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId); + reg.deActivateNotificationSource(); + } + } + + @Override public void onNotification(final DOMNotification notification) { + SchemaPath notificationPath = notification.getType(); + Date notificationEventTime = null; + if (notification instanceof DOMEvent) { + notificationEventTime = ((DOMEvent) notification).getEventTime(); + } + for (NotificationTopicRegistration notifReg : notificationTopicRegistrationList) { + ArrayList topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath); + if (topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false) { + + if (notifReg instanceof StreamNotificationTopicRegistration) { + StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration) notifReg; + streamReg.setLastEventTime(notificationEventTime); + } + + for (TopicId topicId : topicIdsForNotification) { + publishNotification(notification, topicId); + LOG.debug("Notification {} has been published for TopicId {}", notification.getType(), + topicId.getValue()); + } + + } + } + } + + private void publishNotification(final DOMNotification notification, TopicId topicId) { + final ContainerNode topicNotification = Builders.containerBuilder().withNodeIdentifier(TOPIC_NOTIFICATION_ARG) + .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId)) + .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId)).withChild(encapsulate(notification)) + .build(); + try { + domPublish.putNotification(new TopicDOMNotification(topicNotification)); + } catch (final InterruptedException e) { + throw Throwables.propagate(e); + } + } + + private AnyXmlNode encapsulate(final DOMNotification body) { + // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools + final Document doc = XmlUtil.newDocument(); + final Optional namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString()); + final Element element = XmlUtil.createElement(doc, "payload", namespace); + + final DOMResult result = new DOMResult(element); + + final SchemaContext context = getDOMMountPoint().getSchemaContext(); + final SchemaPath schemaPath = body.getType(); + try { + NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context); + return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG).withValue(new DOMSource(element)).build(); + } catch (IOException | XMLStreamException e) { + LOG.error("Unable to encapsulate notification.", e); + throw Throwables.propagate(e); + } + } + + private List getMatchingNotifications(NotificationPattern notificationPattern) { + // FIXME: default language should already be regex + final String regex = Util.wildcardToRegex(notificationPattern.getValue()); + + final Pattern pattern = Pattern.compile(regex); + List availableNotifications = getAvailableNotifications(); + if (availableNotifications == null || availableNotifications.isEmpty()) { + return null; + } + return Util.expandQname(availableNotifications, pattern); + } + + @Override public void close() throws Exception { + for (NotificationTopicRegistration streamReg : notificationTopicRegistrationList) { + streamReg.close(); + } + } + + @Override public NodeKey getSourceNodeKey() { + return getNode().getKey(); + } + + @Override public List getAvailableNotifications() { + + final List availNotifList = new ArrayList<>(); + // add Event Source Connection status notification + availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); + + // FIXME: use SchemaContextListener to get changes asynchronously + final Set availableNotifications = getDOMMountPoint().getSchemaContext() + .getNotifications(); + // add all known notifications from netconf device + for (final NotificationDefinition nd : availableNotifications) { + availNotifList.add(nd.getPath()); + } + return availNotifList; + } + + public Node getNode() { + return node; + } + + DOMMountPoint getDOMMountPoint() { + return netconfMount; + } + + MountPoint getMountPoint() { + return mountPoint; + } + + NetconfNode getNetconfNode() { + return node.getAugmentation(NetconfNode.class); + } + +} diff --git a/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java new file mode 100644 index 0000000000..738bd88b87 --- /dev/null +++ b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.messagebus.eventsources.netconf; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.opendaylight.controller.config.yang.messagebus.netconf.NamespaceToStream; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.binding.api.MountPointService; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceManager.class); + private static final TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey( + new TopologyId(TopologyNetconf.QNAME.getLocalName())); + private static final InstanceIdentifier NETCONF_DEVICE_PATH = InstanceIdentifier.create(NetworkTopology.class) + .child(Topology.class, NETCONF_TOPOLOGY_KEY).child(Node.class); + + private final Map streamMap; + private final ConcurrentHashMap, NetconfEventSourceRegistration> registrationMap = new ConcurrentHashMap<>(); + private final DOMNotificationPublishService publishService; + private final DOMMountPointService domMounts; + private final MountPointService mountPointService; + private ListenerRegistration listenerRegistration; + private final EventSourceRegistry eventSourceRegistry; + + public static NetconfEventSourceManager create(final DataBroker dataBroker, + final DOMNotificationPublishService domPublish, final DOMMountPointService domMount, + final MountPointService bindingMount, final EventSourceRegistry eventSourceRegistry, + final List namespaceMapping) { + + final NetconfEventSourceManager eventSourceManager = new NetconfEventSourceManager(domPublish, domMount, + bindingMount, eventSourceRegistry, namespaceMapping); + + eventSourceManager.initialize(dataBroker); + + return eventSourceManager; + + } + + private NetconfEventSourceManager(final DOMNotificationPublishService domPublish, + final DOMMountPointService domMount, final MountPointService bindingMount, + final EventSourceRegistry eventSourceRegistry, final List namespaceMapping) { + + Preconditions.checkNotNull(domPublish); + Preconditions.checkNotNull(domMount); + Preconditions.checkNotNull(bindingMount); + Preconditions.checkNotNull(eventSourceRegistry); + Preconditions.checkNotNull(namespaceMapping); + this.streamMap = namespaceToStreamMapping(namespaceMapping); + this.domMounts = domMount; + this.mountPointService = bindingMount; + this.publishService = domPublish; + this.eventSourceRegistry = eventSourceRegistry; + } + + private void initialize(final DataBroker dataBroker) { + Preconditions.checkNotNull(dataBroker); + listenerRegistration = dataBroker + .registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, + DataChangeScope.SUBTREE); + LOG.info("NetconfEventSourceManager initialized."); + } + + private Map namespaceToStreamMapping(final List namespaceMapping) { + final Map streamMap = new HashMap<>(namespaceMapping.size()); + + for (final NamespaceToStream nToS : namespaceMapping) { + streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName()); + } + + return streamMap; + } + + @Override public void onDataChanged(final AsyncDataChangeEvent, DataObject> event) { + + LOG.debug("[DataChangeEvent, DataObject>: {}]", event); + for (final Map.Entry, DataObject> changeEntry : event.getCreatedData().entrySet()) { + if (changeEntry.getValue() instanceof Node) { + nodeCreated(changeEntry.getKey(), (Node) changeEntry.getValue()); + } + } + + for (final Map.Entry, DataObject> changeEntry : event.getUpdatedData().entrySet()) { + if (changeEntry.getValue() instanceof Node) { + nodeUpdated(changeEntry.getKey(), (Node) changeEntry.getValue()); + } + } + + for (InstanceIdentifier removePath : event.getRemovedPaths()) { + DataObject removeObject = event.getOriginalData().get(removePath); + if (removeObject instanceof Node) { + nodeRemoved(removePath); + } + } + + } + + private void nodeCreated(final InstanceIdentifier key, final Node node) { + Preconditions.checkNotNull(key); + if (validateNode(node) == false) { + LOG.warn("NodeCreated event : Node [{}] is null or not valid.", key.toString()); + return; + } + LOG.info("Netconf event source [{}] is creating...", key.toString()); + NetconfEventSourceRegistration nesr = NetconfEventSourceRegistration.create(key, node, this); + if (nesr != null) { + NetconfEventSourceRegistration nesrOld = registrationMap.put(key, nesr); + if (nesrOld != null) { + nesrOld.close(); + } + } + } + + private void nodeUpdated(final InstanceIdentifier key, final Node node) { + Preconditions.checkNotNull(key); + if (validateNode(node) == false) { + LOG.warn("NodeUpdated event : Node [{}] is null or not valid.", key.toString()); + return; + } + + LOG.info("Netconf event source [{}] is updating...", key.toString()); + NetconfEventSourceRegistration nesr = registrationMap.get(key); + if (nesr != null) { + nesr.updateStatus(); + } else { + nodeCreated(key, node); + } + } + + private void nodeRemoved(final InstanceIdentifier key) { + Preconditions.checkNotNull(key); + LOG.info("Netconf event source [{}] is removing...", key.toString()); + NetconfEventSourceRegistration nesr = registrationMap.remove(key); + if (nesr != null) { + nesr.close(); + } + } + + private boolean validateNode(final Node node) { + if (node == null) { + return false; + } + return isNetconfNode(node); + } + + Map getStreamMap() { + return streamMap; + } + + DOMNotificationPublishService getPublishService() { + return publishService; + } + + DOMMountPointService getDomMounts() { + return domMounts; + } + + EventSourceRegistry getEventSourceRegistry() { + return eventSourceRegistry; + } + + MountPointService getMountPointService() { + return mountPointService; + } + + private boolean isNetconfNode(final Node node) { + return node.getAugmentation(NetconfNode.class) != null; + } + + @Override public void close() { + listenerRegistration.close(); + for (final NetconfEventSourceRegistration reg : registrationMap.values()) { + reg.close(); + } + registrationMap.clear(); + } + +} \ No newline at end of file diff --git a/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java new file mode 100644 index 0000000000..18cdccc764 --- /dev/null +++ b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.eventsources.netconf; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.List; +import org.opendaylight.controller.md.sal.binding.api.MountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class to keep connection status of netconf node and event source registration object + */ +public class NetconfEventSourceRegistration implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceRegistration.class); + private static final YangInstanceIdentifier NETCONF_DEVICE_DOM_PATH = YangInstanceIdentifier.builder() + .node(NetworkTopology.QNAME).node(Topology.QNAME) + .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"), TopologyNetconf.QNAME.getLocalName()) + .node(Node.QNAME).build(); + private static final QName NODE_ID_QNAME = QName.create(Node.QNAME, "node-id"); + private static final String NotificationCapabilityPrefix = "(urn:ietf:params:xml:ns:netconf:notification"; + + private final Node node; + private final InstanceIdentifier instanceIdent; + private final NetconfEventSourceManager netconfEventSourceManager; + private ConnectionStatus currentNetconfConnStatus; + private EventSourceRegistration eventSourceRegistration; + + public static NetconfEventSourceRegistration create(final InstanceIdentifier instanceIdent, final Node node, + final NetconfEventSourceManager netconfEventSourceManager) { + Preconditions.checkNotNull(instanceIdent); + Preconditions.checkNotNull(node); + Preconditions.checkNotNull(netconfEventSourceManager); + if (isEventSource(node) == false) { + return null; + } + NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node, + netconfEventSourceManager); + nesr.updateStatus(); + LOG.debug("NetconfEventSourceRegistration for node {} has been initialized...", node.getNodeId().getValue()); + return nesr; + } + + private static boolean isEventSource(final Node node) { + final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class); + if (netconfNode == null) { + return false; + } + if (netconfNode.getAvailableCapabilities() == null) { + return false; + } + final List capabilities = netconfNode.getAvailableCapabilities().getAvailableCapability(); + if (capabilities == null || capabilities.isEmpty()) { + return false; + } + for (final String capability : netconfNode.getAvailableCapabilities().getAvailableCapability()) { + if (capability.startsWith(NotificationCapabilityPrefix)) { + return true; + } + } + + return false; + } + + private NetconfEventSourceRegistration(final InstanceIdentifier instanceIdent, final Node node, + final NetconfEventSourceManager netconfEventSourceManager) { + this.instanceIdent = instanceIdent; + this.node = node; + this.netconfEventSourceManager = netconfEventSourceManager; + this.eventSourceRegistration = null; + } + + public Node getNode() { + return node; + } + + Optional> getEventSourceRegistration() { + return Optional.fromNullable(eventSourceRegistration); + } + + NetconfNode getNetconfNode() { + return node.getAugmentation(NetconfNode.class); + } + + void updateStatus() { + ConnectionStatus netconfConnStatus = getNetconfNode().getConnectionStatus(); + LOG.info("Change status on node {}, new status is {}", this.node.getNodeId().getValue(), netconfConnStatus); + if (netconfConnStatus.equals(currentNetconfConnStatus)) { + return; + } + changeStatus(netconfConnStatus); + } + + private boolean checkConnectionStatusType(ConnectionStatus status) { + if (status == ConnectionStatus.Connected || status == ConnectionStatus.Connecting + || status == ConnectionStatus.UnableToConnect) { + return true; + } + return false; + } + + private void changeStatus(ConnectionStatus newStatus) { + Preconditions.checkNotNull(newStatus); + if (checkConnectionStatusType(newStatus) == false) { + throw new IllegalStateException("Unknown new Netconf Connection Status"); + } + if (this.currentNetconfConnStatus == null) { + if (newStatus == ConnectionStatus.Connected) { + registrationEventSource(); + } + } else if (this.currentNetconfConnStatus == ConnectionStatus.Connecting) { + if (newStatus == ConnectionStatus.Connected) { + if (this.eventSourceRegistration == null) { + registrationEventSource(); + } else { + // reactivate stream on registered event source (invoke publish notification about connection) + this.eventSourceRegistration.getInstance().reActivateStreams(); + } + } + } else if (this.currentNetconfConnStatus == ConnectionStatus.Connected) { + + if (newStatus == ConnectionStatus.Connecting || newStatus == ConnectionStatus.UnableToConnect) { + // deactivate streams on registered event source (invoke publish notification about connection) + this.eventSourceRegistration.getInstance().deActivateStreams(); + } + } else if (this.currentNetconfConnStatus == ConnectionStatus.UnableToConnect) { + if (newStatus == ConnectionStatus.Connected) { + if (this.eventSourceRegistration == null) { + registrationEventSource(); + } else { + // reactivate stream on registered event source (invoke publish notification about connection) + this.eventSourceRegistration.getInstance().reActivateStreams(); + } + } + } else { + throw new IllegalStateException("Unknown current Netconf Connection Status"); + } + this.currentNetconfConnStatus = newStatus; + } + + private void registrationEventSource() { + final Optional mountPoint = netconfEventSourceManager.getMountPointService() + .getMountPoint(instanceIdent); + final Optional domMountPoint = netconfEventSourceManager.getDomMounts() + .getMountPoint(domMountPath(node.getNodeId())); + EventSourceRegistration registration = null; + if (domMountPoint.isPresent() && mountPoint.isPresent()) { + final NetconfEventSource netconfEventSource = new NetconfEventSource(node, + netconfEventSourceManager.getStreamMap(), domMountPoint.get(), mountPoint.get(), + netconfEventSourceManager.getPublishService()); + registration = netconfEventSourceManager.getEventSourceRegistry().registerEventSource(netconfEventSource); + LOG.info("Event source {} has been registered", node.getNodeId().getValue()); + } + this.eventSourceRegistration = registration; + } + + private YangInstanceIdentifier domMountPath(final NodeId nodeId) { + return YangInstanceIdentifier.builder(NETCONF_DEVICE_DOM_PATH) + .nodeWithKey(Node.QNAME, NODE_ID_QNAME, nodeId.getValue()).build(); + } + + private void closeEventSourceRegistration() { + if (getEventSourceRegistration().isPresent()) { + getEventSourceRegistration().get().close(); + } + } + + @Override public void close() { + closeEventSourceRegistration(); + } + +} diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java similarity index 91% rename from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java rename to opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java index c12b67ed24..a452ac59fd 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java +++ b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java @@ -8,18 +8,16 @@ package org.opendaylight.controller.messagebus.eventsources.netconf; import java.util.ArrayList; - import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public abstract class NotificationTopicRegistration implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(NotificationTopicRegistration.class); - public enum NotificationSourceType{ + public enum NotificationSourceType { NetconfDeviceStream, ConnectionStatusChange; } @@ -30,7 +28,8 @@ public abstract class NotificationTopicRegistration implements AutoCloseable { private final String notificationUrnPrefix; private boolean replaySupported; - protected NotificationTopicRegistration(NotificationSourceType notificationSourceType, String sourceName, String notificationUrnPrefix) { + protected NotificationTopicRegistration(NotificationSourceType notificationSourceType, String sourceName, + String notificationUrnPrefix) { this.notificationSourceType = notificationSourceType; this.sourceName = sourceName; this.notificationUrnPrefix = notificationUrnPrefix; @@ -58,14 +57,16 @@ public abstract class NotificationTopicRegistration implements AutoCloseable { return notificationUrnPrefix; } - public boolean checkNotificationPath(SchemaPath notificationPath){ - if(notificationPath == null){ + public boolean checkNotificationPath(SchemaPath notificationPath) { + if (notificationPath == null) { return false; } String nameSpace = notificationPath.getLastComponent().toString(); - LOG.debug("CheckNotification - name space {} - NotificationUrnPrefix {}", nameSpace, getNotificationUrnPrefix()); + LOG.debug("CheckNotification - name space {} - NotificationUrnPrefix {}", nameSpace, + getNotificationUrnPrefix()); return nameSpace.startsWith(getNotificationUrnPrefix()); } + abstract void activateNotificationSource(); abstract void deActivateNotificationSource(); diff --git a/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java new file mode 100644 index 0000000000..21d4cde5c0 --- /dev/null +++ b/opendaylight/netconf/messagebus-netconf/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.eventsources.netconf; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StreamNotificationTopicRegistration extends NotificationTopicRegistration { + + private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class); + private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier( + QName.create(CreateSubscriptionInput.QNAME, "stream")); + private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath + .create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); + private static final NodeIdentifier START_TIME_SUBSCRIPTION = new NodeIdentifier( + QName.create(CreateSubscriptionInput.QNAME, "startTime")); + + final private DOMMountPoint domMountPoint; + final private String nodeId; + final private NetconfEventSource netconfEventSource; + final private Stream stream; + private Date lastEventTime; + + private ConcurrentHashMap> notificationRegistrationMap = new ConcurrentHashMap<>(); + private ConcurrentHashMap> notificationTopicMap = new ConcurrentHashMap<>(); + + public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix, + NetconfEventSource netconfEventSource) { + super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix); + this.domMountPoint = netconfEventSource.getDOMMountPoint(); + this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString(); + this.netconfEventSource = netconfEventSource; + this.stream = stream; + this.lastEventTime = null; + setReplaySupported(this.stream.isReplaySupport()); + setActive(false); + LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName()); + } + + void activateNotificationSource() { + if (isActive() == false) { + LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId); + final ContainerNode input = Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) + .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())).build(); + CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get() + .invokeRpc(CREATE_SUBSCRIPTION, input); + try { + csFuture.checkedGet(); + setActive(true); + } catch (DOMRpcException e) { + LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId); + setActive(false); + return; + } + } else { + LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId); + } + } + + void reActivateNotificationSource() { + if (isActive()) { + LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId); + DataContainerNodeAttrBuilder inputBuilder = Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) + .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())); + if (isReplaySupported() && this.getLastEventTime() != null) { + inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime())); + } + final ContainerNode input = inputBuilder.build(); + CheckedFuture csFuture = domMountPoint.getService(DOMRpcService.class).get() + .invokeRpc(CREATE_SUBSCRIPTION, input); + try { + csFuture.checkedGet(); + setActive(true); + } catch (DOMRpcException e) { + LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId); + setActive(false); + return; + } + } + } + + @Override void deActivateNotificationSource() { + // no operations need + } + + private void closeStream() { + if (isActive()) { + for (ListenerRegistration reg : notificationRegistrationMap.values()) { + reg.close(); + } + notificationRegistrationMap.clear(); + notificationTopicMap.clear(); + setActive(false); + } + } + + private String getStreamName() { + return getSourceName(); + } + + @Override ArrayList getNotificationTopicIds(SchemaPath notificationPath) { + return notificationTopicMap.get(notificationPath); + } + + @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) { + + if (checkNotificationPath(notificationPath) == false) { + LOG.debug("Bad SchemaPath for notification try to register"); + return false; + } + + final Optional notifyService = domMountPoint.getService(DOMNotificationService.class); + if (notifyService.isPresent() == false) { + LOG.debug("DOMNotificationService is not present"); + return false; + } + + activateNotificationSource(); + if (isActive() == false) { + LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), + notificationPath.toString()); + return false; + } + + ListenerRegistration registration = notifyService.get() + .registerNotificationListener(this.netconfEventSource, notificationPath); + notificationRegistrationMap.put(notificationPath, registration); + ArrayList topicIds = getNotificationTopicIds(notificationPath); + if (topicIds == null) { + topicIds = new ArrayList<>(); + topicIds.add(topicId); + } else { + if (topicIds.contains(topicId) == false) { + topicIds.add(topicId); + } + } + + notificationTopicMap.put(notificationPath, topicIds); + return true; + } + + @Override synchronized void unRegisterNotificationTopic(TopicId topicId) { + List notificationPathToRemove = new ArrayList<>(); + for (SchemaPath notifKey : notificationTopicMap.keySet()) { + ArrayList topicList = notificationTopicMap.get(notifKey); + if (topicList != null) { + topicList.remove(topicId); + if (topicList.isEmpty()) { + notificationPathToRemove.add(notifKey); + } + } + } + for (SchemaPath notifKey : notificationPathToRemove) { + notificationTopicMap.remove(notifKey); + ListenerRegistration reg = notificationRegistrationMap.remove(notifKey); + if (reg != null) { + reg.close(); + } + } + } + + Optional getLastEventTime() { + return Optional.fromNullable(lastEventTime); + } + + void setLastEventTime(Date lastEventTime) { + this.lastEventTime = lastEventTime; + } + + @Override public void close() throws Exception { + closeStream(); + } + +} diff --git a/opendaylight/netconf/messagebus-netconf/src/main/resources/initial/06-message-netconf.xml b/opendaylight/netconf/messagebus-netconf/src/main/resources/initial/06-message-netconf.xml new file mode 100644 index 0000000000..421085a118 --- /dev/null +++ b/opendaylight/netconf/messagebus-netconf/src/main/resources/initial/06-message-netconf.xml @@ -0,0 +1,44 @@ + + + + + + + + messagebus-netconf + binding-impl:messagebus-netconf + + dom:dom-broker-osgi-registry + dom-broker + + + prefix:binding-broker-osgi-registry + binding-osgi-broker + + + mb-esr:event-source-registry + messagebus-app-impl + + + urn:ietf:params:xml:ns:yang:smiv2 + SNMP + + + urn:ietf:params:xml:ns:yang:ietf-syslog-notification + SYSLOG + + + + + + + urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf?module=messagebus-netconf&revision=2015-07-28 + urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry?module=messagebus-event-source-registry&revision=2015-04-02 + + diff --git a/opendaylight/netconf/messagebus-netconf/src/main/yang/messagebus-netconf.yang b/opendaylight/netconf/messagebus-netconf/src/main/yang/messagebus-netconf.yang new file mode 100644 index 0000000000..780175b917 --- /dev/null +++ b/opendaylight/netconf/messagebus-netconf/src/main/yang/messagebus-netconf.yang @@ -0,0 +1,68 @@ +module messagebus-netconf { + yang-version 1; + namespace "urn:opendaylight:params:xml:ns:yang:controller:messagebus:netconf"; + prefix "msgb-netconf"; + + import config { prefix config; revision-date 2013-04-05; } + import opendaylight-md-sal-binding {prefix sal;} + import opendaylight-md-sal-dom {prefix dom;} + import messagebus-event-source-registry {prefix esr;} + + description + "Message bus netconf event source"; + + revision "2015-07-28" { + description "Message bus netconf event source initial definition"; + } + + identity messagebus-netconf { + base config:module-type; + config:java-name-prefix MessageBusNetconf; + } + + augment "/config:modules/config:module/config:configuration" { + case messagebus-netconf { + when "/config:modules/config:module/config:type = 'messagebus-netconf'"; + + container event-source-registry { + uses config:service-ref { + refine type { + mandatory true; + config:required-identity esr:event-source-registry; + } + } + } + + container dom-broker { + uses config:service-ref { + refine type { + mandatory true; + config:required-identity dom:dom-broker-osgi-registry; + } + } + } + + container binding-broker { + uses config:service-ref { + refine type { + mandatory true; + config:required-identity sal:binding-broker-osgi-registry; + } + } + } + + list namespace-to-stream { + key urn-prefix; + + leaf urn-prefix { + type string; + } + + leaf stream-name { + type string; + } + } + + } + } +} \ No newline at end of file diff --git a/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java b/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java new file mode 100644 index 0000000000..c85bf4789c --- /dev/null +++ b/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManagerTest.java @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.eventsources.netconf; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.notNull; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.config.yang.messagebus.netconf.NamespaceToStream; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.MountPoint; +import org.opendaylight.controller.md.sal.binding.api.MountPointService; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.messagebus.spi.EventSource; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; +import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +public class NetconfEventSourceManagerTest { + + NetconfEventSourceManager netconfEventSourceManager; + ListenerRegistration listenerRegistrationMock; + DOMMountPointService domMountPointServiceMock; + MountPointService mountPointServiceMock; + EventSourceRegistry eventSourceTopologyMock; + AsyncDataChangeEvent asyncDataChangeEventMock; + RpcProviderRegistry rpcProviderRegistryMock; + EventSourceRegistry eventSourceRegistry; + @BeforeClass + public static void initTestClass() throws IllegalAccessException, InstantiationException { + } + + @Before + public void setUp() throws Exception { + DataBroker dataBrokerMock = mock(DataBroker.class); + DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class); + domMountPointServiceMock = mock(DOMMountPointService.class); + mountPointServiceMock = mock(MountPointService.class); + eventSourceTopologyMock = mock(EventSourceRegistry.class); + rpcProviderRegistryMock = mock(RpcProviderRegistry.class); + eventSourceRegistry = mock(EventSourceRegistry.class); + List namespaceToStreamList = new ArrayList<>(); + + listenerRegistrationMock = mock(ListenerRegistration.class); + doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq( + AsyncDataBroker.DataChangeScope.SUBTREE)); + + Optional optionalDomMountServiceMock = (Optional) mock(Optional.class); + doReturn(true).when(optionalDomMountServiceMock).isPresent(); + doReturn(optionalDomMountServiceMock).when(domMountPointServiceMock).getMountPoint((YangInstanceIdentifier)notNull()); + + DOMMountPoint domMountPointMock = mock(DOMMountPoint.class); + doReturn(domMountPointMock).when(optionalDomMountServiceMock).get(); + + + Optional optionalBindingMountMock = mock(Optional.class); + doReturn(true).when(optionalBindingMountMock).isPresent(); + + MountPoint mountPointMock = mock(MountPoint.class); + doReturn(optionalBindingMountMock).when(mountPointServiceMock).getMountPoint(any(InstanceIdentifier.class)); + doReturn(mountPointMock).when(optionalBindingMountMock).get(); + + Optional optionalMpDataBroker = mock(Optional.class); + DataBroker mpDataBroker = mock(DataBroker.class); + doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class); + doReturn(true).when(optionalMpDataBroker).isPresent(); + doReturn(mpDataBroker).when(optionalMpDataBroker).get(); + + ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class); + doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction(); + CheckedFuture, ReadFailedException> checkFeature = (CheckedFuture, ReadFailedException>)mock(CheckedFuture.class); + InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); + doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream); + Optional avStreams = NetconfTestUtils.getAvailableStream("stream01", true); + doReturn(avStreams).when(checkFeature).checkedGet(); + + EventSourceRegistration esrMock = mock(EventSourceRegistration.class); + + netconfEventSourceManager = + NetconfEventSourceManager + .create(dataBrokerMock, domNotificationPublishServiceMock, domMountPointServiceMock, + mountPointServiceMock, eventSourceRegistry, namespaceToStreamList); + } + + @Test + public void onDataChangedCreateEventSourceTestByCreateEntry() throws Exception { + onDataChangedTestHelper(true,false,true, NetconfTestUtils.notification_capability_prefix); + netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); + verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class)); + } + + @Test + public void onDataChangedCreateEventSourceTestByUpdateEntry() throws Exception { + onDataChangedTestHelper(false,true,true, NetconfTestUtils.notification_capability_prefix); + netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); + verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class)); + } + + @Test + public void onDataChangedCreateEventSourceTestNotNeconf() throws Exception { + onDataChangedTestHelper(false,true,false, NetconfTestUtils.notification_capability_prefix); + netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); + verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class)); + } + + @Test + public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws Exception { + onDataChangedTestHelper(true,false,true,"bad-prefix"); + netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock); + verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class)); + } + + private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws Exception{ + asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class); + Map mapCreate = new HashMap<>(); + Map mapUpdate = new HashMap<>(); + + Node node01; + String nodeId = "Node01"; + doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData(); + doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData(); + + if(isNetconf){ + node01 = NetconfTestUtils + .getNetconfNode(nodeId, "node01.test.local", ConnectionStatus.Connected, notificationCapabilityPrefix); + + } else { + node01 = NetconfTestUtils.getNode(nodeId); + } + + if(create){ + mapCreate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01); + } + if(update){ + mapUpdate.put(NetconfTestUtils.getInstanceIdentifier(node01), node01); + } + + } + +} \ No newline at end of file diff --git a/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java b/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java new file mode 100644 index 0000000000..ed548caeb3 --- /dev/null +++ b/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceTest.java @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.eventsources.netconf; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.md.sal.binding.api.BindingService; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.MountPoint; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.md.sal.dom.api.DOMService; +import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; + +public class NetconfEventSourceTest { + + NetconfEventSource netconfEventSource; + DOMMountPoint domMountPointMock; + MountPoint mountPointMock; + JoinTopicInput joinTopicInputMock; + + @Before + public void setUp() throws Exception { + Map streamMap = new HashMap<>(); + streamMap.put("uriStr1", "string2"); + domMountPointMock = mock(DOMMountPoint.class); + mountPointMock = mock(MountPoint.class); + DOMNotificationPublishService domNotificationPublishServiceMock = mock(DOMNotificationPublishService.class); + RpcConsumerRegistry rpcConsumerRegistryMock = mock(RpcConsumerRegistry.class); + Optional onlyOptionalMock = (Optional) mock(Optional.class); + NotificationsService notificationsServiceMock = mock(NotificationsService.class); + doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class); + + Optional optionalMpDataBroker = (Optional) mock(Optional.class); + DataBroker mpDataBroker = mock(DataBroker.class); + doReturn(optionalMpDataBroker).when(mountPointMock).getService(DataBroker.class); + doReturn(true).when(optionalMpDataBroker).isPresent(); + doReturn(mpDataBroker).when(optionalMpDataBroker).get(); + + ReadOnlyTransaction rtx = mock(ReadOnlyTransaction.class); + doReturn(rtx).when(mpDataBroker).newReadOnlyTransaction(); + CheckedFuture, ReadFailedException> checkFeature = (CheckedFuture, ReadFailedException>)mock(CheckedFuture.class); + InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); + doReturn(checkFeature).when(rtx).read(LogicalDatastoreType.OPERATIONAL, pathStream); + Optional avStreams = NetconfTestUtils.getAvailableStream("stream01", true); + doReturn(avStreams).when(checkFeature).checkedGet(); + + netconfEventSource = new NetconfEventSource( + NetconfTestUtils.getNetconfNode("NodeId1", "node.test.local", ConnectionStatus.Connected, + NetconfTestUtils.notification_capability_prefix), + streamMap, + domMountPointMock, + mountPointMock , + domNotificationPublishServiceMock); + + } + + @Test + public void joinTopicTest() throws Exception{ + joinTopicTestHelper(); + assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock)); + } + + private void joinTopicTestHelper() throws Exception{ + joinTopicInputMock = mock(JoinTopicInput.class); + TopicId topicId = new TopicId("topicID007"); + doReturn(topicId).when(joinTopicInputMock).getTopicId(); + NotificationPattern notificationPatternMock = mock(NotificationPattern.class); + doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern(); + doReturn("uriStr1").when(notificationPatternMock).getValue(); + + SchemaContext schemaContextMock = mock(SchemaContext.class); + doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext(); + Set notificationDefinitionSet = new HashSet<>(); + NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class); + notificationDefinitionSet.add(notificationDefinitionMock); + + URI uri = new URI("uriStr1"); + QName qName = new QName(uri, "localName1"); + org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName); + doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications(); + doReturn(schemaPath).when(notificationDefinitionMock).getPath(); + + Optional domNotificationServiceOptionalMock = (Optional) mock(Optional.class); + doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class); + doReturn(true).when(domNotificationServiceOptionalMock).isPresent(); + + DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class); + doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get(); + ListenerRegistration listenerRegistrationMock = (ListenerRegistration)mock(ListenerRegistration.class); + doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(SchemaPath.class)); + + Optional optionalMock = (Optional) mock(Optional.class); + doReturn(optionalMock).when(domMountPointMock).getService(DOMRpcService.class); + doReturn(true).when(optionalMock).isPresent(); + DOMRpcService domRpcServiceMock = mock(DOMRpcService.class); + doReturn(domRpcServiceMock).when(optionalMock).get(); + CheckedFuture checkedFutureMock = mock(CheckedFuture.class); + doReturn(checkedFutureMock).when(domRpcServiceMock).invokeRpc(any(SchemaPath.class), any(ContainerNode.class)); + + } + +} \ No newline at end of file diff --git a/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java b/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java new file mode 100644 index 0000000000..3261ddbc20 --- /dev/null +++ b/opendaylight/netconf/messagebus-netconf/src/test/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfTestUtils.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.messagebus.eventsources.netconf; + +import com.google.common.base.Optional; +import java.util.ArrayList; +import java.util.List; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilities; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilitiesBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; + +public final class NetconfTestUtils { + + public static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification"; + + private NetconfTestUtils() { + } + + public static Node getNetconfNode(String nodeIdent, String hostName, ConnectionStatus cs, + String notificationCapabilityPrefix) { + + DomainName dn = new DomainName(hostName); + Host host = new Host(dn); + + List avCapList = new ArrayList<>(); + avCapList.add(notificationCapabilityPrefix + "_availableCapabilityString1"); + AvailableCapabilities avCaps = new AvailableCapabilitiesBuilder().setAvailableCapability(avCapList).build(); + NetconfNode nn = new NetconfNodeBuilder().setConnectionStatus(cs).setHost(host).setAvailableCapabilities(avCaps) + .build(); + + NodeId nodeId = new NodeId(nodeIdent); + NodeKey nk = new NodeKey(nodeId); + NodeBuilder nb = new NodeBuilder(); + nb.setKey(nk); + + nb.addAugmentation(NetconfNode.class, nn); + return nb.build(); + } + + public static Node getNode(String nodeIdent) { + NodeId nodeId = new NodeId(nodeIdent); + NodeKey nk = new NodeKey(nodeId); + NodeBuilder nb = new NodeBuilder(); + nb.setKey(nk); + return nb.build(); + } + + public static InstanceIdentifier getInstanceIdentifier(Node node) { + TopologyKey NETCONF_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())); + InstanceIdentifier nodeII = InstanceIdentifier.create(NetworkTopology.class) + .child(Topology.class, NETCONF_TOPOLOGY_KEY).child(Node.class, node.getKey()); + return nodeII; + } + + public static Optional getAvailableStream(String Name, boolean replaySupport) { + Stream stream = new StreamBuilder().setName(new StreamNameType(Name)).setReplaySupport(replaySupport).build(); + List streamList = new ArrayList<>(); + streamList.add(stream); + Streams streams = new StreamsBuilder().setStream(streamList).build(); + return Optional.of(streams); + } + +} diff --git a/opendaylight/netconf/netconf-artifacts/pom.xml b/opendaylight/netconf/netconf-artifacts/pom.xml index 269dd4d68c..4607befced 100644 --- a/opendaylight/netconf/netconf-artifacts/pom.xml +++ b/opendaylight/netconf/netconf-artifacts/pom.xml @@ -138,6 +138,11 @@ sal-netconf-connector ${mdsal.version} + + ${project.groupId} + messagebus-netconf + ${project.version} + ${project.groupId} features-netconf-connector diff --git a/opendaylight/netconf/pom.xml b/opendaylight/netconf/pom.xml index a98f632e8f..f33eb112bf 100644 --- a/opendaylight/netconf/pom.xml +++ b/opendaylight/netconf/pom.xml @@ -35,6 +35,7 @@ netconf-notifications-impl netconf-notifications-api sal-netconf-connector + messagebus-netconf features models tools -- 2.36.6