X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FNetconfEventSource.java;h=0d54beb644b1b6f23823c11283e9d03898b3f3b7;hp=9c0697f3fb3d666d7d4871e79a9de571f6d627ab;hb=35128aa4927b06a97e3d1f505a6852105dc81fed;hpb=ef65f3fb0d384aa06c119b8f88add65b073971e5;ds=sidebyside diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java index 9c0697f3fb..0d54beb644 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -8,57 +8,92 @@ package org.opendaylight.controller.messagebus.app.impl; -import com.google.common.base.Preconditions; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Future; import java.util.regex.Pattern; + +import javax.xml.stream.XMLStreamException; +import javax.xml.transform.dom.DOMResult; +import javax.xml.transform.dom.DOMSource; + import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.binding.api.MountPoint; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.mdsal.MdSAL; -import org.opendaylight.controller.sal.core.api.notify.NotificationListener; +import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMNotification; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry; +import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; -import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +import com.google.common.base.Optional; +import com.google.common.base.Throwables; + +public class NetconfEventSource implements EventSourceService, DOMNotificationListener, DataChangeListener { + + private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class); + + private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME); + private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id")); + private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload")); + + private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream")); + private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); -public class NetconfEventSource implements EventSourceService, NotificationListener, DataChangeListener { - private static final Logger LOGGER = LoggerFactory.getLogger(NetconfEventSource.class); - private final MdSAL mdSal; private final String nodeId; - private final List activeStreams = new ArrayList<>(); + + private final DOMMountPoint netconfMount; + private final DOMNotificationPublishService domPublish; + private final NotificationsService notificationRpcService; + + private final Set activeStreams = new ConcurrentSkipListSet<>(); private final Map urnPrefixToStreamMap; - public NetconfEventSource(final MdSAL mdSal, final String nodeId, final Map streamMap) { - Preconditions.checkNotNull(mdSal); - Preconditions.checkNotNull(nodeId); - this.mdSal = mdSal; + public NetconfEventSource(final String nodeId, final Map streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) { + this.netconfMount = netconfMount; + this.notificationRpcService = bindingMount.getService(RpcConsumerRegistry.class).get().getRpcService(NotificationsService.class); this.nodeId = nodeId; this.urnPrefixToStreamMap = streamMap; - - LOGGER.info("NetconfEventSource [{}] created.", nodeId); + this.domPublish = publishService; + LOG.info("NetconfEventSource [{}] created.", nodeId); } @Override @@ -69,63 +104,63 @@ public class NetconfEventSource implements EventSourceService, NotificationListe final String regex = Util.wildcardToRegex(notificationPattern.getValue()); final Pattern pattern = Pattern.compile(regex); - List matchingNotifications = Util.expandQname(availableNotifications(), pattern); + final List matchingNotifications = Util.expandQname(availableNotifications(), pattern); registerNotificationListener(matchingNotifications); - return null; + final JoinTopicOutput output = new JoinTopicOutputBuilder().build(); + return com.google.common.util.concurrent.Futures.immediateFuture(RpcResultBuilder.success(output).build()); } - private List availableNotifications() { + private List availableNotifications() { // FIXME: use SchemaContextListener to get changes asynchronously - Set availableNotifications = mdSal.getSchemaContext(nodeId).getNotifications(); - List qNs = new ArrayList<>(availableNotifications.size()); - for (NotificationDefinition nd : availableNotifications) { - qNs.add(nd.getQName()); + final Set availableNotifications = netconfMount.getSchemaContext().getNotifications(); + final List qNs = new ArrayList<>(availableNotifications.size()); + for (final NotificationDefinition nd : availableNotifications) { + qNs.add(nd.getPath()); } - return qNs; } - private void registerNotificationListener(final List notificationsToSubscribe) { - for (QName qName : notificationsToSubscribe) { - startSubscription(qName); - // FIXME: do not lose this registration - final ListenerRegistration reg = mdSal.addNotificationListener(nodeId, qName, this); + private void registerNotificationListener(final List notificationsToSubscribe) { + + final Optional notifyService = netconfMount.getService(DOMNotificationService.class); + if(notifyService.isPresent()) { + for (final SchemaPath qName : notificationsToSubscribe) { + startSubscription(qName); + } + // FIXME: Capture registration + notifyService.get().registerNotificationListener(this, notificationsToSubscribe); } } - private synchronized void startSubscription(final QName qName) { - String streamName = resolveStream(qName); + private void startSubscription(final SchemaPath path) { + final String streamName = resolveStream(path.getLastComponent()); if (streamIsActive(streamName) == false) { - LOGGER.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId); + LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId); startSubscription(streamName); } } - private synchronized void resubscribeToActiveStreams() { - for (String streamName : activeStreams) { + private void resubscribeToActiveStreams() { + for (final String streamName : activeStreams) { startSubscription(streamName); } } private synchronized void startSubscription(final String streamName) { - CreateSubscriptionInput subscriptionInput = getSubscriptionInput(streamName); - mdSal.getRpcService(nodeId, NotificationsService.class).createSubscription(subscriptionInput); + final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) + .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName)) + .build(); + netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); activeStreams.add(streamName); } - private static CreateSubscriptionInput getSubscriptionInput(final String streamName) { - CreateSubscriptionInputBuilder csib = new CreateSubscriptionInputBuilder(); - csib.setStream(new StreamNameType(streamName)); - return csib.build(); - } - private String resolveStream(final QName qName) { String streamName = null; - for (Map.Entry entry : urnPrefixToStreamMap.entrySet()) { - String nameSpace = qName.getNamespace().toString(); - String urnPrefix = entry.getKey(); + for (final Map.Entry entry : urnPrefixToStreamMap.entrySet()) { + final String nameSpace = qName.getNamespace().toString(); + final String urnPrefix = entry.getKey(); if( nameSpace.startsWith(urnPrefix) ) { streamName = entry.getValue(); break; @@ -139,24 +174,40 @@ public class NetconfEventSource implements EventSourceService, NotificationListe return activeStreams.contains(streamName); } - // PASS - @Override public Set getSupportedNotifications() { - return null; + @Override + public void onNotification(final DOMNotification notification) { + final ContainerNode topicNotification = Builders.containerBuilder() + .withNodeIdentifier(TOPIC_NOTIFICATION_ARG) + .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId)) + .withChild(encapsulate(notification)) + .build(); + try { + domPublish.putNotification(new TopicDOMNotification(topicNotification)); + } catch (final InterruptedException e) { + throw Throwables.propagate(e); + } } - @Override - public void onNotification(final CompositeNode notification) { - LOGGER.info("NetconfEventSource {} received notification {}. Will publish to MD-SAL.", nodeId, notification); - ImmutableCompositeNode payload = ImmutableCompositeNode.builder() - .setQName(QName.create(TopicNotification.QNAME, "payload")) - .add(notification).toInstance(); - ImmutableCompositeNode icn = ImmutableCompositeNode.builder() - .setQName(TopicNotification.QNAME) - .add(payload) - .addLeaf("event-source", nodeId) - .toInstance(); - - mdSal.publishNotification(icn); + private AnyXmlNode encapsulate(final DOMNotification body) { + // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools + final Document doc = XmlUtil.newDocument(); + final Optional namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString()); + final Element element = XmlUtil.createElement(doc , "payload", namespace); + + + final DOMResult result = new DOMResult(element); + + final SchemaContext context = netconfMount.getSchemaContext(); + final SchemaPath schemaPath = body.getType(); + try { + NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context); + return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG) + .withValue(new DOMSource(element)) + .build(); + } catch (IOException | XMLStreamException e) { + LOG.error("Unable to encapsulate notification.",e); + throw Throwables.propagate(e); + } } @Override @@ -164,16 +215,16 @@ public class NetconfEventSource implements EventSourceService, NotificationListe boolean wasConnected = false; boolean nowConnected = false; - for (Map.Entry, DataObject> changeEntry : change.getOriginalData().entrySet()) { + for (final Map.Entry, DataObject> changeEntry : change.getOriginalData().entrySet()) { if ( isNetconfNode(changeEntry) ) { - NetconfNode nn = (NetconfNode)changeEntry.getValue(); + final NetconfNode nn = (NetconfNode)changeEntry.getValue(); wasConnected = nn.isConnected(); } } - for (Map.Entry, DataObject> changeEntry : change.getUpdatedData().entrySet()) { + for (final Map.Entry, DataObject> changeEntry : change.getUpdatedData().entrySet()) { if ( isNetconfNode(changeEntry) ) { - NetconfNode nn = (NetconfNode)changeEntry.getValue(); + final NetconfNode nn = (NetconfNode)changeEntry.getValue(); nowConnected = nn.isConnected(); } }