2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.sal.streams.listeners;
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.OutputStreamWriter;
21 import java.io.StringReader;
22 import java.io.UnsupportedEncodingException;
23 import java.util.Collection;
24 import java.util.Date;
26 import java.util.concurrent.Executors;
27 import javax.xml.parsers.DocumentBuilder;
28 import javax.xml.parsers.DocumentBuilderFactory;
29 import javax.xml.stream.XMLOutputFactory;
30 import javax.xml.stream.XMLStreamException;
31 import javax.xml.stream.XMLStreamWriter;
32 import javax.xml.transform.OutputKeys;
33 import javax.xml.transform.Transformer;
34 import javax.xml.transform.TransformerException;
35 import javax.xml.transform.TransformerFactory;
36 import javax.xml.transform.dom.DOMResult;
37 import javax.xml.transform.dom.DOMSource;
38 import javax.xml.transform.stream.StreamResult;
39 import javax.xml.xpath.XPath;
40 import javax.xml.xpath.XPathConstants;
41 import javax.xml.xpath.XPathFactory;
42 import org.json.JSONObject;
44 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
45 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
46 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
47 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
48 import org.opendaylight.yangtools.concepts.ListenerRegistration;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
52 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
53 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
54 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
55 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
56 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
57 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
59 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62 import org.w3c.dom.Document;
63 import org.w3c.dom.Element;
64 import org.w3c.dom.Node;
65 import org.xml.sax.InputSource;
68 * {@link NotificationListenerAdapter} is responsible to track events on
72 public class NotificationListenerAdapter implements DOMNotificationListener {
74 private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
75 private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
77 private final String streamName;
78 private ListenerRegistration<DOMNotificationListener> registration;
79 private Set<Channel> subscribers = new ConcurrentSet<>();
80 private final EventBus eventBus;
81 private final EventBusChangeRecorder eventBusChangeRecorder;
83 private final SchemaPath path;
84 private final String outputType;
85 private Date start = null;
86 private Date stop = null;
87 private String filter;
90 * Set path of listener and stream name, register event bus.
93 * - path of notification
95 * - stream name of listener
97 * - type of output on notification (JSON, XML)
99 NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
100 this.outputType = outputType;
101 Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
102 Preconditions.checkArgument(path != null);
104 this.streamName = streamName;
105 this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
106 this.eventBusChangeRecorder = new EventBusChangeRecorder();
107 this.eventBus.register(this.eventBusChangeRecorder);
111 public void onNotification(final DOMNotification notification) {
112 final Date now = new Date();
113 if (this.stop != null) {
114 if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
115 checkFilter(notification);
117 if (this.stop.compareTo(now) < 0) {
120 } catch (final Exception e) {
121 throw new RestconfDocumentedException("Problem with unregister listener." + e);
124 } else if (this.start != null) {
125 if (this.start.compareTo(now) < 0) {
127 checkFilter(notification);
130 checkFilter(notification);
135 * Check if is filter used and then prepare and post data do client
137 * @param notification
138 * - data of notification
140 private void checkFilter(final DOMNotification notification) {
141 final String xml = prepareXmlFrom(notification);
142 if (this.filter == null) {
143 prepareAndPostData(xml);
146 if (parseFilterParam(xml)) {
147 prepareAndPostData(xml);
149 } catch (final Exception e) {
150 throw new RestconfDocumentedException("Problem while parsing filter.", e);
156 * Parse and evaluate filter value by xml
159 * - notification data in xml
160 * @return true or false - depends on filter expression and data of
164 private boolean parseFilterParam(final String xml) throws Exception {
165 final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
166 final DocumentBuilder builder = factory.newDocumentBuilder();
167 final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
168 final XPath xPath = XPathFactory.newInstance().newXPath();
169 return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
173 * Prepare data of notification and data to client
177 private void prepareAndPostData(final String xml) {
178 final Event event = new Event(EventType.NOTIFY);
179 if (this.outputType.equals("JSON")) {
180 final JSONObject jsonObject = XML.toJSONObject(xml);
181 event.setData(jsonObject.toString());
185 this.eventBus.post(event);
189 * Checks if exists at least one {@link Channel} subscriber.
191 * @return True if exist at least one {@link Channel} subscriber, false
194 public boolean hasSubscribers() {
195 return !this.subscribers.isEmpty();
199 * Reset lists, close registration and unregister bus event.
201 public void close() {
202 this.subscribers = new ConcurrentSet<>();
203 this.registration.close();
204 this.registration = null;
205 this.eventBus.unregister(this.eventBusChangeRecorder);
209 * Get stream name of this listener
211 * @return {@link String}
213 public String getStreamName() {
214 return this.streamName;
218 * Check if is this listener registered.
220 * @return - true if is registered, otherwise null
222 public boolean isListening() {
223 return this.registration == null ? false : true;
227 * Get schema path of notification
229 * @return {@link SchemaPath}
231 public SchemaPath getSchemaPath() {
236 * Set registration for close after closing connection and check if this
237 * listener is registered
239 * @param registration
240 * - registered listener
242 public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
243 Preconditions.checkNotNull(registration);
244 this.registration = registration;
248 * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
249 * subscriber to the event and post event into event bus.
254 public void addSubscriber(final Channel subscriber) {
255 if (!subscriber.isActive()) {
256 LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
258 final Event event = new Event(EventType.REGISTER);
259 event.setSubscriber(subscriber);
260 this.eventBus.post(event);
264 * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
265 * subscriber to the event and posts event into event bus.
269 public void removeSubscriber(final Channel subscriber) {
270 LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
271 final Event event = new Event(EventType.DEREGISTER);
272 event.setSubscriber(subscriber);
273 this.eventBus.post(event);
276 private String prepareXmlFrom(final DOMNotification notification) {
277 final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
278 final Document doc = ListenerAdapter.createDocument();
279 final Element notificationElement =
280 doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
282 doc.appendChild(notificationElement);
284 final Element eventTimeElement = doc.createElement("eventTime");
285 eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
286 notificationElement.appendChild(eventTimeElement);
287 final String notificationNamespace = notification.getType().getLastComponent().getNamespace().toString();
288 final Element notificationEventElement = doc.createElementNS(
289 notificationNamespace, "event");
290 addValuesToNotificationEventElement(doc, notificationEventElement, notification, schemaContext);
291 notificationElement.appendChild(notificationEventElement);
294 final ByteArrayOutputStream out = new ByteArrayOutputStream();
295 final Transformer transformer = FACTORY.newTransformer();
296 transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
297 transformer.setOutputProperty(OutputKeys.METHOD, "xml");
298 transformer.setOutputProperty(OutputKeys.INDENT, "yes");
299 transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
300 transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
301 transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
302 final byte[] charData = out.toByteArray();
303 return new String(charData, "UTF-8");
304 } catch (TransformerException | UnsupportedEncodingException e) {
305 final String msg = "Error during transformation of Document into String";
311 private void addValuesToNotificationEventElement(final Document doc, final Element element,
312 final DOMNotification notification, final SchemaContext schemaContext) {
313 if (notification == null) {
317 final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
320 final DOMResult domResult = writeNormalizedNode(body,
321 YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
322 final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
323 element.appendChild(result);
324 } catch (final IOException e) {
325 LOG.error("Error in writer ", e);
326 } catch (final XMLStreamException e) {
327 LOG.error("Error processing stream", e);
331 private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
332 final SchemaContext context) throws IOException, XMLStreamException {
333 final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
334 final Document doc = XmlDocumentUtils.getDocument();
335 final DOMResult result = new DOMResult(doc);
336 NormalizedNodeWriter normalizedNodeWriter = null;
337 NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
338 XMLStreamWriter writer = null;
341 writer = XML_FACTORY.createXMLStreamWriter(result);
342 normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
343 this.getSchemaPath());
344 normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
346 normalizedNodeWriter.write(normalized);
348 normalizedNodeWriter.flush();
350 if (normalizedNodeWriter != null) {
351 normalizedNodeWriter.close();
353 if (normalizedNodeStreamWriter != null) {
354 normalizedNodeStreamWriter.close();
356 if (writer != null) {
365 * Tracks events of data change by customer.
367 private final class EventBusChangeRecorder {
369 public void recordCustomerChange(final Event event) {
370 if (event.getType() == EventType.REGISTER) {
371 final Channel subscriber = event.getSubscriber();
372 if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
373 NotificationListenerAdapter.this.subscribers.add(subscriber);
375 } else if (event.getType() == EventType.DEREGISTER) {
376 NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
377 Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
378 } else if (event.getType() == EventType.NOTIFY) {
379 for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
380 if (subscriber.isActive()) {
381 LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
382 subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
384 LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
385 NotificationListenerAdapter.this.subscribers.remove(subscriber);
393 * Represents event of specific {@link EventType} type, holds data and
394 * {@link Channel} subscriber.
396 private final class Event {
397 private final EventType type;
398 private Channel subscriber;
402 * Creates new event specified by {@link EventType} type.
407 public Event(final EventType type) {
412 * Gets the {@link Channel} subscriber.
416 public Channel getSubscriber() {
417 return this.subscriber;
421 * Sets subscriber for event.
426 public void setSubscriber(final Channel subscriber) {
427 this.subscriber = subscriber;
433 * @return String representation of event data.
435 public String getData() {
445 public void setData(final String data) {
452 * @return The type of the event.
454 public EventType getType() {
462 private enum EventType {
463 REGISTER, DEREGISTER, NOTIFY
467 * Set query parameters for listener
470 * - start-time of getting notification
472 * - stop-time of getting notification
474 * - indicate which subset of all possible events are of interest
476 public void setQueryParams(final Date start, final Date stop, final String filter) {
479 this.filter = filter;