Merge "Bug 2517: Catch RuntimeExceptions thrown from the DCL in DataChangeListener"
[controller.git] / opendaylight / md-sal / sal-rest-connector / src / main / java / org / opendaylight / controller / sal / streams / listeners / ListenerAdapter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.OutputStreamWriter;
20 import java.io.UnsupportedEncodingException;
21 import java.text.SimpleDateFormat;
22 import java.util.Collection;
23 import java.util.Date;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.Random;
28 import java.util.Set;
29 import java.util.concurrent.Executors;
30 import java.util.regex.Pattern;
31 import javax.activation.UnsupportedDataTypeException;
32 import javax.xml.parsers.DocumentBuilder;
33 import javax.xml.parsers.DocumentBuilderFactory;
34 import javax.xml.parsers.ParserConfigurationException;
35 import javax.xml.transform.OutputKeys;
36 import javax.xml.transform.Transformer;
37 import javax.xml.transform.TransformerException;
38 import javax.xml.transform.TransformerFactory;
39 import javax.xml.transform.dom.DOMSource;
40 import javax.xml.transform.stream.StreamResult;
41 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
42 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
43 import org.opendaylight.controller.sal.rest.impl.XmlMapper;
44 import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.yang.common.QName;
47 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import org.w3c.dom.Document;
57 import org.w3c.dom.Element;
58 import org.w3c.dom.Node;
59
60 /**
61  * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
62  */
63 public class ListenerAdapter implements DOMDataChangeListener {
64
65     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
66     private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
67     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
68     private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
69
70     private final XmlMapper xmlMapper = new XmlMapper();
71     private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
72
73     private final YangInstanceIdentifier path;
74     private ListenerRegistration<DOMDataChangeListener> registration;
75     private final String streamName;
76     private Set<Channel> subscribers = new ConcurrentSet<>();
77     private final EventBus eventBus;
78     private final EventBusChangeRecorder eventBusChangeRecorder;
79
80     /**
81      * Creates new {@link ListenerAdapter} listener specified by path and stream name.
82      *
83      * @param path
84      *            Path to data in data store.
85      * @param streamName
86      *            The name of the stream.
87      */
88     ListenerAdapter(final YangInstanceIdentifier path, final String streamName) {
89         Preconditions.checkNotNull(path);
90         Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
91         this.path = path;
92         this.streamName = streamName;
93         eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
94         eventBusChangeRecorder = new EventBusChangeRecorder();
95         eventBus.register(eventBusChangeRecorder);
96     }
97
98     @Override
99     public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
100         // TODO Auto-generated method stub
101
102         if (!change.getCreatedData().isEmpty() || !change.getUpdatedData().isEmpty()
103                 || !change.getRemovedPaths().isEmpty()) {
104             final String xml = prepareXmlFrom(change);
105             final Event event = new Event(EventType.NOTIFY);
106             event.setData(xml);
107             eventBus.post(event);
108         }
109     }
110
111     /**
112      * Tracks events of data change by customer.
113      */
114     private final class EventBusChangeRecorder {
115         @Subscribe
116         public void recordCustomerChange(final Event event) {
117             if (event.getType() == EventType.REGISTER) {
118                 final Channel subscriber = event.getSubscriber();
119                 if (!subscribers.contains(subscriber)) {
120                     subscribers.add(subscriber);
121                 }
122             } else if (event.getType() == EventType.DEREGISTER) {
123                 subscribers.remove(event.getSubscriber());
124                 Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
125             } else if (event.getType() == EventType.NOTIFY) {
126                 for (final Channel subscriber : subscribers) {
127                     if (subscriber.isActive()) {
128                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
129                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
130                     } else {
131                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
132                         subscribers.remove(subscriber);
133                     }
134                 }
135             }
136         }
137     }
138
139     /**
140      * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber.
141      */
142     private final class Event {
143         private final EventType type;
144         private Channel subscriber;
145         private String data;
146
147         /**
148          * Creates new event specified by {@link EventType} type.
149          *
150          * @param type
151          *            EventType
152          */
153         public Event(final EventType type) {
154             this.type = type;
155         }
156
157         /**
158          * Gets the {@link Channel} subscriber.
159          *
160          * @return Channel
161          */
162         public Channel getSubscriber() {
163             return subscriber;
164         }
165
166         /**
167          * Sets subscriber for event.
168          *
169          * @param subscriber
170          *            Channel
171          */
172         public void setSubscriber(final Channel subscriber) {
173             this.subscriber = subscriber;
174         }
175
176         /**
177          * Gets event data.
178          *
179          * @return String representation of event data.
180          */
181         public String getData() {
182             return data;
183         }
184
185         /**
186          * Sets event data.
187          *
188          * @param String
189          *            data.
190          */
191         public void setData(final String data) {
192             this.data = data;
193         }
194
195         /**
196          * Gets event type.
197          *
198          * @return The type of the event.
199          */
200         public EventType getType() {
201             return type;
202         }
203     }
204
205     /**
206      * Type of the event.
207      */
208     private enum EventType {
209         REGISTER,
210         DEREGISTER,
211         NOTIFY;
212     }
213
214     /**
215      * Prepare data in printable form and transform it to String.
216      *
217      * @param change
218      *            DataChangeEvent
219      * @return Data in printable form.
220      */
221     private String prepareXmlFrom(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
222         final Document doc = createDocument();
223         final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
224                 "notification");
225         doc.appendChild(notificationElement);
226
227         final Element eventTimeElement = doc.createElement("eventTime");
228         eventTimeElement.setTextContent(toRFC3339(new Date()));
229         notificationElement.appendChild(eventTimeElement);
230
231         final Element dataChangedNotificationEventElement = doc.createElementNS(
232                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
233         addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change);
234         notificationElement.appendChild(dataChangedNotificationEventElement);
235
236         try {
237             final ByteArrayOutputStream out = new ByteArrayOutputStream();
238             final Transformer transformer = FACTORY.newTransformer();
239             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
240             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
241             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
242             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
243             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
244             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
245             final byte[] charData = out.toByteArray();
246             return new String(charData, "UTF-8");
247         } catch (TransformerException | UnsupportedEncodingException e) {
248             final String msg = "Error during transformation of Document into String";
249             LOG.error(msg, e);
250             return msg;
251         }
252     }
253
254     /**
255      * Formats data specified by RFC3339.
256      *
257      * @param d
258      *            Date
259      * @return Data specified by RFC3339.
260      */
261     private String toRFC3339(final Date d) {
262         return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2");
263     }
264
265     /**
266      * Creates {@link Document} document.
267      * @return {@link Document} document.
268      */
269     private Document createDocument() {
270         final DocumentBuilder bob;
271         try {
272             bob = DBF.newDocumentBuilder();
273         } catch (final ParserConfigurationException e) {
274             return null;
275         }
276         return bob.newDocument();
277     }
278
279     /**
280      * Adds values to data changed notification event element.
281      *
282      * @param doc
283      *            {@link Document}
284      * @param dataChangedNotificationEventElement
285      *            {@link Element}
286      * @param change
287      *            {@link DataChangeEvent}
288      */
289     private void addValuesToDataChangedNotificationEventElement(final Document doc,
290             final Element dataChangedNotificationEventElement,
291             final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
292         addValuesFromDataToElement(doc, change.getCreatedData().keySet(), dataChangedNotificationEventElement,
293                 Operation.CREATED);
294         if (change.getCreatedData().isEmpty()) {
295             addValuesFromDataToElement(doc, change.getUpdatedData().keySet(), dataChangedNotificationEventElement,
296                     Operation.UPDATED);
297         }
298         addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement,
299                 Operation.DELETED);
300     }
301
302     /**
303      * Adds values from data to element.
304      *
305      * @param doc
306      *            {@link Document}
307      * @param data
308      *            Set of {@link YangInstanceIdentifier}.
309      * @param element
310      *            {@link Element}
311      * @param store
312      *            {@link Store}
313      * @param operation
314      *            {@link Operation}
315      */
316     private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data, final Element element,
317             final Operation operation) {
318         if (data == null || data.isEmpty()) {
319             return;
320         }
321         for (final YangInstanceIdentifier path : data) {
322             final Node node = createDataChangeEventElement(doc, path, null, operation);
323             element.appendChild(node);
324         }
325     }
326
327     /**
328      * Adds values from data to element.
329      *
330      * @param doc
331      *            {@link Document}
332      * @param data
333      *            Map of {@link YangInstanceIdentifier} and {@link CompositeNode}.
334      * @param element
335      *            {@link Element}
336      * @param store
337      *            {@link Store}
338      * @param operation
339      *            {@link Operation}
340      */
341     private void addValuesFromDataToElement(final Document doc, final Map<YangInstanceIdentifier, CompositeNode> data, final Element element,
342             final Operation operation) {
343         if (data == null || data.isEmpty()) {
344             return;
345         }
346         for (final Entry<YangInstanceIdentifier, CompositeNode> entry : data.entrySet()) {
347             final Node node = createDataChangeEventElement(doc, entry.getKey(), entry.getValue(), operation);
348             element.appendChild(node);
349         }
350     }
351
352     /**
353      * Creates changed event element from data.
354      *
355      * @param doc
356      *            {@link Document}
357      * @param path
358      *            Path to data in data store.
359      * @param data
360      *            {@link CompositeNode}
361      * @param store
362      *            {@link Store}
363      * @param operation
364      *            {@link Operation}
365      * @return {@link Node} node represented by changed event element.
366      */
367     private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path, final CompositeNode data,
368             final Operation operation) {
369         final Element dataChangeEventElement = doc.createElement("data-change-event");
370         final Element pathElement = doc.createElement("path");
371         addPathAsValueToElement(path, pathElement);
372         dataChangeEventElement.appendChild(pathElement);
373
374         // Element storeElement = doc.createElement("store");
375         // storeElement.setTextContent(store.value);
376         // dataChangeEventElement.appendChild(storeElement);
377
378         final Element operationElement = doc.createElement("operation");
379         operationElement.setTextContent(operation.value);
380         dataChangeEventElement.appendChild(operationElement);
381
382         if (data != null) {
383             final Element dataElement = doc.createElement("data");
384             final Node dataAnyXml = translateToXml(path, data);
385             final Node adoptedNode = doc.adoptNode(dataAnyXml);
386             dataElement.appendChild(adoptedNode);
387             dataChangeEventElement.appendChild(dataElement);
388         }
389
390         return dataChangeEventElement;
391     }
392
393     /**
394      * Translates {@link CompositeNode} data to XML format.
395      *
396      * @param path
397      *            Path to data in data store.
398      * @param data
399      *            {@link CompositeNode}
400      * @return Data in XML format.
401      */
402     private Node translateToXml(final YangInstanceIdentifier path, final CompositeNode data) {
403         final DataNodeContainer schemaNode = ControllerContext.getInstance().getDataNodeContainerFor(path);
404         if (schemaNode == null) {
405             LOG.info(
406                     "Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.",
407                     path);
408             return null;
409         }
410         try {
411             final Document xml = xmlMapper.write(data, schemaNode);
412             return xml.getFirstChild();
413         } catch (final UnsupportedDataTypeException e) {
414             LOG.error("Error occured during translation of notification to XML.", e);
415             return null;
416         }
417     }
418
419     /**
420      * Adds path as value to element.
421      *
422      * @param path
423      *            Path to data in data store.
424      * @param element
425      *            {@link Element}
426      */
427     private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) {
428         // Map< key = namespace, value = prefix>
429         final Map<String, String> prefixes = new HashMap<>();
430         final YangInstanceIdentifier instanceIdentifier = path;
431         final StringBuilder textContent = new StringBuilder();
432
433         // FIXME: BUG-1281: this is duplicated code from yangtools (BUG-1275)
434         for (final PathArgument pathArgument : instanceIdentifier.getPathArguments()) {
435             textContent.append("/");
436             writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes);
437             if (pathArgument instanceof NodeIdentifierWithPredicates) {
438                 final Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
439                 for (final QName keyValue : predicates.keySet()) {
440                     final String predicateValue = String.valueOf(predicates.get(keyValue));
441                     textContent.append("[");
442                     writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes);
443                     textContent.append("='");
444                     textContent.append(predicateValue);
445                     textContent.append("'");
446                     textContent.append("]");
447                 }
448             } else if (pathArgument instanceof NodeWithValue) {
449                 textContent.append("[.='");
450                 textContent.append(((NodeWithValue) pathArgument).getValue());
451                 textContent.append("'");
452                 textContent.append("]");
453             }
454         }
455         element.setTextContent(textContent.toString());
456     }
457
458     /**
459      * Writes identifier that consists of prefix and QName.
460      *
461      * @param element
462      *            {@link Element}
463      * @param textContent
464      *            StringBuilder
465      * @param qName
466      *            QName
467      * @param prefixes
468      *            Map of namespaces and prefixes.
469      */
470     private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
471             final QName qName, final Map<String, String> prefixes) {
472         final String namespace = qName.getNamespace().toString();
473         String prefix = prefixes.get(namespace);
474         if (prefix == null) {
475             prefix = generateNewPrefix(prefixes.values());
476         }
477
478         element.setAttribute("xmlns:" + prefix, namespace);
479         textContent.append(prefix);
480         prefixes.put(namespace, prefix);
481
482         textContent.append(":");
483         textContent.append(qName.getLocalName());
484     }
485
486     /**
487      * Generates new prefix which consists of four random characters <a-z>.
488      *
489      * @param prefixes
490      *            Collection of prefixes.
491      * @return New prefix which consists of four random characters <a-z>.
492      */
493     private static String generateNewPrefix(final Collection<String> prefixes) {
494         StringBuilder result = null;
495         final Random random = new Random();
496         do {
497             result = new StringBuilder();
498             for (int i = 0; i < 4; i++) {
499                 final int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
500                 result.append(Character.toChars(randomNumber));
501             }
502         } while (prefixes.contains(result.toString()));
503
504         return result.toString();
505     }
506
507     /**
508      * Gets path pointed to data in data store.
509      *
510      * @return Path pointed to data in data store.
511      */
512     public YangInstanceIdentifier getPath() {
513         return path;
514     }
515
516     /**
517      * Sets {@link ListenerRegistration} registration.
518      *
519      * @param registration
520      *            ListenerRegistration<DataChangeListener>
521      */
522     public void setRegistration(final ListenerRegistration<DOMDataChangeListener> registration) {
523         this.registration = registration;
524     }
525
526     /**
527      * Gets the name of the stream.
528      *
529      * @return The name of the stream.
530      */
531     public String getStreamName() {
532         return streamName;
533     }
534
535     /**
536      * Removes all subscribers and unregisters event bus change recorder form event bus.
537      */
538     public void close() throws Exception {
539         subscribers = new ConcurrentSet<>();
540         registration.close();
541         registration = null;
542         eventBus.unregister(eventBusChangeRecorder);
543     }
544
545     /**
546      * Checks if {@link ListenerRegistration} registration exist.
547      *
548      * @return True if exist, false otherwise.
549      */
550     public boolean isListening() {
551         return registration == null ? false : true;
552     }
553
554     /**
555      * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into
556      * event bus.
557      *
558      * @param subscriber
559      *            Channel
560      */
561     public void addSubscriber(final Channel subscriber) {
562         if (!subscriber.isActive()) {
563             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
564         }
565         final Event event = new Event(EventType.REGISTER);
566         event.setSubscriber(subscriber);
567         eventBus.post(event);
568     }
569
570     /**
571      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event
572      * into event bus.
573      *
574      * @param subscriber
575      */
576     public void removeSubscriber(final Channel subscriber) {
577         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
578         final Event event = new Event(EventType.DEREGISTER);
579         event.setSubscriber(subscriber);
580         eventBus.post(event);
581     }
582
583     /**
584      * Checks if exists at least one {@link Channel} subscriber.
585      *
586      * @return True if exist at least one {@link Channel} subscriber, false otherwise.
587      */
588     public boolean hasSubscribers() {
589         return !subscribers.isEmpty();
590     }
591
592     /**
593      * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
594      */
595     private static enum Store {
596         CONFIG("config"),
597         OPERATION("operation");
598
599         private final String value;
600
601         private Store(final String value) {
602             this.value = value;
603         }
604     }
605
606     /**
607      * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
608      */
609     private static enum Operation {
610         CREATED("created"),
611         UPDATED("updated"),
612         DELETED("deleted");
613
614         private final String value;
615
616         private Operation(final String value) {
617             this.value = value;
618         }
619     }
620
621 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.