X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-rest-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fstreams%2Flisteners%2FListenerAdapter.java;h=2b7b0246e35ac2356ce90f87126d5e4f43076a2c;hp=5fd76de98bb51c4e0b322f6c16f0bd17c5d417d2;hb=de3e413b633b7555ae8f3fe2ec163dbb7dda5da8;hpb=ac317ecb6d40d51ed026272b3b6b788e0e87ea9d diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java index 5fd76de98b..2b7b0246e3 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java @@ -1,9 +1,20 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.sal.streams.listeners; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.eventbus.AsyncEventBus; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.internal.ConcurrentSet; - import java.io.ByteArrayOutputStream; import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; @@ -16,7 +27,7 @@ import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.concurrent.Executors; - +import java.util.regex.Pattern; import javax.activation.UnsupportedDataTypeException; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; @@ -27,7 +38,6 @@ import javax.xml.transform.TransformerException; import javax.xml.transform.TransformerFactory; import javax.xml.transform.dom.DOMSource; import javax.xml.transform.stream.StreamResult; - import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; import org.opendaylight.controller.sal.core.api.data.DataChangeListener; import org.opendaylight.controller.sal.rest.impl.XmlMapper; @@ -35,10 +45,10 @@ import org.opendaylight.controller.sal.restconf.impl.ControllerContext; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeWithValue; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.model.api.DataNodeContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,622 +56,577 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; -import com.google.common.base.Preconditions; -import com.google.common.eventbus.AsyncEventBus; -import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; - /** - * {@link ListenerAdapter} is responsible to track events, which occurred by - * changing data in data source. + * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source. */ public class ListenerAdapter implements DataChangeListener { - private static final Logger logger = LoggerFactory - .getLogger(ListenerAdapter.class); - private final XmlMapper xmlMapper = new XmlMapper(); - private final SimpleDateFormat rfc3339 = new SimpleDateFormat( - "yyyy-MM-dd'T'hh:mm:ssZ"); - - private final InstanceIdentifier path; - private ListenerRegistration registration; - private final String streamName; - private Set subscribers = new ConcurrentSet<>(); - private final EventBus eventBus; - private final EventBusChangeRecorder eventBusChangeRecorder; - - /** - * Creates new {@link ListenerAdapter} listener specified by path and stream - * name. - * - * @param path - * Path to data in data store. - * @param streamName - * The name of the stream. - */ - ListenerAdapter(InstanceIdentifier path, String streamName) { - Preconditions.checkNotNull(path); - Preconditions - .checkArgument(streamName != null && !streamName.isEmpty()); - this.path = path; - this.streamName = streamName; - eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); - eventBusChangeRecorder = new EventBusChangeRecorder(); - eventBus.register(eventBusChangeRecorder); - } - - @Override - public void onDataChanged( - DataChangeEvent change) { - if (!change.getCreatedConfigurationData().isEmpty() - || !change.getCreatedOperationalData().isEmpty() - || !change.getUpdatedConfigurationData().isEmpty() - || !change.getUpdatedOperationalData().isEmpty() - || !change.getRemovedConfigurationData().isEmpty() - || !change.getRemovedOperationalData().isEmpty()) { - String xml = prepareXmlFrom(change); - Event event = new Event(EventType.NOTIFY); - event.setData(xml); - eventBus.post(event); - } - } - - /** - * Tracks events of data change by customer. - */ - private final class EventBusChangeRecorder { - @Subscribe - public void recordCustomerChange(Event event) { - if (event.getType() == EventType.REGISTER) { - Channel subscriber = event.getSubscriber(); - if (!subscribers.contains(subscriber)) { - subscribers.add(subscriber); - } - } else if (event.getType() == EventType.DEREGISTER) { - subscribers.remove(event.getSubscriber()); - Notificator - .removeListenerIfNoSubscriberExists(ListenerAdapter.this); - } else if (event.getType() == EventType.NOTIFY) { - for (Channel subscriber : subscribers) { - if (subscriber.isActive()) { - logger.debug("Data are sent to subscriber {}:", - subscriber.remoteAddress()); - subscriber.writeAndFlush(new TextWebSocketFrame(event - .getData())); - } else { - logger.debug( - "Subscriber {} is removed - channel is not active yet.", - subscriber.remoteAddress()); - subscribers.remove(subscriber); - } - } - } - } - } - - /** - * Represents event of specific {@link EventType} type, holds data and - * {@link Channel} subscriber. - */ - private final class Event { - private final EventType type; - private Channel subscriber; - private String data; - - /** - * Creates new event specified by {@link EventType} type. - * - * @param type - * EventType - */ - public Event(EventType type) { - this.type = type; - } - - /** - * Gets the {@link Channel} subscriber. - * - * @return Channel - */ - public Channel getSubscriber() { - return subscriber; - } - - /** - * Sets subscriber for event. - * - * @param subscriber - * Channel - */ - public void setSubscriber(Channel subscriber) { - this.subscriber = subscriber; - } - - /** - * Gets event data. - * - * @return String representation of event data. - */ - public String getData() { - return data; - } - - /** - * Sets event data. - * - * @param String - * data. - */ - public void setData(String data) { - this.data = data; - } - - /** - * Gets event type. - * - * @return The type of the event. - */ - public EventType getType() { - return type; - } - } - - /** - * Type of the event. - */ - private enum EventType { - REGISTER, DEREGISTER, NOTIFY; - } - - /** - * Prepare data in printable form and transform it to String. - * - * @param change - * DataChangeEvent - * @return Data in printable form. - */ - private String prepareXmlFrom( - DataChangeEvent change) { - Document doc = createDocument(); - Element notificationElement = doc.createElementNS( - "urn:ietf:params:xml:ns:netconf:notification:1.0", - "notification"); - doc.appendChild(notificationElement); - - Element eventTimeElement = doc.createElement("eventTime"); - eventTimeElement.setTextContent(toRFC3339(new Date())); - notificationElement.appendChild(eventTimeElement); - - Element dataChangedNotificationEventElement = doc.createElementNS( - "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", - "data-changed-notification"); - addValuesToDataChangedNotificationEventElement(doc, - dataChangedNotificationEventElement, change); - notificationElement.appendChild(dataChangedNotificationEventElement); - - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - TransformerFactory tf = TransformerFactory.newInstance(); - Transformer transformer = tf.newTransformer(); - transformer - .setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); - transformer.setOutputProperty(OutputKeys.METHOD, "xml"); - transformer.setOutputProperty(OutputKeys.INDENT, "yes"); - transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); - transformer.setOutputProperty( - "{http://xml.apache.org/xslt}indent-amount", "4"); - transformer.transform(new DOMSource(doc), new StreamResult( - new OutputStreamWriter(out, "UTF-8"))); - byte[] charData = out.toByteArray(); - return new String(charData, "UTF-8"); - } catch (TransformerException | UnsupportedEncodingException e) { - String msg = "Error during transformation of Document into String"; - logger.error(msg, e); - return msg; - } - } - - /** - * Formats data specified by RFC3339. - * - * @param d - * Date - * @return Data specified by RFC3339. - */ - private String toRFC3339(Date d) { - return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2"); - } - - /** - * Creates {@link Document} document. - * - * @return {@link Document} document. - */ - private Document createDocument() { - DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); - Document doc = null; - try { - DocumentBuilder bob = dbf.newDocumentBuilder(); - doc = bob.newDocument(); - } catch (ParserConfigurationException e) { - return null; - } - return doc; - } - - /** - * Adds values to data changed notification event element. - * - * @param doc - * {@link Document} - * @param dataChangedNotificationEventElement - * {@link Element} - * @param change - * {@link DataChangeEvent} - */ - private void addValuesToDataChangedNotificationEventElement(Document doc, - Element dataChangedNotificationEventElement, - DataChangeEvent change) { - addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), - dataChangedNotificationEventElement, Store.CONFIG, - Operation.CREATED); - addValuesFromDataToElement(doc, change.getCreatedOperationalData(), - dataChangedNotificationEventElement, Store.OPERATION, - Operation.CREATED); - if (change.getCreatedConfigurationData().isEmpty()) { - addValuesFromDataToElement(doc, - change.getUpdatedConfigurationData(), - dataChangedNotificationEventElement, Store.CONFIG, - Operation.UPDATED); - } - if (change.getCreatedOperationalData().isEmpty()) { - addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), - dataChangedNotificationEventElement, Store.OPERATION, - Operation.UPDATED); - } - addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), - dataChangedNotificationEventElement, Store.CONFIG, - Operation.DELETED); - addValuesFromDataToElement(doc, change.getRemovedOperationalData(), - dataChangedNotificationEventElement, Store.OPERATION, - Operation.DELETED); - } - - /** - * Adds values from data to element. - * - * @param doc - * {@link Document} - * @param data - * Set of {@link InstanceIdentifier}. - * @param element - * {@link Element} - * @param store - * {@link Store} - * @param operation - * {@link Operation} - */ - private void addValuesFromDataToElement(Document doc, - Set data, Element element, Store store, - Operation operation) { - if (data == null || data.isEmpty()) { - return; - } - for (InstanceIdentifier path : data) { - Node node = createDataChangeEventElement(doc, path, null, store, - operation); - element.appendChild(node); - } - } - - /** - * Adds values from data to element. - * - * @param doc - * {@link Document} - * @param data - * Map of {@link InstanceIdentifier} and {@link CompositeNode}. - * @param element - * {@link Element} - * @param store - * {@link Store} - * @param operation - * {@link Operation} - */ - private void addValuesFromDataToElement(Document doc, - Map data, Element element, - Store store, Operation operation) { - if (data == null || data.isEmpty()) { - return; - } - for (Entry entry : data.entrySet()) { - Node node = createDataChangeEventElement(doc, entry.getKey(), - entry.getValue(), store, operation); - element.appendChild(node); - } - } - - /** - * Creates changed event element from data. - * - * @param doc - * {@link Document} - * @param path - * Path to data in data store. - * @param data - * {@link CompositeNode} - * @param store - * {@link Store} - * @param operation - * {@link Operation} - * @return {@link Node} node represented by changed event element. - */ - private Node createDataChangeEventElement(Document doc, - InstanceIdentifier path, CompositeNode data, Store store, - Operation operation) { - Element dataChangeEventElement = doc.createElement("data-change-event"); - - Element pathElement = doc.createElement("path"); - addPathAsValueToElement(path, pathElement); - dataChangeEventElement.appendChild(pathElement); - - Element storeElement = doc.createElement("store"); - storeElement.setTextContent(store.value); - dataChangeEventElement.appendChild(storeElement); - - Element operationElement = doc.createElement("operation"); - operationElement.setTextContent(operation.value); - dataChangeEventElement.appendChild(operationElement); - - if (data != null) { - Element dataElement = doc.createElement("data"); - Node dataAnyXml = translateToXml(path, data); - Node adoptedNode = doc.adoptNode(dataAnyXml); - dataElement.appendChild(adoptedNode); - dataChangeEventElement.appendChild(dataElement); - } - - return dataChangeEventElement; - } - - /** - * Translates {@link CompositeNode} data to XML format. - * - * @param path - * Path to data in data store. - * @param data - * {@link CompositeNode} - * @return Data in XML format. - */ - private Node translateToXml(InstanceIdentifier path, CompositeNode data) { - DataNodeContainer schemaNode = ControllerContext.getInstance() - .getDataNodeContainerFor(path); - if (schemaNode == null) { - logger.info( - "Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.", - path); - return null; - } - try { - Document xml = xmlMapper.write(data, schemaNode); - return xml.getFirstChild(); - } catch (UnsupportedDataTypeException e) { - logger.error( - "Error occured during translation of notification to XML.", - e); - return null; - } - } - - /** - * Adds path as value to element. - * - * @param path - * Path to data in data store. - * @param element - * {@link Element} - */ - private void addPathAsValueToElement(InstanceIdentifier path, - Element element) { - // Map< key = namespace, value = prefix> - Map prefixes = new HashMap<>(); - InstanceIdentifier instanceIdentifier = path; - StringBuilder textContent = new StringBuilder(); - for (PathArgument pathArgument : instanceIdentifier.getPath()) { - textContent.append("/"); - writeIdentifierWithNamespacePrefix(element, textContent, - pathArgument.getNodeType(), prefixes); - if (pathArgument instanceof NodeIdentifierWithPredicates) { - Map predicates = ((NodeIdentifierWithPredicates) pathArgument) - .getKeyValues(); - for (QName keyValue : predicates.keySet()) { - String predicateValue = String.valueOf(predicates - .get(keyValue)); - textContent.append("["); - writeIdentifierWithNamespacePrefix(element, textContent, - keyValue, prefixes); - textContent.append("='"); - textContent.append(predicateValue); - textContent.append("'"); - textContent.append("]"); - } - } else if (pathArgument instanceof NodeWithValue) { - textContent.append("[.='"); - textContent.append(((NodeWithValue) pathArgument).getValue()); - textContent.append("'"); - textContent.append("]"); - } - } - element.setTextContent(textContent.toString()); - } - - /** - * Writes identifier that consists of prefix and QName. - * - * @param element - * {@link Element} - * @param textContent - * StringBuilder - * @param qName - * QName - * @param prefixes - * Map of namespaces and prefixes. - */ - private static void writeIdentifierWithNamespacePrefix(Element element, - StringBuilder textContent, QName qName, Map prefixes) { - String namespace = qName.getNamespace().toString(); - String prefix = prefixes.get(namespace); - if (prefix == null) { - prefix = qName.getPrefix(); - if (prefix == null || prefix.isEmpty() - || prefixes.containsValue(prefix)) { - prefix = generateNewPrefix(prefixes.values()); - } - } - - element.setAttribute("xmlns:" + prefix, namespace.toString()); - textContent.append(prefix); - prefixes.put(namespace, prefix); - - textContent.append(":"); - textContent.append(qName.getLocalName()); - } - - /** - * Generates new prefix which consists of four random characters . - * - * @param prefixes - * Collection of prefixes. - * @return New prefix which consists of four random characters . - */ - private static String generateNewPrefix(Collection prefixes) { - StringBuilder result = null; - Random random = new Random(); - do { - result = new StringBuilder(); - for (int i = 0; i < 4; i++) { - int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26); - result.append(Character.toChars(randomNumber)); - } - } while (prefixes.contains(result.toString())); - - return result.toString(); - } - - /** - * Gets path pointed to data in data store. - * - * @return Path pointed to data in data store. - */ - public InstanceIdentifier getPath() { - return path; - } - - /** - * Sets {@link ListenerRegistration} registration. - * - * @param registration - * ListenerRegistration - */ - public void setRegistration( - ListenerRegistration registration) { - this.registration = registration; - } - - /** - * Gets the name of the stream. - * - * @return The name of the stream. - */ - public String getStreamName() { - return streamName; - } - - /** - * Removes all subscribers and unregisters event bus change recorder form - * event bus. - */ - public void close() throws Exception { - subscribers = new ConcurrentSet<>(); - registration.close(); - registration = null; - eventBus.unregister(eventBusChangeRecorder); - } - - /** - * Checks if {@link ListenerRegistration} registration exist. - * - * @return True if exist, false otherwise. - */ - public boolean isListening() { - return registration == null ? false : true; - } - - /** - * Creates event of type {@link EventType#REGISTER}, set {@link Channel} - * subscriber to the event and post event into event bus. - * - * @param subscriber - * Channel - */ - public void addSubscriber(Channel subscriber) { - if (!subscriber.isActive()) { - logger.debug("Channel is not active between websocket server and subscriber {}" - + subscriber.remoteAddress()); - } - Event event = new Event(EventType.REGISTER); - event.setSubscriber(subscriber); - eventBus.post(event); - } - - /** - * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} - * subscriber to the event and posts event into event bus. - * - * @param subscriber - */ - public void removeSubscriber(Channel subscriber) { - logger.debug("Subscriber {} is removed.", subscriber.remoteAddress()); - Event event = new Event(EventType.DEREGISTER); - event.setSubscriber(subscriber); - eventBus.post(event); - } - - /** - * Checks if exists at least one {@link Channel} subscriber. - * - * @return True if exist at least one {@link Channel} subscriber, false - * otherwise. - */ - public boolean hasSubscribers() { - return !subscribers.isEmpty(); - } - - /** - * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}. - */ - private static enum Store { - CONFIG("config"), OPERATION("operation"); - - private final String value; - - private Store(String value) { - this.value = value; - } - } - - /** - * Consists of three types {@link Operation#CREATED}, - * {@link Operation#UPDATED} and {@link Operation#DELETED}. - */ - private static enum Operation { - CREATED("created"), UPDATED("updated"), DELETED("deleted"); - - private final String value; - - private Operation(String value) { - this.value = value; - } - } + private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class); + private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance(); + private static final TransformerFactory FACTORY = TransformerFactory.newInstance(); + private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$"); + + private final XmlMapper xmlMapper = new XmlMapper(); + private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ"); + + private final YangInstanceIdentifier path; + private ListenerRegistration registration; + private final String streamName; + private Set subscribers = new ConcurrentSet<>(); + private final EventBus eventBus; + private final EventBusChangeRecorder eventBusChangeRecorder; + + /** + * Creates new {@link ListenerAdapter} listener specified by path and stream name. + * + * @param path + * Path to data in data store. + * @param streamName + * The name of the stream. + */ + ListenerAdapter(final YangInstanceIdentifier path, final String streamName) { + Preconditions.checkNotNull(path); + Preconditions.checkArgument(streamName != null && !streamName.isEmpty()); + this.path = path; + this.streamName = streamName; + eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); + eventBusChangeRecorder = new EventBusChangeRecorder(); + eventBus.register(eventBusChangeRecorder); + } + + @Override + public void onDataChanged(final DataChangeEvent change) { + if (!change.getCreatedConfigurationData().isEmpty() || !change.getCreatedOperationalData().isEmpty() + || !change.getUpdatedConfigurationData().isEmpty() || !change.getUpdatedOperationalData().isEmpty() + || !change.getRemovedConfigurationData().isEmpty() || !change.getRemovedOperationalData().isEmpty()) { + String xml = prepareXmlFrom(change); + Event event = new Event(EventType.NOTIFY); + event.setData(xml); + eventBus.post(event); + } + } + + /** + * Tracks events of data change by customer. + */ + private final class EventBusChangeRecorder { + @Subscribe + public void recordCustomerChange(final Event event) { + if (event.getType() == EventType.REGISTER) { + Channel subscriber = event.getSubscriber(); + if (!subscribers.contains(subscriber)) { + subscribers.add(subscriber); + } + } else if (event.getType() == EventType.DEREGISTER) { + subscribers.remove(event.getSubscriber()); + Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this); + } else if (event.getType() == EventType.NOTIFY) { + for (Channel subscriber : subscribers) { + if (subscriber.isActive()) { + LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); + subscriber.writeAndFlush(new TextWebSocketFrame(event.getData())); + } else { + LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); + subscribers.remove(subscriber); + } + } + } + } + } + + /** + * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber. + */ + private final class Event { + private final EventType type; + private Channel subscriber; + private String data; + + /** + * Creates new event specified by {@link EventType} type. + * + * @param type + * EventType + */ + public Event(final EventType type) { + this.type = type; + } + + /** + * Gets the {@link Channel} subscriber. + * + * @return Channel + */ + public Channel getSubscriber() { + return subscriber; + } + + /** + * Sets subscriber for event. + * + * @param subscriber + * Channel + */ + public void setSubscriber(final Channel subscriber) { + this.subscriber = subscriber; + } + + /** + * Gets event data. + * + * @return String representation of event data. + */ + public String getData() { + return data; + } + + /** + * Sets event data. + * + * @param String + * data. + */ + public void setData(final String data) { + this.data = data; + } + + /** + * Gets event type. + * + * @return The type of the event. + */ + public EventType getType() { + return type; + } + } + + /** + * Type of the event. + */ + private enum EventType { + REGISTER, + DEREGISTER, + NOTIFY; + } + + /** + * Prepare data in printable form and transform it to String. + * + * @param change + * DataChangeEvent + * @return Data in printable form. + */ + private String prepareXmlFrom(final DataChangeEvent change) { + Document doc = createDocument(); + Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0", + "notification"); + doc.appendChild(notificationElement); + + Element eventTimeElement = doc.createElement("eventTime"); + eventTimeElement.setTextContent(toRFC3339(new Date())); + notificationElement.appendChild(eventTimeElement); + + Element dataChangedNotificationEventElement = doc.createElementNS( + "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification"); + addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change); + notificationElement.appendChild(dataChangedNotificationEventElement); + + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Transformer transformer = FACTORY.newTransformer(); + transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); + transformer.setOutputProperty(OutputKeys.METHOD, "xml"); + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); + transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4"); + transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8))); + byte[] charData = out.toByteArray(); + return new String(charData, "UTF-8"); + } catch (TransformerException | UnsupportedEncodingException e) { + String msg = "Error during transformation of Document into String"; + LOG.error(msg, e); + return msg; + } + } + + /** + * Formats data specified by RFC3339. + * + * @param d + * Date + * @return Data specified by RFC3339. + */ + private String toRFC3339(final Date d) { + return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2"); + } + + /** + * Creates {@link Document} document. + * + * @return {@link Document} document. + */ + private Document createDocument() { + final DocumentBuilder bob; + try { + bob = DBF.newDocumentBuilder(); + } catch (ParserConfigurationException e) { + return null; + } + return bob.newDocument(); + } + + /** + * Adds values to data changed notification event element. + * + * @param doc + * {@link Document} + * @param dataChangedNotificationEventElement + * {@link Element} + * @param change + * {@link DataChangeEvent} + */ + private void addValuesToDataChangedNotificationEventElement(final Document doc, + final Element dataChangedNotificationEventElement, + final DataChangeEvent change) { + addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), dataChangedNotificationEventElement, + Store.CONFIG, Operation.CREATED); + addValuesFromDataToElement(doc, change.getCreatedOperationalData(), dataChangedNotificationEventElement, + Store.OPERATION, Operation.CREATED); + if (change.getCreatedConfigurationData().isEmpty()) { + addValuesFromDataToElement(doc, change.getUpdatedConfigurationData(), dataChangedNotificationEventElement, + Store.CONFIG, Operation.UPDATED); + } + if (change.getCreatedOperationalData().isEmpty()) { + addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), dataChangedNotificationEventElement, + Store.OPERATION, Operation.UPDATED); + } + addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), dataChangedNotificationEventElement, + Store.CONFIG, Operation.DELETED); + addValuesFromDataToElement(doc, change.getRemovedOperationalData(), dataChangedNotificationEventElement, + Store.OPERATION, Operation.DELETED); + } + + /** + * Adds values from data to element. + * + * @param doc + * {@link Document} + * @param data + * Set of {@link YangInstanceIdentifier}. + * @param element + * {@link Element} + * @param store + * {@link Store} + * @param operation + * {@link Operation} + */ + private void addValuesFromDataToElement(final Document doc, final Set data, + final Element element, final Store store, final Operation operation) { + if (data == null || data.isEmpty()) { + return; + } + for (YangInstanceIdentifier path : data) { + Node node = createDataChangeEventElement(doc, path, null, store, operation); + element.appendChild(node); + } + } + + /** + * Adds values from data to element. + * + * @param doc + * {@link Document} + * @param data + * Map of {@link YangInstanceIdentifier} and {@link CompositeNode}. + * @param element + * {@link Element} + * @param store + * {@link Store} + * @param operation + * {@link Operation} + */ + private void addValuesFromDataToElement(final Document doc, final Map data, + final Element element, final Store store, final Operation operation) { + if (data == null || data.isEmpty()) { + return; + } + for (Entry entry : data.entrySet()) { + Node node = createDataChangeEventElement(doc, entry.getKey(), entry.getValue(), store, operation); + element.appendChild(node); + } + } + + /** + * Creates changed event element from data. + * + * @param doc + * {@link Document} + * @param path + * Path to data in data store. + * @param data + * {@link CompositeNode} + * @param store + * {@link Store} + * @param operation + * {@link Operation} + * @return {@link Node} node represented by changed event element. + */ + private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path, + final CompositeNode data, final Store store, final Operation operation) { + Element dataChangeEventElement = doc.createElement("data-change-event"); + + Element pathElement = doc.createElement("path"); + addPathAsValueToElement(path, pathElement); + dataChangeEventElement.appendChild(pathElement); + + Element storeElement = doc.createElement("store"); + storeElement.setTextContent(store.value); + dataChangeEventElement.appendChild(storeElement); + + Element operationElement = doc.createElement("operation"); + operationElement.setTextContent(operation.value); + dataChangeEventElement.appendChild(operationElement); + + if (data != null) { + Element dataElement = doc.createElement("data"); + Node dataAnyXml = translateToXml(path, data); + Node adoptedNode = doc.adoptNode(dataAnyXml); + dataElement.appendChild(adoptedNode); + dataChangeEventElement.appendChild(dataElement); + } + + return dataChangeEventElement; + } + + /** + * Translates {@link CompositeNode} data to XML format. + * + * @param path + * Path to data in data store. + * @param data + * {@link CompositeNode} + * @return Data in XML format. + */ + private Node translateToXml(final YangInstanceIdentifier path, final CompositeNode data) { + DataNodeContainer schemaNode = ControllerContext.getInstance().getDataNodeContainerFor(path); + if (schemaNode == null) { + LOG.info( + "Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.", + path); + return null; + } + try { + Document xml = xmlMapper.write(data, schemaNode); + return xml.getFirstChild(); + } catch (UnsupportedDataTypeException e) { + LOG.error("Error occured during translation of notification to XML.", e); + return null; + } + } + + /** + * Adds path as value to element. + * + * @param path + * Path to data in data store. + * @param element + * {@link Element} + */ + private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) { + // Map< key = namespace, value = prefix> + Map prefixes = new HashMap<>(); + YangInstanceIdentifier instanceIdentifier = path; + StringBuilder textContent = new StringBuilder(); + + // FIXME: BUG-1281: this is duplicated code from yangtools (BUG-1275) + for (PathArgument pathArgument : instanceIdentifier.getPathArguments()) { + textContent.append("/"); + writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes); + if (pathArgument instanceof NodeIdentifierWithPredicates) { + Map predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues(); + for (QName keyValue : predicates.keySet()) { + String predicateValue = String.valueOf(predicates.get(keyValue)); + textContent.append("["); + writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes); + textContent.append("='"); + textContent.append(predicateValue); + textContent.append("'"); + textContent.append("]"); + } + } else if (pathArgument instanceof NodeWithValue) { + textContent.append("[.='"); + textContent.append(((NodeWithValue) pathArgument).getValue()); + textContent.append("'"); + textContent.append("]"); + } + } + element.setTextContent(textContent.toString()); + } + + /** + * Writes identifier that consists of prefix and QName. + * + * @param element + * {@link Element} + * @param textContent + * StringBuilder + * @param qName + * QName + * @param prefixes + * Map of namespaces and prefixes. + */ + private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent, + final QName qName, final Map prefixes) { + String namespace = qName.getNamespace().toString(); + String prefix = prefixes.get(namespace); + if (prefix == null) { + prefix = qName.getPrefix(); + if (prefix == null || prefix.isEmpty() || prefixes.containsValue(prefix)) { + prefix = generateNewPrefix(prefixes.values()); + } + } + + element.setAttribute("xmlns:" + prefix, namespace); + textContent.append(prefix); + prefixes.put(namespace, prefix); + + textContent.append(":"); + textContent.append(qName.getLocalName()); + } + + /** + * Generates new prefix which consists of four random characters . + * + * @param prefixes + * Collection of prefixes. + * @return New prefix which consists of four random characters . + */ + private static String generateNewPrefix(final Collection prefixes) { + StringBuilder result = null; + Random random = new Random(); + do { + result = new StringBuilder(); + for (int i = 0; i < 4; i++) { + int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26); + result.append(Character.toChars(randomNumber)); + } + } while (prefixes.contains(result.toString())); + + return result.toString(); + } + + /** + * Gets path pointed to data in data store. + * + * @return Path pointed to data in data store. + */ + public YangInstanceIdentifier getPath() { + return path; + } + + /** + * Sets {@link ListenerRegistration} registration. + * + * @param registration + * ListenerRegistration + */ + public void setRegistration(final ListenerRegistration registration) { + this.registration = registration; + } + + /** + * Gets the name of the stream. + * + * @return The name of the stream. + */ + public String getStreamName() { + return streamName; + } + + /** + * Removes all subscribers and unregisters event bus change recorder form event bus. + */ + public void close() throws Exception { + subscribers = new ConcurrentSet<>(); + registration.close(); + registration = null; + eventBus.unregister(eventBusChangeRecorder); + } + + /** + * Checks if {@link ListenerRegistration} registration exist. + * + * @return True if exist, false otherwise. + */ + public boolean isListening() { + return registration == null ? false : true; + } + + /** + * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into + * event bus. + * + * @param subscriber + * Channel + */ + public void addSubscriber(final Channel subscriber) { + if (!subscriber.isActive()) { + LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress()); + } + Event event = new Event(EventType.REGISTER); + event.setSubscriber(subscriber); + eventBus.post(event); + } + + /** + * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event + * into event bus. + * + * @param subscriber + */ + public void removeSubscriber(final Channel subscriber) { + LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress()); + Event event = new Event(EventType.DEREGISTER); + event.setSubscriber(subscriber); + eventBus.post(event); + } + + /** + * Checks if exists at least one {@link Channel} subscriber. + * + * @return True if exist at least one {@link Channel} subscriber, false otherwise. + */ + public boolean hasSubscribers() { + return !subscribers.isEmpty(); + } + + /** + * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}. + */ + private static enum Store { + CONFIG("config"), + OPERATION("operation"); + + private final String value; + + private Store(final String value) { + this.value = value; + } + } + + /** + * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}. + */ + private static enum Operation { + CREATED("created"), + UPDATED("updated"), + DELETED("deleted"); + + private final String value; + + private Operation(final String value) { + this.value = value; + } + } }