///* // * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. // * // * This program and the accompanying materials are made available under the // * terms of the Eclipse Public License v1.0 which accompanies this distribution, // * and is available at http://www.eclipse.org/legal/epl-v10.html // */ // //package org.opendaylight.controller.messagebus.eventsources.netconf; // //import static com.google.common.util.concurrent.Futures.immediateFuture; // //import java.io.IOException; //import java.util.ArrayList; //import java.util.Date; //import java.util.HashMap; //import java.util.List; //import java.util.Map; //import java.util.Set; //import java.util.concurrent.Future; //import java.util.regex.Pattern; // //import javax.xml.stream.XMLStreamException; //import javax.xml.transform.dom.DOMResult; //import javax.xml.transform.dom.DOMSource; // //import org.opendaylight.controller.md.sal.binding.api.DataBroker; //import org.opendaylight.controller.md.sal.binding.api.MountPoint; //import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; //import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; //import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; //import org.opendaylight.controller.md.sal.dom.api.DOMEvent; //import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; //import org.opendaylight.controller.md.sal.dom.api.DOMNotification; //import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; //import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService; //import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; //import org.opendaylight.controller.messagebus.app.impl.TopicDOMNotification; //import org.opendaylight.controller.messagebus.app.impl.Util; //import org.opendaylight.controller.messagebus.spi.EventSource; //import org.opendaylight.controller.config.util.xml.XmlUtil; //import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil; //import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; //import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; //import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification; //import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; //import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; //import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder; //import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus; //import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput; //import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; //import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; //import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; //import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; //import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; //import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; //import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; //import org.opendaylight.yangtools.yang.common.QName; //import org.opendaylight.yangtools.yang.common.RpcResult; //import org.opendaylight.yangtools.yang.common.RpcResultBuilder; //import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; //import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode; //import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; //import org.opendaylight.yangtools.yang.data.impl.schema.Builders; //import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; //import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; //import org.opendaylight.yangtools.yang.model.api.SchemaContext; //import org.opendaylight.yangtools.yang.model.api.SchemaPath; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; //import org.w3c.dom.Document; //import org.w3c.dom.Element; // //import com.google.common.base.Optional; //import com.google.common.base.Preconditions; //import com.google.common.base.Throwables; //import com.google.common.util.concurrent.CheckedFuture; // //public class NetconfEventSource implements EventSource, DOMNotificationListener { // // private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class); // // private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME); // private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id")); // private static final NodeIdentifier TOPIC_ID_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "topic-id")); // private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload")); // private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource"; // // private final String nodeId; // private final Node node; // // private final DOMMountPoint netconfMount; // private final MountPoint mountPoint; // private final DOMNotificationPublishService domPublish; // // private final Map urnPrefixToStreamMap; // key = urnPrefix, value = StreamName // private final List notificationTopicRegistrationList = new ArrayList<>(); // // public NetconfEventSource(final Node node, final Map streamMap, final DOMMountPoint netconfMount, final MountPoint mountPoint, final DOMNotificationPublishService publishService) { // this.netconfMount = Preconditions.checkNotNull(netconfMount); // this.mountPoint = Preconditions.checkNotNull(mountPoint); // this.node = Preconditions.checkNotNull(node); // this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap); // this.domPublish = Preconditions.checkNotNull(publishService); // this.nodeId = node.getNodeId().getValue(); // this.initializeNotificationTopicRegistrationList(); // // LOG.info("NetconfEventSource [{}] created.", this.nodeId); // } // // private void initializeNotificationTopicRegistrationList() { // notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this)); // Optional> streamMap = getAvailableStreams(); // if(streamMap.isPresent()){ // LOG.debug("Stream configuration compare..."); // for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) { // final String streamName = this.urnPrefixToStreamMap.get(urnPrefix); // LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName); // if(streamMap.get().containsKey(streamName)){ // LOG.debug("Stream containig on device"); // notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this)); // } // } // } // } // // private Optional> getAvailableStreams(){ // // Map streamMap = null; // InstanceIdentifier pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build(); // Optional dataBroker = this.mountPoint.getService(DataBroker.class); // // if(dataBroker.isPresent()){ // LOG.debug("GET Available streams ..."); // ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction(); // CheckedFuture, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream); // // try { // Optional streams = checkFeature.checkedGet(); // if(streams.isPresent()){ // streamMap = new HashMap<>(); // for(Stream stream : streams.get().getStream()){ // LOG.debug("*** find stream {}", stream.getName().getValue()); // streamMap.put(stream.getName().getValue(), stream); // } // } // } catch (ReadFailedException e) { // LOG.warn("Can not read streams for node {}",this.nodeId); // } // // } else { // LOG.warn("No databroker on node {}", this.nodeId); // } // // return Optional.fromNullable(streamMap); // } // // @Override // public Future> joinTopic(final JoinTopicInput input) { // LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId); // final NotificationPattern notificationPattern = input.getNotificationPattern(); // final List matchingNotifications = getMatchingNotifications(notificationPattern); // return registerTopic(input.getTopicId(),matchingNotifications); // // } // // @Override // public Future> disJoinTopic(DisJoinTopicInput input) { // for(NotificationTopicRegistration reg : notificationTopicRegistrationList){ // reg.unRegisterNotificationTopic(input.getTopicId()); // } // return Util.resultRpcSuccessFor((Void) null) ; // } // // private synchronized Future> registerTopic(final TopicId topicId, final List notificationsToSubscribe){ // LOG.debug("Join topic {} - register"); // JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; // if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){ // LOG.debug("Notifications to subscribe has found - count {}",notificationsToSubscribe.size() ); // final Optional notifyService = getDOMMountPoint().getService(DOMNotificationService.class); // if(notifyService.isPresent()){ // int registeredNotificationCount = 0; // for(SchemaPath schemaNotification : notificationsToSubscribe){ // for(NotificationTopicRegistration reg : notificationTopicRegistrationList){ // LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(), schemaNotification.getLastComponent().getLocalName()); // if(reg.checkNotificationPath(schemaNotification)){ // LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() ); // boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId); // if(regSuccess){ // registeredNotificationCount = registeredNotificationCount +1; // } // } // } // } // if(registeredNotificationCount > 0){ // joinTopicStatus = JoinTopicStatus.Up; // } // } else { // LOG.warn("NO DOMNotification service on node {}", this.nodeId); // } // } else { // LOG.debug("Notifications to subscribe has NOT found"); // } // // final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build(); // return immediateFuture(RpcResultBuilder.success(output).build()); // // } // // public void reActivateStreams(){ // for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { // LOG.info("Source of notification {} is reactivating on node {}", reg.getSourceName(), this.nodeId); // reg.reActivateNotificationSource(); // } // } // // public void deActivateStreams(){ // for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { // LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId); // reg.deActivateNotificationSource(); // } // } // // @Override // public void onNotification(final DOMNotification notification) { // SchemaPath notificationPath = notification.getType(); // Date notificationEventTime = null; // if(notification instanceof DOMEvent){ // notificationEventTime = ((DOMEvent) notification).getEventTime(); // } // for(NotificationTopicRegistration notifReg : notificationTopicRegistrationList){ // ArrayList topicIdsForNotification = notifReg.getNotificationTopicIds(notificationPath); // if(topicIdsForNotification != null && topicIdsForNotification.isEmpty() == false){ // // if(notifReg instanceof StreamNotificationTopicRegistration){ // StreamNotificationTopicRegistration streamReg = (StreamNotificationTopicRegistration)notifReg; // streamReg.setLastEventTime(notificationEventTime); // } // // for(TopicId topicId : topicIdsForNotification){ // publishNotification(notification, topicId); // LOG.debug("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue()); // } // // } // } // } // // private void publishNotification(final DOMNotification notification, TopicId topicId){ // final ContainerNode topicNotification = Builders.containerBuilder() // .withNodeIdentifier(TOPIC_NOTIFICATION_ARG) // .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId)) // .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, this.nodeId)) // .withChild(encapsulate(notification)) // .build(); // try { // domPublish.putNotification(new TopicDOMNotification(topicNotification)); // } catch (final InterruptedException e) { // throw Throwables.propagate(e); // } // } // // private AnyXmlNode encapsulate(final DOMNotification body) { // // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools // final Document doc = XmlUtil.newDocument(); // final Optional namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString()); // final Element element = XmlUtil.createElement(doc , "payload", namespace); // // final DOMResult result = new DOMResult(element); // // final SchemaContext context = getDOMMountPoint().getSchemaContext(); // final SchemaPath schemaPath = body.getType(); // try { // NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context); // return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG) // .withValue(new DOMSource(element)) // .build(); // } catch (IOException | XMLStreamException e) { // LOG.error("Unable to encapsulate notification.",e); // throw Throwables.propagate(e); // } // } // // private List getMatchingNotifications(NotificationPattern notificationPattern){ // // FIXME: default language should already be regex // final String regex = Util.wildcardToRegex(notificationPattern.getValue()); // // final Pattern pattern = Pattern.compile(regex); // List availableNotifications = getAvailableNotifications(); // if(availableNotifications == null || availableNotifications.isEmpty()){ // return null; // } // return Util.expandQname(availableNotifications, pattern); // } // // @Override // public void close() throws Exception { // for(NotificationTopicRegistration streamReg : notificationTopicRegistrationList){ // streamReg.close(); // } // } // // @Override // public NodeKey getSourceNodeKey(){ // return getNode().getKey(); // } // // @Override // public List getAvailableNotifications() { // // final List availNotifList = new ArrayList<>(); // // add Event Source Connection status notification // availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH); // // // FIXME: use SchemaContextListener to get changes asynchronously // final Set availableNotifications = getDOMMountPoint().getSchemaContext().getNotifications(); // // add all known notifications from netconf device // for (final NotificationDefinition nd : availableNotifications) { // availNotifList.add(nd.getPath()); // } // return availNotifList; // } // // public Node getNode() { // return node; // } // // DOMMountPoint getDOMMountPoint() { // return netconfMount; // } // // MountPoint getMountPoint() { // return mountPoint; // } // // NetconfNode getNetconfNode(){ // return node.getAugmentation(NetconfNode.class); // } // //}