Bug 3959 - support netconf notification
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / NotificationListenerAdapter.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.OutputStreamWriter;
21 import java.io.UnsupportedEncodingException;
22 import java.util.Collection;
23 import java.util.Date;
24 import java.util.Set;
25 import java.util.concurrent.Executors;
26 import javax.xml.stream.XMLOutputFactory;
27 import javax.xml.stream.XMLStreamException;
28 import javax.xml.stream.XMLStreamWriter;
29 import javax.xml.transform.OutputKeys;
30 import javax.xml.transform.Transformer;
31 import javax.xml.transform.TransformerException;
32 import javax.xml.transform.TransformerFactory;
33 import javax.xml.transform.dom.DOMResult;
34 import javax.xml.transform.dom.DOMSource;
35 import javax.xml.transform.stream.StreamResult;
36 import org.json.JSONObject;
37 import org.json.XML;
38 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
39 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
40 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
45 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
48 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
49 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
50 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
51 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
52 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import org.w3c.dom.Document;
56 import org.w3c.dom.Element;
57 import org.w3c.dom.Node;
58
59 /**
60  * {@link NotificationListenerAdapter} is responsible to track events on
61  * notifications.
62  *
63  */
64 public class NotificationListenerAdapter implements DOMNotificationListener {
65
66     private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
67     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
68
69     private final String streamName;
70     private ListenerRegistration<DOMNotificationListener> registration;
71     private Set<Channel> subscribers = new ConcurrentSet<>();
72     private final EventBus eventBus;
73     private final EventBusChangeRecorder eventBusChangeRecorder;
74
75     private final SchemaPath path;
76     private final String outputType;
77
78     /**
79      * Set path of listener and stream name, register event bus.
80      *
81      * @param path
82      *            - path of notification
83      * @param streamName
84      *            - stream name of listener
85      * @param outputType
86      *            - type of output on notification (JSON, XML)
87      */
88     NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
89         this.outputType = outputType;
90         Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
91         Preconditions.checkArgument(path != null);
92         this.path = path;
93         this.streamName = streamName;
94         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
95         this.eventBusChangeRecorder = new EventBusChangeRecorder();
96         this.eventBus.register(this.eventBusChangeRecorder);
97     }
98
99     @Override
100     public void onNotification(final DOMNotification notification) {
101         final String xml = prepareXmlFrom(notification);
102         final Event event = new Event(EventType.NOTIFY);
103         if (this.outputType.equals("JSON")) {
104             final JSONObject jsonObject = XML.toJSONObject(xml);
105             event.setData(jsonObject.toString());
106         } else {
107             event.setData(xml);
108         }
109         this.eventBus.post(event);
110     }
111
112     /**
113      * Checks if exists at least one {@link Channel} subscriber.
114      *
115      * @return True if exist at least one {@link Channel} subscriber, false
116      *         otherwise.
117      */
118     public boolean hasSubscribers() {
119         return !this.subscribers.isEmpty();
120     }
121
122     /**
123      * Reset lists, close registration and unregister bus event.
124      */
125     public void close() {
126         this.subscribers = new ConcurrentSet<>();
127         this.registration.close();
128         this.registration = null;
129         this.eventBus.unregister(this.eventBusChangeRecorder);
130     }
131
132     /**
133      * Get stream name of this listener
134      *
135      * @return {@link String}
136      */
137     public String getStreamName() {
138         return this.streamName;
139     }
140
141     /**
142      * Check if is this listener registered.
143      *
144      * @return - true if is registered, otherwise null
145      */
146     public boolean isListening() {
147         return this.registration == null ? false : true;
148     }
149
150     /**
151      * Get schema path of notification
152      *
153      * @return {@link SchemaPath}
154      */
155     public SchemaPath getSchemaPath() {
156         return this.path;
157     }
158
159     /**
160      * Set registration for close after closing connection and check if this
161      * listener is registered
162      *
163      * @param registration
164      *            - registered listener
165      */
166     public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
167         Preconditions.checkNotNull(registration);
168         this.registration = registration;
169     }
170
171     /**
172      * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
173      * subscriber to the event and post event into event bus.
174      *
175      * @param subscriber
176      *            Channel
177      */
178     public void addSubscriber(final Channel subscriber) {
179         if (!subscriber.isActive()) {
180             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
181         }
182         final Event event = new Event(EventType.REGISTER);
183         event.setSubscriber(subscriber);
184         this.eventBus.post(event);
185     }
186
187     /**
188      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
189      * subscriber to the event and posts event into event bus.
190      *
191      * @param subscriber
192      */
193     public void removeSubscriber(final Channel subscriber) {
194         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
195         final Event event = new Event(EventType.DEREGISTER);
196         event.setSubscriber(subscriber);
197         this.eventBus.post(event);
198     }
199
200     private String prepareXmlFrom(final DOMNotification notification) {
201         final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
202         final Document doc = ListenerAdapter.createDocument();
203         final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
204                 "notification");
205         doc.appendChild(notificationElement);
206
207         final Element eventTimeElement = doc.createElement("eventTime");
208         eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
209         notificationElement.appendChild(eventTimeElement);
210
211         final Element notificationEventElement = doc.createElementNS(
212                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
213         addValuesToNotificationEventElement(doc, notificationEventElement, notification, schemaContext);
214         notificationElement.appendChild(notificationEventElement);
215
216         try {
217             final ByteArrayOutputStream out = new ByteArrayOutputStream();
218             final Transformer transformer = FACTORY.newTransformer();
219             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
220             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
221             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
222             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
223             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
224             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
225             final byte[] charData = out.toByteArray();
226             return new String(charData, "UTF-8");
227         } catch (TransformerException | UnsupportedEncodingException e) {
228             final String msg = "Error during transformation of Document into String";
229             LOG.error(msg, e);
230             return msg;
231         }
232     }
233
234     private void addValuesToNotificationEventElement(final Document doc, final Element element,
235             final DOMNotification notification, final SchemaContext schemaContext) {
236         if (notification == null) {
237             return;
238         }
239
240         final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
241                 .getBody();
242         try {
243             final DOMResult domResult = writeNormalizedNode(body,
244                     YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
245             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
246             final Element dataElement = doc.createElement("notification");
247             dataElement.appendChild(result);
248             element.appendChild(dataElement);
249         } catch (final IOException e) {
250             LOG.error("Error in writer ", e);
251         } catch (final XMLStreamException e) {
252             LOG.error("Error processing stream", e);
253         }
254     }
255
256     private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
257             final SchemaContext context) throws IOException, XMLStreamException {
258         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
259         final Document doc = XmlDocumentUtils.getDocument();
260         final DOMResult result = new DOMResult(doc);
261         NormalizedNodeWriter normalizedNodeWriter = null;
262         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
263         XMLStreamWriter writer = null;
264
265         try {
266             writer = XML_FACTORY.createXMLStreamWriter(result);
267             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
268                     this.getSchemaPath());
269             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
270
271             normalizedNodeWriter.write(normalized);
272
273             normalizedNodeWriter.flush();
274         } finally {
275             if (normalizedNodeWriter != null) {
276                 normalizedNodeWriter.close();
277             }
278             if (normalizedNodeStreamWriter != null) {
279                 normalizedNodeStreamWriter.close();
280             }
281             if (writer != null) {
282                 writer.close();
283             }
284         }
285
286         return result;
287     }
288
289     /**
290      * Tracks events of data change by customer.
291      */
292     private final class EventBusChangeRecorder {
293         @Subscribe
294         public void recordCustomerChange(final Event event) {
295             if (event.getType() == EventType.REGISTER) {
296                 final Channel subscriber = event.getSubscriber();
297                 if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
298                     NotificationListenerAdapter.this.subscribers.add(subscriber);
299                 }
300             } else if (event.getType() == EventType.DEREGISTER) {
301                 NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
302                 Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
303             } else if (event.getType() == EventType.NOTIFY) {
304                 for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
305                     if (subscriber.isActive()) {
306                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
307                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
308                     } else {
309                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
310                         NotificationListenerAdapter.this.subscribers.remove(subscriber);
311                     }
312                 }
313             }
314         }
315     }
316
317     /**
318      * Represents event of specific {@link EventType} type, holds data and
319      * {@link Channel} subscriber.
320      */
321     private final class Event {
322         private final EventType type;
323         private Channel subscriber;
324         private String data;
325
326         /**
327          * Creates new event specified by {@link EventType} type.
328          *
329          * @param type
330          *            EventType
331          */
332         public Event(final EventType type) {
333             this.type = type;
334         }
335
336         /**
337          * Gets the {@link Channel} subscriber.
338          *
339          * @return Channel
340          */
341         public Channel getSubscriber() {
342             return this.subscriber;
343         }
344
345         /**
346          * Sets subscriber for event.
347          *
348          * @param subscriber
349          *            Channel
350          */
351         public void setSubscriber(final Channel subscriber) {
352             this.subscriber = subscriber;
353         }
354
355         /**
356          * Gets event String.
357          *
358          * @return String representation of event data.
359          */
360         public String getData() {
361             return this.data;
362         }
363
364         /**
365          * Sets event data.
366          *
367          * @param data
368          *            String.
369          */
370         public void setData(final String data) {
371             this.data = data;
372         }
373
374         /**
375          * Gets event type.
376          *
377          * @return The type of the event.
378          */
379         public EventType getType() {
380             return this.type;
381         }
382     }
383
384     /**
385      * Type of the event.
386      */
387     private enum EventType {
388         REGISTER, DEREGISTER, NOTIFY;
389     }
390 }