2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.sal.streams.listeners;
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.OutputStreamWriter;
21 import java.io.StringWriter;
22 import java.io.UnsupportedEncodingException;
23 import java.io.Writer;
24 import java.util.Collection;
25 import java.util.Date;
26 import java.util.LinkedList;
27 import java.util.List;
29 import java.util.concurrent.Executors;
30 import javax.xml.stream.XMLOutputFactory;
31 import javax.xml.stream.XMLStreamException;
32 import javax.xml.stream.XMLStreamWriter;
33 import javax.xml.transform.OutputKeys;
34 import javax.xml.transform.Transformer;
35 import javax.xml.transform.TransformerException;
36 import javax.xml.transform.TransformerFactory;
37 import javax.xml.transform.dom.DOMResult;
38 import javax.xml.transform.dom.DOMSource;
39 import javax.xml.transform.stream.StreamResult;
40 import org.json.JSONObject;
41 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
42 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
43 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
44 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
49 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
52 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
53 import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactory;
54 import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
55 import org.opendaylight.yangtools.yang.data.codec.gson.JsonWriterFactory;
56 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
57 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
59 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62 import org.w3c.dom.Document;
63 import org.w3c.dom.Element;
64 import org.w3c.dom.Node;
67 * {@link NotificationListenerAdapter} is responsible to track events on
71 public class NotificationListenerAdapter implements DOMNotificationListener {
73 private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
74 private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
76 private final String streamName;
77 private final EventBus eventBus;
78 private final EventBusChangeRecorder eventBusChangeRecorder;
80 private final SchemaPath path;
81 private final String outputType;
83 private SchemaContext schemaContext;
84 private DOMNotification notification;
85 private ListenerRegistration<DOMNotificationListener> registration;
86 private Set<Channel> subscribers = new ConcurrentSet<>();
89 * Set path of listener and stream name, register event bus.
92 * - path of notification
94 * - stream name of listener
96 * - type of output on notification (JSON, XML)
98 NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
99 this.outputType = outputType;
100 Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
101 Preconditions.checkArgument(path != null);
103 this.streamName = streamName;
104 this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
105 this.eventBusChangeRecorder = new EventBusChangeRecorder();
106 this.eventBus.register(this.eventBusChangeRecorder);
110 public void onNotification(final DOMNotification notification) {
111 this.schemaContext = ControllerContext.getInstance().getGlobalSchema();
112 this.notification = notification;
113 final Event event = new Event(EventType.NOTIFY);
114 if (this.outputType.equals("JSON")) {
115 event.setData(prepareJson());
117 event.setData(prepareXml());
119 this.eventBus.post(event);
123 * Prepare json from notification data
125 * @return json as {@link String}
127 private String prepareJson() {
128 final JSONObject json = new JSONObject();
129 json.put("ietf-restconf:notification",
130 new JSONObject(writeBodyToString()).put("event-time", ListenerAdapter.toRFC3339(new Date())));
131 return json.toString();
134 private String writeBodyToString() {
135 final Writer writer = new StringWriter();
136 final NormalizedNodeStreamWriter jsonStream =
137 JSONNormalizedNodeStreamWriter.createExclusiveWriter(JSONCodecFactory.create(this.schemaContext),
138 this.notification.getType(), null, JsonWriterFactory.createJsonWriter(writer));
139 final NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(jsonStream);
141 nodeWriter.write(this.notification.getBody());
143 } catch (final IOException e) {
144 throw new RestconfDocumentedException("Problem while writing body of notification to JSON. ", e);
146 return writer.toString();
150 * Checks if exists at least one {@link Channel} subscriber.
152 * @return True if exist at least one {@link Channel} subscriber, false
155 public boolean hasSubscribers() {
156 return !this.subscribers.isEmpty();
160 * Reset lists, close registration and unregister bus event.
162 public void close() {
163 this.subscribers = new ConcurrentSet<>();
164 this.registration.close();
165 this.registration = null;
166 this.eventBus.unregister(this.eventBusChangeRecorder);
170 * Get stream name of this listener
172 * @return {@link String}
174 public String getStreamName() {
175 return this.streamName;
179 * Check if is this listener registered.
181 * @return - true if is registered, otherwise null
183 public boolean isListening() {
184 return this.registration == null ? false : true;
188 * Get schema path of notification
190 * @return {@link SchemaPath}
192 public SchemaPath getSchemaPath() {
197 * Set registration for close after closing connection and check if this
198 * listener is registered
200 * @param registration
201 * - registered listener
203 public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
204 Preconditions.checkNotNull(registration);
205 this.registration = registration;
209 * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
210 * subscriber to the event and post event into event bus.
215 public void addSubscriber(final Channel subscriber) {
216 if (!subscriber.isActive()) {
217 LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
219 final Event event = new Event(EventType.REGISTER);
220 event.setSubscriber(subscriber);
221 this.eventBus.post(event);
225 * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
226 * subscriber to the event and posts event into event bus.
230 public void removeSubscriber(final Channel subscriber) {
231 LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
232 final Event event = new Event(EventType.DEREGISTER);
233 event.setSubscriber(subscriber);
234 this.eventBus.post(event);
237 private String prepareXml() {
238 final Document doc = ListenerAdapter.createDocument();
239 final Element notificationElement =
240 doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
242 doc.appendChild(notificationElement);
244 final Element eventTimeElement = doc.createElement("eventTime");
245 eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
246 notificationElement.appendChild(eventTimeElement);
248 final Element notificationEventElement = doc.createElementNS(
249 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
250 addValuesToNotificationEventElement(doc, notificationEventElement, this.notification, this.schemaContext);
251 notificationElement.appendChild(notificationEventElement);
254 final ByteArrayOutputStream out = new ByteArrayOutputStream();
255 final Transformer transformer = FACTORY.newTransformer();
256 transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
257 transformer.setOutputProperty(OutputKeys.METHOD, "xml");
258 transformer.setOutputProperty(OutputKeys.INDENT, "yes");
259 transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
260 transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
261 transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
262 final byte[] charData = out.toByteArray();
263 return new String(charData, "UTF-8");
264 } catch (TransformerException | UnsupportedEncodingException e) {
265 final String msg = "Error during transformation of Document into String";
271 private void addValuesToNotificationEventElement(final Document doc, final Element element,
272 final DOMNotification notification, final SchemaContext schemaContext) {
273 if (notification == null) {
277 final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
280 final DOMResult domResult = writeNormalizedNode(body,
281 YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
282 final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
283 final Element dataElement = doc.createElement("notification");
284 dataElement.appendChild(result);
285 element.appendChild(dataElement);
286 } catch (final IOException e) {
287 LOG.error("Error in writer ", e);
288 } catch (final XMLStreamException e) {
289 LOG.error("Error processing stream", e);
293 private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
294 final SchemaContext context) throws IOException, XMLStreamException {
295 final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
296 final Document doc = XmlDocumentUtils.getDocument();
297 final DOMResult result = new DOMResult(doc);
298 NormalizedNodeWriter normalizedNodeWriter = null;
299 NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
300 XMLStreamWriter writer = null;
303 writer = XML_FACTORY.createXMLStreamWriter(result);
304 normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
305 this.getSchemaPath());
306 normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
308 normalizedNodeWriter.write(normalized);
310 normalizedNodeWriter.flush();
312 if (normalizedNodeWriter != null) {
313 normalizedNodeWriter.close();
315 if (normalizedNodeStreamWriter != null) {
316 normalizedNodeStreamWriter.close();
318 if (writer != null) {
327 * Tracks events of data change by customer.
329 private final class EventBusChangeRecorder {
331 public void recordCustomerChange(final Event event) {
332 if (event.getType() == EventType.REGISTER) {
333 final Channel subscriber = event.getSubscriber();
334 if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
335 NotificationListenerAdapter.this.subscribers.add(subscriber);
337 } else if (event.getType() == EventType.DEREGISTER) {
338 NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
339 Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
340 } else if (event.getType() == EventType.NOTIFY) {
341 for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
342 if (subscriber.isActive()) {
343 LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
344 subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
346 LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
347 NotificationListenerAdapter.this.subscribers.remove(subscriber);
355 * Represents event of specific {@link EventType} type, holds data and
356 * {@link Channel} subscriber.
358 private final class Event {
359 private final EventType type;
360 private Channel subscriber;
364 * Creates new event specified by {@link EventType} type.
369 public Event(final EventType type) {
374 * Gets the {@link Channel} subscriber.
378 public Channel getSubscriber() {
379 return this.subscriber;
383 * Sets subscriber for event.
388 public void setSubscriber(final Channel subscriber) {
389 this.subscriber = subscriber;
395 * @return String representation of event data.
397 public String getData() {
407 public void setData(final String data) {
414 * @return The type of the event.
416 public EventType getType() {
424 private enum EventType {
425 REGISTER, DEREGISTER, NOTIFY;