Bug 6324 - Notifications stream output is not same as restconf data
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / NotificationListenerAdapter.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.OutputStreamWriter;
21 import java.io.StringWriter;
22 import java.io.UnsupportedEncodingException;
23 import java.io.Writer;
24 import java.util.Collection;
25 import java.util.Date;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Set;
29 import java.util.concurrent.Executors;
30 import javax.xml.stream.XMLOutputFactory;
31 import javax.xml.stream.XMLStreamException;
32 import javax.xml.stream.XMLStreamWriter;
33 import javax.xml.transform.OutputKeys;
34 import javax.xml.transform.Transformer;
35 import javax.xml.transform.TransformerException;
36 import javax.xml.transform.TransformerFactory;
37 import javax.xml.transform.dom.DOMResult;
38 import javax.xml.transform.dom.DOMSource;
39 import javax.xml.transform.stream.StreamResult;
40 import org.json.JSONObject;
41 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
42 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
43 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
44 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
49 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
52 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
53 import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactory;
54 import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
55 import org.opendaylight.yangtools.yang.data.codec.gson.JsonWriterFactory;
56 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
57 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
59 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62 import org.w3c.dom.Document;
63 import org.w3c.dom.Element;
64 import org.w3c.dom.Node;
65
66 /**
67  * {@link NotificationListenerAdapter} is responsible to track events on
68  * notifications.
69  *
70  */
71 public class NotificationListenerAdapter implements DOMNotificationListener {
72
73     private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
74     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
75
76     private final String streamName;
77     private final EventBus eventBus;
78     private final EventBusChangeRecorder eventBusChangeRecorder;
79
80     private final SchemaPath path;
81     private final String outputType;
82
83     private SchemaContext schemaContext;
84     private DOMNotification notification;
85     private ListenerRegistration<DOMNotificationListener> registration;
86     private Set<Channel> subscribers = new ConcurrentSet<>();
87
88     /**
89      * Set path of listener and stream name, register event bus.
90      *
91      * @param path
92      *            - path of notification
93      * @param streamName
94      *            - stream name of listener
95      * @param outputType
96      *            - type of output on notification (JSON, XML)
97      */
98     NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
99         this.outputType = outputType;
100         Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
101         Preconditions.checkArgument(path != null);
102         this.path = path;
103         this.streamName = streamName;
104         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
105         this.eventBusChangeRecorder = new EventBusChangeRecorder();
106         this.eventBus.register(this.eventBusChangeRecorder);
107     }
108
109     @Override
110     public void onNotification(final DOMNotification notification) {
111         this.schemaContext = ControllerContext.getInstance().getGlobalSchema();
112         this.notification = notification;
113         final Event event = new Event(EventType.NOTIFY);
114         if (this.outputType.equals("JSON")) {
115             event.setData(prepareJson());
116         } else {
117             event.setData(prepareXml());
118         }
119         this.eventBus.post(event);
120     }
121
122     /**
123      * Prepare json from notification data
124      *
125      * @return json as {@link String}
126      */
127     private String prepareJson() {
128         final JSONObject json = new JSONObject();
129         json.put("ietf-restconf:notification",
130                 new JSONObject(writeBodyToString()).put("event-time", ListenerAdapter.toRFC3339(new Date())));
131         return json.toString();
132     }
133
134     private String writeBodyToString() {
135         final Writer writer = new StringWriter();
136         final NormalizedNodeStreamWriter jsonStream =
137                 JSONNormalizedNodeStreamWriter.createExclusiveWriter(JSONCodecFactory.create(this.schemaContext),
138                         this.notification.getType(), null, JsonWriterFactory.createJsonWriter(writer));
139         final NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(jsonStream);
140         try {
141             nodeWriter.write(this.notification.getBody());
142             nodeWriter.close();
143         } catch (final IOException e) {
144             throw new RestconfDocumentedException("Problem while writing body of notification to JSON. ", e);
145         }
146         return writer.toString();
147     }
148
149     /**
150      * Checks if exists at least one {@link Channel} subscriber.
151      *
152      * @return True if exist at least one {@link Channel} subscriber, false
153      *         otherwise.
154      */
155     public boolean hasSubscribers() {
156         return !this.subscribers.isEmpty();
157     }
158
159     /**
160      * Reset lists, close registration and unregister bus event.
161      */
162     public void close() {
163         this.subscribers = new ConcurrentSet<>();
164         this.registration.close();
165         this.registration = null;
166         this.eventBus.unregister(this.eventBusChangeRecorder);
167     }
168
169     /**
170      * Get stream name of this listener
171      *
172      * @return {@link String}
173      */
174     public String getStreamName() {
175         return this.streamName;
176     }
177
178     /**
179      * Check if is this listener registered.
180      *
181      * @return - true if is registered, otherwise null
182      */
183     public boolean isListening() {
184         return this.registration == null ? false : true;
185     }
186
187     /**
188      * Get schema path of notification
189      *
190      * @return {@link SchemaPath}
191      */
192     public SchemaPath getSchemaPath() {
193         return this.path;
194     }
195
196     /**
197      * Set registration for close after closing connection and check if this
198      * listener is registered
199      *
200      * @param registration
201      *            - registered listener
202      */
203     public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
204         Preconditions.checkNotNull(registration);
205         this.registration = registration;
206     }
207
208     /**
209      * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
210      * subscriber to the event and post event into event bus.
211      *
212      * @param subscriber
213      *            Channel
214      */
215     public void addSubscriber(final Channel subscriber) {
216         if (!subscriber.isActive()) {
217             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
218         }
219         final Event event = new Event(EventType.REGISTER);
220         event.setSubscriber(subscriber);
221         this.eventBus.post(event);
222     }
223
224     /**
225      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
226      * subscriber to the event and posts event into event bus.
227      *
228      * @param subscriber
229      */
230     public void removeSubscriber(final Channel subscriber) {
231         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
232         final Event event = new Event(EventType.DEREGISTER);
233         event.setSubscriber(subscriber);
234         this.eventBus.post(event);
235     }
236
237     private String prepareXml() {
238         final Document doc = ListenerAdapter.createDocument();
239         final Element notificationElement =
240                 doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
241                 "notification");
242         doc.appendChild(notificationElement);
243
244         final Element eventTimeElement = doc.createElement("eventTime");
245         eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
246         notificationElement.appendChild(eventTimeElement);
247
248         final Element notificationEventElement = doc.createElementNS(
249                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
250         addValuesToNotificationEventElement(doc, notificationEventElement, this.notification, this.schemaContext);
251         notificationElement.appendChild(notificationEventElement);
252
253         try {
254             final ByteArrayOutputStream out = new ByteArrayOutputStream();
255             final Transformer transformer = FACTORY.newTransformer();
256             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
257             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
258             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
259             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
260             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
261             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
262             final byte[] charData = out.toByteArray();
263             return new String(charData, "UTF-8");
264         } catch (TransformerException | UnsupportedEncodingException e) {
265             final String msg = "Error during transformation of Document into String";
266             LOG.error(msg, e);
267             return msg;
268         }
269     }
270
271     private void addValuesToNotificationEventElement(final Document doc, final Element element,
272             final DOMNotification notification, final SchemaContext schemaContext) {
273         if (notification == null) {
274             return;
275         }
276
277         final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
278                 .getBody();
279         try {
280             final DOMResult domResult = writeNormalizedNode(body,
281                     YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
282             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
283             final Element dataElement = doc.createElement("notification");
284             dataElement.appendChild(result);
285             element.appendChild(dataElement);
286         } catch (final IOException e) {
287             LOG.error("Error in writer ", e);
288         } catch (final XMLStreamException e) {
289             LOG.error("Error processing stream", e);
290         }
291     }
292
293     private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
294             final SchemaContext context) throws IOException, XMLStreamException {
295         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
296         final Document doc = XmlDocumentUtils.getDocument();
297         final DOMResult result = new DOMResult(doc);
298         NormalizedNodeWriter normalizedNodeWriter = null;
299         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
300         XMLStreamWriter writer = null;
301
302         try {
303             writer = XML_FACTORY.createXMLStreamWriter(result);
304             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
305                     this.getSchemaPath());
306             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
307
308             normalizedNodeWriter.write(normalized);
309
310             normalizedNodeWriter.flush();
311         } finally {
312             if (normalizedNodeWriter != null) {
313                 normalizedNodeWriter.close();
314             }
315             if (normalizedNodeStreamWriter != null) {
316                 normalizedNodeStreamWriter.close();
317             }
318             if (writer != null) {
319                 writer.close();
320             }
321         }
322
323         return result;
324     }
325
326     /**
327      * Tracks events of data change by customer.
328      */
329     private final class EventBusChangeRecorder {
330         @Subscribe
331         public void recordCustomerChange(final Event event) {
332             if (event.getType() == EventType.REGISTER) {
333                 final Channel subscriber = event.getSubscriber();
334                 if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
335                     NotificationListenerAdapter.this.subscribers.add(subscriber);
336                 }
337             } else if (event.getType() == EventType.DEREGISTER) {
338                 NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
339                 Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
340             } else if (event.getType() == EventType.NOTIFY) {
341                 for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
342                     if (subscriber.isActive()) {
343                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
344                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
345                     } else {
346                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
347                         NotificationListenerAdapter.this.subscribers.remove(subscriber);
348                     }
349                 }
350             }
351         }
352     }
353
354     /**
355      * Represents event of specific {@link EventType} type, holds data and
356      * {@link Channel} subscriber.
357      */
358     private final class Event {
359         private final EventType type;
360         private Channel subscriber;
361         private String data;
362
363         /**
364          * Creates new event specified by {@link EventType} type.
365          *
366          * @param type
367          *            EventType
368          */
369         public Event(final EventType type) {
370             this.type = type;
371         }
372
373         /**
374          * Gets the {@link Channel} subscriber.
375          *
376          * @return Channel
377          */
378         public Channel getSubscriber() {
379             return this.subscriber;
380         }
381
382         /**
383          * Sets subscriber for event.
384          *
385          * @param subscriber
386          *            Channel
387          */
388         public void setSubscriber(final Channel subscriber) {
389             this.subscriber = subscriber;
390         }
391
392         /**
393          * Gets event String.
394          *
395          * @return String representation of event data.
396          */
397         public String getData() {
398             return this.data;
399         }
400
401         /**
402          * Sets event data.
403          *
404          * @param data
405          *            String.
406          */
407         public void setData(final String data) {
408             this.data = data;
409         }
410
411         /**
412          * Gets event type.
413          *
414          * @return The type of the event.
415          */
416         public EventType getType() {
417             return this.type;
418         }
419     }
420
421     /**
422      * Type of the event.
423      */
424     private enum EventType {
425         REGISTER, DEREGISTER, NOTIFY;
426     }
427 }