3e0bdd984794735088866dd4a07143b8e7d867c5
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / NotificationListenerAdapter.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.OutputStreamWriter;
21 import java.io.UnsupportedEncodingException;
22 import java.util.Collection;
23 import java.util.Date;
24 import java.util.Set;
25 import java.util.concurrent.Executors;
26 import javax.xml.stream.XMLOutputFactory;
27 import javax.xml.stream.XMLStreamException;
28 import javax.xml.stream.XMLStreamWriter;
29 import javax.xml.transform.OutputKeys;
30 import javax.xml.transform.Transformer;
31 import javax.xml.transform.TransformerException;
32 import javax.xml.transform.TransformerFactory;
33 import javax.xml.transform.dom.DOMResult;
34 import javax.xml.transform.dom.DOMSource;
35 import javax.xml.transform.stream.StreamResult;
36 import org.json.JSONObject;
37 import org.json.XML;
38 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
39 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
40 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
45 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
48 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
49 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
50 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
51 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
52 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import org.w3c.dom.Document;
56 import org.w3c.dom.Element;
57 import org.w3c.dom.Node;
58
59 /**
60  * {@link NotificationListenerAdapter} is responsible to track events on
61  * notifications.
62  *
63  */
64 public class NotificationListenerAdapter implements DOMNotificationListener {
65
66     private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
67     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
68
69     private final String streamName;
70     private ListenerRegistration<DOMNotificationListener> registration;
71     private Set<Channel> subscribers = new ConcurrentSet<>();
72     private final EventBus eventBus;
73     private final EventBusChangeRecorder eventBusChangeRecorder;
74
75     private final SchemaPath path;
76     private final String outputType;
77
78     /**
79      * Set path of listener and stream name, register event bus.
80      *
81      * @param path
82      *            - path of notification
83      * @param streamName
84      *            - stream name of listener
85      * @param outputType
86      *            - type of output on notification (JSON, XML)
87      */
88     NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
89         this.outputType = outputType;
90         Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
91         Preconditions.checkArgument(path != null);
92         this.path = path;
93         this.streamName = streamName;
94         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
95         this.eventBusChangeRecorder = new EventBusChangeRecorder();
96         this.eventBus.register(this.eventBusChangeRecorder);
97     }
98
99     @Override
100     public void onNotification(final DOMNotification notification) {
101         final String xml = prepareXmlFrom(notification);
102         final Event event = new Event(EventType.NOTIFY);
103         if (this.outputType.equals("JSON")) {
104             final JSONObject jsonObject = XML.toJSONObject(xml);
105             event.setData(jsonObject.toString());
106         } else {
107             event.setData(xml);
108         }
109         this.eventBus.post(event);
110     }
111
112     /**
113      * Checks if exists at least one {@link Channel} subscriber.
114      *
115      * @return True if exist at least one {@link Channel} subscriber, false
116      *         otherwise.
117      */
118     public boolean hasSubscribers() {
119         return !this.subscribers.isEmpty();
120     }
121
122     /**
123      * Reset lists, close registration and unregister bus event.
124      */
125     public void close() {
126         this.subscribers = new ConcurrentSet<>();
127         this.registration.close();
128         this.registration = null;
129         this.eventBus.unregister(this.eventBusChangeRecorder);
130     }
131
132     /**
133      * Get stream name of this listener
134      *
135      * @return {@link String}
136      */
137     public String getStreamName() {
138         return this.streamName;
139     }
140
141     /**
142      * Check if is this listener registered.
143      *
144      * @return - true if is registered, otherwise null
145      */
146     public boolean isListening() {
147         return this.registration == null ? false : true;
148     }
149
150     /**
151      * Get schema path of notification
152      *
153      * @return {@link SchemaPath}
154      */
155     public SchemaPath getSchemaPath() {
156         return this.path;
157     }
158
159     /**
160      * Set registration for close after closing connection and check if this
161      * listener is registered
162      *
163      * @param registration
164      *            - registered listener
165      */
166     public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
167         Preconditions.checkNotNull(registration);
168         this.registration = registration;
169     }
170
171     /**
172      * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
173      * subscriber to the event and post event into event bus.
174      *
175      * @param subscriber
176      *            Channel
177      */
178     public void addSubscriber(final Channel subscriber) {
179         if (!subscriber.isActive()) {
180             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
181         }
182         final Event event = new Event(EventType.REGISTER);
183         event.setSubscriber(subscriber);
184         this.eventBus.post(event);
185     }
186
187     /**
188      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
189      * subscriber to the event and posts event into event bus.
190      *
191      * @param subscriber
192      */
193     public void removeSubscriber(final Channel subscriber) {
194         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
195         final Event event = new Event(EventType.DEREGISTER);
196         event.setSubscriber(subscriber);
197         this.eventBus.post(event);
198     }
199
200     private String prepareXmlFrom(final DOMNotification notification) {
201         final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
202         final Document doc = ListenerAdapter.createDocument();
203         final Element notificationElement =
204                 doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
205                 "notification");
206         doc.appendChild(notificationElement);
207
208         final Element eventTimeElement = doc.createElement("eventTime");
209         eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
210         notificationElement.appendChild(eventTimeElement);
211
212         final Element notificationEventElement = doc.createElementNS(
213                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
214         addValuesToNotificationEventElement(doc, notificationEventElement, notification, schemaContext);
215         notificationElement.appendChild(notificationEventElement);
216
217         try {
218             final ByteArrayOutputStream out = new ByteArrayOutputStream();
219             final Transformer transformer = FACTORY.newTransformer();
220             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
221             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
222             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
223             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
224             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
225             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
226             final byte[] charData = out.toByteArray();
227             return new String(charData, "UTF-8");
228         } catch (TransformerException | UnsupportedEncodingException e) {
229             final String msg = "Error during transformation of Document into String";
230             LOG.error(msg, e);
231             return msg;
232         }
233     }
234
235     private void addValuesToNotificationEventElement(final Document doc, final Element element,
236             final DOMNotification notification, final SchemaContext schemaContext) {
237         if (notification == null) {
238             return;
239         }
240
241         final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
242                 .getBody();
243         try {
244             final DOMResult domResult = writeNormalizedNode(body,
245                     YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
246             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
247             final Element dataElement = doc.createElement("notification");
248             dataElement.appendChild(result);
249             element.appendChild(dataElement);
250         } catch (final IOException e) {
251             LOG.error("Error in writer ", e);
252         } catch (final XMLStreamException e) {
253             LOG.error("Error processing stream", e);
254         }
255     }
256
257     private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
258             final SchemaContext context) throws IOException, XMLStreamException {
259         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
260         final Document doc = XmlDocumentUtils.getDocument();
261         final DOMResult result = new DOMResult(doc);
262         NormalizedNodeWriter normalizedNodeWriter = null;
263         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
264         XMLStreamWriter writer = null;
265
266         try {
267             writer = XML_FACTORY.createXMLStreamWriter(result);
268             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
269                     this.getSchemaPath());
270             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
271
272             normalizedNodeWriter.write(normalized);
273
274             normalizedNodeWriter.flush();
275         } finally {
276             if (normalizedNodeWriter != null) {
277                 normalizedNodeWriter.close();
278             }
279             if (normalizedNodeStreamWriter != null) {
280                 normalizedNodeStreamWriter.close();
281             }
282             if (writer != null) {
283                 writer.close();
284             }
285         }
286
287         return result;
288     }
289
290     /**
291      * Tracks events of data change by customer.
292      */
293     private final class EventBusChangeRecorder {
294         @Subscribe
295         public void recordCustomerChange(final Event event) {
296             if (event.getType() == EventType.REGISTER) {
297                 final Channel subscriber = event.getSubscriber();
298                 if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
299                     NotificationListenerAdapter.this.subscribers.add(subscriber);
300                 }
301             } else if (event.getType() == EventType.DEREGISTER) {
302                 NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
303                 Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
304             } else if (event.getType() == EventType.NOTIFY) {
305                 for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
306                     if (subscriber.isActive()) {
307                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
308                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
309                     } else {
310                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
311                         NotificationListenerAdapter.this.subscribers.remove(subscriber);
312                     }
313                 }
314             }
315         }
316     }
317
318     /**
319      * Represents event of specific {@link EventType} type, holds data and
320      * {@link Channel} subscriber.
321      */
322     private final class Event {
323         private final EventType type;
324         private Channel subscriber;
325         private String data;
326
327         /**
328          * Creates new event specified by {@link EventType} type.
329          *
330          * @param type
331          *            EventType
332          */
333         public Event(final EventType type) {
334             this.type = type;
335         }
336
337         /**
338          * Gets the {@link Channel} subscriber.
339          *
340          * @return Channel
341          */
342         public Channel getSubscriber() {
343             return this.subscriber;
344         }
345
346         /**
347          * Sets subscriber for event.
348          *
349          * @param subscriber
350          *            Channel
351          */
352         public void setSubscriber(final Channel subscriber) {
353             this.subscriber = subscriber;
354         }
355
356         /**
357          * Gets event String.
358          *
359          * @return String representation of event data.
360          */
361         public String getData() {
362             return this.data;
363         }
364
365         /**
366          * Sets event data.
367          *
368          * @param data
369          *            String.
370          */
371         public void setData(final String data) {
372             this.data = data;
373         }
374
375         /**
376          * Gets event type.
377          *
378          * @return The type of the event.
379          */
380         public EventType getType() {
381             return this.type;
382         }
383     }
384
385     /**
386      * Type of the event.
387      */
388     private enum EventType {
389         REGISTER, DEREGISTER, NOTIFY;
390     }
391 }