Merge changes from topic 'Upgrade ietf-restconf draft16 to draft17'
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / NotificationListenerAdapter.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.OutputStreamWriter;
21 import java.io.UnsupportedEncodingException;
22 import java.util.Collection;
23 import java.util.Date;
24 import java.util.Set;
25 import java.util.concurrent.Executors;
26 import javax.xml.stream.XMLOutputFactory;
27 import javax.xml.stream.XMLStreamException;
28 import javax.xml.stream.XMLStreamWriter;
29 import javax.xml.transform.OutputKeys;
30 import javax.xml.transform.Transformer;
31 import javax.xml.transform.TransformerException;
32 import javax.xml.transform.TransformerFactory;
33 import javax.xml.transform.dom.DOMResult;
34 import javax.xml.transform.dom.DOMSource;
35 import javax.xml.transform.stream.StreamResult;
36 import org.json.JSONObject;
37 import org.json.XML;
38 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
39 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
40 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
45 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
48 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
49 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
50 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
51 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
52 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import org.w3c.dom.Document;
56 import org.w3c.dom.Element;
57 import org.w3c.dom.Node;
58
59 /**
60  * {@link NotificationListenerAdapter} is responsible to track events on
61  * notifications.
62  *
63  */
64 public class NotificationListenerAdapter implements DOMNotificationListener {
65
66     private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
67     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
68
69     private final String streamName;
70     private ListenerRegistration<DOMNotificationListener> registration;
71     private Set<Channel> subscribers = new ConcurrentSet<>();
72     private final EventBus eventBus;
73     private final EventBusChangeRecorder eventBusChangeRecorder;
74
75     private final SchemaPath path;
76     private final String outputType;
77
78     /**
79      * Set path of listener and stream name, register event bus.
80      *
81      * @param path
82      *            - path of notification
83      * @param streamName
84      *            - stream name of listener
85      * @param outputType
86      *            - type of output on notification (JSON, XML)
87      */
88     NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
89         this.outputType = outputType;
90         Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
91         Preconditions.checkArgument(path != null);
92         this.path = path;
93         this.streamName = streamName;
94         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
95         this.eventBusChangeRecorder = new EventBusChangeRecorder();
96         this.eventBus.register(this.eventBusChangeRecorder);
97     }
98
99     @Override
100     public void onNotification(final DOMNotification notification) {
101         final String xml = prepareXmlFrom(notification);
102         final Event event = new Event(EventType.NOTIFY);
103         if (this.outputType.equals("JSON")) {
104             final JSONObject jsonObject = XML.toJSONObject(xml);
105             event.setData(jsonObject.toString());
106         } else {
107             event.setData(xml);
108         }
109         this.eventBus.post(event);
110     }
111
112     /**
113      * Checks if exists at least one {@link Channel} subscriber.
114      *
115      * @return True if exist at least one {@link Channel} subscriber, false
116      *         otherwise.
117      */
118     public boolean hasSubscribers() {
119         return !this.subscribers.isEmpty();
120     }
121
122     /**
123      * Reset lists, close registration and unregister bus event.
124      */
125     public void close() {
126         this.subscribers = new ConcurrentSet<>();
127         this.registration.close();
128         this.registration = null;
129         this.eventBus.unregister(this.eventBusChangeRecorder);
130     }
131
132     /**
133      * Get stream name of this listener
134      *
135      * @return {@link String}
136      */
137     public String getStreamName() {
138         return this.streamName;
139     }
140
141     /**
142      * Check if is this listener registered.
143      *
144      * @return - true if is registered, otherwise null
145      */
146     public boolean isListening() {
147         return this.registration == null ? false : true;
148     }
149
150     /**
151      * Get schema path of notification
152      *
153      * @return {@link SchemaPath}
154      */
155     public SchemaPath getSchemaPath() {
156         return this.path;
157     }
158
159     /**
160      * Set registration for close after closing connection and check if this
161      * listener is registered
162      *
163      * @param registration
164      *            - registered listener
165      */
166     public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
167         Preconditions.checkNotNull(registration);
168         this.registration = registration;
169     }
170
171     /**
172      * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
173      * subscriber to the event and post event into event bus.
174      *
175      * @param subscriber
176      *            Channel
177      */
178     public void addSubscriber(final Channel subscriber) {
179         if (!subscriber.isActive()) {
180             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
181         }
182         final Event event = new Event(EventType.REGISTER);
183         event.setSubscriber(subscriber);
184         this.eventBus.post(event);
185     }
186
187     /**
188      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
189      * subscriber to the event and posts event into event bus.
190      *
191      * @param subscriber
192      */
193     public void removeSubscriber(final Channel subscriber) {
194         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
195         final Event event = new Event(EventType.DEREGISTER);
196         event.setSubscriber(subscriber);
197         this.eventBus.post(event);
198     }
199
200     private String prepareXmlFrom(final DOMNotification notification) {
201         final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
202         final Document doc = ListenerAdapter.createDocument();
203         final Element notificationElement =
204                 doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
205                 "notification");
206         doc.appendChild(notificationElement);
207
208         final Element eventTimeElement = doc.createElement("eventTime");
209         eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
210         notificationElement.appendChild(eventTimeElement);
211         final String notificationNamespace = notification.getType().getLastComponent().getNamespace().toString();
212         final Element notificationEventElement = doc.createElementNS(
213                 notificationNamespace, "event");
214         addValuesToNotificationEventElement(doc, notificationEventElement, notification, schemaContext);
215         notificationElement.appendChild(notificationEventElement);
216
217         try {
218             final ByteArrayOutputStream out = new ByteArrayOutputStream();
219             final Transformer transformer = FACTORY.newTransformer();
220             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
221             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
222             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
223             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
224             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
225             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
226             final byte[] charData = out.toByteArray();
227             return new String(charData, "UTF-8");
228         } catch (TransformerException | UnsupportedEncodingException e) {
229             final String msg = "Error during transformation of Document into String";
230             LOG.error(msg, e);
231             return msg;
232         }
233     }
234
235     private void addValuesToNotificationEventElement(final Document doc, final Element element,
236             final DOMNotification notification, final SchemaContext schemaContext) {
237         if (notification == null) {
238             return;
239         }
240
241         final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
242                 .getBody();
243         try {
244             final DOMResult domResult = writeNormalizedNode(body,
245                     YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
246             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
247             element.appendChild(result);
248         } catch (final IOException e) {
249             LOG.error("Error in writer ", e);
250         } catch (final XMLStreamException e) {
251             LOG.error("Error processing stream", e);
252         }
253     }
254
255     private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
256             final SchemaContext context) throws IOException, XMLStreamException {
257         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
258         final Document doc = XmlDocumentUtils.getDocument();
259         final DOMResult result = new DOMResult(doc);
260         NormalizedNodeWriter normalizedNodeWriter = null;
261         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
262         XMLStreamWriter writer = null;
263
264         try {
265             writer = XML_FACTORY.createXMLStreamWriter(result);
266             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
267                     this.getSchemaPath());
268             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
269
270             normalizedNodeWriter.write(normalized);
271
272             normalizedNodeWriter.flush();
273         } finally {
274             if (normalizedNodeWriter != null) {
275                 normalizedNodeWriter.close();
276             }
277             if (normalizedNodeStreamWriter != null) {
278                 normalizedNodeStreamWriter.close();
279             }
280             if (writer != null) {
281                 writer.close();
282             }
283         }
284
285         return result;
286     }
287
288     /**
289      * Tracks events of data change by customer.
290      */
291     private final class EventBusChangeRecorder {
292         @Subscribe
293         public void recordCustomerChange(final Event event) {
294             if (event.getType() == EventType.REGISTER) {
295                 final Channel subscriber = event.getSubscriber();
296                 if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
297                     NotificationListenerAdapter.this.subscribers.add(subscriber);
298                 }
299             } else if (event.getType() == EventType.DEREGISTER) {
300                 NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
301                 Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
302             } else if (event.getType() == EventType.NOTIFY) {
303                 for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
304                     if (subscriber.isActive()) {
305                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
306                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
307                     } else {
308                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
309                         NotificationListenerAdapter.this.subscribers.remove(subscriber);
310                     }
311                 }
312             }
313         }
314     }
315
316     /**
317      * Represents event of specific {@link EventType} type, holds data and
318      * {@link Channel} subscriber.
319      */
320     private final class Event {
321         private final EventType type;
322         private Channel subscriber;
323         private String data;
324
325         /**
326          * Creates new event specified by {@link EventType} type.
327          *
328          * @param type
329          *            EventType
330          */
331         public Event(final EventType type) {
332             this.type = type;
333         }
334
335         /**
336          * Gets the {@link Channel} subscriber.
337          *
338          * @return Channel
339          */
340         public Channel getSubscriber() {
341             return this.subscriber;
342         }
343
344         /**
345          * Sets subscriber for event.
346          *
347          * @param subscriber
348          *            Channel
349          */
350         public void setSubscriber(final Channel subscriber) {
351             this.subscriber = subscriber;
352         }
353
354         /**
355          * Gets event String.
356          *
357          * @return String representation of event data.
358          */
359         public String getData() {
360             return this.data;
361         }
362
363         /**
364          * Sets event data.
365          *
366          * @param data
367          *            String.
368          */
369         public void setData(final String data) {
370             this.data = data;
371         }
372
373         /**
374          * Gets event type.
375          *
376          * @return The type of the event.
377          */
378         public EventType getType() {
379             return this.type;
380         }
381     }
382
383     /**
384      * Type of the event.
385      */
386     private enum EventType {
387         REGISTER, DEREGISTER, NOTIFY
388     }
389 }