1d32b39e23bb077c59120d7bcd5a78c09db0fe35
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / NotificationListenerAdapter.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.OutputStreamWriter;
21 import java.io.StringReader;
22 import java.io.UnsupportedEncodingException;
23 import java.util.Collection;
24 import java.util.Date;
25 import java.util.Set;
26 import java.util.concurrent.Executors;
27 import javax.xml.parsers.DocumentBuilder;
28 import javax.xml.parsers.DocumentBuilderFactory;
29 import javax.xml.stream.XMLOutputFactory;
30 import javax.xml.stream.XMLStreamException;
31 import javax.xml.stream.XMLStreamWriter;
32 import javax.xml.transform.OutputKeys;
33 import javax.xml.transform.Transformer;
34 import javax.xml.transform.TransformerException;
35 import javax.xml.transform.TransformerFactory;
36 import javax.xml.transform.dom.DOMResult;
37 import javax.xml.transform.dom.DOMSource;
38 import javax.xml.transform.stream.StreamResult;
39 import javax.xml.xpath.XPath;
40 import javax.xml.xpath.XPathConstants;
41 import javax.xml.xpath.XPathFactory;
42 import org.json.JSONObject;
43 import org.json.XML;
44 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
45 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
46 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
47 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
48 import org.opendaylight.yangtools.concepts.ListenerRegistration;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
52 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
53 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
54 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
55 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
56 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
57 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
59 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62 import org.w3c.dom.Document;
63 import org.w3c.dom.Element;
64 import org.w3c.dom.Node;
65 import org.xml.sax.InputSource;
66
67 /**
68  * {@link NotificationListenerAdapter} is responsible to track events on
69  * notifications.
70  *
71  */
72 public class NotificationListenerAdapter implements DOMNotificationListener {
73
74     private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
75     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
76
77     private final String streamName;
78     private ListenerRegistration<DOMNotificationListener> registration;
79     private Set<Channel> subscribers = new ConcurrentSet<>();
80     private final EventBus eventBus;
81     private final EventBusChangeRecorder eventBusChangeRecorder;
82
83     private final SchemaPath path;
84     private final String outputType;
85     private Date start = null;
86     private Date stop = null;
87     private String filter;
88
89     /**
90      * Set path of listener and stream name, register event bus.
91      *
92      * @param path
93      *            - path of notification
94      * @param streamName
95      *            - stream name of listener
96      * @param outputType
97      *            - type of output on notification (JSON, XML)
98      */
99     NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
100         this.outputType = outputType;
101         Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
102         Preconditions.checkArgument(path != null);
103         this.path = path;
104         this.streamName = streamName;
105         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
106         this.eventBusChangeRecorder = new EventBusChangeRecorder();
107         this.eventBus.register(this.eventBusChangeRecorder);
108     }
109
110     @Override
111     public void onNotification(final DOMNotification notification) {
112         final Date now = new Date();
113         if (this.stop != null) {
114             if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
115                 checkFilter(notification);
116             }
117             if (this.stop.compareTo(now) < 0) {
118                 try {
119                     this.close();
120                 } catch (final Exception e) {
121                     throw new RestconfDocumentedException("Problem with unregister listener." + e);
122                 }
123             }
124         } else if (this.start != null) {
125             if (this.start.compareTo(now) < 0) {
126                 this.start = null;
127                 checkFilter(notification);
128             }
129         } else {
130             checkFilter(notification);
131         }
132     }
133
134     /**
135      * Check if is filter used and then prepare and post data do client
136      *
137      * @param notification
138      *            - data of notification
139      */
140     private void checkFilter(final DOMNotification notification) {
141         final String xml = prepareXmlFrom(notification);
142         if (this.filter == null) {
143             prepareAndPostData(xml);
144         } else {
145             try {
146                 if (parseFilterParam(xml)) {
147                     prepareAndPostData(xml);
148                 }
149             } catch (final Exception e) {
150                 throw new RestconfDocumentedException("Problem while parsing filter.", e);
151             }
152         }
153     }
154
155     /**
156      * Parse and evaluate filter value by xml
157      *
158      * @param xml
159      *            - notification data in xml
160      * @return true or false - depends on filter expression and data of
161      *         notifiaction
162      * @throws Exception
163      */
164     private boolean parseFilterParam(final String xml) throws Exception {
165         final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
166         final DocumentBuilder builder = factory.newDocumentBuilder();
167         final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
168         final XPath xPath = XPathFactory.newInstance().newXPath();
169         return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
170     }
171
172     /**
173      * Prepare data of notification and data to client
174      *
175      * @param xml
176      */
177     private void prepareAndPostData(final String xml) {
178         final Event event = new Event(EventType.NOTIFY);
179         if (this.outputType.equals("JSON")) {
180             final JSONObject jsonObject = XML.toJSONObject(xml);
181             event.setData(jsonObject.toString());
182         } else {
183             event.setData(xml);
184         }
185         this.eventBus.post(event);
186     }
187
188     /**
189      * Checks if exists at least one {@link Channel} subscriber.
190      *
191      * @return True if exist at least one {@link Channel} subscriber, false
192      *         otherwise.
193      */
194     public boolean hasSubscribers() {
195         return !this.subscribers.isEmpty();
196     }
197
198     /**
199      * Reset lists, close registration and unregister bus event.
200      */
201     public void close() {
202         this.subscribers = new ConcurrentSet<>();
203         this.registration.close();
204         this.registration = null;
205         this.eventBus.unregister(this.eventBusChangeRecorder);
206     }
207
208     /**
209      * Get stream name of this listener
210      *
211      * @return {@link String}
212      */
213     public String getStreamName() {
214         return this.streamName;
215     }
216
217     /**
218      * Check if is this listener registered.
219      *
220      * @return - true if is registered, otherwise null
221      */
222     public boolean isListening() {
223         return this.registration == null ? false : true;
224     }
225
226     /**
227      * Get schema path of notification
228      *
229      * @return {@link SchemaPath}
230      */
231     public SchemaPath getSchemaPath() {
232         return this.path;
233     }
234
235     /**
236      * Set registration for close after closing connection and check if this
237      * listener is registered
238      *
239      * @param registration
240      *            - registered listener
241      */
242     public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
243         Preconditions.checkNotNull(registration);
244         this.registration = registration;
245     }
246
247     /**
248      * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
249      * subscriber to the event and post event into event bus.
250      *
251      * @param subscriber
252      *            Channel
253      */
254     public void addSubscriber(final Channel subscriber) {
255         if (!subscriber.isActive()) {
256             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
257         }
258         final Event event = new Event(EventType.REGISTER);
259         event.setSubscriber(subscriber);
260         this.eventBus.post(event);
261     }
262
263     /**
264      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
265      * subscriber to the event and posts event into event bus.
266      *
267      * @param subscriber
268      */
269     public void removeSubscriber(final Channel subscriber) {
270         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
271         final Event event = new Event(EventType.DEREGISTER);
272         event.setSubscriber(subscriber);
273         this.eventBus.post(event);
274     }
275
276     private String prepareXmlFrom(final DOMNotification notification) {
277         final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
278         final Document doc = ListenerAdapter.createDocument();
279         final Element notificationElement =
280                 doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
281                 "notification");
282         doc.appendChild(notificationElement);
283
284         final Element eventTimeElement = doc.createElement("eventTime");
285         eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
286         notificationElement.appendChild(eventTimeElement);
287         final String notificationNamespace = notification.getType().getLastComponent().getNamespace().toString();
288         final Element notificationEventElement = doc.createElementNS(
289                 notificationNamespace, "event");
290         addValuesToNotificationEventElement(doc, notificationEventElement, notification, schemaContext);
291         notificationElement.appendChild(notificationEventElement);
292
293         try {
294             final ByteArrayOutputStream out = new ByteArrayOutputStream();
295             final Transformer transformer = FACTORY.newTransformer();
296             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
297             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
298             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
299             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
300             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
301             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
302             final byte[] charData = out.toByteArray();
303             return new String(charData, "UTF-8");
304         } catch (TransformerException | UnsupportedEncodingException e) {
305             final String msg = "Error during transformation of Document into String";
306             LOG.error(msg, e);
307             return msg;
308         }
309     }
310
311     private void addValuesToNotificationEventElement(final Document doc, final Element element,
312             final DOMNotification notification, final SchemaContext schemaContext) {
313         if (notification == null) {
314             return;
315         }
316
317         final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
318                 .getBody();
319         try {
320             final DOMResult domResult = writeNormalizedNode(body,
321                     YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
322             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
323             element.appendChild(result);
324         } catch (final IOException e) {
325             LOG.error("Error in writer ", e);
326         } catch (final XMLStreamException e) {
327             LOG.error("Error processing stream", e);
328         }
329     }
330
331     private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
332             final SchemaContext context) throws IOException, XMLStreamException {
333         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
334         final Document doc = XmlDocumentUtils.getDocument();
335         final DOMResult result = new DOMResult(doc);
336         NormalizedNodeWriter normalizedNodeWriter = null;
337         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
338         XMLStreamWriter writer = null;
339
340         try {
341             writer = XML_FACTORY.createXMLStreamWriter(result);
342             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
343                     this.getSchemaPath());
344             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
345
346             normalizedNodeWriter.write(normalized);
347
348             normalizedNodeWriter.flush();
349         } finally {
350             if (normalizedNodeWriter != null) {
351                 normalizedNodeWriter.close();
352             }
353             if (normalizedNodeStreamWriter != null) {
354                 normalizedNodeStreamWriter.close();
355             }
356             if (writer != null) {
357                 writer.close();
358             }
359         }
360
361         return result;
362     }
363
364     /**
365      * Tracks events of data change by customer.
366      */
367     private final class EventBusChangeRecorder {
368         @Subscribe
369         public void recordCustomerChange(final Event event) {
370             if (event.getType() == EventType.REGISTER) {
371                 final Channel subscriber = event.getSubscriber();
372                 if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
373                     NotificationListenerAdapter.this.subscribers.add(subscriber);
374                 }
375             } else if (event.getType() == EventType.DEREGISTER) {
376                 NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
377                 Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
378             } else if (event.getType() == EventType.NOTIFY) {
379                 for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
380                     if (subscriber.isActive()) {
381                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
382                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
383                     } else {
384                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
385                         NotificationListenerAdapter.this.subscribers.remove(subscriber);
386                     }
387                 }
388             }
389         }
390     }
391
392     /**
393      * Represents event of specific {@link EventType} type, holds data and
394      * {@link Channel} subscriber.
395      */
396     private final class Event {
397         private final EventType type;
398         private Channel subscriber;
399         private String data;
400
401         /**
402          * Creates new event specified by {@link EventType} type.
403          *
404          * @param type
405          *            EventType
406          */
407         public Event(final EventType type) {
408             this.type = type;
409         }
410
411         /**
412          * Gets the {@link Channel} subscriber.
413          *
414          * @return Channel
415          */
416         public Channel getSubscriber() {
417             return this.subscriber;
418         }
419
420         /**
421          * Sets subscriber for event.
422          *
423          * @param subscriber
424          *            Channel
425          */
426         public void setSubscriber(final Channel subscriber) {
427             this.subscriber = subscriber;
428         }
429
430         /**
431          * Gets event String.
432          *
433          * @return String representation of event data.
434          */
435         public String getData() {
436             return this.data;
437         }
438
439         /**
440          * Sets event data.
441          *
442          * @param data
443          *            String.
444          */
445         public void setData(final String data) {
446             this.data = data;
447         }
448
449         /**
450          * Gets event type.
451          *
452          * @return The type of the event.
453          */
454         public EventType getType() {
455             return this.type;
456         }
457     }
458
459     /**
460      * Type of the event.
461      */
462     private enum EventType {
463         REGISTER, DEREGISTER, NOTIFY
464     }
465
466     /**
467      * Set query parameters for listener
468      *
469      * @param start
470      *            - start-time of getting notification
471      * @param stop
472      *            - stop-time of getting notification
473      * @param filter
474      *            - indicate which subset of all possible events are of interest
475      */
476     public void setQueryParams(final Date start, final Date stop, final String filter) {
477         this.start = start;
478         this.stop = stop;
479         this.filter = filter;
480     }
481 }