608ddcbca1f8d12e572536b4fba4f2f5b1c4a4a6
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / NotificationListenerAdapter.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.OutputStreamWriter;
21 import java.io.StringReader;
22 import java.io.StringWriter;
23 import java.io.UnsupportedEncodingException;
24 import java.io.Writer;
25 import java.util.Collection;
26 import java.util.Date;
27 import java.util.Set;
28 import java.util.concurrent.Executors;
29 import javax.xml.parsers.DocumentBuilder;
30 import javax.xml.parsers.DocumentBuilderFactory;
31 import javax.xml.stream.XMLOutputFactory;
32 import javax.xml.stream.XMLStreamException;
33 import javax.xml.stream.XMLStreamWriter;
34 import javax.xml.transform.OutputKeys;
35 import javax.xml.transform.Transformer;
36 import javax.xml.transform.TransformerException;
37 import javax.xml.transform.TransformerFactory;
38 import javax.xml.transform.dom.DOMResult;
39 import javax.xml.transform.dom.DOMSource;
40 import javax.xml.transform.stream.StreamResult;
41 import javax.xml.xpath.XPath;
42 import javax.xml.xpath.XPathConstants;
43 import javax.xml.xpath.XPathFactory;
44 import org.json.JSONObject;
45 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
46 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
47 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
48 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
49 import org.opendaylight.yangtools.concepts.ListenerRegistration;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
53 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
54 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
55 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
56 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
57 import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactory;
58 import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
59 import org.opendaylight.yangtools.yang.data.codec.gson.JsonWriterFactory;
60 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
61 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
62 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
63 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66 import org.w3c.dom.Document;
67 import org.w3c.dom.Element;
68 import org.w3c.dom.Node;
69 import org.xml.sax.InputSource;
70
71 /**
72  * {@link NotificationListenerAdapter} is responsible to track events on
73  * notifications.
74  *
75  */
76 public class NotificationListenerAdapter implements DOMNotificationListener {
77
78     private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
79     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
80
81     private final String streamName;
82     private final EventBus eventBus;
83     private final EventBusChangeRecorder eventBusChangeRecorder;
84
85     private final SchemaPath path;
86     private final String outputType;
87     private Date start = null;
88     private Date stop = null;
89     private String filter;
90
91     private SchemaContext schemaContext;
92     private DOMNotification notification;
93     private ListenerRegistration<DOMNotificationListener> registration;
94     private Set<Channel> subscribers = new ConcurrentSet<>();
95
96     /**
97      * Set path of listener and stream name, register event bus.
98      *
99      * @param path
100      *            - path of notification
101      * @param streamName
102      *            - stream name of listener
103      * @param outputType
104      *            - type of output on notification (JSON, XML)
105      */
106     NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
107         this.outputType = outputType;
108         Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
109         Preconditions.checkArgument(path != null);
110         this.path = path;
111         this.streamName = streamName;
112         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
113         this.eventBusChangeRecorder = new EventBusChangeRecorder();
114         this.eventBus.register(this.eventBusChangeRecorder);
115     }
116
117     @Override
118     public void onNotification(final DOMNotification notification) {
119         this.schemaContext = ControllerContext.getInstance().getGlobalSchema();
120         this.notification = notification;
121         final Date now = new Date();
122         if (this.stop != null) {
123             if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
124                 checkFilter();
125             }
126             if (this.stop.compareTo(now) < 0) {
127                 try {
128                     this.close();
129                 } catch (final Exception e) {
130                     throw new RestconfDocumentedException("Problem with unregister listener." + e);
131                 }
132             }
133         } else if (this.start != null) {
134             if (this.start.compareTo(now) < 0) {
135                 this.start = null;
136                 checkFilter();
137             }
138         } else {
139             checkFilter();
140         }
141     }
142
143     /**
144      * Check if is filter used and then prepare and post data do client
145      *
146      */
147     private void checkFilter() {
148         final String xml = prepareXml();
149         if (this.filter == null) {
150             prepareAndPostData(xml);
151         } else {
152             try {
153                 if (parseFilterParam(xml)) {
154                     prepareAndPostData(xml);
155                 }
156             } catch (final Exception e) {
157                 throw new RestconfDocumentedException("Problem while parsing filter.", e);
158             }
159         }
160     }
161
162     /**
163      * Parse and evaluate filter value by xml
164      *
165      * @param xml
166      *            - notification data in xml
167      * @return true or false - depends on filter expression and data of
168      *         notifiaction
169      * @throws Exception
170      */
171     private boolean parseFilterParam(final String xml) throws Exception {
172         final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
173         final DocumentBuilder builder = factory.newDocumentBuilder();
174         final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
175         final XPath xPath = XPathFactory.newInstance().newXPath();
176         return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
177     }
178
179     /**
180      * Prepare data of notification and data to client
181      *
182      * @param xml
183      */
184     private void prepareAndPostData(final String xml) {
185         final Event event = new Event(EventType.NOTIFY);
186         if (this.outputType.equals("JSON")) {
187             event.setData(prepareJson());
188         } else {
189             event.setData(xml);
190         }
191         this.eventBus.post(event);
192     }
193
194     /**
195      * Prepare json from notification data
196      *
197      * @return json as {@link String}
198      */
199     private String prepareJson() {
200         final JSONObject json = new JSONObject();
201         json.put("ietf-restconf:notification",
202                 new JSONObject(writeBodyToString()).put("event-time", ListenerAdapter.toRFC3339(new Date())));
203         return json.toString();
204     }
205
206     private String writeBodyToString() {
207         final Writer writer = new StringWriter();
208         final NormalizedNodeStreamWriter jsonStream =
209                 JSONNormalizedNodeStreamWriter.createExclusiveWriter(JSONCodecFactory.create(this.schemaContext),
210                         this.notification.getType(), null, JsonWriterFactory.createJsonWriter(writer));
211         final NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(jsonStream);
212         try {
213             nodeWriter.write(this.notification.getBody());
214             nodeWriter.close();
215         } catch (final IOException e) {
216             throw new RestconfDocumentedException("Problem while writing body of notification to JSON. ", e);
217         }
218         return writer.toString();
219     }
220
221     /**
222      * Checks if exists at least one {@link Channel} subscriber.
223      *
224      * @return True if exist at least one {@link Channel} subscriber, false
225      *         otherwise.
226      */
227     public boolean hasSubscribers() {
228         return !this.subscribers.isEmpty();
229     }
230
231     /**
232      * Reset lists, close registration and unregister bus event.
233      */
234     public void close() {
235         this.subscribers = new ConcurrentSet<>();
236         this.registration.close();
237         this.registration = null;
238         this.eventBus.unregister(this.eventBusChangeRecorder);
239     }
240
241     /**
242      * Get stream name of this listener
243      *
244      * @return {@link String}
245      */
246     public String getStreamName() {
247         return this.streamName;
248     }
249
250     /**
251      * Check if is this listener registered.
252      *
253      * @return - true if is registered, otherwise null
254      */
255     public boolean isListening() {
256         return this.registration == null ? false : true;
257     }
258
259     /**
260      * Get schema path of notification
261      *
262      * @return {@link SchemaPath}
263      */
264     public SchemaPath getSchemaPath() {
265         return this.path;
266     }
267
268     /**
269      * Set registration for close after closing connection and check if this
270      * listener is registered
271      *
272      * @param registration
273      *            - registered listener
274      */
275     public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
276         Preconditions.checkNotNull(registration);
277         this.registration = registration;
278     }
279
280     /**
281      * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
282      * subscriber to the event and post event into event bus.
283      *
284      * @param subscriber
285      *            Channel
286      */
287     public void addSubscriber(final Channel subscriber) {
288         if (!subscriber.isActive()) {
289             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
290         }
291         final Event event = new Event(EventType.REGISTER);
292         event.setSubscriber(subscriber);
293         this.eventBus.post(event);
294     }
295
296     /**
297      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
298      * subscriber to the event and posts event into event bus.
299      *
300      * @param subscriber
301      */
302     public void removeSubscriber(final Channel subscriber) {
303         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
304         final Event event = new Event(EventType.DEREGISTER);
305         event.setSubscriber(subscriber);
306         this.eventBus.post(event);
307     }
308
309     private String prepareXml() {
310         final Document doc = ListenerAdapter.createDocument();
311         final Element notificationElement =
312                 doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
313                 "notification");
314         doc.appendChild(notificationElement);
315
316         final Element eventTimeElement = doc.createElement("eventTime");
317         eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
318         notificationElement.appendChild(eventTimeElement);
319
320         final Element notificationEventElement = doc.createElementNS(
321                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
322         addValuesToNotificationEventElement(doc, notificationEventElement, this.notification, this.schemaContext);
323         notificationElement.appendChild(notificationEventElement);
324
325         try {
326             final ByteArrayOutputStream out = new ByteArrayOutputStream();
327             final Transformer transformer = FACTORY.newTransformer();
328             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
329             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
330             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
331             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
332             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
333             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
334             final byte[] charData = out.toByteArray();
335             return new String(charData, "UTF-8");
336         } catch (TransformerException | UnsupportedEncodingException e) {
337             final String msg = "Error during transformation of Document into String";
338             LOG.error(msg, e);
339             return msg;
340         }
341     }
342
343     private void addValuesToNotificationEventElement(final Document doc, final Element element,
344             final DOMNotification notification, final SchemaContext schemaContext) {
345         if (notification == null) {
346             return;
347         }
348
349         final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
350                 .getBody();
351         try {
352             final DOMResult domResult = writeNormalizedNode(body,
353                     YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
354             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
355             final Element dataElement = doc.createElement("notification");
356             dataElement.appendChild(result);
357             element.appendChild(dataElement);
358         } catch (final IOException e) {
359             LOG.error("Error in writer ", e);
360         } catch (final XMLStreamException e) {
361             LOG.error("Error processing stream", e);
362         }
363     }
364
365     private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
366             final SchemaContext context) throws IOException, XMLStreamException {
367         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
368         final Document doc = XmlDocumentUtils.getDocument();
369         final DOMResult result = new DOMResult(doc);
370         NormalizedNodeWriter normalizedNodeWriter = null;
371         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
372         XMLStreamWriter writer = null;
373
374         try {
375             writer = XML_FACTORY.createXMLStreamWriter(result);
376             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
377                     this.getSchemaPath());
378             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
379
380             normalizedNodeWriter.write(normalized);
381
382             normalizedNodeWriter.flush();
383         } finally {
384             if (normalizedNodeWriter != null) {
385                 normalizedNodeWriter.close();
386             }
387             if (normalizedNodeStreamWriter != null) {
388                 normalizedNodeStreamWriter.close();
389             }
390             if (writer != null) {
391                 writer.close();
392             }
393         }
394
395         return result;
396     }
397
398     /**
399      * Tracks events of data change by customer.
400      */
401     private final class EventBusChangeRecorder {
402         @Subscribe
403         public void recordCustomerChange(final Event event) {
404             if (event.getType() == EventType.REGISTER) {
405                 final Channel subscriber = event.getSubscriber();
406                 if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
407                     NotificationListenerAdapter.this.subscribers.add(subscriber);
408                 }
409             } else if (event.getType() == EventType.DEREGISTER) {
410                 NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
411                 Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
412             } else if (event.getType() == EventType.NOTIFY) {
413                 for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
414                     if (subscriber.isActive()) {
415                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
416                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
417                     } else {
418                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
419                         NotificationListenerAdapter.this.subscribers.remove(subscriber);
420                     }
421                 }
422             }
423         }
424     }
425
426     /**
427      * Represents event of specific {@link EventType} type, holds data and
428      * {@link Channel} subscriber.
429      */
430     private final class Event {
431         private final EventType type;
432         private Channel subscriber;
433         private String data;
434
435         /**
436          * Creates new event specified by {@link EventType} type.
437          *
438          * @param type
439          *            EventType
440          */
441         public Event(final EventType type) {
442             this.type = type;
443         }
444
445         /**
446          * Gets the {@link Channel} subscriber.
447          *
448          * @return Channel
449          */
450         public Channel getSubscriber() {
451             return this.subscriber;
452         }
453
454         /**
455          * Sets subscriber for event.
456          *
457          * @param subscriber
458          *            Channel
459          */
460         public void setSubscriber(final Channel subscriber) {
461             this.subscriber = subscriber;
462         }
463
464         /**
465          * Gets event String.
466          *
467          * @return String representation of event data.
468          */
469         public String getData() {
470             return this.data;
471         }
472
473         /**
474          * Sets event data.
475          *
476          * @param data
477          *            String.
478          */
479         public void setData(final String data) {
480             this.data = data;
481         }
482
483         /**
484          * Gets event type.
485          *
486          * @return The type of the event.
487          */
488         public EventType getType() {
489             return this.type;
490         }
491     }
492
493     /**
494      * Type of the event.
495      */
496     private enum EventType {
497         REGISTER, DEREGISTER, NOTIFY
498     }
499
500     /**
501      * Set query parameters for listener
502      *
503      * @param start
504      *            - start-time of getting notification
505      * @param stop
506      *            - stop-time of getting notification
507      * @param filter
508      *            - indicate which subset of all possible events are of interest
509      */
510     public void setQueryParams(final Date start, final Date stop, final String filter) {
511         this.start = start;
512         this.stop = stop;
513         this.filter = filter;
514     }
515 }