Add missing copyright text
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / app / impl / NetconfEventSource.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.messagebus.app.impl;
10
11 import java.io.IOException;
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Set;
16 import java.util.concurrent.ConcurrentSkipListSet;
17 import java.util.concurrent.Future;
18 import java.util.regex.Pattern;
19
20 import javax.xml.stream.XMLStreamException;
21 import javax.xml.transform.dom.DOMResult;
22 import javax.xml.transform.dom.DOMSource;
23
24 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
25 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
27 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
28 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
29 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
30 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
31 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
33 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
34 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
35 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
36 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
37 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
38 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
39 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
40 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
41 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
43 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
45 import org.opendaylight.yangtools.yang.binding.DataObject;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.opendaylight.yangtools.yang.common.QName;
48 import org.opendaylight.yangtools.yang.common.RpcResult;
49 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
52 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
54 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
55 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import org.w3c.dom.Document;
61 import org.w3c.dom.Element;
62
63 import com.google.common.base.Optional;
64 import com.google.common.base.Throwables;
65
66 public class NetconfEventSource implements EventSourceService, DOMNotificationListener, DataChangeListener {
67
68     private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
69
70     private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = new NodeIdentifier(TopicNotification.QNAME);
71     private static final NodeIdentifier EVENT_SOURCE_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "node-id"));
72     private static final NodeIdentifier PAYLOAD_ARG = new NodeIdentifier(QName.create(TopicNotification.QNAME, "payload"));
73
74     private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
75     private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
76
77
78     private final String nodeId;
79
80
81     private final DOMMountPoint netconfMount;
82     private final DOMNotificationPublishService domPublish;
83     private final NotificationsService notificationRpcService;
84
85     private final Set<String> activeStreams = new ConcurrentSkipListSet<>();
86
87     private final Map<String, String> urnPrefixToStreamMap;
88
89
90     public NetconfEventSource(final String nodeId, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) {
91         this.netconfMount = netconfMount;
92         this.notificationRpcService = bindingMount.getService(RpcConsumerRegistry.class).get().getRpcService(NotificationsService.class);
93         this.nodeId = nodeId;
94         this.urnPrefixToStreamMap = streamMap;
95         this.domPublish = publishService;
96         LOG.info("NetconfEventSource [{}] created.", nodeId);
97     }
98
99     @Override
100     public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
101         final NotificationPattern notificationPattern = input.getNotificationPattern();
102
103         // FIXME: default language should already be regex
104         final String regex = Util.wildcardToRegex(notificationPattern.getValue());
105
106         final Pattern pattern = Pattern.compile(regex);
107         final List<SchemaPath> matchingNotifications = Util.expandQname(availableNotifications(), pattern);
108         registerNotificationListener(matchingNotifications);
109         final JoinTopicOutput output = new JoinTopicOutputBuilder().build();
110         return com.google.common.util.concurrent.Futures.immediateFuture(RpcResultBuilder.success(output).build());
111     }
112
113     private List<SchemaPath> availableNotifications() {
114         // FIXME: use SchemaContextListener to get changes asynchronously
115         final Set<NotificationDefinition> availableNotifications = netconfMount.getSchemaContext().getNotifications();
116         final List<SchemaPath> qNs = new ArrayList<>(availableNotifications.size());
117         for (final NotificationDefinition nd : availableNotifications) {
118             qNs.add(nd.getPath());
119         }
120         return qNs;
121     }
122
123     private void registerNotificationListener(final List<SchemaPath> notificationsToSubscribe) {
124
125         final Optional<DOMNotificationService> notifyService = netconfMount.getService(DOMNotificationService.class);
126         if(notifyService.isPresent()) {
127             for (final SchemaPath qName : notificationsToSubscribe) {
128                 startSubscription(qName);
129             }
130             // FIXME: Capture registration
131             notifyService.get().registerNotificationListener(this, notificationsToSubscribe);
132         }
133     }
134
135     private void startSubscription(final SchemaPath path) {
136         final String streamName = resolveStream(path.getLastComponent());
137
138         if (streamIsActive(streamName) == false) {
139             LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId);
140             startSubscription(streamName);
141         }
142     }
143
144     private void resubscribeToActiveStreams() {
145         for (final String streamName : activeStreams) {
146             startSubscription(streamName);
147         }
148     }
149
150     private synchronized void startSubscription(final String streamName) {
151         final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
152             .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName))
153             .build();
154         netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
155         activeStreams.add(streamName);
156     }
157
158     private String resolveStream(final QName qName) {
159         String streamName = null;
160
161         for (final Map.Entry<String, String> entry : urnPrefixToStreamMap.entrySet()) {
162             final String nameSpace = qName.getNamespace().toString();
163             final String urnPrefix = entry.getKey();
164             if( nameSpace.startsWith(urnPrefix) ) {
165                 streamName = entry.getValue();
166                 break;
167             }
168         }
169
170         return streamName;
171     }
172
173     private boolean streamIsActive(final String streamName) {
174         return activeStreams.contains(streamName);
175     }
176
177     @Override
178     public void onNotification(final DOMNotification notification) {
179         final ContainerNode topicNotification = Builders.containerBuilder()
180                 .withNodeIdentifier(TOPIC_NOTIFICATION_ARG)
181                 .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, nodeId))
182                 .withChild(encapsulate(notification))
183                 .build();
184         try {
185             domPublish.putNotification(new TopicDOMNotification(topicNotification));
186         } catch (final InterruptedException e) {
187             throw Throwables.propagate(e);
188         }
189     }
190
191     private AnyXmlNode encapsulate(final DOMNotification body) {
192         // FIXME: Introduce something like AnyXmlWithNormalizedNodeData in Yangtools
193         final Document doc = XmlUtil.newDocument();
194         final Optional<String> namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString());
195         final Element element = XmlUtil.createElement(doc , "payload", namespace);
196
197
198         final DOMResult result = new DOMResult(element);
199
200         final SchemaContext context = netconfMount.getSchemaContext();
201         final SchemaPath schemaPath = body.getType();
202         try {
203             NetconfMessageTransformUtil.writeNormalizedNode(body.getBody(), result, schemaPath, context);
204             return Builders.anyXmlBuilder().withNodeIdentifier(PAYLOAD_ARG)
205                     .withValue(new DOMSource(element))
206                     .build();
207         } catch (IOException | XMLStreamException e) {
208             LOG.error("Unable to encapsulate notification.",e);
209             throw Throwables.propagate(e);
210         }
211     }
212
213     @Override
214     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
215         boolean wasConnected = false;
216         boolean nowConnected = false;
217
218         for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getOriginalData().entrySet()) {
219             if ( isNetconfNode(changeEntry) ) {
220                 final NetconfNode nn = (NetconfNode)changeEntry.getValue();
221                 wasConnected = nn.isConnected();
222             }
223         }
224
225         for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getUpdatedData().entrySet()) {
226             if ( isNetconfNode(changeEntry) ) {
227                 final NetconfNode nn = (NetconfNode)changeEntry.getValue();
228                 nowConnected = nn.isConnected();
229             }
230         }
231
232         if (wasConnected == false && nowConnected == true) {
233             resubscribeToActiveStreams();
234         }
235     }
236
237     private static boolean isNetconfNode(final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry )  {
238         return NetconfNode.class.equals(changeEntry.getKey().getTargetType());
239     }
240
241 }