2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.sal.streams.listeners;
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.OutputStreamWriter;
21 import java.io.StringReader;
22 import java.io.StringWriter;
23 import java.io.UnsupportedEncodingException;
24 import java.io.Writer;
25 import java.util.Collection;
26 import java.util.Date;
28 import java.util.concurrent.Executors;
29 import javax.xml.parsers.DocumentBuilder;
30 import javax.xml.parsers.DocumentBuilderFactory;
31 import javax.xml.stream.XMLOutputFactory;
32 import javax.xml.stream.XMLStreamException;
33 import javax.xml.stream.XMLStreamWriter;
34 import javax.xml.transform.OutputKeys;
35 import javax.xml.transform.Transformer;
36 import javax.xml.transform.TransformerException;
37 import javax.xml.transform.TransformerFactory;
38 import javax.xml.transform.dom.DOMResult;
39 import javax.xml.transform.dom.DOMSource;
40 import javax.xml.transform.stream.StreamResult;
41 import javax.xml.xpath.XPath;
42 import javax.xml.xpath.XPathConstants;
43 import javax.xml.xpath.XPathFactory;
44 import org.json.JSONObject;
45 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
46 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
47 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
48 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
49 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
50 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
51 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
52 import org.opendaylight.restconf.Draft18.MonitoringModule;
53 import org.opendaylight.restconf.handlers.SchemaContextHandler;
54 import org.opendaylight.restconf.handlers.TransactionChainHandler;
55 import org.opendaylight.restconf.parser.IdentifierCodec;
56 import org.opendaylight.yangtools.concepts.ListenerRegistration;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
59 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
60 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
61 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
62 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
63 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
64 import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactory;
65 import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
66 import org.opendaylight.yangtools.yang.data.codec.gson.JsonWriterFactory;
67 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
68 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
69 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
70 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73 import org.w3c.dom.Document;
74 import org.w3c.dom.Element;
75 import org.w3c.dom.Node;
76 import org.xml.sax.InputSource;
79 * {@link NotificationListenerAdapter} is responsible to track events on
83 public class NotificationListenerAdapter implements DOMNotificationListener {
85 private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
86 private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
88 private final String streamName;
89 private final EventBus eventBus;
90 private final EventBusChangeRecorder eventBusChangeRecorder;
92 private final SchemaPath path;
93 private final String outputType;
94 private Date start = null;
95 private Date stop = null;
96 private String filter;
98 private SchemaContext schemaContext;
99 private DOMNotification notification;
100 private ListenerRegistration<DOMNotificationListener> registration;
101 private Set<Channel> subscribers = new ConcurrentSet<>();
102 private TransactionChainHandler transactionChainHandler;
103 private SchemaContextHandler schemaHandler;
106 * Set path of listener and stream name, register event bus.
109 * - path of notification
111 * - stream name of listener
113 * - type of output on notification (JSON, XML)
115 NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
116 this.outputType = outputType;
117 Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
118 Preconditions.checkArgument(path != null);
120 this.streamName = streamName;
121 this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
122 this.eventBusChangeRecorder = new EventBusChangeRecorder();
123 this.eventBus.register(this.eventBusChangeRecorder);
127 public void onNotification(final DOMNotification notification) {
128 this.schemaContext = ControllerContext.getInstance().getGlobalSchema();
129 this.notification = notification;
130 final Date now = new Date();
131 if (this.stop != null) {
132 if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
135 if (this.stop.compareTo(now) < 0) {
138 } catch (final Exception e) {
139 throw new RestconfDocumentedException("Problem with unregister listener." + e);
142 } else if (this.start != null) {
143 if (this.start.compareTo(now) < 0) {
153 * Check if is filter used and then prepare and post data do client
156 private void checkFilter() {
157 final String xml = prepareXml();
158 if (this.filter == null) {
159 prepareAndPostData(xml);
162 if (parseFilterParam(xml)) {
163 prepareAndPostData(xml);
165 } catch (final Exception e) {
166 throw new RestconfDocumentedException("Problem while parsing filter.", e);
172 * Parse and evaluate filter value by xml
175 * - notification data in xml
176 * @return true or false - depends on filter expression and data of
180 private boolean parseFilterParam(final String xml) throws Exception {
181 final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
182 final DocumentBuilder builder = factory.newDocumentBuilder();
183 final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
184 final XPath xPath = XPathFactory.newInstance().newXPath();
185 return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
189 * Prepare data of notification and data to client
193 private void prepareAndPostData(final String xml) {
194 final Event event = new Event(EventType.NOTIFY);
195 if (this.outputType.equals("JSON")) {
196 event.setData(prepareJson());
200 this.eventBus.post(event);
204 * Prepare json from notification data
206 * @return json as {@link String}
208 private String prepareJson() {
209 final JSONObject json = new JSONObject();
210 json.put("ietf-restconf:notification",
211 new JSONObject(writeBodyToString()).put("event-time", ListenerAdapter.toRFC3339(new Date())));
212 return json.toString();
215 private String writeBodyToString() {
216 final Writer writer = new StringWriter();
217 final NormalizedNodeStreamWriter jsonStream =
218 JSONNormalizedNodeStreamWriter.createExclusiveWriter(JSONCodecFactory.create(this.schemaContext),
219 this.notification.getType(), null, JsonWriterFactory.createJsonWriter(writer));
220 final NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(jsonStream);
222 nodeWriter.write(this.notification.getBody());
224 } catch (final IOException e) {
225 throw new RestconfDocumentedException("Problem while writing body of notification to JSON. ", e);
227 return writer.toString();
231 * Checks if exists at least one {@link Channel} subscriber.
233 * @return True if exist at least one {@link Channel} subscriber, false
236 public boolean hasSubscribers() {
237 return !this.subscribers.isEmpty();
241 * Reset lists, close registration and unregister bus event and delete data in DS.
243 public void close() {
244 final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction();
245 wTx.delete(LogicalDatastoreType.OPERATIONAL,
246 IdentifierCodec.deserialize(
247 MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + this.path.getLastComponent().getLocalName(),
248 this.schemaHandler.get()));
250 wTx.submit().checkedGet();
251 } catch (final TransactionCommitFailedException e) {
252 throw new RestconfDocumentedException("Problem while deleting data from DS.", e);
255 this.subscribers = new ConcurrentSet<>();
256 this.registration.close();
257 this.registration = null;
258 this.eventBus.unregister(this.eventBusChangeRecorder);
262 * Get stream name of this listener
264 * @return {@link String}
266 public String getStreamName() {
267 return this.streamName;
271 * Check if is this listener registered.
273 * @return - true if is registered, otherwise null
275 public boolean isListening() {
276 return this.registration == null ? false : true;
280 * Get schema path of notification
282 * @return {@link SchemaPath}
284 public SchemaPath getSchemaPath() {
289 * Set registration for close after closing connection and check if this
290 * listener is registered
292 * @param registration
293 * - registered listener
295 public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
296 Preconditions.checkNotNull(registration);
297 this.registration = registration;
301 * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
302 * subscriber to the event and post event into event bus.
307 public void addSubscriber(final Channel subscriber) {
308 if (!subscriber.isActive()) {
309 LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
311 final Event event = new Event(EventType.REGISTER);
312 event.setSubscriber(subscriber);
313 this.eventBus.post(event);
317 * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
318 * subscriber to the event and posts event into event bus.
322 public void removeSubscriber(final Channel subscriber) {
323 LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
324 final Event event = new Event(EventType.DEREGISTER);
325 event.setSubscriber(subscriber);
326 this.eventBus.post(event);
329 private String prepareXml() {
330 final Document doc = ListenerAdapter.createDocument();
331 final Element notificationElement =
332 doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
334 doc.appendChild(notificationElement);
336 final Element eventTimeElement = doc.createElement("eventTime");
337 eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
338 notificationElement.appendChild(eventTimeElement);
340 final Element notificationEventElement = doc.createElementNS(
341 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
342 addValuesToNotificationEventElement(doc, notificationEventElement, this.notification, this.schemaContext);
343 notificationElement.appendChild(notificationEventElement);
346 final ByteArrayOutputStream out = new ByteArrayOutputStream();
347 final Transformer transformer = FACTORY.newTransformer();
348 transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
349 transformer.setOutputProperty(OutputKeys.METHOD, "xml");
350 transformer.setOutputProperty(OutputKeys.INDENT, "yes");
351 transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
352 transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
353 transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
354 final byte[] charData = out.toByteArray();
355 return new String(charData, "UTF-8");
356 } catch (TransformerException | UnsupportedEncodingException e) {
357 final String msg = "Error during transformation of Document into String";
363 private void addValuesToNotificationEventElement(final Document doc, final Element element,
364 final DOMNotification notification, final SchemaContext schemaContext) {
365 if (notification == null) {
369 final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
372 final DOMResult domResult = writeNormalizedNode(body,
373 YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
374 final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
375 final Element dataElement = doc.createElement("notification");
376 dataElement.appendChild(result);
377 element.appendChild(dataElement);
378 } catch (final IOException e) {
379 LOG.error("Error in writer ", e);
380 } catch (final XMLStreamException e) {
381 LOG.error("Error processing stream", e);
385 private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
386 final SchemaContext context) throws IOException, XMLStreamException {
387 final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
388 final Document doc = XmlDocumentUtils.getDocument();
389 final DOMResult result = new DOMResult(doc);
390 NormalizedNodeWriter normalizedNodeWriter = null;
391 NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
392 XMLStreamWriter writer = null;
395 writer = XML_FACTORY.createXMLStreamWriter(result);
396 normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
397 this.getSchemaPath());
398 normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
400 normalizedNodeWriter.write(normalized);
402 normalizedNodeWriter.flush();
404 if (normalizedNodeWriter != null) {
405 normalizedNodeWriter.close();
407 if (normalizedNodeStreamWriter != null) {
408 normalizedNodeStreamWriter.close();
410 if (writer != null) {
419 * Tracks events of data change by customer.
421 private final class EventBusChangeRecorder {
423 public void recordCustomerChange(final Event event) {
424 if (event.getType() == EventType.REGISTER) {
425 final Channel subscriber = event.getSubscriber();
426 if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
427 NotificationListenerAdapter.this.subscribers.add(subscriber);
429 } else if (event.getType() == EventType.DEREGISTER) {
430 NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
431 Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
432 } else if (event.getType() == EventType.NOTIFY) {
433 for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
434 if (subscriber.isActive()) {
435 LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
436 subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
438 LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
439 NotificationListenerAdapter.this.subscribers.remove(subscriber);
447 * Represents event of specific {@link EventType} type, holds data and
448 * {@link Channel} subscriber.
450 private final class Event {
451 private final EventType type;
452 private Channel subscriber;
456 * Creates new event specified by {@link EventType} type.
461 public Event(final EventType type) {
466 * Gets the {@link Channel} subscriber.
470 public Channel getSubscriber() {
471 return this.subscriber;
475 * Sets subscriber for event.
480 public void setSubscriber(final Channel subscriber) {
481 this.subscriber = subscriber;
487 * @return String representation of event data.
489 public String getData() {
499 public void setData(final String data) {
506 * @return The type of the event.
508 public EventType getType() {
516 private enum EventType {
517 REGISTER, DEREGISTER, NOTIFY
521 * Set query parameters for listener
524 * - start-time of getting notification
526 * - stop-time of getting notification
528 * - indicate which subset of all possible events are of interest
530 public void setQueryParams(final Date start, final Date stop, final String filter) {
533 this.filter = filter;
537 * Get outputType of listenere
539 * @return the outputType
541 public String getOutputType() {
542 return this.outputType;
546 * Transaction chain to delete data in DS on close()
548 * @param transactionChainHandler
549 * - creating new write transaction to delete data on close
550 * @param schemaHandler
551 * - for getting schema to deserialize
552 * {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to
553 * {@link YangInstanceIdentifier}
555 public void setCloseVars(final TransactionChainHandler transactionChainHandler,
556 final SchemaContextHandler schemaHandler) {
557 this.transactionChainHandler = transactionChainHandler;
558 this.schemaHandler = schemaHandler;