X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-rest-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fstreams%2Flisteners%2FListenerAdapter.java;h=e526ec13be75fad83ff6c33b95dfe9ba0d809298;hp=fdd6ba0317d597dd1483c4a54dcd647e4734b6cc;hb=73c9fecf86aad02761df47fe0cc943af4ea1f2bc;hpb=c24246d2ae43ab30f1a80218cb90ed538db8d25c diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java index fdd6ba0317..e526ec13be 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java @@ -1,9 +1,20 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.sal.streams.listeners; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.eventbus.AsyncEventBus; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.internal.ConcurrentSet; - import java.io.ByteArrayOutputStream; import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; @@ -16,7 +27,7 @@ import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.concurrent.Executors; - +import java.util.regex.Pattern; import javax.activation.UnsupportedDataTypeException; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; @@ -27,7 +38,6 @@ import javax.xml.transform.TransformerException; import javax.xml.transform.TransformerFactory; import javax.xml.transform.dom.DOMSource; import javax.xml.transform.stream.StreamResult; - import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; import org.opendaylight.controller.sal.core.api.data.DataChangeListener; import org.opendaylight.controller.sal.rest.impl.XmlMapper; @@ -46,14 +56,16 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; -import com.google.common.base.Preconditions; -import com.google.common.eventbus.AsyncEventBus; -import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; - +/** + * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source. + */ public class ListenerAdapter implements DataChangeListener { - private static final Logger logger = LoggerFactory.getLogger(ListenerAdapter.class); + private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class); + private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance(); + private static final TransformerFactory FACTORY = TransformerFactory.newInstance(); + private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$"); + private final XmlMapper xmlMapper = new XmlMapper(); private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ"); @@ -64,7 +76,15 @@ public class ListenerAdapter implements DataChangeListener { private final EventBus eventBus; private final EventBusChangeRecorder eventBusChangeRecorder; - ListenerAdapter(InstanceIdentifier path, String streamName) { + /** + * Creates new {@link ListenerAdapter} listener specified by path and stream name. + * + * @param path + * Path to data in data store. + * @param streamName + * The name of the stream. + */ + ListenerAdapter(final InstanceIdentifier path, final String streamName) { Preconditions.checkNotNull(path); Preconditions.checkArgument(streamName != null && !streamName.isEmpty()); this.path = path; @@ -75,7 +95,7 @@ public class ListenerAdapter implements DataChangeListener { } @Override - public void onDataChanged(DataChangeEvent change) { + public void onDataChanged(final DataChangeEvent change) { if (!change.getCreatedConfigurationData().isEmpty() || !change.getCreatedOperationalData().isEmpty() || !change.getUpdatedConfigurationData().isEmpty() || !change.getUpdatedOperationalData().isEmpty() || !change.getRemovedConfigurationData().isEmpty() || !change.getRemovedOperationalData().isEmpty()) { @@ -86,8 +106,12 @@ public class ListenerAdapter implements DataChangeListener { } } + /** + * Tracks events of data change by customer. + */ private final class EventBusChangeRecorder { - @Subscribe public void recordCustomerChange(Event event) { + @Subscribe + public void recordCustomerChange(final Event event) { if (event.getType() == EventType.REGISTER) { Channel subscriber = event.getSubscriber(); if (!subscribers.contains(subscriber)) { @@ -99,10 +123,10 @@ public class ListenerAdapter implements DataChangeListener { } else if (event.getType() == EventType.NOTIFY) { for (Channel subscriber : subscribers) { if (subscriber.isActive()) { - logger.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); + LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); subscriber.writeAndFlush(new TextWebSocketFrame(event.getData())); } else { - logger.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); + LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); subscribers.remove(subscriber); } } @@ -110,43 +134,89 @@ public class ListenerAdapter implements DataChangeListener { } } + /** + * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber. + */ private final class Event { private final EventType type; private Channel subscriber; private String data; - public Event(EventType type) { + /** + * Creates new event specified by {@link EventType} type. + * + * @param type + * EventType + */ + public Event(final EventType type) { this.type = type; } + /** + * Gets the {@link Channel} subscriber. + * + * @return Channel + */ public Channel getSubscriber() { return subscriber; } - public void setSubscriber(Channel subscriber) { + /** + * Sets subscriber for event. + * + * @param subscriber + * Channel + */ + public void setSubscriber(final Channel subscriber) { this.subscriber = subscriber; } + /** + * Gets event data. + * + * @return String representation of event data. + */ public String getData() { return data; } - public void setData(String data) { + /** + * Sets event data. + * + * @param String + * data. + */ + public void setData(final String data) { this.data = data; } + /** + * Gets event type. + * + * @return The type of the event. + */ public EventType getType() { return type; } } + /** + * Type of the event. + */ private enum EventType { REGISTER, DEREGISTER, NOTIFY; } - private String prepareXmlFrom(DataChangeEvent change) { + /** + * Prepare data in printable form and transform it to String. + * + * @param change + * DataChangeEvent + * @return Data in printable form. + */ + private String prepareXmlFrom(final DataChangeEvent change) { Document doc = createDocument(); Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0", "notification"); @@ -163,55 +233,95 @@ public class ListenerAdapter implements DataChangeListener { try { ByteArrayOutputStream out = new ByteArrayOutputStream(); - TransformerFactory tf = TransformerFactory.newInstance(); - Transformer transformer = tf.newTransformer(); + Transformer transformer = FACTORY.newTransformer(); transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); transformer.setOutputProperty(OutputKeys.METHOD, "xml"); transformer.setOutputProperty(OutputKeys.INDENT, "yes"); transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4"); - transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, "UTF-8"))); + transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8))); byte[] charData = out.toByteArray(); return new String(charData, "UTF-8"); } catch (TransformerException | UnsupportedEncodingException e) { String msg = "Error during transformation of Document into String"; - logger.error(msg, e); + LOG.error(msg, e); return msg; } } - private String toRFC3339(Date d) { - return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2"); + /** + * Formats data specified by RFC3339. + * + * @param d + * Date + * @return Data specified by RFC3339. + */ + private String toRFC3339(final Date d) { + return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2"); } + /** + * Creates {@link Document} document. + * + * @return {@link Document} document. + */ private Document createDocument() { - DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); - Document doc = null; + final DocumentBuilder bob; try { - DocumentBuilder bob = dbf.newDocumentBuilder(); - doc = bob.newDocument(); + bob = DBF.newDocumentBuilder(); } catch (ParserConfigurationException e) { return null; } - return doc; + return bob.newDocument(); } - private void addValuesToDataChangedNotificationEventElement(Document doc, - Element dataChangedNotificationEventElement, DataChangeEvent change) { - addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.CREATED); - addValuesFromDataToElement(doc, change.getCreatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.CREATED); + /** + * Adds values to data changed notification event element. + * + * @param doc + * {@link Document} + * @param dataChangedNotificationEventElement + * {@link Element} + * @param change + * {@link DataChangeEvent} + */ + private void addValuesToDataChangedNotificationEventElement(final Document doc, + final Element dataChangedNotificationEventElement, + final DataChangeEvent change) { + addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), dataChangedNotificationEventElement, + Store.CONFIG, Operation.CREATED); + addValuesFromDataToElement(doc, change.getCreatedOperationalData(), dataChangedNotificationEventElement, + Store.OPERATION, Operation.CREATED); if (change.getCreatedConfigurationData().isEmpty()) { - addValuesFromDataToElement(doc, change.getUpdatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.UPDATED); + addValuesFromDataToElement(doc, change.getUpdatedConfigurationData(), dataChangedNotificationEventElement, + Store.CONFIG, Operation.UPDATED); } if (change.getCreatedOperationalData().isEmpty()) { - addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.UPDATED); + addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), dataChangedNotificationEventElement, + Store.OPERATION, Operation.UPDATED); } - addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.DELETED); - addValuesFromDataToElement(doc, change.getRemovedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.DELETED); + addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), dataChangedNotificationEventElement, + Store.CONFIG, Operation.DELETED); + addValuesFromDataToElement(doc, change.getRemovedOperationalData(), dataChangedNotificationEventElement, + Store.OPERATION, Operation.DELETED); } - private void addValuesFromDataToElement(Document doc, Set data, Element element, Store store, - Operation operation) { + /** + * Adds values from data to element. + * + * @param doc + * {@link Document} + * @param data + * Set of {@link InstanceIdentifier}. + * @param element + * {@link Element} + * @param store + * {@link Store} + * @param operation + * {@link Operation} + */ + private void addValuesFromDataToElement(final Document doc, final Set data, + final Element element, final Store store, final Operation operation) { if (data == null || data.isEmpty()) { return; } @@ -221,8 +331,22 @@ public class ListenerAdapter implements DataChangeListener { } } - private void addValuesFromDataToElement(Document doc, Map data, Element element, Store store, - Operation operation) { + /** + * Adds values from data to element. + * + * @param doc + * {@link Document} + * @param data + * Map of {@link InstanceIdentifier} and {@link CompositeNode}. + * @param element + * {@link Element} + * @param store + * {@link Store} + * @param operation + * {@link Operation} + */ + private void addValuesFromDataToElement(final Document doc, final Map data, + final Element element, final Store store, final Operation operation) { if (data == null || data.isEmpty()) { return; } @@ -232,8 +356,23 @@ public class ListenerAdapter implements DataChangeListener { } } - private Node createDataChangeEventElement(Document doc, InstanceIdentifier path, CompositeNode data, Store store, - Operation operation) { + /** + * Creates changed event element from data. + * + * @param doc + * {@link Document} + * @param path + * Path to data in data store. + * @param data + * {@link CompositeNode} + * @param store + * {@link Store} + * @param operation + * {@link Operation} + * @return {@link Node} node represented by changed event element. + */ + private Node createDataChangeEventElement(final Document doc, final InstanceIdentifier path, + final CompositeNode data, final Store store, final Operation operation) { Element dataChangeEventElement = doc.createElement("data-change-event"); Element pathElement = doc.createElement("path"); @@ -259,27 +398,48 @@ public class ListenerAdapter implements DataChangeListener { return dataChangeEventElement; } - private Node translateToXml(InstanceIdentifier path, CompositeNode data) { + /** + * Translates {@link CompositeNode} data to XML format. + * + * @param path + * Path to data in data store. + * @param data + * {@link CompositeNode} + * @return Data in XML format. + */ + private Node translateToXml(final InstanceIdentifier path, final CompositeNode data) { DataNodeContainer schemaNode = ControllerContext.getInstance().getDataNodeContainerFor(path); if (schemaNode == null) { - logger.info("Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.", path); + LOG.info( + "Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.", + path); return null; } try { Document xml = xmlMapper.write(data, schemaNode); return xml.getFirstChild(); } catch (UnsupportedDataTypeException e) { - logger.error("Error occured during translation of notification to XML.", e); + LOG.error("Error occured during translation of notification to XML.", e); return null; } } - private void addPathAsValueToElement(InstanceIdentifier path, Element element) { + /** + * Adds path as value to element. + * + * @param path + * Path to data in data store. + * @param element + * {@link Element} + */ + private void addPathAsValueToElement(final InstanceIdentifier path, final Element element) { // Map< key = namespace, value = prefix> Map prefixes = new HashMap<>(); InstanceIdentifier instanceIdentifier = path; StringBuilder textContent = new StringBuilder(); - for (PathArgument pathArgument : instanceIdentifier.getPath()) { + + // FIXME: BUG-1281: this is duplicated code from yangtools (BUG-1275) + for (PathArgument pathArgument : instanceIdentifier.getPathArguments()) { textContent.append("/"); writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes); if (pathArgument instanceof NodeIdentifierWithPredicates) { @@ -295,7 +455,7 @@ public class ListenerAdapter implements DataChangeListener { } } else if (pathArgument instanceof NodeWithValue) { textContent.append("[.='"); - textContent.append(((NodeWithValue)pathArgument).getValue()); + textContent.append(((NodeWithValue) pathArgument).getValue()); textContent.append("'"); textContent.append("]"); } @@ -303,8 +463,20 @@ public class ListenerAdapter implements DataChangeListener { element.setTextContent(textContent.toString()); } - private static void writeIdentifierWithNamespacePrefix(Element element, StringBuilder textContent, QName qName, - Map prefixes) { + /** + * Writes identifier that consists of prefix and QName. + * + * @param element + * {@link Element} + * @param textContent + * StringBuilder + * @param qName + * QName + * @param prefixes + * Map of namespaces and prefixes. + */ + private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent, + final QName qName, final Map prefixes) { String namespace = qName.getNamespace().toString(); String prefix = prefixes.get(namespace); if (prefix == null) { @@ -314,7 +486,7 @@ public class ListenerAdapter implements DataChangeListener { } } - element.setAttribute("xmlns:" + prefix, namespace.toString()); + element.setAttribute("xmlns:" + prefix, namespace); textContent.append(prefix); prefixes.put(namespace, prefix); @@ -322,7 +494,14 @@ public class ListenerAdapter implements DataChangeListener { textContent.append(qName.getLocalName()); } - private static String generateNewPrefix(Collection prefixes) { + /** + * Generates new prefix which consists of four random characters . + * + * @param prefixes + * Collection of prefixes. + * @return New prefix which consists of four random characters . + */ + private static String generateNewPrefix(final Collection prefixes) { StringBuilder result = null; Random random = new Random(); do { @@ -336,18 +515,37 @@ public class ListenerAdapter implements DataChangeListener { return result.toString(); } + /** + * Gets path pointed to data in data store. + * + * @return Path pointed to data in data store. + */ public InstanceIdentifier getPath() { return path; } - public void setRegistration(ListenerRegistration registration) { + /** + * Sets {@link ListenerRegistration} registration. + * + * @param registration + * ListenerRegistration + */ + public void setRegistration(final ListenerRegistration registration) { this.registration = registration; } + /** + * Gets the name of the stream. + * + * @return The name of the stream. + */ public String getStreamName() { return streamName; } + /** + * Removes all subscribers and unregisters event bus change recorder form event bus. + */ public void close() throws Exception { subscribers = new ConcurrentSet<>(); registration.close(); @@ -355,42 +553,70 @@ public class ListenerAdapter implements DataChangeListener { eventBus.unregister(eventBusChangeRecorder); } + /** + * Checks if {@link ListenerRegistration} registration exist. + * + * @return True if exist, false otherwise. + */ public boolean isListening() { return registration == null ? false : true; } - public void addSubscriber(Channel subscriber) { + /** + * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into + * event bus. + * + * @param subscriber + * Channel + */ + public void addSubscriber(final Channel subscriber) { if (!subscriber.isActive()) { - logger.debug("Channel is not active between websocket server and subscriber {}" - + subscriber.remoteAddress()); + LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress()); } Event event = new Event(EventType.REGISTER); event.setSubscriber(subscriber); eventBus.post(event); } - public void removeSubscriber(Channel subscriber) { - logger.debug("Subscriber {} is removed.", subscriber.remoteAddress()); + /** + * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event + * into event bus. + * + * @param subscriber + */ + public void removeSubscriber(final Channel subscriber) { + LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress()); Event event = new Event(EventType.DEREGISTER); event.setSubscriber(subscriber); eventBus.post(event); } + /** + * Checks if exists at least one {@link Channel} subscriber. + * + * @return True if exist at least one {@link Channel} subscriber, false otherwise. + */ public boolean hasSubscribers() { return !subscribers.isEmpty(); } + /** + * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}. + */ private static enum Store { CONFIG("config"), OPERATION("operation"); private final String value; - private Store(String value) { + private Store(final String value) { this.value = value; } } + /** + * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}. + */ private static enum Operation { CREATED("created"), UPDATED("updated"), @@ -398,7 +624,7 @@ public class ListenerAdapter implements DataChangeListener { private final String value; - private Operation(String value) { + private Operation(final String value) { this.value = value; } }