X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-rest-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fstreams%2Flisteners%2FListenerAdapter.java;h=6282f37602be8236a737fe3e8ef42852816756bc;hb=48814d6a264b8f13e5db1422336d9ef25cb05fa9;hp=fdd6ba0317d597dd1483c4a54dcd647e4734b6cc;hpb=24fa75eae25771889b94c316f55282c39795d166;p=controller.git diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java index fdd6ba0317..6282f37602 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java @@ -1,3 +1,10 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.sal.streams.listeners; import io.netty.channel.Channel; @@ -51,11 +58,17 @@ import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; +/** + * {@link ListenerAdapter} is responsible to track events, which occurred by + * changing data in data source. + */ public class ListenerAdapter implements DataChangeListener { - private static final Logger logger = LoggerFactory.getLogger(ListenerAdapter.class); + private static final Logger logger = LoggerFactory + .getLogger(ListenerAdapter.class); private final XmlMapper xmlMapper = new XmlMapper(); - private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ"); + private final SimpleDateFormat rfc3339 = new SimpleDateFormat( + "yyyy-MM-dd'T'hh:mm:ssZ"); private final InstanceIdentifier path; private ListenerRegistration registration; @@ -64,9 +77,19 @@ public class ListenerAdapter implements DataChangeListener { private final EventBus eventBus; private final EventBusChangeRecorder eventBusChangeRecorder; + /** + * Creates new {@link ListenerAdapter} listener specified by path and stream + * name. + * + * @param path + * Path to data in data store. + * @param streamName + * The name of the stream. + */ ListenerAdapter(InstanceIdentifier path, String streamName) { Preconditions.checkNotNull(path); - Preconditions.checkArgument(streamName != null && !streamName.isEmpty()); + Preconditions + .checkArgument(streamName != null && !streamName.isEmpty()); this.path = path; this.streamName = streamName; eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); @@ -75,10 +98,14 @@ public class ListenerAdapter implements DataChangeListener { } @Override - public void onDataChanged(DataChangeEvent change) { - if (!change.getCreatedConfigurationData().isEmpty() || !change.getCreatedOperationalData().isEmpty() - || !change.getUpdatedConfigurationData().isEmpty() || !change.getUpdatedOperationalData().isEmpty() - || !change.getRemovedConfigurationData().isEmpty() || !change.getRemovedOperationalData().isEmpty()) { + public void onDataChanged( + DataChangeEvent change) { + if (!change.getCreatedConfigurationData().isEmpty() + || !change.getCreatedOperationalData().isEmpty() + || !change.getUpdatedConfigurationData().isEmpty() + || !change.getUpdatedOperationalData().isEmpty() + || !change.getRemovedConfigurationData().isEmpty() + || !change.getRemovedOperationalData().isEmpty()) { String xml = prepareXmlFrom(change); Event event = new Event(EventType.NOTIFY); event.setData(xml); @@ -86,8 +113,12 @@ public class ListenerAdapter implements DataChangeListener { } } + /** + * Tracks events of data change by customer. + */ private final class EventBusChangeRecorder { - @Subscribe public void recordCustomerChange(Event event) { + @Subscribe + public void recordCustomerChange(Event event) { if (event.getType() == EventType.REGISTER) { Channel subscriber = event.getSubscriber(); if (!subscribers.contains(subscriber)) { @@ -95,14 +126,19 @@ public class ListenerAdapter implements DataChangeListener { } } else if (event.getType() == EventType.DEREGISTER) { subscribers.remove(event.getSubscriber()); - Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this); + Notificator + .removeListenerIfNoSubscriberExists(ListenerAdapter.this); } else if (event.getType() == EventType.NOTIFY) { for (Channel subscriber : subscribers) { if (subscriber.isActive()) { - logger.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); - subscriber.writeAndFlush(new TextWebSocketFrame(event.getData())); + logger.debug("Data are sent to subscriber {}:", + subscriber.remoteAddress()); + subscriber.writeAndFlush(new TextWebSocketFrame(event + .getData())); } else { - logger.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); + logger.debug( + "Subscriber {} is removed - channel is not active yet.", + subscriber.remoteAddress()); subscribers.remove(subscriber); } } @@ -110,45 +146,92 @@ public class ListenerAdapter implements DataChangeListener { } } + /** + * Represents event of specific {@link EventType} type, holds data and + * {@link Channel} subscriber. + */ private final class Event { private final EventType type; private Channel subscriber; private String data; + /** + * Creates new event specified by {@link EventType} type. + * + * @param type + * EventType + */ public Event(EventType type) { this.type = type; } + /** + * Gets the {@link Channel} subscriber. + * + * @return Channel + */ public Channel getSubscriber() { return subscriber; } + /** + * Sets subscriber for event. + * + * @param subscriber + * Channel + */ public void setSubscriber(Channel subscriber) { this.subscriber = subscriber; } + /** + * Gets event data. + * + * @return String representation of event data. + */ public String getData() { return data; } + /** + * Sets event data. + * + * @param String + * data. + */ public void setData(String data) { this.data = data; } + /** + * Gets event type. + * + * @return The type of the event. + */ public EventType getType() { return type; } } + /** + * Type of the event. + */ private enum EventType { - REGISTER, - DEREGISTER, - NOTIFY; + REGISTER, DEREGISTER, NOTIFY; } - private String prepareXmlFrom(DataChangeEvent change) { + /** + * Prepare data in printable form and transform it to String. + * + * @param change + * DataChangeEvent + * @return Data in printable form. + */ + private String prepareXmlFrom( + DataChangeEvent change) { Document doc = createDocument(); - Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0", + Element notificationElement = doc.createElementNS( + "urn:ietf:params:xml:ns:netconf:notification:1.0", "notification"); doc.appendChild(notificationElement); @@ -157,20 +240,25 @@ public class ListenerAdapter implements DataChangeListener { notificationElement.appendChild(eventTimeElement); Element dataChangedNotificationEventElement = doc.createElementNS( - "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification"); - addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change); + "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", + "data-changed-notification"); + addValuesToDataChangedNotificationEventElement(doc, + dataChangedNotificationEventElement, change); notificationElement.appendChild(dataChangedNotificationEventElement); try { ByteArrayOutputStream out = new ByteArrayOutputStream(); TransformerFactory tf = TransformerFactory.newInstance(); Transformer transformer = tf.newTransformer(); - transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); + transformer + .setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); transformer.setOutputProperty(OutputKeys.METHOD, "xml"); transformer.setOutputProperty(OutputKeys.INDENT, "yes"); transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); - transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4"); - transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, "UTF-8"))); + transformer.setOutputProperty( + "{http://xml.apache.org/xslt}indent-amount", "4"); + transformer.transform(new DOMSource(doc), new StreamResult( + new OutputStreamWriter(out, "UTF-8"))); byte[] charData = out.toByteArray(); return new String(charData, "UTF-8"); } catch (TransformerException | UnsupportedEncodingException e) { @@ -180,10 +268,22 @@ public class ListenerAdapter implements DataChangeListener { } } + /** + * Formats data specified by RFC3339. + * + * @param d + * Date + * @return Data specified by RFC3339. + */ private String toRFC3339(Date d) { return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2"); } + /** + * Creates {@link Document} document. + * + * @return {@link Document} document. + */ private Document createDocument() { DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); Document doc = null; @@ -196,43 +296,115 @@ public class ListenerAdapter implements DataChangeListener { return doc; } + /** + * Adds values to data changed notification event element. + * + * @param doc + * {@link Document} + * @param dataChangedNotificationEventElement + * {@link Element} + * @param change + * {@link DataChangeEvent} + */ private void addValuesToDataChangedNotificationEventElement(Document doc, - Element dataChangedNotificationEventElement, DataChangeEvent change) { - addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.CREATED); - addValuesFromDataToElement(doc, change.getCreatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.CREATED); + Element dataChangedNotificationEventElement, + DataChangeEvent change) { + addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), + dataChangedNotificationEventElement, Store.CONFIG, + Operation.CREATED); + addValuesFromDataToElement(doc, change.getCreatedOperationalData(), + dataChangedNotificationEventElement, Store.OPERATION, + Operation.CREATED); if (change.getCreatedConfigurationData().isEmpty()) { - addValuesFromDataToElement(doc, change.getUpdatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.UPDATED); + addValuesFromDataToElement(doc, + change.getUpdatedConfigurationData(), + dataChangedNotificationEventElement, Store.CONFIG, + Operation.UPDATED); } if (change.getCreatedOperationalData().isEmpty()) { - addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.UPDATED); + addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), + dataChangedNotificationEventElement, Store.OPERATION, + Operation.UPDATED); } - addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.DELETED); - addValuesFromDataToElement(doc, change.getRemovedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.DELETED); + addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), + dataChangedNotificationEventElement, Store.CONFIG, + Operation.DELETED); + addValuesFromDataToElement(doc, change.getRemovedOperationalData(), + dataChangedNotificationEventElement, Store.OPERATION, + Operation.DELETED); } - private void addValuesFromDataToElement(Document doc, Set data, Element element, Store store, + /** + * Adds values from data to element. + * + * @param doc + * {@link Document} + * @param data + * Set of {@link InstanceIdentifier}. + * @param element + * {@link Element} + * @param store + * {@link Store} + * @param operation + * {@link Operation} + */ + private void addValuesFromDataToElement(Document doc, + Set data, Element element, Store store, Operation operation) { if (data == null || data.isEmpty()) { return; } for (InstanceIdentifier path : data) { - Node node = createDataChangeEventElement(doc, path, null, store, operation); + Node node = createDataChangeEventElement(doc, path, null, store, + operation); element.appendChild(node); } } - private void addValuesFromDataToElement(Document doc, Map data, Element element, Store store, - Operation operation) { + /** + * Adds values from data to element. + * + * @param doc + * {@link Document} + * @param data + * Map of {@link InstanceIdentifier} and {@link CompositeNode}. + * @param element + * {@link Element} + * @param store + * {@link Store} + * @param operation + * {@link Operation} + */ + private void addValuesFromDataToElement(Document doc, + Map data, Element element, + Store store, Operation operation) { if (data == null || data.isEmpty()) { return; } for (Entry entry : data.entrySet()) { - Node node = createDataChangeEventElement(doc, entry.getKey(), entry.getValue(), store, operation); + Node node = createDataChangeEventElement(doc, entry.getKey(), + entry.getValue(), store, operation); element.appendChild(node); } } - private Node createDataChangeEventElement(Document doc, InstanceIdentifier path, CompositeNode data, Store store, + /** + * Creates changed event element from data. + * + * @param doc + * {@link Document} + * @param path + * Path to data in data store. + * @param data + * {@link CompositeNode} + * @param store + * {@link Store} + * @param operation + * {@link Operation} + * @return {@link Node} node represented by changed event element. + */ + private Node createDataChangeEventElement(Document doc, + InstanceIdentifier path, CompositeNode data, Store store, Operation operation) { Element dataChangeEventElement = doc.createElement("data-change-event"); @@ -259,35 +431,62 @@ public class ListenerAdapter implements DataChangeListener { return dataChangeEventElement; } + /** + * Translates {@link CompositeNode} data to XML format. + * + * @param path + * Path to data in data store. + * @param data + * {@link CompositeNode} + * @return Data in XML format. + */ private Node translateToXml(InstanceIdentifier path, CompositeNode data) { - DataNodeContainer schemaNode = ControllerContext.getInstance().getDataNodeContainerFor(path); + DataNodeContainer schemaNode = ControllerContext.getInstance() + .getDataNodeContainerFor(path); if (schemaNode == null) { - logger.info("Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.", path); + logger.info( + "Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.", + path); return null; } try { Document xml = xmlMapper.write(data, schemaNode); return xml.getFirstChild(); } catch (UnsupportedDataTypeException e) { - logger.error("Error occured during translation of notification to XML.", e); + logger.error( + "Error occured during translation of notification to XML.", + e); return null; } } - private void addPathAsValueToElement(InstanceIdentifier path, Element element) { + /** + * Adds path as value to element. + * + * @param path + * Path to data in data store. + * @param element + * {@link Element} + */ + private void addPathAsValueToElement(InstanceIdentifier path, + Element element) { // Map< key = namespace, value = prefix> Map prefixes = new HashMap<>(); InstanceIdentifier instanceIdentifier = path; StringBuilder textContent = new StringBuilder(); for (PathArgument pathArgument : instanceIdentifier.getPath()) { textContent.append("/"); - writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes); + writeIdentifierWithNamespacePrefix(element, textContent, + pathArgument.getNodeType(), prefixes); if (pathArgument instanceof NodeIdentifierWithPredicates) { - Map predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues(); + Map predicates = ((NodeIdentifierWithPredicates) pathArgument) + .getKeyValues(); for (QName keyValue : predicates.keySet()) { - String predicateValue = String.valueOf(predicates.get(keyValue)); + String predicateValue = String.valueOf(predicates + .get(keyValue)); textContent.append("["); - writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes); + writeIdentifierWithNamespacePrefix(element, textContent, + keyValue, prefixes); textContent.append("='"); textContent.append(predicateValue); textContent.append("'"); @@ -295,7 +494,7 @@ public class ListenerAdapter implements DataChangeListener { } } else if (pathArgument instanceof NodeWithValue) { textContent.append("[.='"); - textContent.append(((NodeWithValue)pathArgument).getValue()); + textContent.append(((NodeWithValue) pathArgument).getValue()); textContent.append("'"); textContent.append("]"); } @@ -303,18 +502,31 @@ public class ListenerAdapter implements DataChangeListener { element.setTextContent(textContent.toString()); } - private static void writeIdentifierWithNamespacePrefix(Element element, StringBuilder textContent, QName qName, - Map prefixes) { + /** + * Writes identifier that consists of prefix and QName. + * + * @param element + * {@link Element} + * @param textContent + * StringBuilder + * @param qName + * QName + * @param prefixes + * Map of namespaces and prefixes. + */ + private static void writeIdentifierWithNamespacePrefix(Element element, + StringBuilder textContent, QName qName, Map prefixes) { String namespace = qName.getNamespace().toString(); String prefix = prefixes.get(namespace); if (prefix == null) { prefix = qName.getPrefix(); - if (prefix == null || prefix.isEmpty() || prefixes.containsValue(prefix)) { + if (prefix == null || prefix.isEmpty() + || prefixes.containsValue(prefix)) { prefix = generateNewPrefix(prefixes.values()); } } - element.setAttribute("xmlns:" + prefix, namespace.toString()); + element.setAttribute("xmlns:" + prefix, namespace); textContent.append(prefix); prefixes.put(namespace, prefix); @@ -322,6 +534,13 @@ public class ListenerAdapter implements DataChangeListener { textContent.append(qName.getLocalName()); } + /** + * Generates new prefix which consists of four random characters . + * + * @param prefixes + * Collection of prefixes. + * @return New prefix which consists of four random characters . + */ private static String generateNewPrefix(Collection prefixes) { StringBuilder result = null; Random random = new Random(); @@ -336,18 +555,39 @@ public class ListenerAdapter implements DataChangeListener { return result.toString(); } + /** + * Gets path pointed to data in data store. + * + * @return Path pointed to data in data store. + */ public InstanceIdentifier getPath() { return path; } - public void setRegistration(ListenerRegistration registration) { + /** + * Sets {@link ListenerRegistration} registration. + * + * @param registration + * ListenerRegistration + */ + public void setRegistration( + ListenerRegistration registration) { this.registration = registration; } + /** + * Gets the name of the stream. + * + * @return The name of the stream. + */ public String getStreamName() { return streamName; } + /** + * Removes all subscribers and unregisters event bus change recorder form + * event bus. + */ public void close() throws Exception { subscribers = new ConcurrentSet<>(); registration.close(); @@ -355,10 +595,22 @@ public class ListenerAdapter implements DataChangeListener { eventBus.unregister(eventBusChangeRecorder); } + /** + * Checks if {@link ListenerRegistration} registration exist. + * + * @return True if exist, false otherwise. + */ public boolean isListening() { return registration == null ? false : true; } + /** + * Creates event of type {@link EventType#REGISTER}, set {@link Channel} + * subscriber to the event and post event into event bus. + * + * @param subscriber + * Channel + */ public void addSubscriber(Channel subscriber) { if (!subscriber.isActive()) { logger.debug("Channel is not active between websocket server and subscriber {}" @@ -369,6 +621,12 @@ public class ListenerAdapter implements DataChangeListener { eventBus.post(event); } + /** + * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} + * subscriber to the event and posts event into event bus. + * + * @param subscriber + */ public void removeSubscriber(Channel subscriber) { logger.debug("Subscriber {} is removed.", subscriber.remoteAddress()); Event event = new Event(EventType.DEREGISTER); @@ -376,13 +634,21 @@ public class ListenerAdapter implements DataChangeListener { eventBus.post(event); } + /** + * Checks if exists at least one {@link Channel} subscriber. + * + * @return True if exist at least one {@link Channel} subscriber, false + * otherwise. + */ public boolean hasSubscribers() { return !subscribers.isEmpty(); } + /** + * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}. + */ private static enum Store { - CONFIG("config"), - OPERATION("operation"); + CONFIG("config"), OPERATION("operation"); private final String value; @@ -391,10 +657,12 @@ public class ListenerAdapter implements DataChangeListener { } } + /** + * Consists of three types {@link Operation#CREATED}, + * {@link Operation#UPDATED} and {@link Operation#DELETED}. + */ private static enum Operation { - CREATED("created"), - UPDATED("updated"), - DELETED("deleted"); + CREATED("created"), UPDATED("updated"), DELETED("deleted"); private final String value;