X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Feventsources%2Fnetconf%2FNetconfEventSource.java;h=5eb32d64e75dc07e459647777316231eb97dfeaf;hb=refs%2Fchanges%2F13%2F23413%2F26;hp=615fa34b7c4d302aa005cd5b236a6661e4c4c354;hpb=08631886ab131bdd74a8364c894792a9ef7253e8;p=controller.git diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java index 615fa34b7c..5eb32d64e7 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java @@ -1,271 +1,340 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.messagebus.eventsources.netconf; - -import static com.google.common.util.concurrent.Futures.immediateFuture; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.Future; -import java.util.regex.Pattern; - -import javax.xml.stream.XMLStreamException; -import javax.xml.transform.dom.DOMResult; -import javax.xml.transform.dom.DOMSource; - -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.binding.api.MountPoint; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; -import org.opendaylight.controller.md.sal.dom.api.DOMNotification; -import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; -import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; -import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; -import org.opendaylight.controller.messagebus.app.impl.TopicDOMNotification; -import org.opendaylight.controller.messagebus.app.impl.Util; -import org.opendaylight.controller.messagebus.spi.EventSource; -import org.opendaylight.controller.netconf.util.xml.XmlUtil; -import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.binding.DataObject; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.impl.schema.Builders; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.w3c.dom.Document; -import org.w3c.dom.Element; - -import com.google.common.base.Optional; -import com.google.common.base.Throwables; - -public class NetconfEventSource implements EventSource, DOMNotificationListener, DataChangeListener { - - private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class); - - private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME); - private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id")); - private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload")); - - private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream")); - private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription")); - - private final String nodeId; - private final Node node; - - private final DOMMountPoint netconfMount; - private final DOMNotificationPublishService domPublish; - private final Set activeStreams = new ConcurrentSkipListSet<>(); - - private final Map urnPrefixToStreamMap; - private final ConcurrentHashMap> listenerRegistrationMap = new ConcurrentHashMap<>(); - - public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) { - this.netconfMount = netconfMount; - this.node = node; - this.nodeId = node.getNodeId().getValue(); - this.urnPrefixToStreamMap = streamMap; - this.domPublish = publishService; - LOG.info("NetconfEventSource [{}] created.", nodeId); - } - - @Override - public Future> joinTopic(final JoinTopicInput input) { - final NotificationPattern notificationPattern = input.getNotificationPattern(); - final List matchingNotifications = getMatchingNotifications(notificationPattern); - return registerNotificationListener(input.getTopicId(),matchingNotifications); - } - - private synchronized Future> registerNotificationListener(final TopicId topicId, final List notificationsToSubscribe){ - if(listenerRegistrationMap.containsKey(topicId)){ - final String errMessage = "Can not join topic twice. Topic " + topicId.getValue() + " has been joined to node " + this.nodeId; - return immediateFuture(RpcResultBuilder.failed().withError(ErrorType.APPLICATION, errMessage).build()); - } - ListenerRegistration registration = null; - JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; - final Optional notifyService = netconfMount.getService(DOMNotificationService.class); - - if(notifyService.isPresent()) { - for (final SchemaPath qName : notificationsToSubscribe) { - startSubscription(qName); - } - registration = notifyService.get().registerNotificationListener(this, notificationsToSubscribe); - } - - if(registration != null){ - listenerRegistrationMap.put(topicId,registration); - joinTopicStatus = JoinTopicStatus.Up; - } - final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build(); - return immediateFuture(RpcResultBuilder.success(output).build()); - } - - private void startSubscription(final SchemaPath path) { - final String streamName = resolveStream(path.getLastComponent()); - startSubscription(streamName); - } - - private void resubscribeToActiveStreams() { - for (final String streamName : activeStreams) { - startSubscription(streamName); - } - } - - private synchronized void startSubscription(final String streamName) { - if(streamIsActive(streamName) == false){ - LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId); - final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME)) - .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName)) - .build(); - netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input); - activeStreams.add(streamName); - } - } - - private String resolveStream(final QName qName) { - String streamName = null; - - for (final Map.Entry entry : urnPrefixToStreamMap.entrySet()) { - final String nameSpace = qName.getNamespace().toString(); - final String urnPrefix = entry.getKey(); - if( nameSpace.startsWith(urnPrefix) ) { - streamName = entry.getValue(); - break; - } - } - - return streamName; - } - - private boolean streamIsActive(final String streamName) { - return activeStreams.contains(streamName); - } - - @Override - public void onNotification(final DOMNotification notification) { - final ContainerNode topicNotification = Builders.containerBuilder() - .withNodeIdentifier(TOPIC_NOTIFICATION_ARG) - .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId)) - .withChild(encapsulate(notification)) - .build(); - try { - domPublish.putNotification(new TopicDOMNotification(topicNotification)); - } catch (final InterruptedException e) { - throw Throwables.propagate(e); - } - } - - private AnyXmlNode encapsulate(final DOMNotification body) { - // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools - final Document doc = XmlUtil.newDocument(); - final Optional namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString()); - final Element element = XmlUtil.createElement(doc , "payload", namespace); - - final DOMResult result = new DOMResult(element); - - final SchemaContext context = netconfMount.getSchemaContext(); - final SchemaPath schemaPath = body.getType(); - try { - NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context); - return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG) - .withValue(new DOMSource(element)) - .build(); - } catch (IOException | XMLStreamException e) { - LOG.error("Unable to encapsulate notification.",e); - throw Throwables.propagate(e); - } - } - - @Override - public void onDataChanged(final AsyncDataChangeEvent, DataObject> change) { - boolean wasConnected = false; - boolean nowConnected = false; - - for (final Map.Entry, DataObject> changeEntry : change.getOriginalData().entrySet()) { - if ( isNetconfNode(changeEntry) ) { - final NetconfNode nn = (NetconfNode)changeEntry.getValue(); - wasConnected = nn.isConnected(); - } - } - - for (final Map.Entry, DataObject> changeEntry : change.getUpdatedData().entrySet()) { - if ( isNetconfNode(changeEntry) ) { - final NetconfNode nn = (NetconfNode)changeEntry.getValue(); - nowConnected = nn.isConnected(); - } - } - - if (wasConnected == false && nowConnected == true) { - resubscribeToActiveStreams(); - } - } - - private static boolean isNetconfNode(final Map.Entry, DataObject> changeEntry ) { - return NetconfNode.class.equals(changeEntry.getKey().getTargetType()); - } - - private List getMatchingNotifications(NotificationPattern notificationPattern){ - // FIXME: default language should already be regex - final String regex = Util.wildcardToRegex(notificationPattern.getValue()); - - final Pattern pattern = Pattern.compile(regex); - return Util.expandQname(getAvailableNotifications(), pattern); - } - - @Override - public void close() throws Exception { - for(ListenerRegistration registration : listenerRegistrationMap.values()){ - registration.close(); - } - } - - @Override - public NodeKey getSourceNodeKey(){ - return node.getKey(); - } - - @Override - public List getAvailableNotifications() { - // FIXME: use SchemaContextListener to get changes asynchronously - final Set availableNotifications = netconfMount.getSchemaContext().getNotifications(); - final List qNs = new ArrayList<>(availableNotifications.size()); - for (final NotificationDefinition nd : availableNotifications) { - qNs.add(nd.getPath()); - } - return qNs; - } - -} +///* +// * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. +// * +// * This program and the accompanying materials are made available under the +// * terms of the Eclipse Public License v1.0 which accompanies this distribution, +// * and is available at http://www.eclipse.org/legal/epl-v10.html +// */ +// +//package org.opendaylight.controller.messagebus.eventsources.netconf; +// +//import static com.google.common.util.concurrent.Futures.immediateFuture; +// +//import java.io.IOException; +//import java.util.ArrayList; +//import java.util.Date; +//import java.util.HashMap; +//import java.util.List; +//import java.util.Map; +//import java.util.Set; +//import java.util.concurrent.Future; +//import java.util.regex.Pattern; +// +//import javax.xml.stream.XMLStreamException; +//import javax.xml.transform.dom.DOMResult; +//import javax.xml.transform.dom.DOMSource; +// +//import org.opendaylight.controller.md.sal.binding.api.DataBroker; +//import org.opendaylight.controller.md.sal.binding.api.MountPoint; +//import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +//import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +//import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +//import org.opendaylight.controller.md.sal.dom.api.DOMEvent; +//import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +//import org.opendaylight.controller.md.sal.dom.api.DOMNotification; +//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; +//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; +//import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +//import org.opendaylight.controller.messagebus.app.impl.TopicDOMNotification; +//import org.opendaylight.controller.messagebus.app.impl.Util; +//import org.opendaylight.controller.messagebus.spi.EventSource; +//import org.opendaylight.controller.config.util.xml.XmlUtil; +//import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; +//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; +//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification; +//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; +//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; +//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder; +//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus; +//import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput; +//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; +//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +//import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +//import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +//import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; +//import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +//import org.opendaylight.yangtools.yang.common.QName; +//import org.opendaylight.yangtools.yang.common.RpcResult; +//import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +//import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +//import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode; +//import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +//import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +//import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +//import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; +//import org.opendaylight.yangtools.yang.model.api.SchemaContext; +//import org.opendaylight.yangtools.yang.model.api.SchemaPath; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +//import org.w3c.dom.Document; +//import org.w3c.dom.Element; +// +//import com.google.common.base.Optional; +//import com.google.common.base.Preconditions; +//import com.google.common.base.Throwables; +//import com.google.common.util.concurrent.CheckedFuture; +// +//public class NetconfEventSource implements EventSource, DOMNotificationListener { +// +// private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class); +// +// private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME); +// private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id")); +// private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "topic-id")); +// private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload")); +// private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource"; +// +// private final String nodeId; +// private final Node node; +// +// private final DOMMountPoint netconfMount; +// private final MountPoint mountPoint; +// private final DOMNotificationPublishService domPublish; +// +// private final Map urnPrefixToStreamMap; // key = urnPrefix, value = StreamName +// private final List notificationTopicRegistrationList = new ArrayList<>(); +// +// public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, final MountPoint mountPoint, final DOMNotificationPublishService publishService) { +// this.netconfMount = Preconditions.checkNotNull(netconfMount); +// this.mountPoint = Preconditions.checkNotNull(mountPoint); +// this.node = Preconditions.checkNotNull(node); +// this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap); +// this.domPublish = Preconditions.checkNotNull(publishService); +// this.nodeId = node.getNodeId().getValue(); +// this.initializeNotificationTopicRegistrationList(); +// +// LOG.info("NetconfEventSource [{}] created.", this.nodeId); +// } +// +// private void initializeNotificationTopicRegistrationList() { +// notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this)); +// Optional> streamMap = getAvailableStreams(); +// if(streamMap.isPresent()){ +// LOG.debug("Stream configuration compare..."); +// for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) { +// final String streamName = this.urnPrefixToStreamMap.get(urnPrefix); +// LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName); +// if(streamMap.get().containsKey(streamName)){ +// LOG.debug("Stream containig on device"); +// notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this)); +// } +// } +// } +// } +// +// private Optional> getAvailableStreams(){ +// +// Map streamMap = null; +// InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); +// Optional dataBroker = this.mountPoint.getService(DataBroker.class); +// +// if(dataBroker.isPresent()){ +// LOG.debug("GET Available streams ..."); +// ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction(); +// CheckedFuture, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream); +// +// try { +// Optional streams = checkFeature.checkedGet(); +// if(streams.isPresent()){ +// streamMap = new HashMap<>(); +// for(Stream stream : streams.get().getStream()){ +// LOG.debug("*** find stream {}", stream.getName().getValue()); +// streamMap.put(stream.getName().getValue(), stream); +// } +// } +// } catch (ReadFailedException e) { +// LOG.warn("Can not read streams for node {}",this.nodeId); +// } +// +// } else { +// LOG.warn("No databroker on node {}", this.nodeId); +// } +// +// return Optional.fromNullable(streamMap); +// } +// +// @Override +// public Future> joinTopic(final JoinTopicInput input) { +// LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId); +// final NotificationPattern notificationPattern = input.getNotificationPattern(); +// final List matchingNotifications = getMatchingNotifications(notificationPattern); +// return registerTopic(input.getTopicId(),matchingNotifications); +// +// } +// +// @Override +// public Future> disJoinTopic(DisJoinTopicInput input) { +// for(NotificationTopicRegistration reg : notificationTopicRegistrationList){ +// reg.unRegisterNotificationTopic(input.getTopicId()); +// } +// return Util.resultRpcSuccessFor((Void) null) ; +// } +// +// private synchronized Future> registerTopic(final TopicId topicId, final List notificationsToSubscribe){ +// LOG.debug("Join topic {} - register"); +// JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; +// if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){ +// LOG.debug("Notifications to subscribe has found - count {}",notificationsToSubscribe.size() ); +// final Optional notifyService = getDOMMountPoint().getService(DOMNotificationService.class); +// if(notifyService.isPresent()){ +// int registeredNotificationCount = 0; +// for(SchemaPath schemaNotification : notificationsToSubscribe){ +// for(NotificationTopicRegistration reg : notificationTopicRegistrationList){ +// LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(), schemaNotification.getLastComponent().getLocalName()); +// if(reg.checkNotificationPath(schemaNotification)){ +// LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() ); +// boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId); +// if(regSuccess){ +// registeredNotificationCount = registeredNotificationCount +1; +// } +// } +// } +// } +// if(registeredNotificationCount > 0){ +// joinTopicStatus = JoinTopicStatus.Up; +// } +// } else { +// LOG.warn("NO DOMNotification service on node {}", this.nodeId); +// } +// } else { +// LOG.debug("Notifications to subscribe has NOT found"); +// } +// +// final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build(); +// return immediateFuture(RpcResultBuilder.success(output).build()); +// +// } +// +// public void reActivateStreams(){ +// for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { +// LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId); +// reg.reActivateNotificationSource(); +// } +// } +// +// public void deActivateStreams(){ +// for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { +// LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId); +// reg.deActivateNotificationSource(); +// } +// } +// +// @Override +// public void onNotification(final DOMNotification notification) { +// SchemaPath notificationPath = notification.getType(); +// Date notificationEventTime = null; +// if(notification instanceof DOMEvent){ +// notificationEventTime = ((DOMEvent) notification).getEventTime(); +// } +// for(NotificationTopicRegistration notifReg : notificationTopicRegistrationList){ +// ArrayList topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath); +// if(topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false){ +// +// if(notifReg instanceof StreamNotificationTopicRegistration){ +// StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration)notifReg; +// streamReg.setLastEventTime(notificationEventTime); +// } +// +// for(TopicId topicId : topicIdsForNotification){ +// publishNotification(notification, topicId); +// LOG.debug("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue()); +// } +// +// } +// } +// } +// +// private void publishNotification(final DOMNotification notification, TopicId topicId){ +// final ContainerNode topicNotification = Builders.containerBuilder() +// .withNodeIdentifier(TOPIC_NOTIFICATION_ARG) +// .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId)) +// .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId)) +// .withChild(encapsulate(notification)) +// .build(); +// try { +// domPublish.putNotification(new TopicDOMNotification(topicNotification)); +// } catch (final InterruptedException e) { +// throw Throwables.propagate(e); +// } +// } +// +// private AnyXmlNode encapsulate(final DOMNotification body) { +// // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools +// final Document doc = XmlUtil.newDocument(); +// final Optional namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString()); +// final Element element = XmlUtil.createElement(doc , "payload", namespace); +// +// final DOMResult result = new DOMResult(element); +// +// final SchemaContext context = getDOMMountPoint().getSchemaContext(); +// final SchemaPath schemaPath = body.getType(); +// try { +// NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context); +// return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG) +// .withValue(new DOMSource(element)) +// .build(); +// } catch (IOException | XMLStreamException e) { +// LOG.error("Unable to encapsulate notification.",e); +// throw Throwables.propagate(e); +// } +// } +// +// private List getMatchingNotifications(NotificationPattern notificationPattern){ +// // FIXME: default language should already be regex +// final String regex = Util.wildcardToRegex(notificationPattern.getValue()); +// +// final Pattern pattern = Pattern.compile(regex); +// List availableNotifications = getAvailableNotifications(); +// if(availableNotifications == null || availableNotifications.isEmpty()){ +// return null; +// } +// return Util.expandQname(availableNotifications, pattern); +// } +// +// @Override +// public void close() throws Exception { +// for(NotificationTopicRegistration streamReg : notificationTopicRegistrationList){ +// streamReg.close(); +// } +// } +// +// @Override +// public NodeKey getSourceNodeKey(){ +// return getNode().getKey(); +// } +// +// @Override +// public List getAvailableNotifications() { +// +// final List availNotifList = new ArrayList<>(); +// // add Event Source Connection status notification +// availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); +// +// // FIXME: use SchemaContextListener to get changes asynchronously +// final Set availableNotifications = getDOMMountPoint().getSchemaContext().getNotifications(); +// // add all known notifications from netconf device +// for (final NotificationDefinition nd : availableNotifications) { +// availNotifList.add(nd.getPath()); +// } +// return availNotifList; +// } +// +// public Node getNode() { +// return node; +// } +// +// DOMMountPoint getDOMMountPoint() { +// return netconfMount; +// } +// +// MountPoint getMountPoint() { +// return mountPoint; +// } +// +// NetconfNode getNetconfNode(){ +// return node.getAugmentation(NetconfNode.class); +// } +// +//}