*/
package org.opendaylight.controller.sal.streams.listeners;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.internal.ConcurrentSet;
-
import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
-
+import java.util.regex.Pattern;
import javax.activation.UnsupportedDataTypeException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
-
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
-import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.sal.rest.impl.XmlMapper;
import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeWithValue;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
-import com.google.common.base.Preconditions;
-import com.google.common.eventbus.AsyncEventBus;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-
/**
- * {@link ListenerAdapter} is responsible to track events, which occurred by
- * changing data in data source.
+ * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
*/
-public class ListenerAdapter implements DataChangeListener {
+public class ListenerAdapter implements DOMDataChangeListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
+ private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
+ private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
+ private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
- private static final Logger logger = LoggerFactory
- .getLogger(ListenerAdapter.class);
private final XmlMapper xmlMapper = new XmlMapper();
- private final SimpleDateFormat rfc3339 = new SimpleDateFormat(
- "yyyy-MM-dd'T'hh:mm:ssZ");
+ private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
- private final InstanceIdentifier path;
- private ListenerRegistration<DataChangeListener> registration;
+ private final YangInstanceIdentifier path;
+ private ListenerRegistration<DOMDataChangeListener> registration;
private final String streamName;
private Set<Channel> subscribers = new ConcurrentSet<>();
private final EventBus eventBus;
private final EventBusChangeRecorder eventBusChangeRecorder;
/**
- * Creates new {@link ListenerAdapter} listener specified by path and stream
- * name.
+ * Creates new {@link ListenerAdapter} listener specified by path and stream name.
*
* @param path
* Path to data in data store.
* @param streamName
* The name of the stream.
*/
- ListenerAdapter(InstanceIdentifier path, String streamName) {
+ ListenerAdapter(final YangInstanceIdentifier path, final String streamName) {
Preconditions.checkNotNull(path);
- Preconditions
- .checkArgument(streamName != null && !streamName.isEmpty());
+ Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
this.path = path;
this.streamName = streamName;
eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
}
@Override
- public void onDataChanged(
- DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
- if (!change.getCreatedConfigurationData().isEmpty()
- || !change.getCreatedOperationalData().isEmpty()
- || !change.getUpdatedConfigurationData().isEmpty()
- || !change.getUpdatedOperationalData().isEmpty()
- || !change.getRemovedConfigurationData().isEmpty()
- || !change.getRemovedOperationalData().isEmpty()) {
+ public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ // TODO Auto-generated method stub
+
+ if (!change.getCreatedData().isEmpty() || !change.getUpdatedData().isEmpty()
+ || !change.getRemovedPaths().isEmpty()) {
String xml = prepareXmlFrom(change);
Event event = new Event(EventType.NOTIFY);
event.setData(xml);
*/
private final class EventBusChangeRecorder {
@Subscribe
- public void recordCustomerChange(Event event) {
+ public void recordCustomerChange(final Event event) {
if (event.getType() == EventType.REGISTER) {
Channel subscriber = event.getSubscriber();
if (!subscribers.contains(subscriber)) {
}
} else if (event.getType() == EventType.DEREGISTER) {
subscribers.remove(event.getSubscriber());
- Notificator
- .removeListenerIfNoSubscriberExists(ListenerAdapter.this);
+ Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
} else if (event.getType() == EventType.NOTIFY) {
for (Channel subscriber : subscribers) {
if (subscriber.isActive()) {
- logger.debug("Data are sent to subscriber {}:",
- subscriber.remoteAddress());
- subscriber.writeAndFlush(new TextWebSocketFrame(event
- .getData()));
+ LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
+ subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
} else {
- logger.debug(
- "Subscriber {} is removed - channel is not active yet.",
- subscriber.remoteAddress());
+ LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
subscribers.remove(subscriber);
}
}
}
/**
- * Represents event of specific {@link EventType} type, holds data and
- * {@link Channel} subscriber.
+ * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber.
*/
private final class Event {
private final EventType type;
* @param type
* EventType
*/
- public Event(EventType type) {
+ public Event(final EventType type) {
this.type = type;
}
* @param subscriber
* Channel
*/
- public void setSubscriber(Channel subscriber) {
+ public void setSubscriber(final Channel subscriber) {
this.subscriber = subscriber;
}
* @param String
* data.
*/
- public void setData(String data) {
+ public void setData(final String data) {
this.data = data;
}
* Type of the event.
*/
private enum EventType {
- REGISTER, DEREGISTER, NOTIFY;
+ REGISTER,
+ DEREGISTER,
+ NOTIFY;
}
/**
* DataChangeEvent
* @return Data in printable form.
*/
- private String prepareXmlFrom(
- DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
+ private String prepareXmlFrom(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
Document doc = createDocument();
- Element notificationElement = doc.createElementNS(
- "urn:ietf:params:xml:ns:netconf:notification:1.0",
+ Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
"notification");
doc.appendChild(notificationElement);
notificationElement.appendChild(eventTimeElement);
Element dataChangedNotificationEventElement = doc.createElementNS(
- "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote",
- "data-changed-notification");
- addValuesToDataChangedNotificationEventElement(doc,
- dataChangedNotificationEventElement, change);
+ "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
+ addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change);
notificationElement.appendChild(dataChangedNotificationEventElement);
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
- TransformerFactory tf = TransformerFactory.newInstance();
- Transformer transformer = tf.newTransformer();
- transformer
- .setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
+ Transformer transformer = FACTORY.newTransformer();
+ transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
transformer.setOutputProperty(OutputKeys.METHOD, "xml");
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
- transformer.setOutputProperty(
- "{http://xml.apache.org/xslt}indent-amount", "4");
- transformer.transform(new DOMSource(doc), new StreamResult(
- new OutputStreamWriter(out, "UTF-8")));
+ transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
+ transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
byte[] charData = out.toByteArray();
return new String(charData, "UTF-8");
} catch (TransformerException | UnsupportedEncodingException e) {
String msg = "Error during transformation of Document into String";
- logger.error(msg, e);
+ LOG.error(msg, e);
return msg;
}
}
* Date
* @return Data specified by RFC3339.
*/
- private String toRFC3339(Date d) {
- return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2");
+ private String toRFC3339(final Date d) {
+ return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2");
}
/**
* Creates {@link Document} document.
- *
* @return {@link Document} document.
*/
private Document createDocument() {
- DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
- Document doc = null;
+ final DocumentBuilder bob;
try {
- DocumentBuilder bob = dbf.newDocumentBuilder();
- doc = bob.newDocument();
+ bob = DBF.newDocumentBuilder();
} catch (ParserConfigurationException e) {
return null;
}
- return doc;
+ return bob.newDocument();
}
/**
* @param change
* {@link DataChangeEvent}
*/
- private void addValuesToDataChangedNotificationEventElement(Document doc,
- Element dataChangedNotificationEventElement,
- DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
- addValuesFromDataToElement(doc, change.getCreatedConfigurationData(),
- dataChangedNotificationEventElement, Store.CONFIG,
+ private void addValuesToDataChangedNotificationEventElement(final Document doc,
+ final Element dataChangedNotificationEventElement,
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ addValuesFromDataToElement(doc, change.getCreatedData().keySet(), dataChangedNotificationEventElement,
Operation.CREATED);
- addValuesFromDataToElement(doc, change.getCreatedOperationalData(),
- dataChangedNotificationEventElement, Store.OPERATION,
- Operation.CREATED);
- if (change.getCreatedConfigurationData().isEmpty()) {
- addValuesFromDataToElement(doc,
- change.getUpdatedConfigurationData(),
- dataChangedNotificationEventElement, Store.CONFIG,
- Operation.UPDATED);
- }
- if (change.getCreatedOperationalData().isEmpty()) {
- addValuesFromDataToElement(doc, change.getUpdatedOperationalData(),
- dataChangedNotificationEventElement, Store.OPERATION,
+ if (change.getCreatedData().isEmpty()) {
+ addValuesFromDataToElement(doc, change.getUpdatedData().keySet(), dataChangedNotificationEventElement,
Operation.UPDATED);
}
- addValuesFromDataToElement(doc, change.getRemovedConfigurationData(),
- dataChangedNotificationEventElement, Store.CONFIG,
- Operation.DELETED);
- addValuesFromDataToElement(doc, change.getRemovedOperationalData(),
- dataChangedNotificationEventElement, Store.OPERATION,
+ addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement,
Operation.DELETED);
}
* @param doc
* {@link Document}
* @param data
- * Set of {@link InstanceIdentifier}.
+ * Set of {@link YangInstanceIdentifier}.
* @param element
* {@link Element}
* @param store
* @param operation
* {@link Operation}
*/
- private void addValuesFromDataToElement(Document doc,
- Set<InstanceIdentifier> data, Element element, Store store,
+ private void addValuesFromDataToElement(Document doc, Set<YangInstanceIdentifier> data, Element element,
Operation operation) {
if (data == null || data.isEmpty()) {
return;
}
- for (InstanceIdentifier path : data) {
- Node node = createDataChangeEventElement(doc, path, null, store,
- operation);
+ for (YangInstanceIdentifier path : data) {
+ Node node = createDataChangeEventElement(doc, path, null, operation);
element.appendChild(node);
}
}
* @param doc
* {@link Document}
* @param data
- * Map of {@link InstanceIdentifier} and {@link CompositeNode}.
+ * Map of {@link YangInstanceIdentifier} and {@link CompositeNode}.
* @param element
* {@link Element}
* @param store
* @param operation
* {@link Operation}
*/
- private void addValuesFromDataToElement(Document doc,
- Map<InstanceIdentifier, CompositeNode> data, Element element,
- Store store, Operation operation) {
+ private void addValuesFromDataToElement(Document doc, Map<YangInstanceIdentifier, CompositeNode> data, Element element,
+ Operation operation) {
if (data == null || data.isEmpty()) {
return;
}
- for (Entry<InstanceIdentifier, CompositeNode> entry : data.entrySet()) {
- Node node = createDataChangeEventElement(doc, entry.getKey(),
- entry.getValue(), store, operation);
+ for (Entry<YangInstanceIdentifier, CompositeNode> entry : data.entrySet()) {
+ Node node = createDataChangeEventElement(doc, entry.getKey(), entry.getValue(), operation);
element.appendChild(node);
}
}
* {@link Operation}
* @return {@link Node} node represented by changed event element.
*/
- private Node createDataChangeEventElement(Document doc,
- InstanceIdentifier path, CompositeNode data, Store store,
+ private Node createDataChangeEventElement(Document doc, YangInstanceIdentifier path, CompositeNode data,
Operation operation) {
Element dataChangeEventElement = doc.createElement("data-change-event");
addPathAsValueToElement(path, pathElement);
dataChangeEventElement.appendChild(pathElement);
- Element storeElement = doc.createElement("store");
- storeElement.setTextContent(store.value);
- dataChangeEventElement.appendChild(storeElement);
+ // Element storeElement = doc.createElement("store");
+ // storeElement.setTextContent(store.value);
+ // dataChangeEventElement.appendChild(storeElement);
Element operationElement = doc.createElement("operation");
operationElement.setTextContent(operation.value);
* {@link CompositeNode}
* @return Data in XML format.
*/
- private Node translateToXml(InstanceIdentifier path, CompositeNode data) {
- DataNodeContainer schemaNode = ControllerContext.getInstance()
- .getDataNodeContainerFor(path);
+ private Node translateToXml(final YangInstanceIdentifier path, final CompositeNode data) {
+ DataNodeContainer schemaNode = ControllerContext.getInstance().getDataNodeContainerFor(path);
if (schemaNode == null) {
- logger.info(
+ LOG.info(
"Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.",
path);
return null;
Document xml = xmlMapper.write(data, schemaNode);
return xml.getFirstChild();
} catch (UnsupportedDataTypeException e) {
- logger.error(
- "Error occured during translation of notification to XML.",
- e);
+ LOG.error("Error occured during translation of notification to XML.", e);
return null;
}
}
* @param element
* {@link Element}
*/
- private void addPathAsValueToElement(InstanceIdentifier path,
- Element element) {
+ private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) {
// Map< key = namespace, value = prefix>
Map<String, String> prefixes = new HashMap<>();
- InstanceIdentifier instanceIdentifier = path;
+ YangInstanceIdentifier instanceIdentifier = path;
StringBuilder textContent = new StringBuilder();
- for (PathArgument pathArgument : instanceIdentifier.getPath()) {
+
+ // FIXME: BUG-1281: this is duplicated code from yangtools (BUG-1275)
+ for (PathArgument pathArgument : instanceIdentifier.getPathArguments()) {
textContent.append("/");
- writeIdentifierWithNamespacePrefix(element, textContent,
- pathArgument.getNodeType(), prefixes);
+ writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes);
if (pathArgument instanceof NodeIdentifierWithPredicates) {
- Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument)
- .getKeyValues();
+ Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
for (QName keyValue : predicates.keySet()) {
- String predicateValue = String.valueOf(predicates
- .get(keyValue));
+ String predicateValue = String.valueOf(predicates.get(keyValue));
textContent.append("[");
- writeIdentifierWithNamespacePrefix(element, textContent,
- keyValue, prefixes);
+ writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes);
textContent.append("='");
textContent.append(predicateValue);
textContent.append("'");
* @param prefixes
* Map of namespaces and prefixes.
*/
- private static void writeIdentifierWithNamespacePrefix(Element element,
- StringBuilder textContent, QName qName, Map<String, String> prefixes) {
+ private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
+ final QName qName, final Map<String, String> prefixes) {
String namespace = qName.getNamespace().toString();
String prefix = prefixes.get(namespace);
if (prefix == null) {
prefix = qName.getPrefix();
- if (prefix == null || prefix.isEmpty()
- || prefixes.containsValue(prefix)) {
+ if (prefix == null || prefix.isEmpty() || prefixes.containsValue(prefix)) {
prefix = generateNewPrefix(prefixes.values());
}
}
* Collection of prefixes.
* @return New prefix which consists of four random characters <a-z>.
*/
- private static String generateNewPrefix(Collection<String> prefixes) {
+ private static String generateNewPrefix(final Collection<String> prefixes) {
StringBuilder result = null;
Random random = new Random();
do {
*
* @return Path pointed to data in data store.
*/
- public InstanceIdentifier getPath() {
+ public YangInstanceIdentifier getPath() {
return path;
}
* @param registration
* ListenerRegistration<DataChangeListener>
*/
- public void setRegistration(
- ListenerRegistration<DataChangeListener> registration) {
+ public void setRegistration(final ListenerRegistration<DOMDataChangeListener> registration) {
this.registration = registration;
}
}
/**
- * Removes all subscribers and unregisters event bus change recorder form
- * event bus.
+ * Removes all subscribers and unregisters event bus change recorder form event bus.
*/
public void close() throws Exception {
subscribers = new ConcurrentSet<>();
}
/**
- * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
- * subscriber to the event and post event into event bus.
+ * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into
+ * event bus.
*
* @param subscriber
* Channel
*/
- public void addSubscriber(Channel subscriber) {
+ public void addSubscriber(final Channel subscriber) {
if (!subscriber.isActive()) {
- logger.debug("Channel is not active between websocket server and subscriber {}"
- + subscriber.remoteAddress());
+ LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
}
Event event = new Event(EventType.REGISTER);
event.setSubscriber(subscriber);
}
/**
- * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
- * subscriber to the event and posts event into event bus.
+ * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event
+ * into event bus.
*
* @param subscriber
*/
- public void removeSubscriber(Channel subscriber) {
- logger.debug("Subscriber {} is removed.", subscriber.remoteAddress());
+ public void removeSubscriber(final Channel subscriber) {
+ LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
Event event = new Event(EventType.DEREGISTER);
event.setSubscriber(subscriber);
eventBus.post(event);
/**
* Checks if exists at least one {@link Channel} subscriber.
*
- * @return True if exist at least one {@link Channel} subscriber, false
- * otherwise.
+ * @return True if exist at least one {@link Channel} subscriber, false otherwise.
*/
public boolean hasSubscribers() {
return !subscribers.isEmpty();
* Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
*/
private static enum Store {
- CONFIG("config"), OPERATION("operation");
+ CONFIG("config"),
+ OPERATION("operation");
private final String value;
- private Store(String value) {
+ private Store(final String value) {
this.value = value;
}
}
/**
- * Consists of three types {@link Operation#CREATED},
- * {@link Operation#UPDATED} and {@link Operation#DELETED}.
+ * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
*/
private static enum Operation {
- CREATED("created"), UPDATED("updated"), DELETED("deleted");
+ CREATED("created"),
+ UPDATED("updated"),
+ DELETED("deleted");
private final String value;
- private Operation(String value) {
+ private Operation(final String value) {
this.value = value;
}
}