2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.sal.streams.listeners;
10 import com.google.common.base.Preconditions;
11 import com.google.common.eventbus.AsyncEventBus;
12 import com.google.common.eventbus.EventBus;
13 import com.google.common.eventbus.Subscribe;
14 import io.netty.channel.Channel;
15 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
16 import io.netty.util.internal.ConcurrentSet;
17 import java.io.ByteArrayOutputStream;
18 import java.io.IOException;
19 import java.io.OutputStreamWriter;
20 import java.io.StringReader;
21 import java.io.UnsupportedEncodingException;
22 import java.nio.charset.StandardCharsets;
23 import java.text.SimpleDateFormat;
24 import java.util.Date;
26 import java.util.Map.Entry;
28 import java.util.concurrent.Executors;
29 import java.util.regex.Pattern;
30 import javax.xml.parsers.DocumentBuilder;
31 import javax.xml.parsers.DocumentBuilderFactory;
32 import javax.xml.parsers.ParserConfigurationException;
33 import javax.xml.stream.XMLOutputFactory;
34 import javax.xml.stream.XMLStreamException;
35 import javax.xml.stream.XMLStreamWriter;
36 import javax.xml.transform.OutputKeys;
37 import javax.xml.transform.Transformer;
38 import javax.xml.transform.TransformerException;
39 import javax.xml.transform.TransformerFactory;
40 import javax.xml.transform.dom.DOMResult;
41 import javax.xml.transform.dom.DOMSource;
42 import javax.xml.transform.stream.StreamResult;
43 import javax.xml.xpath.XPath;
44 import javax.xml.xpath.XPathConstants;
45 import javax.xml.xpath.XPathFactory;
46 import org.json.JSONObject;
48 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
49 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
50 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
51 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
52 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
53 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
54 import org.opendaylight.restconf.Draft18.MonitoringModule;
55 import org.opendaylight.restconf.handlers.SchemaContextHandler;
56 import org.opendaylight.restconf.handlers.TransactionChainHandler;
57 import org.opendaylight.restconf.parser.IdentifierCodec;
58 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
59 import org.opendaylight.yangtools.concepts.ListenerRegistration;
60 import org.opendaylight.yangtools.yang.common.QName;
61 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
62 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
64 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
65 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
67 import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
68 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
69 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
70 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
71 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
72 import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
73 import org.opendaylight.yangtools.yang.model.api.Module;
74 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78 import org.w3c.dom.Document;
79 import org.w3c.dom.Element;
80 import org.w3c.dom.Node;
81 import org.xml.sax.InputSource;
84 * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
86 public class ListenerAdapter implements DOMDataChangeListener {
88 private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
89 private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
90 private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
91 private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
93 private static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
95 private final YangInstanceIdentifier path;
96 private ListenerRegistration<DOMDataChangeListener> registration;
97 private final String streamName;
98 private Set<Channel> subscribers = new ConcurrentSet<>();
99 private final EventBus eventBus;
100 private final EventBusChangeRecorder eventBusChangeRecorder;
101 private final NotificationOutputType outputType;
102 private Date start = null;
103 private Date stop = null;
104 private String filter = null;
105 private TransactionChainHandler transactionChainHandler;
106 private SchemaContextHandler schemaHandler;
109 * Creates new {@link ListenerAdapter} listener specified by path and stream
113 * Path to data in data store.
115 * The name of the stream.
117 * - type of output on notification (JSON, XML)
119 ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
120 final NotificationOutputType outputType) {
121 this.outputType = outputType;
122 Preconditions.checkNotNull(path);
123 Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
125 this.streamName = streamName;
126 this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
127 this.eventBusChangeRecorder = new EventBusChangeRecorder();
128 this.eventBus.register(this.eventBusChangeRecorder);
132 public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
133 final Date now = new Date();
134 if (this.stop != null) {
135 if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
138 if (this.stop.compareTo(now) < 0) {
141 } catch (final Exception e) {
142 throw new RestconfDocumentedException("Problem with unregister listener." + e);
145 } else if (this.start != null) {
146 if (this.start.compareTo(now) < 0) {
156 * Check if is filter used and then prepare and post data do client
159 * - data of notification
161 private void checkFilter(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
162 final String xml = prepareXmlFrom(change);
163 if (this.filter == null) {
164 prepareAndPostData(xml);
167 if (parseFilterParam(xml)) {
168 prepareAndPostData(xml);
170 } catch (final Exception e) {
171 throw new RestconfDocumentedException("Problem while parsing filter.", e);
177 * Parse and evaluate filter value by xml
180 * - notification data in xml
181 * @return true or false - depends on filter expression and data of
185 private boolean parseFilterParam(final String xml) throws Exception {
186 final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
187 final DocumentBuilder builder = factory.newDocumentBuilder();
188 final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
189 final XPath xPath = XPathFactory.newInstance().newXPath();
190 return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
194 * Prepare data of notification and data to client
198 private void prepareAndPostData(final String xml) {
199 final Event event = new Event(EventType.NOTIFY);
200 if (this.outputType.equals(NotificationOutputType.JSON)) {
201 final JSONObject jsonObject = XML.toJSONObject(xml);
202 event.setData(jsonObject.toString());
206 this.eventBus.post(event);
210 * Tracks events of data change by customer.
212 private final class EventBusChangeRecorder {
214 public void recordCustomerChange(final Event event) {
215 if (event.getType() == EventType.REGISTER) {
216 final Channel subscriber = event.getSubscriber();
217 if (!ListenerAdapter.this.subscribers.contains(subscriber)) {
218 ListenerAdapter.this.subscribers.add(subscriber);
220 } else if (event.getType() == EventType.DEREGISTER) {
221 ListenerAdapter.this.subscribers.remove(event.getSubscriber());
222 Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
223 } else if (event.getType() == EventType.NOTIFY) {
224 for (final Channel subscriber : ListenerAdapter.this.subscribers) {
225 if (subscriber.isActive()) {
226 LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
227 subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
229 LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
230 ListenerAdapter.this.subscribers.remove(subscriber);
238 * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber.
240 private final class Event {
241 private final EventType type;
242 private Channel subscriber;
246 * Creates new event specified by {@link EventType} type.
251 public Event(final EventType type) {
256 * Gets the {@link Channel} subscriber.
260 public Channel getSubscriber() {
261 return this.subscriber;
265 * Sets subscriber for event.
270 public void setSubscriber(final Channel subscriber) {
271 this.subscriber = subscriber;
277 * @return String representation of event data.
279 public String getData() {
286 * @param data String.
288 public void setData(final String data) {
295 * @return The type of the event.
297 public EventType getType() {
305 private enum EventType {
312 * Prepare data in printable form and transform it to String.
316 * @return Data in printable form.
318 private String prepareXmlFrom(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
319 final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
320 final DataSchemaContextTree dataContextTree = DataSchemaContextTree.from(schemaContext);
321 final Document doc = createDocument();
322 final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
325 doc.appendChild(notificationElement);
327 final Element eventTimeElement = doc.createElement("eventTime");
328 eventTimeElement.setTextContent(toRFC3339(new Date()));
329 notificationElement.appendChild(eventTimeElement);
331 final Element dataChangedNotificationEventElement = doc.createElementNS(
332 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
334 addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change,
335 schemaContext, dataContextTree);
336 notificationElement.appendChild(dataChangedNotificationEventElement);
339 final ByteArrayOutputStream out = new ByteArrayOutputStream();
340 final Transformer transformer = FACTORY.newTransformer();
341 transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
342 transformer.setOutputProperty(OutputKeys.METHOD, "xml");
343 transformer.setOutputProperty(OutputKeys.INDENT, "yes");
344 transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
345 transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
346 transformer.transform(new DOMSource(doc),
347 new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
348 final byte[] charData = out.toByteArray();
349 return new String(charData, "UTF-8");
350 } catch (TransformerException | UnsupportedEncodingException e) {
351 final String msg = "Error during transformation of Document into String";
358 * Formats data specified by RFC3339.
362 * @return Data specified by RFC3339.
364 public static String toRFC3339(final Date d) {
365 return RFC3339_PATTERN.matcher(RFC3339.format(d)).replaceAll("$1:$2");
369 * Creates {@link Document} document.
370 * @return {@link Document} document.
372 public static Document createDocument() {
373 final DocumentBuilder bob;
375 bob = DBF.newDocumentBuilder();
376 } catch (final ParserConfigurationException e) {
379 return bob.newDocument();
383 * Adds values to data changed notification event element.
387 * @param dataChangedNotificationEventElement
390 * {@link AsyncDataChangeEvent}
392 private void addValuesToDataChangedNotificationEventElement(final Document doc,
393 final Element dataChangedNotificationEventElement,
394 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change,
395 final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
397 addCreatedChangedValuesFromDataToElement(doc, change.getCreatedData().entrySet(),
398 dataChangedNotificationEventElement,
399 Operation.CREATED, schemaContext, dataSchemaContextTree);
401 addCreatedChangedValuesFromDataToElement(doc, change.getUpdatedData().entrySet(),
402 dataChangedNotificationEventElement,
403 Operation.UPDATED, schemaContext, dataSchemaContextTree);
405 addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement,
410 * Adds values from data to element.
415 * Set of {@link YangInstanceIdentifier}.
421 private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data,
422 final Element element, final Operation operation) {
423 if ((data == null) || data.isEmpty()) {
426 for (final YangInstanceIdentifier path : data) {
427 if (!ControllerContext.getInstance().isNodeMixin(path)) {
428 final Node node = createDataChangeEventElement(doc, path, operation);
429 element.appendChild(node);
434 private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set<Entry<YangInstanceIdentifier,
435 NormalizedNode<?,?>>> data, final Element element, final Operation operation, final SchemaContext
436 schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
437 if ((data == null) || data.isEmpty()) {
440 for (final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
441 if (!ControllerContext.getInstance().isNodeMixin(entry.getKey())) {
442 final Node node = createCreatedChangedDataChangeEventElement(doc, entry, operation, schemaContext,
443 dataSchemaContextTree);
444 element.appendChild(node);
450 * Creates changed event element from data.
455 * Path to data in data store.
458 * @return {@link Node} node represented by changed event element.
460 private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path,
461 final Operation operation) {
462 final Element dataChangeEventElement = doc.createElement("data-change-event");
463 final Element pathElement = doc.createElement("path");
464 addPathAsValueToElement(path, pathElement);
465 dataChangeEventElement.appendChild(pathElement);
467 final Element operationElement = doc.createElement("operation");
468 operationElement.setTextContent(operation.value);
469 dataChangeEventElement.appendChild(operationElement);
471 return dataChangeEventElement;
474 private Node createCreatedChangedDataChangeEventElement(final Document doc, final Entry<YangInstanceIdentifier,
475 NormalizedNode<?, ?>> entry, final Operation operation, final SchemaContext
476 schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
477 final Element dataChangeEventElement = doc.createElement("data-change-event");
478 final Element pathElement = doc.createElement("path");
479 final YangInstanceIdentifier path = entry.getKey();
480 addPathAsValueToElement(path, pathElement);
481 dataChangeEventElement.appendChild(pathElement);
483 final Element operationElement = doc.createElement("operation");
484 operationElement.setTextContent(operation.value);
485 dataChangeEventElement.appendChild(operationElement);
488 final DOMResult domResult = writeNormalizedNode(entry.getValue(), path,
489 schemaContext, dataSchemaContextTree);
490 final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
491 final Element dataElement = doc.createElement("data");
492 dataElement.appendChild(result);
493 dataChangeEventElement.appendChild(dataElement);
494 } catch (final IOException e) {
495 LOG.error("Error in writer ", e);
496 } catch (final XMLStreamException e) {
497 LOG.error("Error processing stream", e);
500 return dataChangeEventElement;
503 private static DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized,
504 final YangInstanceIdentifier path, final SchemaContext context,
505 final DataSchemaContextTree dataSchemaContextTree)
506 throws IOException, XMLStreamException {
507 final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
508 final Document doc = XmlDocumentUtils.getDocument();
509 final DOMResult result = new DOMResult(doc);
510 NormalizedNodeWriter normalizedNodeWriter = null;
511 NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
512 XMLStreamWriter writer = null;
513 final SchemaPath nodePath;
515 if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) {
516 nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
518 nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
522 writer = XML_FACTORY.createXMLStreamWriter(result);
523 normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, nodePath);
524 normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
526 normalizedNodeWriter.write(normalized);
528 normalizedNodeWriter.flush();
530 if (normalizedNodeWriter != null) {
531 normalizedNodeWriter.close();
533 if (normalizedNodeStreamWriter != null) {
534 normalizedNodeStreamWriter.close();
536 if (writer != null) {
545 * Adds path as value to element.
548 * Path to data in data store.
552 private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) {
553 final YangInstanceIdentifier normalizedPath = ControllerContext.getInstance().toXpathRepresentation(path);
554 final StringBuilder textContent = new StringBuilder();
556 for (final PathArgument pathArgument : normalizedPath.getPathArguments()) {
557 if (pathArgument instanceof YangInstanceIdentifier.AugmentationIdentifier) {
560 textContent.append("/");
561 writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType());
562 if (pathArgument instanceof NodeIdentifierWithPredicates) {
563 final Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
564 for (final QName keyValue : predicates.keySet()) {
565 final String predicateValue = String.valueOf(predicates.get(keyValue));
566 textContent.append("[");
567 writeIdentifierWithNamespacePrefix(element, textContent, keyValue);
568 textContent.append("='");
569 textContent.append(predicateValue);
570 textContent.append("'");
571 textContent.append("]");
573 } else if (pathArgument instanceof NodeWithValue) {
574 textContent.append("[.='");
575 textContent.append(((NodeWithValue) pathArgument).getValue());
576 textContent.append("'");
577 textContent.append("]");
580 element.setTextContent(textContent.toString());
584 * Writes identifier that consists of prefix and QName.
593 private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
595 final Module module = ControllerContext.getInstance().getGlobalSchema()
596 .findModuleByNamespaceAndRevision(qName.getNamespace(), qName.getRevision());
598 textContent.append(module.getName());
599 textContent.append(":");
600 textContent.append(qName.getLocalName());
604 * Gets path pointed to data in data store.
606 * @return Path pointed to data in data store.
608 public YangInstanceIdentifier getPath() {
613 * Sets {@link ListenerRegistration} registration.
615 * @param registration DOMDataChangeListener registration
617 public void setRegistration(final ListenerRegistration<DOMDataChangeListener> registration) {
618 this.registration = registration;
622 * Gets the name of the stream.
624 * @return The name of the stream.
626 public String getStreamName() {
627 return this.streamName;
631 * Removes all subscribers and unregisters event bus change recorder form
632 * event bus and delete data in DS
634 public void close() throws Exception {
635 final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction();
636 wTx.delete(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY
637 + this.path.getLastPathArgument().getNodeType().getLocalName(), this.schemaHandler.get()));
638 wTx.submit().checkedGet();
640 this.subscribers = new ConcurrentSet<>();
641 this.registration.close();
642 this.registration = null;
643 this.eventBus.unregister(this.eventBusChangeRecorder);
647 * Checks if {@link ListenerRegistration} registration exist.
649 * @return True if exist, false otherwise.
651 public boolean isListening() {
652 return this.registration == null ? false : true;
656 * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into
662 public void addSubscriber(final Channel subscriber) {
663 if (!subscriber.isActive()) {
664 LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
666 final Event event = new Event(EventType.REGISTER);
667 event.setSubscriber(subscriber);
668 this.eventBus.post(event);
672 * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event
677 public void removeSubscriber(final Channel subscriber) {
678 LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
679 final Event event = new Event(EventType.DEREGISTER);
680 event.setSubscriber(subscriber);
681 this.eventBus.post(event);
685 * Checks if exists at least one {@link Channel} subscriber.
687 * @return True if exist at least one {@link Channel} subscriber, false otherwise.
689 public boolean hasSubscribers() {
690 return !this.subscribers.isEmpty();
694 * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
696 private static enum Store {
698 OPERATION("operation");
700 private final String value;
702 private Store(final String value) {
708 * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
710 private static enum Operation {
715 private final String value;
717 private Operation(final String value) {
723 * Set query parameters for listener
726 * - start-time of getting notification
728 * - stop-time of getting notification
730 * - indicate which subset of all possible events are of interest
732 public void setQueryParams(final Date start, final Date stop, final String filter) {
735 this.filter = filter;
743 public String getOutputType() {
744 return this.outputType.getName();
748 * Transaction chain to delete data in DS on close()
750 * @param transactionChainHandler
751 * - creating new write transaction to delete data on close
752 * @param schemaHandler
753 * - for getting schema to deserialize
754 * {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to
755 * {@link YangInstanceIdentifier}
757 public void setCloseVars(final TransactionChainHandler transactionChainHandler,
758 final SchemaContextHandler schemaHandler) {
759 this.transactionChainHandler = transactionChainHandler;
760 this.schemaHandler = schemaHandler;