Bug 5679 - implement ietf-restconf-monitoring - streams
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / NotificationListenerAdapter.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.OutputStreamWriter;
21 import java.io.StringReader;
22 import java.io.StringWriter;
23 import java.io.UnsupportedEncodingException;
24 import java.io.Writer;
25 import java.util.Collection;
26 import java.util.Date;
27 import java.util.Set;
28 import java.util.concurrent.Executors;
29 import javax.xml.parsers.DocumentBuilder;
30 import javax.xml.parsers.DocumentBuilderFactory;
31 import javax.xml.stream.XMLOutputFactory;
32 import javax.xml.stream.XMLStreamException;
33 import javax.xml.stream.XMLStreamWriter;
34 import javax.xml.transform.OutputKeys;
35 import javax.xml.transform.Transformer;
36 import javax.xml.transform.TransformerException;
37 import javax.xml.transform.TransformerFactory;
38 import javax.xml.transform.dom.DOMResult;
39 import javax.xml.transform.dom.DOMSource;
40 import javax.xml.transform.stream.StreamResult;
41 import javax.xml.xpath.XPath;
42 import javax.xml.xpath.XPathConstants;
43 import javax.xml.xpath.XPathFactory;
44 import org.json.JSONObject;
45 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
46 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
47 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
48 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
49 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
50 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
51 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
52 import org.opendaylight.restconf.Draft18.MonitoringModule;
53 import org.opendaylight.restconf.handlers.SchemaContextHandler;
54 import org.opendaylight.restconf.handlers.TransactionChainHandler;
55 import org.opendaylight.restconf.parser.IdentifierCodec;
56 import org.opendaylight.yangtools.concepts.ListenerRegistration;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
59 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
60 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
61 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
62 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
63 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
64 import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactory;
65 import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
66 import org.opendaylight.yangtools.yang.data.codec.gson.JsonWriterFactory;
67 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
68 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
69 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
70 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73 import org.w3c.dom.Document;
74 import org.w3c.dom.Element;
75 import org.w3c.dom.Node;
76 import org.xml.sax.InputSource;
77
78 /**
79  * {@link NotificationListenerAdapter} is responsible to track events on
80  * notifications.
81  *
82  */
83 public class NotificationListenerAdapter implements DOMNotificationListener {
84
85     private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
86     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
87
88     private final String streamName;
89     private final EventBus eventBus;
90     private final EventBusChangeRecorder eventBusChangeRecorder;
91
92     private final SchemaPath path;
93     private final String outputType;
94     private Date start = null;
95     private Date stop = null;
96     private String filter;
97
98     private SchemaContext schemaContext;
99     private DOMNotification notification;
100     private ListenerRegistration<DOMNotificationListener> registration;
101     private Set<Channel> subscribers = new ConcurrentSet<>();
102     private TransactionChainHandler transactionChainHandler;
103     private SchemaContextHandler schemaHandler;
104
105     /**
106      * Set path of listener and stream name, register event bus.
107      *
108      * @param path
109      *            - path of notification
110      * @param streamName
111      *            - stream name of listener
112      * @param outputType
113      *            - type of output on notification (JSON, XML)
114      */
115     NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
116         this.outputType = outputType;
117         Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
118         Preconditions.checkArgument(path != null);
119         this.path = path;
120         this.streamName = streamName;
121         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
122         this.eventBusChangeRecorder = new EventBusChangeRecorder();
123         this.eventBus.register(this.eventBusChangeRecorder);
124     }
125
126     @Override
127     public void onNotification(final DOMNotification notification) {
128         this.schemaContext = ControllerContext.getInstance().getGlobalSchema();
129         this.notification = notification;
130         final Date now = new Date();
131         if (this.stop != null) {
132             if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
133                 checkFilter();
134             }
135             if (this.stop.compareTo(now) < 0) {
136                 try {
137                     this.close();
138                 } catch (final Exception e) {
139                     throw new RestconfDocumentedException("Problem with unregister listener." + e);
140                 }
141             }
142         } else if (this.start != null) {
143             if (this.start.compareTo(now) < 0) {
144                 this.start = null;
145                 checkFilter();
146             }
147         } else {
148             checkFilter();
149         }
150     }
151
152     /**
153      * Check if is filter used and then prepare and post data do client
154      *
155      */
156     private void checkFilter() {
157         final String xml = prepareXml();
158         if (this.filter == null) {
159             prepareAndPostData(xml);
160         } else {
161             try {
162                 if (parseFilterParam(xml)) {
163                     prepareAndPostData(xml);
164                 }
165             } catch (final Exception e) {
166                 throw new RestconfDocumentedException("Problem while parsing filter.", e);
167             }
168         }
169     }
170
171     /**
172      * Parse and evaluate filter value by xml
173      *
174      * @param xml
175      *            - notification data in xml
176      * @return true or false - depends on filter expression and data of
177      *         notifiaction
178      * @throws Exception
179      */
180     private boolean parseFilterParam(final String xml) throws Exception {
181         final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
182         final DocumentBuilder builder = factory.newDocumentBuilder();
183         final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
184         final XPath xPath = XPathFactory.newInstance().newXPath();
185         return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
186     }
187
188     /**
189      * Prepare data of notification and data to client
190      *
191      * @param xml
192      */
193     private void prepareAndPostData(final String xml) {
194         final Event event = new Event(EventType.NOTIFY);
195         if (this.outputType.equals("JSON")) {
196             event.setData(prepareJson());
197         } else {
198             event.setData(xml);
199         }
200         this.eventBus.post(event);
201     }
202
203     /**
204      * Prepare json from notification data
205      *
206      * @return json as {@link String}
207      */
208     private String prepareJson() {
209         final JSONObject json = new JSONObject();
210         json.put("ietf-restconf:notification",
211                 new JSONObject(writeBodyToString()).put("event-time", ListenerAdapter.toRFC3339(new Date())));
212         return json.toString();
213     }
214
215     private String writeBodyToString() {
216         final Writer writer = new StringWriter();
217         final NormalizedNodeStreamWriter jsonStream =
218                 JSONNormalizedNodeStreamWriter.createExclusiveWriter(JSONCodecFactory.create(this.schemaContext),
219                         this.notification.getType(), null, JsonWriterFactory.createJsonWriter(writer));
220         final NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(jsonStream);
221         try {
222             nodeWriter.write(this.notification.getBody());
223             nodeWriter.close();
224         } catch (final IOException e) {
225             throw new RestconfDocumentedException("Problem while writing body of notification to JSON. ", e);
226         }
227         return writer.toString();
228     }
229
230     /**
231      * Checks if exists at least one {@link Channel} subscriber.
232      *
233      * @return True if exist at least one {@link Channel} subscriber, false
234      *         otherwise.
235      */
236     public boolean hasSubscribers() {
237         return !this.subscribers.isEmpty();
238     }
239
240     /**
241      * Reset lists, close registration and unregister bus event and delete data in DS.
242      */
243     public void close() {
244         final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction();
245         wTx.delete(LogicalDatastoreType.OPERATIONAL,
246                 IdentifierCodec.deserialize(
247                         MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + this.path.getLastComponent().getLocalName(),
248                         this.schemaHandler.get()));
249         try {
250             wTx.submit().checkedGet();
251         } catch (final TransactionCommitFailedException e) {
252             throw new RestconfDocumentedException("Problem while deleting data from DS.", e);
253         }
254
255         this.subscribers = new ConcurrentSet<>();
256         this.registration.close();
257         this.registration = null;
258         this.eventBus.unregister(this.eventBusChangeRecorder);
259     }
260
261     /**
262      * Get stream name of this listener
263      *
264      * @return {@link String}
265      */
266     public String getStreamName() {
267         return this.streamName;
268     }
269
270     /**
271      * Check if is this listener registered.
272      *
273      * @return - true if is registered, otherwise null
274      */
275     public boolean isListening() {
276         return this.registration == null ? false : true;
277     }
278
279     /**
280      * Get schema path of notification
281      *
282      * @return {@link SchemaPath}
283      */
284     public SchemaPath getSchemaPath() {
285         return this.path;
286     }
287
288     /**
289      * Set registration for close after closing connection and check if this
290      * listener is registered
291      *
292      * @param registration
293      *            - registered listener
294      */
295     public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
296         Preconditions.checkNotNull(registration);
297         this.registration = registration;
298     }
299
300     /**
301      * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
302      * subscriber to the event and post event into event bus.
303      *
304      * @param subscriber
305      *            Channel
306      */
307     public void addSubscriber(final Channel subscriber) {
308         if (!subscriber.isActive()) {
309             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
310         }
311         final Event event = new Event(EventType.REGISTER);
312         event.setSubscriber(subscriber);
313         this.eventBus.post(event);
314     }
315
316     /**
317      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
318      * subscriber to the event and posts event into event bus.
319      *
320      * @param subscriber
321      */
322     public void removeSubscriber(final Channel subscriber) {
323         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
324         final Event event = new Event(EventType.DEREGISTER);
325         event.setSubscriber(subscriber);
326         this.eventBus.post(event);
327     }
328
329     private String prepareXml() {
330         final Document doc = ListenerAdapter.createDocument();
331         final Element notificationElement =
332                 doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
333                 "notification");
334         doc.appendChild(notificationElement);
335
336         final Element eventTimeElement = doc.createElement("eventTime");
337         eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
338         notificationElement.appendChild(eventTimeElement);
339
340         final Element notificationEventElement = doc.createElementNS(
341                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
342         addValuesToNotificationEventElement(doc, notificationEventElement, this.notification, this.schemaContext);
343         notificationElement.appendChild(notificationEventElement);
344
345         try {
346             final ByteArrayOutputStream out = new ByteArrayOutputStream();
347             final Transformer transformer = FACTORY.newTransformer();
348             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
349             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
350             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
351             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
352             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
353             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
354             final byte[] charData = out.toByteArray();
355             return new String(charData, "UTF-8");
356         } catch (TransformerException | UnsupportedEncodingException e) {
357             final String msg = "Error during transformation of Document into String";
358             LOG.error(msg, e);
359             return msg;
360         }
361     }
362
363     private void addValuesToNotificationEventElement(final Document doc, final Element element,
364             final DOMNotification notification, final SchemaContext schemaContext) {
365         if (notification == null) {
366             return;
367         }
368
369         final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
370                 .getBody();
371         try {
372             final DOMResult domResult = writeNormalizedNode(body,
373                     YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
374             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
375             final Element dataElement = doc.createElement("notification");
376             dataElement.appendChild(result);
377             element.appendChild(dataElement);
378         } catch (final IOException e) {
379             LOG.error("Error in writer ", e);
380         } catch (final XMLStreamException e) {
381             LOG.error("Error processing stream", e);
382         }
383     }
384
385     private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
386             final SchemaContext context) throws IOException, XMLStreamException {
387         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
388         final Document doc = XmlDocumentUtils.getDocument();
389         final DOMResult result = new DOMResult(doc);
390         NormalizedNodeWriter normalizedNodeWriter = null;
391         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
392         XMLStreamWriter writer = null;
393
394         try {
395             writer = XML_FACTORY.createXMLStreamWriter(result);
396             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
397                     this.getSchemaPath());
398             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
399
400             normalizedNodeWriter.write(normalized);
401
402             normalizedNodeWriter.flush();
403         } finally {
404             if (normalizedNodeWriter != null) {
405                 normalizedNodeWriter.close();
406             }
407             if (normalizedNodeStreamWriter != null) {
408                 normalizedNodeStreamWriter.close();
409             }
410             if (writer != null) {
411                 writer.close();
412             }
413         }
414
415         return result;
416     }
417
418     /**
419      * Tracks events of data change by customer.
420      */
421     private final class EventBusChangeRecorder {
422         @Subscribe
423         public void recordCustomerChange(final Event event) {
424             if (event.getType() == EventType.REGISTER) {
425                 final Channel subscriber = event.getSubscriber();
426                 if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
427                     NotificationListenerAdapter.this.subscribers.add(subscriber);
428                 }
429             } else if (event.getType() == EventType.DEREGISTER) {
430                 NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
431                 Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
432             } else if (event.getType() == EventType.NOTIFY) {
433                 for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
434                     if (subscriber.isActive()) {
435                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
436                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
437                     } else {
438                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
439                         NotificationListenerAdapter.this.subscribers.remove(subscriber);
440                     }
441                 }
442             }
443         }
444     }
445
446     /**
447      * Represents event of specific {@link EventType} type, holds data and
448      * {@link Channel} subscriber.
449      */
450     private final class Event {
451         private final EventType type;
452         private Channel subscriber;
453         private String data;
454
455         /**
456          * Creates new event specified by {@link EventType} type.
457          *
458          * @param type
459          *            EventType
460          */
461         public Event(final EventType type) {
462             this.type = type;
463         }
464
465         /**
466          * Gets the {@link Channel} subscriber.
467          *
468          * @return Channel
469          */
470         public Channel getSubscriber() {
471             return this.subscriber;
472         }
473
474         /**
475          * Sets subscriber for event.
476          *
477          * @param subscriber
478          *            Channel
479          */
480         public void setSubscriber(final Channel subscriber) {
481             this.subscriber = subscriber;
482         }
483
484         /**
485          * Gets event String.
486          *
487          * @return String representation of event data.
488          */
489         public String getData() {
490             return this.data;
491         }
492
493         /**
494          * Sets event data.
495          *
496          * @param data
497          *            String.
498          */
499         public void setData(final String data) {
500             this.data = data;
501         }
502
503         /**
504          * Gets event type.
505          *
506          * @return The type of the event.
507          */
508         public EventType getType() {
509             return this.type;
510         }
511     }
512
513     /**
514      * Type of the event.
515      */
516     private enum EventType {
517         REGISTER, DEREGISTER, NOTIFY
518     }
519
520     /**
521      * Set query parameters for listener
522      *
523      * @param start
524      *            - start-time of getting notification
525      * @param stop
526      *            - stop-time of getting notification
527      * @param filter
528      *            - indicate which subset of all possible events are of interest
529      */
530     public void setQueryParams(final Date start, final Date stop, final String filter) {
531         this.start = start;
532         this.stop = stop;
533         this.filter = filter;
534     }
535
536     /**
537      * Get outputType of listenere
538      *
539      * @return the outputType
540      */
541     public String getOutputType() {
542         return this.outputType;
543     }
544
545     /**
546      * Transaction chain to delete data in DS on close()
547      *
548      * @param transactionChainHandler
549      *            - creating new write transaction to delete data on close
550      * @param schemaHandler
551      *            - for getting schema to deserialize
552      *            {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to
553      *            {@link YangInstanceIdentifier}
554      */
555     public void setCloseVars(final TransactionChainHandler transactionChainHandler,
556             final SchemaContextHandler schemaHandler) {
557         this.transactionChainHandler = transactionChainHandler;
558         this.schemaHandler = schemaHandler;
559
560     }
561 }