2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.sal.streams.listeners;
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.OutputStreamWriter;
21 import java.io.UnsupportedEncodingException;
22 import java.util.Collection;
23 import java.util.Date;
25 import java.util.concurrent.Executors;
26 import javax.xml.stream.XMLOutputFactory;
27 import javax.xml.stream.XMLStreamException;
28 import javax.xml.stream.XMLStreamWriter;
29 import javax.xml.transform.OutputKeys;
30 import javax.xml.transform.Transformer;
31 import javax.xml.transform.TransformerException;
32 import javax.xml.transform.TransformerFactory;
33 import javax.xml.transform.dom.DOMResult;
34 import javax.xml.transform.dom.DOMSource;
35 import javax.xml.transform.stream.StreamResult;
36 import org.json.JSONObject;
38 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
39 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
40 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
41 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
42 import org.opendaylight.yangtools.concepts.ListenerRegistration;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
46 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
47 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
48 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
49 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
50 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
51 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import org.w3c.dom.Document;
57 import org.w3c.dom.Element;
58 import org.w3c.dom.Node;
61 * {@link NotificationListenerAdapter} is responsible to track events on
65 public class NotificationListenerAdapter implements DOMNotificationListener {
67 private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
68 private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
70 private final String streamName;
71 private ListenerRegistration<DOMNotificationListener> registration;
72 private Set<Channel> subscribers = new ConcurrentSet<>();
73 private final EventBus eventBus;
74 private final EventBusChangeRecorder eventBusChangeRecorder;
76 private final SchemaPath path;
77 private final String outputType;
78 private Date start = null;
79 private Date stop = null;
82 * Set path of listener and stream name, register event bus.
85 * - path of notification
87 * - stream name of listener
89 * - type of output on notification (JSON, XML)
91 NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
92 this.outputType = outputType;
93 Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
94 Preconditions.checkArgument(path != null);
96 this.streamName = streamName;
97 this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
98 this.eventBusChangeRecorder = new EventBusChangeRecorder();
99 this.eventBus.register(this.eventBusChangeRecorder);
103 public void onNotification(final DOMNotification notification) {
104 final Date now = new Date();
105 if (this.stop != null) {
106 if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
107 prepareAndPostData(notification);
109 if (this.stop.compareTo(now) < 0) {
112 } catch (final Exception e) {
113 throw new RestconfDocumentedException("Problem with unregister listener." + e);
116 } else if (this.start != null) {
117 if (this.start.compareTo(now) < 0) {
119 prepareAndPostData(notification);
122 prepareAndPostData(notification);
127 * @param notification
129 private void prepareAndPostData(final DOMNotification notification) {
130 final String xml = prepareXmlFrom(notification);
131 final Event event = new Event(EventType.NOTIFY);
132 if (this.outputType.equals("JSON")) {
133 final JSONObject jsonObject = XML.toJSONObject(xml);
134 event.setData(jsonObject.toString());
138 this.eventBus.post(event);
142 * Checks if exists at least one {@link Channel} subscriber.
144 * @return True if exist at least one {@link Channel} subscriber, false
147 public boolean hasSubscribers() {
148 return !this.subscribers.isEmpty();
152 * Reset lists, close registration and unregister bus event.
154 public void close() {
155 this.subscribers = new ConcurrentSet<>();
156 this.registration.close();
157 this.registration = null;
158 this.eventBus.unregister(this.eventBusChangeRecorder);
162 * Get stream name of this listener
164 * @return {@link String}
166 public String getStreamName() {
167 return this.streamName;
171 * Check if is this listener registered.
173 * @return - true if is registered, otherwise null
175 public boolean isListening() {
176 return this.registration == null ? false : true;
180 * Get schema path of notification
182 * @return {@link SchemaPath}
184 public SchemaPath getSchemaPath() {
189 * Set registration for close after closing connection and check if this
190 * listener is registered
192 * @param registration
193 * - registered listener
195 public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
196 Preconditions.checkNotNull(registration);
197 this.registration = registration;
201 * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
202 * subscriber to the event and post event into event bus.
207 public void addSubscriber(final Channel subscriber) {
208 if (!subscriber.isActive()) {
209 LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
211 final Event event = new Event(EventType.REGISTER);
212 event.setSubscriber(subscriber);
213 this.eventBus.post(event);
217 * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
218 * subscriber to the event and posts event into event bus.
222 public void removeSubscriber(final Channel subscriber) {
223 LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
224 final Event event = new Event(EventType.DEREGISTER);
225 event.setSubscriber(subscriber);
226 this.eventBus.post(event);
229 private String prepareXmlFrom(final DOMNotification notification) {
230 final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
231 final Document doc = ListenerAdapter.createDocument();
232 final Element notificationElement =
233 doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
235 doc.appendChild(notificationElement);
237 final Element eventTimeElement = doc.createElement("eventTime");
238 eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
239 notificationElement.appendChild(eventTimeElement);
240 final String notificationNamespace = notification.getType().getLastComponent().getNamespace().toString();
241 final Element notificationEventElement = doc.createElementNS(
242 notificationNamespace, "event");
243 addValuesToNotificationEventElement(doc, notificationEventElement, notification, schemaContext);
244 notificationElement.appendChild(notificationEventElement);
247 final ByteArrayOutputStream out = new ByteArrayOutputStream();
248 final Transformer transformer = FACTORY.newTransformer();
249 transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
250 transformer.setOutputProperty(OutputKeys.METHOD, "xml");
251 transformer.setOutputProperty(OutputKeys.INDENT, "yes");
252 transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
253 transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
254 transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
255 final byte[] charData = out.toByteArray();
256 return new String(charData, "UTF-8");
257 } catch (TransformerException | UnsupportedEncodingException e) {
258 final String msg = "Error during transformation of Document into String";
264 private void addValuesToNotificationEventElement(final Document doc, final Element element,
265 final DOMNotification notification, final SchemaContext schemaContext) {
266 if (notification == null) {
270 final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
273 final DOMResult domResult = writeNormalizedNode(body,
274 YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
275 final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
276 element.appendChild(result);
277 } catch (final IOException e) {
278 LOG.error("Error in writer ", e);
279 } catch (final XMLStreamException e) {
280 LOG.error("Error processing stream", e);
284 private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
285 final SchemaContext context) throws IOException, XMLStreamException {
286 final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
287 final Document doc = XmlDocumentUtils.getDocument();
288 final DOMResult result = new DOMResult(doc);
289 NormalizedNodeWriter normalizedNodeWriter = null;
290 NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
291 XMLStreamWriter writer = null;
294 writer = XML_FACTORY.createXMLStreamWriter(result);
295 normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
296 this.getSchemaPath());
297 normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
299 normalizedNodeWriter.write(normalized);
301 normalizedNodeWriter.flush();
303 if (normalizedNodeWriter != null) {
304 normalizedNodeWriter.close();
306 if (normalizedNodeStreamWriter != null) {
307 normalizedNodeStreamWriter.close();
309 if (writer != null) {
318 * Tracks events of data change by customer.
320 private final class EventBusChangeRecorder {
322 public void recordCustomerChange(final Event event) {
323 if (event.getType() == EventType.REGISTER) {
324 final Channel subscriber = event.getSubscriber();
325 if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
326 NotificationListenerAdapter.this.subscribers.add(subscriber);
328 } else if (event.getType() == EventType.DEREGISTER) {
329 NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
330 Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
331 } else if (event.getType() == EventType.NOTIFY) {
332 for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
333 if (subscriber.isActive()) {
334 LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
335 subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
337 LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
338 NotificationListenerAdapter.this.subscribers.remove(subscriber);
346 * Represents event of specific {@link EventType} type, holds data and
347 * {@link Channel} subscriber.
349 private final class Event {
350 private final EventType type;
351 private Channel subscriber;
355 * Creates new event specified by {@link EventType} type.
360 public Event(final EventType type) {
365 * Gets the {@link Channel} subscriber.
369 public Channel getSubscriber() {
370 return this.subscriber;
374 * Sets subscriber for event.
379 public void setSubscriber(final Channel subscriber) {
380 this.subscriber = subscriber;
386 * @return String representation of event data.
388 public String getData() {
398 public void setData(final String data) {
405 * @return The type of the event.
407 public EventType getType() {
415 private enum EventType {
416 REGISTER, DEREGISTER, NOTIFY
420 * Set query parameters for listener
423 * - start-time of getting notification
425 * - stop-time of getting notification
427 public void setTime(final Date start, final Date stop) {