Merge "BUG-614: migrate RuntimeGeneratedInvoker"
[controller.git] / opendaylight / md-sal / sal-rest-connector / src / main / java / org / opendaylight / controller / sal / streams / listeners / ListenerAdapter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.streams.listeners;
9
10 import io.netty.channel.Channel;
11 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
12 import io.netty.util.internal.ConcurrentSet;
13
14 import java.io.ByteArrayOutputStream;
15 import java.io.OutputStreamWriter;
16 import java.io.UnsupportedEncodingException;
17 import java.text.SimpleDateFormat;
18 import java.util.Collection;
19 import java.util.Date;
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.Random;
24 import java.util.Set;
25 import java.util.concurrent.Executors;
26
27 import javax.activation.UnsupportedDataTypeException;
28 import javax.xml.parsers.DocumentBuilder;
29 import javax.xml.parsers.DocumentBuilderFactory;
30 import javax.xml.parsers.ParserConfigurationException;
31 import javax.xml.transform.OutputKeys;
32 import javax.xml.transform.Transformer;
33 import javax.xml.transform.TransformerException;
34 import javax.xml.transform.TransformerFactory;
35 import javax.xml.transform.dom.DOMSource;
36 import javax.xml.transform.stream.StreamResult;
37
38 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
39 import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
40 import org.opendaylight.controller.sal.rest.impl.XmlMapper;
41 import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
42 import org.opendaylight.yangtools.concepts.ListenerRegistration;
43 import org.opendaylight.yangtools.yang.common.QName;
44 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
45 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
47 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeWithValue;
48 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
49 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import org.w3c.dom.Document;
53 import org.w3c.dom.Element;
54 import org.w3c.dom.Node;
55
56 import com.google.common.base.Preconditions;
57 import com.google.common.eventbus.AsyncEventBus;
58 import com.google.common.eventbus.EventBus;
59 import com.google.common.eventbus.Subscribe;
60
61 /**
62  * {@link ListenerAdapter} is responsible to track events, which occurred by
63  * changing data in data source.
64  */
65 public class ListenerAdapter implements DataChangeListener {
66
67     private static final Logger logger = LoggerFactory
68             .getLogger(ListenerAdapter.class);
69     private final XmlMapper xmlMapper = new XmlMapper();
70     private final SimpleDateFormat rfc3339 = new SimpleDateFormat(
71             "yyyy-MM-dd'T'hh:mm:ssZ");
72
73     private final InstanceIdentifier path;
74     private ListenerRegistration<DataChangeListener> registration;
75     private final String streamName;
76     private Set<Channel> subscribers = new ConcurrentSet<>();
77     private final EventBus eventBus;
78     private final EventBusChangeRecorder eventBusChangeRecorder;
79
80     /**
81      * Creates new {@link ListenerAdapter} listener specified by path and stream
82      * name.
83      *
84      * @param path
85      *            Path to data in data store.
86      * @param streamName
87      *            The name of the stream.
88      */
89     ListenerAdapter(InstanceIdentifier path, String streamName) {
90         Preconditions.checkNotNull(path);
91         Preconditions
92                 .checkArgument(streamName != null && !streamName.isEmpty());
93         this.path = path;
94         this.streamName = streamName;
95         eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
96         eventBusChangeRecorder = new EventBusChangeRecorder();
97         eventBus.register(eventBusChangeRecorder);
98     }
99
100     @Override
101     public void onDataChanged(
102             DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
103         if (!change.getCreatedConfigurationData().isEmpty()
104                 || !change.getCreatedOperationalData().isEmpty()
105                 || !change.getUpdatedConfigurationData().isEmpty()
106                 || !change.getUpdatedOperationalData().isEmpty()
107                 || !change.getRemovedConfigurationData().isEmpty()
108                 || !change.getRemovedOperationalData().isEmpty()) {
109             String xml = prepareXmlFrom(change);
110             Event event = new Event(EventType.NOTIFY);
111             event.setData(xml);
112             eventBus.post(event);
113         }
114     }
115
116     /**
117      * Tracks events of data change by customer.
118      */
119     private final class EventBusChangeRecorder {
120         @Subscribe
121         public void recordCustomerChange(Event event) {
122             if (event.getType() == EventType.REGISTER) {
123                 Channel subscriber = event.getSubscriber();
124                 if (!subscribers.contains(subscriber)) {
125                     subscribers.add(subscriber);
126                 }
127             } else if (event.getType() == EventType.DEREGISTER) {
128                 subscribers.remove(event.getSubscriber());
129                 Notificator
130                         .removeListenerIfNoSubscriberExists(ListenerAdapter.this);
131             } else if (event.getType() == EventType.NOTIFY) {
132                 for (Channel subscriber : subscribers) {
133                     if (subscriber.isActive()) {
134                         logger.debug("Data are sent to subscriber {}:",
135                                 subscriber.remoteAddress());
136                         subscriber.writeAndFlush(new TextWebSocketFrame(event
137                                 .getData()));
138                     } else {
139                         logger.debug(
140                                 "Subscriber {} is removed - channel is not active yet.",
141                                 subscriber.remoteAddress());
142                         subscribers.remove(subscriber);
143                     }
144                 }
145             }
146         }
147     }
148
149     /**
150      * Represents event of specific {@link EventType} type, holds data and
151      * {@link Channel} subscriber.
152      */
153     private final class Event {
154         private final EventType type;
155         private Channel subscriber;
156         private String data;
157
158         /**
159          * Creates new event specified by {@link EventType} type.
160          *
161          * @param type
162          *            EventType
163          */
164         public Event(EventType type) {
165             this.type = type;
166         }
167
168         /**
169          * Gets the {@link Channel} subscriber.
170          *
171          * @return Channel
172          */
173         public Channel getSubscriber() {
174             return subscriber;
175         }
176
177         /**
178          * Sets subscriber for event.
179          *
180          * @param subscriber
181          *            Channel
182          */
183         public void setSubscriber(Channel subscriber) {
184             this.subscriber = subscriber;
185         }
186
187         /**
188          * Gets event data.
189          *
190          * @return String representation of event data.
191          */
192         public String getData() {
193             return data;
194         }
195
196         /**
197          * Sets event data.
198          *
199          * @param String
200          *            data.
201          */
202         public void setData(String data) {
203             this.data = data;
204         }
205
206         /**
207          * Gets event type.
208          *
209          * @return The type of the event.
210          */
211         public EventType getType() {
212             return type;
213         }
214     }
215
216     /**
217      * Type of the event.
218      */
219     private enum EventType {
220         REGISTER, DEREGISTER, NOTIFY;
221     }
222
223     /**
224      * Prepare data in printable form and transform it to String.
225      *
226      * @param change
227      *            DataChangeEvent
228      * @return Data in printable form.
229      */
230     private String prepareXmlFrom(
231             DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
232         Document doc = createDocument();
233         Element notificationElement = doc.createElementNS(
234                 "urn:ietf:params:xml:ns:netconf:notification:1.0",
235                 "notification");
236         doc.appendChild(notificationElement);
237
238         Element eventTimeElement = doc.createElement("eventTime");
239         eventTimeElement.setTextContent(toRFC3339(new Date()));
240         notificationElement.appendChild(eventTimeElement);
241
242         Element dataChangedNotificationEventElement = doc.createElementNS(
243                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote",
244                 "data-changed-notification");
245         addValuesToDataChangedNotificationEventElement(doc,
246                 dataChangedNotificationEventElement, change);
247         notificationElement.appendChild(dataChangedNotificationEventElement);
248
249         try {
250             ByteArrayOutputStream out = new ByteArrayOutputStream();
251             TransformerFactory tf = TransformerFactory.newInstance();
252             Transformer transformer = tf.newTransformer();
253             transformer
254                     .setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
255             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
256             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
257             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
258             transformer.setOutputProperty(
259                     "{http://xml.apache.org/xslt}indent-amount", "4");
260             transformer.transform(new DOMSource(doc), new StreamResult(
261                     new OutputStreamWriter(out, "UTF-8")));
262             byte[] charData = out.toByteArray();
263             return new String(charData, "UTF-8");
264         } catch (TransformerException | UnsupportedEncodingException e) {
265             String msg = "Error during transformation of Document into String";
266             logger.error(msg, e);
267             return msg;
268         }
269     }
270
271     /**
272      * Formats data specified by RFC3339.
273      *
274      * @param d
275      *            Date
276      * @return Data specified by RFC3339.
277      */
278     private String toRFC3339(Date d) {
279         return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2");
280     }
281
282     /**
283      * Creates {@link Document} document.
284      *
285      * @return {@link Document} document.
286      */
287     private Document createDocument() {
288         DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
289         Document doc = null;
290         try {
291             DocumentBuilder bob = dbf.newDocumentBuilder();
292             doc = bob.newDocument();
293         } catch (ParserConfigurationException e) {
294             return null;
295         }
296         return doc;
297     }
298
299     /**
300      * Adds values to data changed notification event element.
301      *
302      * @param doc
303      *            {@link Document}
304      * @param dataChangedNotificationEventElement
305      *            {@link Element}
306      * @param change
307      *            {@link DataChangeEvent}
308      */
309     private void addValuesToDataChangedNotificationEventElement(Document doc,
310             Element dataChangedNotificationEventElement,
311             DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
312         addValuesFromDataToElement(doc, change.getCreatedConfigurationData(),
313                 dataChangedNotificationEventElement, Store.CONFIG,
314                 Operation.CREATED);
315         addValuesFromDataToElement(doc, change.getCreatedOperationalData(),
316                 dataChangedNotificationEventElement, Store.OPERATION,
317                 Operation.CREATED);
318         if (change.getCreatedConfigurationData().isEmpty()) {
319             addValuesFromDataToElement(doc,
320                     change.getUpdatedConfigurationData(),
321                     dataChangedNotificationEventElement, Store.CONFIG,
322                     Operation.UPDATED);
323         }
324         if (change.getCreatedOperationalData().isEmpty()) {
325             addValuesFromDataToElement(doc, change.getUpdatedOperationalData(),
326                     dataChangedNotificationEventElement, Store.OPERATION,
327                     Operation.UPDATED);
328         }
329         addValuesFromDataToElement(doc, change.getRemovedConfigurationData(),
330                 dataChangedNotificationEventElement, Store.CONFIG,
331                 Operation.DELETED);
332         addValuesFromDataToElement(doc, change.getRemovedOperationalData(),
333                 dataChangedNotificationEventElement, Store.OPERATION,
334                 Operation.DELETED);
335     }
336
337     /**
338      * Adds values from data to element.
339      *
340      * @param doc
341      *            {@link Document}
342      * @param data
343      *            Set of {@link InstanceIdentifier}.
344      * @param element
345      *            {@link Element}
346      * @param store
347      *            {@link Store}
348      * @param operation
349      *            {@link Operation}
350      */
351     private void addValuesFromDataToElement(Document doc,
352             Set<InstanceIdentifier> data, Element element, Store store,
353             Operation operation) {
354         if (data == null || data.isEmpty()) {
355             return;
356         }
357         for (InstanceIdentifier path : data) {
358             Node node = createDataChangeEventElement(doc, path, null, store,
359                     operation);
360             element.appendChild(node);
361         }
362     }
363
364     /**
365      * Adds values from data to element.
366      *
367      * @param doc
368      *            {@link Document}
369      * @param data
370      *            Map of {@link InstanceIdentifier} and {@link CompositeNode}.
371      * @param element
372      *            {@link Element}
373      * @param store
374      *            {@link Store}
375      * @param operation
376      *            {@link Operation}
377      */
378     private void addValuesFromDataToElement(Document doc,
379             Map<InstanceIdentifier, CompositeNode> data, Element element,
380             Store store, Operation operation) {
381         if (data == null || data.isEmpty()) {
382             return;
383         }
384         for (Entry<InstanceIdentifier, CompositeNode> entry : data.entrySet()) {
385             Node node = createDataChangeEventElement(doc, entry.getKey(),
386                     entry.getValue(), store, operation);
387             element.appendChild(node);
388         }
389     }
390
391     /**
392      * Creates changed event element from data.
393      *
394      * @param doc
395      *            {@link Document}
396      * @param path
397      *            Path to data in data store.
398      * @param data
399      *            {@link CompositeNode}
400      * @param store
401      *            {@link Store}
402      * @param operation
403      *            {@link Operation}
404      * @return {@link Node} node represented by changed event element.
405      */
406     private Node createDataChangeEventElement(Document doc,
407             InstanceIdentifier path, CompositeNode data, Store store,
408             Operation operation) {
409         Element dataChangeEventElement = doc.createElement("data-change-event");
410
411         Element pathElement = doc.createElement("path");
412         addPathAsValueToElement(path, pathElement);
413         dataChangeEventElement.appendChild(pathElement);
414
415         Element storeElement = doc.createElement("store");
416         storeElement.setTextContent(store.value);
417         dataChangeEventElement.appendChild(storeElement);
418
419         Element operationElement = doc.createElement("operation");
420         operationElement.setTextContent(operation.value);
421         dataChangeEventElement.appendChild(operationElement);
422
423         if (data != null) {
424             Element dataElement = doc.createElement("data");
425             Node dataAnyXml = translateToXml(path, data);
426             Node adoptedNode = doc.adoptNode(dataAnyXml);
427             dataElement.appendChild(adoptedNode);
428             dataChangeEventElement.appendChild(dataElement);
429         }
430
431         return dataChangeEventElement;
432     }
433
434     /**
435      * Translates {@link CompositeNode} data to XML format.
436      *
437      * @param path
438      *            Path to data in data store.
439      * @param data
440      *            {@link CompositeNode}
441      * @return Data in XML format.
442      */
443     private Node translateToXml(InstanceIdentifier path, CompositeNode data) {
444         DataNodeContainer schemaNode = ControllerContext.getInstance()
445                 .getDataNodeContainerFor(path);
446         if (schemaNode == null) {
447             logger.info(
448                     "Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.",
449                     path);
450             return null;
451         }
452         try {
453             Document xml = xmlMapper.write(data, schemaNode);
454             return xml.getFirstChild();
455         } catch (UnsupportedDataTypeException e) {
456             logger.error(
457                     "Error occured during translation of notification to XML.",
458                     e);
459             return null;
460         }
461     }
462
463     /**
464      * Adds path as value to element.
465      *
466      * @param path
467      *            Path to data in data store.
468      * @param element
469      *            {@link Element}
470      */
471     private void addPathAsValueToElement(InstanceIdentifier path,
472             Element element) {
473         // Map< key = namespace, value = prefix>
474         Map<String, String> prefixes = new HashMap<>();
475         InstanceIdentifier instanceIdentifier = path;
476         StringBuilder textContent = new StringBuilder();
477         for (PathArgument pathArgument : instanceIdentifier.getPath()) {
478             textContent.append("/");
479             writeIdentifierWithNamespacePrefix(element, textContent,
480                     pathArgument.getNodeType(), prefixes);
481             if (pathArgument instanceof NodeIdentifierWithPredicates) {
482                 Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument)
483                         .getKeyValues();
484                 for (QName keyValue : predicates.keySet()) {
485                     String predicateValue = String.valueOf(predicates
486                             .get(keyValue));
487                     textContent.append("[");
488                     writeIdentifierWithNamespacePrefix(element, textContent,
489                             keyValue, prefixes);
490                     textContent.append("='");
491                     textContent.append(predicateValue);
492                     textContent.append("'");
493                     textContent.append("]");
494                 }
495             } else if (pathArgument instanceof NodeWithValue) {
496                 textContent.append("[.='");
497                 textContent.append(((NodeWithValue) pathArgument).getValue());
498                 textContent.append("'");
499                 textContent.append("]");
500             }
501         }
502         element.setTextContent(textContent.toString());
503     }
504
505     /**
506      * Writes identifier that consists of prefix and QName.
507      *
508      * @param element
509      *            {@link Element}
510      * @param textContent
511      *            StringBuilder
512      * @param qName
513      *            QName
514      * @param prefixes
515      *            Map of namespaces and prefixes.
516      */
517     private static void writeIdentifierWithNamespacePrefix(Element element,
518             StringBuilder textContent, QName qName, Map<String, String> prefixes) {
519         String namespace = qName.getNamespace().toString();
520         String prefix = prefixes.get(namespace);
521         if (prefix == null) {
522             prefix = qName.getPrefix();
523             if (prefix == null || prefix.isEmpty()
524                     || prefixes.containsValue(prefix)) {
525                 prefix = generateNewPrefix(prefixes.values());
526             }
527         }
528
529         element.setAttribute("xmlns:" + prefix, namespace);
530         textContent.append(prefix);
531         prefixes.put(namespace, prefix);
532
533         textContent.append(":");
534         textContent.append(qName.getLocalName());
535     }
536
537     /**
538      * Generates new prefix which consists of four random characters <a-z>.
539      *
540      * @param prefixes
541      *            Collection of prefixes.
542      * @return New prefix which consists of four random characters <a-z>.
543      */
544     private static String generateNewPrefix(Collection<String> prefixes) {
545         StringBuilder result = null;
546         Random random = new Random();
547         do {
548             result = new StringBuilder();
549             for (int i = 0; i < 4; i++) {
550                 int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
551                 result.append(Character.toChars(randomNumber));
552             }
553         } while (prefixes.contains(result.toString()));
554
555         return result.toString();
556     }
557
558     /**
559      * Gets path pointed to data in data store.
560      *
561      * @return Path pointed to data in data store.
562      */
563     public InstanceIdentifier getPath() {
564         return path;
565     }
566
567     /**
568      * Sets {@link ListenerRegistration} registration.
569      *
570      * @param registration
571      *            ListenerRegistration<DataChangeListener>
572      */
573     public void setRegistration(
574             ListenerRegistration<DataChangeListener> registration) {
575         this.registration = registration;
576     }
577
578     /**
579      * Gets the name of the stream.
580      *
581      * @return The name of the stream.
582      */
583     public String getStreamName() {
584         return streamName;
585     }
586
587     /**
588      * Removes all subscribers and unregisters event bus change recorder form
589      * event bus.
590      */
591     public void close() throws Exception {
592         subscribers = new ConcurrentSet<>();
593         registration.close();
594         registration = null;
595         eventBus.unregister(eventBusChangeRecorder);
596     }
597
598     /**
599      * Checks if {@link ListenerRegistration} registration exist.
600      *
601      * @return True if exist, false otherwise.
602      */
603     public boolean isListening() {
604         return registration == null ? false : true;
605     }
606
607     /**
608      * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
609      * subscriber to the event and post event into event bus.
610      *
611      * @param subscriber
612      *            Channel
613      */
614     public void addSubscriber(Channel subscriber) {
615         if (!subscriber.isActive()) {
616             logger.debug("Channel is not active between websocket server and subscriber {}"
617                     + subscriber.remoteAddress());
618         }
619         Event event = new Event(EventType.REGISTER);
620         event.setSubscriber(subscriber);
621         eventBus.post(event);
622     }
623
624     /**
625      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
626      * subscriber to the event and posts event into event bus.
627      *
628      * @param subscriber
629      */
630     public void removeSubscriber(Channel subscriber) {
631         logger.debug("Subscriber {} is removed.", subscriber.remoteAddress());
632         Event event = new Event(EventType.DEREGISTER);
633         event.setSubscriber(subscriber);
634         eventBus.post(event);
635     }
636
637     /**
638      * Checks if exists at least one {@link Channel} subscriber.
639      *
640      * @return True if exist at least one {@link Channel} subscriber, false
641      *         otherwise.
642      */
643     public boolean hasSubscribers() {
644         return !subscribers.isEmpty();
645     }
646
647     /**
648      * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
649      */
650     private static enum Store {
651         CONFIG("config"), OPERATION("operation");
652
653         private final String value;
654
655         private Store(String value) {
656             this.value = value;
657         }
658     }
659
660     /**
661      * Consists of three types {@link Operation#CREATED},
662      * {@link Operation#UPDATED} and {@link Operation#DELETED}.
663      */
664     private static enum Operation {
665         CREATED("created"), UPDATED("updated"), DELETED("deleted");
666
667         private final String value;
668
669         private Operation(String value) {
670             this.value = value;
671         }
672     }
673
674 }