303bfb30b5a0519dcce3ebfdb41a903380f35445
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / NotificationListenerAdapter.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.OutputStreamWriter;
21 import java.io.UnsupportedEncodingException;
22 import java.util.Collection;
23 import java.util.Date;
24 import java.util.Set;
25 import java.util.concurrent.Executors;
26 import javax.xml.stream.XMLOutputFactory;
27 import javax.xml.stream.XMLStreamException;
28 import javax.xml.stream.XMLStreamWriter;
29 import javax.xml.transform.OutputKeys;
30 import javax.xml.transform.Transformer;
31 import javax.xml.transform.TransformerException;
32 import javax.xml.transform.TransformerFactory;
33 import javax.xml.transform.dom.DOMResult;
34 import javax.xml.transform.dom.DOMSource;
35 import javax.xml.transform.stream.StreamResult;
36 import org.json.JSONObject;
37 import org.json.XML;
38 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
39 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
40 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
41 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
42 import org.opendaylight.yangtools.concepts.ListenerRegistration;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
46 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
49 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
50 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
51 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import org.w3c.dom.Document;
57 import org.w3c.dom.Element;
58 import org.w3c.dom.Node;
59
60 /**
61  * {@link NotificationListenerAdapter} is responsible to track events on
62  * notifications.
63  *
64  */
65 public class NotificationListenerAdapter implements DOMNotificationListener {
66
67     private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
68     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
69
70     private final String streamName;
71     private ListenerRegistration<DOMNotificationListener> registration;
72     private Set<Channel> subscribers = new ConcurrentSet<>();
73     private final EventBus eventBus;
74     private final EventBusChangeRecorder eventBusChangeRecorder;
75
76     private final SchemaPath path;
77     private final String outputType;
78     private Date start = null;
79     private Date stop = null;
80
81     /**
82      * Set path of listener and stream name, register event bus.
83      *
84      * @param path
85      *            - path of notification
86      * @param streamName
87      *            - stream name of listener
88      * @param outputType
89      *            - type of output on notification (JSON, XML)
90      */
91     NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
92         this.outputType = outputType;
93         Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
94         Preconditions.checkArgument(path != null);
95         this.path = path;
96         this.streamName = streamName;
97         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
98         this.eventBusChangeRecorder = new EventBusChangeRecorder();
99         this.eventBus.register(this.eventBusChangeRecorder);
100     }
101
102     @Override
103     public void onNotification(final DOMNotification notification) {
104         final Date now = new Date();
105         if (this.stop != null) {
106             if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
107                 prepareAndPostData(notification);
108             }
109             if (this.stop.compareTo(now) < 0) {
110                 try {
111                     this.close();
112                 } catch (final Exception e) {
113                     throw new RestconfDocumentedException("Problem with unregister listener." + e);
114                 }
115             }
116         } else if (this.start != null) {
117             if (this.start.compareTo(now) < 0) {
118                 this.start = null;
119                 prepareAndPostData(notification);
120             }
121         } else {
122             prepareAndPostData(notification);
123         }
124     }
125
126     /**
127      * @param notification
128      */
129     private void prepareAndPostData(final DOMNotification notification) {
130         final String xml = prepareXmlFrom(notification);
131         final Event event = new Event(EventType.NOTIFY);
132         if (this.outputType.equals("JSON")) {
133             final JSONObject jsonObject = XML.toJSONObject(xml);
134             event.setData(jsonObject.toString());
135         } else {
136             event.setData(xml);
137         }
138         this.eventBus.post(event);
139     }
140
141     /**
142      * Checks if exists at least one {@link Channel} subscriber.
143      *
144      * @return True if exist at least one {@link Channel} subscriber, false
145      *         otherwise.
146      */
147     public boolean hasSubscribers() {
148         return !this.subscribers.isEmpty();
149     }
150
151     /**
152      * Reset lists, close registration and unregister bus event.
153      */
154     public void close() {
155         this.subscribers = new ConcurrentSet<>();
156         this.registration.close();
157         this.registration = null;
158         this.eventBus.unregister(this.eventBusChangeRecorder);
159     }
160
161     /**
162      * Get stream name of this listener
163      *
164      * @return {@link String}
165      */
166     public String getStreamName() {
167         return this.streamName;
168     }
169
170     /**
171      * Check if is this listener registered.
172      *
173      * @return - true if is registered, otherwise null
174      */
175     public boolean isListening() {
176         return this.registration == null ? false : true;
177     }
178
179     /**
180      * Get schema path of notification
181      *
182      * @return {@link SchemaPath}
183      */
184     public SchemaPath getSchemaPath() {
185         return this.path;
186     }
187
188     /**
189      * Set registration for close after closing connection and check if this
190      * listener is registered
191      *
192      * @param registration
193      *            - registered listener
194      */
195     public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
196         Preconditions.checkNotNull(registration);
197         this.registration = registration;
198     }
199
200     /**
201      * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
202      * subscriber to the event and post event into event bus.
203      *
204      * @param subscriber
205      *            Channel
206      */
207     public void addSubscriber(final Channel subscriber) {
208         if (!subscriber.isActive()) {
209             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
210         }
211         final Event event = new Event(EventType.REGISTER);
212         event.setSubscriber(subscriber);
213         this.eventBus.post(event);
214     }
215
216     /**
217      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
218      * subscriber to the event and posts event into event bus.
219      *
220      * @param subscriber
221      */
222     public void removeSubscriber(final Channel subscriber) {
223         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
224         final Event event = new Event(EventType.DEREGISTER);
225         event.setSubscriber(subscriber);
226         this.eventBus.post(event);
227     }
228
229     private String prepareXmlFrom(final DOMNotification notification) {
230         final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
231         final Document doc = ListenerAdapter.createDocument();
232         final Element notificationElement =
233                 doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
234                 "notification");
235         doc.appendChild(notificationElement);
236
237         final Element eventTimeElement = doc.createElement("eventTime");
238         eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
239         notificationElement.appendChild(eventTimeElement);
240         final String notificationNamespace = notification.getType().getLastComponent().getNamespace().toString();
241         final Element notificationEventElement = doc.createElementNS(
242                 notificationNamespace, "event");
243         addValuesToNotificationEventElement(doc, notificationEventElement, notification, schemaContext);
244         notificationElement.appendChild(notificationEventElement);
245
246         try {
247             final ByteArrayOutputStream out = new ByteArrayOutputStream();
248             final Transformer transformer = FACTORY.newTransformer();
249             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
250             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
251             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
252             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
253             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
254             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
255             final byte[] charData = out.toByteArray();
256             return new String(charData, "UTF-8");
257         } catch (TransformerException | UnsupportedEncodingException e) {
258             final String msg = "Error during transformation of Document into String";
259             LOG.error(msg, e);
260             return msg;
261         }
262     }
263
264     private void addValuesToNotificationEventElement(final Document doc, final Element element,
265             final DOMNotification notification, final SchemaContext schemaContext) {
266         if (notification == null) {
267             return;
268         }
269
270         final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
271                 .getBody();
272         try {
273             final DOMResult domResult = writeNormalizedNode(body,
274                     YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
275             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
276             element.appendChild(result);
277         } catch (final IOException e) {
278             LOG.error("Error in writer ", e);
279         } catch (final XMLStreamException e) {
280             LOG.error("Error processing stream", e);
281         }
282     }
283
284     private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
285             final SchemaContext context) throws IOException, XMLStreamException {
286         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
287         final Document doc = XmlDocumentUtils.getDocument();
288         final DOMResult result = new DOMResult(doc);
289         NormalizedNodeWriter normalizedNodeWriter = null;
290         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
291         XMLStreamWriter writer = null;
292
293         try {
294             writer = XML_FACTORY.createXMLStreamWriter(result);
295             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
296                     this.getSchemaPath());
297             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
298
299             normalizedNodeWriter.write(normalized);
300
301             normalizedNodeWriter.flush();
302         } finally {
303             if (normalizedNodeWriter != null) {
304                 normalizedNodeWriter.close();
305             }
306             if (normalizedNodeStreamWriter != null) {
307                 normalizedNodeStreamWriter.close();
308             }
309             if (writer != null) {
310                 writer.close();
311             }
312         }
313
314         return result;
315     }
316
317     /**
318      * Tracks events of data change by customer.
319      */
320     private final class EventBusChangeRecorder {
321         @Subscribe
322         public void recordCustomerChange(final Event event) {
323             if (event.getType() == EventType.REGISTER) {
324                 final Channel subscriber = event.getSubscriber();
325                 if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
326                     NotificationListenerAdapter.this.subscribers.add(subscriber);
327                 }
328             } else if (event.getType() == EventType.DEREGISTER) {
329                 NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
330                 Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
331             } else if (event.getType() == EventType.NOTIFY) {
332                 for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
333                     if (subscriber.isActive()) {
334                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
335                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
336                     } else {
337                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
338                         NotificationListenerAdapter.this.subscribers.remove(subscriber);
339                     }
340                 }
341             }
342         }
343     }
344
345     /**
346      * Represents event of specific {@link EventType} type, holds data and
347      * {@link Channel} subscriber.
348      */
349     private final class Event {
350         private final EventType type;
351         private Channel subscriber;
352         private String data;
353
354         /**
355          * Creates new event specified by {@link EventType} type.
356          *
357          * @param type
358          *            EventType
359          */
360         public Event(final EventType type) {
361             this.type = type;
362         }
363
364         /**
365          * Gets the {@link Channel} subscriber.
366          *
367          * @return Channel
368          */
369         public Channel getSubscriber() {
370             return this.subscriber;
371         }
372
373         /**
374          * Sets subscriber for event.
375          *
376          * @param subscriber
377          *            Channel
378          */
379         public void setSubscriber(final Channel subscriber) {
380             this.subscriber = subscriber;
381         }
382
383         /**
384          * Gets event String.
385          *
386          * @return String representation of event data.
387          */
388         public String getData() {
389             return this.data;
390         }
391
392         /**
393          * Sets event data.
394          *
395          * @param data
396          *            String.
397          */
398         public void setData(final String data) {
399             this.data = data;
400         }
401
402         /**
403          * Gets event type.
404          *
405          * @return The type of the event.
406          */
407         public EventType getType() {
408             return this.type;
409         }
410     }
411
412     /**
413      * Type of the event.
414      */
415     private enum EventType {
416         REGISTER, DEREGISTER, NOTIFY
417     }
418
419     /**
420      * Set query parameters for listener
421      *
422      * @param start
423      *            - start-time of getting notification
424      * @param stop
425      *            - stop-time of getting notification
426      */
427     public void setTime(final Date start, final Date stop) {
428         this.start = start;
429         this.stop = stop;
430     }
431 }