BUG-868: use a single version of ClassLoaderUtils
[controller.git] / opendaylight / md-sal / sal-rest-connector / src / main / java / org / opendaylight / controller / sal / streams / listeners / ListenerAdapter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.streams.listeners;
9
10 import io.netty.channel.Channel;
11 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
12 import io.netty.util.internal.ConcurrentSet;
13
14 import java.io.ByteArrayOutputStream;
15 import java.io.OutputStreamWriter;
16 import java.io.UnsupportedEncodingException;
17 import java.text.SimpleDateFormat;
18 import java.util.Collection;
19 import java.util.Date;
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.Random;
24 import java.util.Set;
25 import java.util.concurrent.Executors;
26
27 import javax.activation.UnsupportedDataTypeException;
28 import javax.xml.parsers.DocumentBuilder;
29 import javax.xml.parsers.DocumentBuilderFactory;
30 import javax.xml.parsers.ParserConfigurationException;
31 import javax.xml.transform.OutputKeys;
32 import javax.xml.transform.Transformer;
33 import javax.xml.transform.TransformerException;
34 import javax.xml.transform.TransformerFactory;
35 import javax.xml.transform.dom.DOMSource;
36 import javax.xml.transform.stream.StreamResult;
37
38 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
39 import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
40 import org.opendaylight.controller.sal.rest.impl.XmlMapper;
41 import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
42 import org.opendaylight.yangtools.concepts.ListenerRegistration;
43 import org.opendaylight.yangtools.yang.common.QName;
44 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
45 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
47 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeWithValue;
48 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
49 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import org.w3c.dom.Document;
53 import org.w3c.dom.Element;
54 import org.w3c.dom.Node;
55
56 import com.google.common.base.Preconditions;
57 import com.google.common.eventbus.AsyncEventBus;
58 import com.google.common.eventbus.EventBus;
59 import com.google.common.eventbus.Subscribe;
60
61 /**
62  * {@link ListenerAdapter} is responsible to track events, which occurred by
63  * changing data in data source.
64  */
65 public class ListenerAdapter implements DataChangeListener {
66
67         private static final Logger logger = LoggerFactory
68                         .getLogger(ListenerAdapter.class);
69         private final XmlMapper xmlMapper = new XmlMapper();
70         private final SimpleDateFormat rfc3339 = new SimpleDateFormat(
71                         "yyyy-MM-dd'T'hh:mm:ssZ");
72
73         private final InstanceIdentifier path;
74         private ListenerRegistration<DataChangeListener> registration;
75         private final String streamName;
76         private Set<Channel> subscribers = new ConcurrentSet<>();
77         private final EventBus eventBus;
78         private final EventBusChangeRecorder eventBusChangeRecorder;
79
80         /**
81          * Creates new {@link ListenerAdapter} listener specified by path and stream
82          * name.
83          * 
84          * @param path
85          *            Path to data in data store.
86          * @param streamName
87          *            The name of the stream.
88          */
89         ListenerAdapter(InstanceIdentifier path, String streamName) {
90                 Preconditions.checkNotNull(path);
91                 Preconditions
92                                 .checkArgument(streamName != null && !streamName.isEmpty());
93                 this.path = path;
94                 this.streamName = streamName;
95                 eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
96                 eventBusChangeRecorder = new EventBusChangeRecorder();
97                 eventBus.register(eventBusChangeRecorder);
98         }
99
100         @Override
101         public void onDataChanged(
102                         DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
103                 if (!change.getCreatedConfigurationData().isEmpty()
104                                 || !change.getCreatedOperationalData().isEmpty()
105                                 || !change.getUpdatedConfigurationData().isEmpty()
106                                 || !change.getUpdatedOperationalData().isEmpty()
107                                 || !change.getRemovedConfigurationData().isEmpty()
108                                 || !change.getRemovedOperationalData().isEmpty()) {
109                         String xml = prepareXmlFrom(change);
110                         Event event = new Event(EventType.NOTIFY);
111                         event.setData(xml);
112                         eventBus.post(event);
113                 }
114         }
115
116         /**
117          * Tracks events of data change by customer.
118          */
119         private final class EventBusChangeRecorder {
120                 @Subscribe
121                 public void recordCustomerChange(Event event) {
122                         if (event.getType() == EventType.REGISTER) {
123                                 Channel subscriber = event.getSubscriber();
124                                 if (!subscribers.contains(subscriber)) {
125                                         subscribers.add(subscriber);
126                                 }
127                         } else if (event.getType() == EventType.DEREGISTER) {
128                                 subscribers.remove(event.getSubscriber());
129                                 Notificator
130                                                 .removeListenerIfNoSubscriberExists(ListenerAdapter.this);
131                         } else if (event.getType() == EventType.NOTIFY) {
132                                 for (Channel subscriber : subscribers) {
133                                         if (subscriber.isActive()) {
134                                                 logger.debug("Data are sent to subscriber {}:",
135                                                                 subscriber.remoteAddress());
136                                                 subscriber.writeAndFlush(new TextWebSocketFrame(event
137                                                                 .getData()));
138                                         } else {
139                                                 logger.debug(
140                                                                 "Subscriber {} is removed - channel is not active yet.",
141                                                                 subscriber.remoteAddress());
142                                                 subscribers.remove(subscriber);
143                                         }
144                                 }
145                         }
146                 }
147         }
148
149         /**
150          * Represents event of specific {@link EventType} type, holds data and
151          * {@link Channel} subscriber.
152          */
153         private final class Event {
154                 private final EventType type;
155                 private Channel subscriber;
156                 private String data;
157
158                 /**
159                  * Creates new event specified by {@link EventType} type.
160                  * 
161                  * @param type
162                  *            EventType
163                  */
164                 public Event(EventType type) {
165                         this.type = type;
166                 }
167
168                 /**
169                  * Gets the {@link Channel} subscriber.
170                  * 
171                  * @return Channel
172                  */
173                 public Channel getSubscriber() {
174                         return subscriber;
175                 }
176
177                 /**
178                  * Sets subscriber for event.
179                  * 
180                  * @param subscriber
181                  *            Channel
182                  */
183                 public void setSubscriber(Channel subscriber) {
184                         this.subscriber = subscriber;
185                 }
186
187                 /**
188                  * Gets event data.
189                  * 
190                  * @return String representation of event data.
191                  */
192                 public String getData() {
193                         return data;
194                 }
195
196                 /**
197                  * Sets event data.
198                  * 
199                  * @param String
200                  *            data.
201                  */
202                 public void setData(String data) {
203                         this.data = data;
204                 }
205
206                 /**
207                  * Gets event type.
208                  * 
209                  * @return The type of the event.
210                  */
211                 public EventType getType() {
212                         return type;
213                 }
214         }
215
216         /**
217          * Type of the event.
218          */
219         private enum EventType {
220                 REGISTER, DEREGISTER, NOTIFY;
221         }
222
223         /**
224          * Prepare data in printable form and transform it to String.
225          * 
226          * @param change
227          *            DataChangeEvent
228          * @return Data in printable form.
229          */
230         private String prepareXmlFrom(
231                         DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
232                 Document doc = createDocument();
233                 Element notificationElement = doc.createElementNS(
234                                 "urn:ietf:params:xml:ns:netconf:notification:1.0",
235                                 "notification");
236                 doc.appendChild(notificationElement);
237
238                 Element eventTimeElement = doc.createElement("eventTime");
239                 eventTimeElement.setTextContent(toRFC3339(new Date()));
240                 notificationElement.appendChild(eventTimeElement);
241
242                 Element dataChangedNotificationEventElement = doc.createElementNS(
243                                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote",
244                                 "data-changed-notification");
245                 addValuesToDataChangedNotificationEventElement(doc,
246                                 dataChangedNotificationEventElement, change);
247                 notificationElement.appendChild(dataChangedNotificationEventElement);
248
249                 try {
250                         ByteArrayOutputStream out = new ByteArrayOutputStream();
251                         TransformerFactory tf = TransformerFactory.newInstance();
252                         Transformer transformer = tf.newTransformer();
253                         transformer
254                                         .setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
255                         transformer.setOutputProperty(OutputKeys.METHOD, "xml");
256                         transformer.setOutputProperty(OutputKeys.INDENT, "yes");
257                         transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
258                         transformer.setOutputProperty(
259                                         "{http://xml.apache.org/xslt}indent-amount", "4");
260                         transformer.transform(new DOMSource(doc), new StreamResult(
261                                         new OutputStreamWriter(out, "UTF-8")));
262                         byte[] charData = out.toByteArray();
263                         return new String(charData, "UTF-8");
264                 } catch (TransformerException | UnsupportedEncodingException e) {
265                         String msg = "Error during transformation of Document into String";
266                         logger.error(msg, e);
267                         return msg;
268                 }
269         }
270
271         /**
272          * Formats data specified by RFC3339.
273          * 
274          * @param d
275          *            Date
276          * @return Data specified by RFC3339.
277          */
278         private String toRFC3339(Date d) {
279                 return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2");
280         }
281
282         /**
283          * Creates {@link Document} document.
284          * 
285          * @return {@link Document} document.
286          */
287         private Document createDocument() {
288                 DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
289                 Document doc = null;
290                 try {
291                         DocumentBuilder bob = dbf.newDocumentBuilder();
292                         doc = bob.newDocument();
293                 } catch (ParserConfigurationException e) {
294                         return null;
295                 }
296                 return doc;
297         }
298
299         /**
300          * Adds values to data changed notification event element.
301          * 
302          * @param doc
303          *            {@link Document}
304          * @param dataChangedNotificationEventElement
305          *            {@link Element}
306          * @param change
307          *            {@link DataChangeEvent}
308          */
309         private void addValuesToDataChangedNotificationEventElement(Document doc,
310                         Element dataChangedNotificationEventElement,
311                         DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
312                 addValuesFromDataToElement(doc, change.getCreatedConfigurationData(),
313                                 dataChangedNotificationEventElement, Store.CONFIG,
314                                 Operation.CREATED);
315                 addValuesFromDataToElement(doc, change.getCreatedOperationalData(),
316                                 dataChangedNotificationEventElement, Store.OPERATION,
317                                 Operation.CREATED);
318                 if (change.getCreatedConfigurationData().isEmpty()) {
319                         addValuesFromDataToElement(doc,
320                                         change.getUpdatedConfigurationData(),
321                                         dataChangedNotificationEventElement, Store.CONFIG,
322                                         Operation.UPDATED);
323                 }
324                 if (change.getCreatedOperationalData().isEmpty()) {
325                         addValuesFromDataToElement(doc, change.getUpdatedOperationalData(),
326                                         dataChangedNotificationEventElement, Store.OPERATION,
327                                         Operation.UPDATED);
328                 }
329                 addValuesFromDataToElement(doc, change.getRemovedConfigurationData(),
330                                 dataChangedNotificationEventElement, Store.CONFIG,
331                                 Operation.DELETED);
332                 addValuesFromDataToElement(doc, change.getRemovedOperationalData(),
333                                 dataChangedNotificationEventElement, Store.OPERATION,
334                                 Operation.DELETED);
335         }
336
337         /**
338          * Adds values from data to element.
339          * 
340          * @param doc
341          *            {@link Document}
342          * @param data
343          *            Set of {@link InstanceIdentifier}.
344          * @param element
345          *            {@link Element}
346          * @param store
347          *            {@link Store}
348          * @param operation
349          *            {@link Operation}
350          */
351         private void addValuesFromDataToElement(Document doc,
352                         Set<InstanceIdentifier> data, Element element, Store store,
353                         Operation operation) {
354                 if (data == null || data.isEmpty()) {
355                         return;
356                 }
357                 for (InstanceIdentifier path : data) {
358                         Node node = createDataChangeEventElement(doc, path, null, store,
359                                         operation);
360                         element.appendChild(node);
361                 }
362         }
363
364         /**
365          * Adds values from data to element.
366          * 
367          * @param doc
368          *            {@link Document}
369          * @param data
370          *            Map of {@link InstanceIdentifier} and {@link CompositeNode}.
371          * @param element
372          *            {@link Element}
373          * @param store
374          *            {@link Store}
375          * @param operation
376          *            {@link Operation}
377          */
378         private void addValuesFromDataToElement(Document doc,
379                         Map<InstanceIdentifier, CompositeNode> data, Element element,
380                         Store store, Operation operation) {
381                 if (data == null || data.isEmpty()) {
382                         return;
383                 }
384                 for (Entry<InstanceIdentifier, CompositeNode> entry : data.entrySet()) {
385                         Node node = createDataChangeEventElement(doc, entry.getKey(),
386                                         entry.getValue(), store, operation);
387                         element.appendChild(node);
388                 }
389         }
390
391         /**
392          * Creates changed event element from data.
393          * 
394          * @param doc
395          *            {@link Document}
396          * @param path
397          *            Path to data in data store.
398          * @param data
399          *            {@link CompositeNode}
400          * @param store
401          *            {@link Store}
402          * @param operation
403          *            {@link Operation}
404          * @return {@link Node} node represented by changed event element.
405          */
406         private Node createDataChangeEventElement(Document doc,
407                         InstanceIdentifier path, CompositeNode data, Store store,
408                         Operation operation) {
409                 Element dataChangeEventElement = doc.createElement("data-change-event");
410
411                 Element pathElement = doc.createElement("path");
412                 addPathAsValueToElement(path, pathElement);
413                 dataChangeEventElement.appendChild(pathElement);
414
415                 Element storeElement = doc.createElement("store");
416                 storeElement.setTextContent(store.value);
417                 dataChangeEventElement.appendChild(storeElement);
418
419                 Element operationElement = doc.createElement("operation");
420                 operationElement.setTextContent(operation.value);
421                 dataChangeEventElement.appendChild(operationElement);
422
423                 if (data != null) {
424                         Element dataElement = doc.createElement("data");
425                         Node dataAnyXml = translateToXml(path, data);
426                         Node adoptedNode = doc.adoptNode(dataAnyXml);
427                         dataElement.appendChild(adoptedNode);
428                         dataChangeEventElement.appendChild(dataElement);
429                 }
430
431                 return dataChangeEventElement;
432         }
433
434         /**
435          * Translates {@link CompositeNode} data to XML format.
436          * 
437          * @param path
438          *            Path to data in data store.
439          * @param data
440          *            {@link CompositeNode}
441          * @return Data in XML format.
442          */
443         private Node translateToXml(InstanceIdentifier path, CompositeNode data) {
444                 DataNodeContainer schemaNode = ControllerContext.getInstance()
445                                 .getDataNodeContainerFor(path);
446                 if (schemaNode == null) {
447                         logger.info(
448                                         "Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.",
449                                         path);
450                         return null;
451                 }
452                 try {
453                         Document xml = xmlMapper.write(data, schemaNode);
454                         return xml.getFirstChild();
455                 } catch (UnsupportedDataTypeException e) {
456                         logger.error(
457                                         "Error occured during translation of notification to XML.",
458                                         e);
459                         return null;
460                 }
461         }
462
463         /**
464          * Adds path as value to element.
465          * 
466          * @param path
467          *            Path to data in data store.
468          * @param element
469          *            {@link Element}
470          */
471         private void addPathAsValueToElement(InstanceIdentifier path,
472                         Element element) {
473                 // Map< key = namespace, value = prefix>
474                 Map<String, String> prefixes = new HashMap<>();
475                 InstanceIdentifier instanceIdentifier = path;
476                 StringBuilder textContent = new StringBuilder();
477                 for (PathArgument pathArgument : instanceIdentifier.getPath()) {
478                         textContent.append("/");
479                         writeIdentifierWithNamespacePrefix(element, textContent,
480                                         pathArgument.getNodeType(), prefixes);
481                         if (pathArgument instanceof NodeIdentifierWithPredicates) {
482                                 Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument)
483                                                 .getKeyValues();
484                                 for (QName keyValue : predicates.keySet()) {
485                                         String predicateValue = String.valueOf(predicates
486                                                         .get(keyValue));
487                                         textContent.append("[");
488                                         writeIdentifierWithNamespacePrefix(element, textContent,
489                                                         keyValue, prefixes);
490                                         textContent.append("='");
491                                         textContent.append(predicateValue);
492                                         textContent.append("'");
493                                         textContent.append("]");
494                                 }
495                         } else if (pathArgument instanceof NodeWithValue) {
496                                 textContent.append("[.='");
497                                 textContent.append(((NodeWithValue) pathArgument).getValue());
498                                 textContent.append("'");
499                                 textContent.append("]");
500                         }
501                 }
502                 element.setTextContent(textContent.toString());
503         }
504
505         /**
506          * Writes identifier that consists of prefix and QName.
507          * 
508          * @param element
509          *            {@link Element}
510          * @param textContent
511          *            StringBuilder
512          * @param qName
513          *            QName
514          * @param prefixes
515          *            Map of namespaces and prefixes.
516          */
517         private static void writeIdentifierWithNamespacePrefix(Element element,
518                         StringBuilder textContent, QName qName, Map<String, String> prefixes) {
519                 String namespace = qName.getNamespace().toString();
520                 String prefix = prefixes.get(namespace);
521                 if (prefix == null) {
522                         prefix = qName.getPrefix();
523                         if (prefix == null || prefix.isEmpty()
524                                         || prefixes.containsValue(prefix)) {
525                                 prefix = generateNewPrefix(prefixes.values());
526                         }
527                 }
528
529                 element.setAttribute("xmlns:" + prefix, namespace);
530                 textContent.append(prefix);
531                 prefixes.put(namespace, prefix);
532
533                 textContent.append(":");
534                 textContent.append(qName.getLocalName());
535         }
536
537         /**
538          * Generates new prefix which consists of four random characters <a-z>.
539          * 
540          * @param prefixes
541          *            Collection of prefixes.
542          * @return New prefix which consists of four random characters <a-z>.
543          */
544         private static String generateNewPrefix(Collection<String> prefixes) {
545                 StringBuilder result = null;
546                 Random random = new Random();
547                 do {
548                         result = new StringBuilder();
549                         for (int i = 0; i < 4; i++) {
550                                 int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
551                                 result.append(Character.toChars(randomNumber));
552                         }
553                 } while (prefixes.contains(result.toString()));
554
555                 return result.toString();
556         }
557
558         /**
559          * Gets path pointed to data in data store.
560          * 
561          * @return Path pointed to data in data store.
562          */
563         public InstanceIdentifier getPath() {
564                 return path;
565         }
566
567         /**
568          * Sets {@link ListenerRegistration} registration.
569          * 
570          * @param registration
571          *            ListenerRegistration<DataChangeListener>
572          */
573         public void setRegistration(
574                         ListenerRegistration<DataChangeListener> registration) {
575                 this.registration = registration;
576         }
577
578         /**
579          * Gets the name of the stream.
580          * 
581          * @return The name of the stream.
582          */
583         public String getStreamName() {
584                 return streamName;
585         }
586
587         /**
588          * Removes all subscribers and unregisters event bus change recorder form
589          * event bus.
590          */
591         public void close() throws Exception {
592                 subscribers = new ConcurrentSet<>();
593                 registration.close();
594                 registration = null;
595                 eventBus.unregister(eventBusChangeRecorder);
596         }
597
598         /**
599          * Checks if {@link ListenerRegistration} registration exist.
600          * 
601          * @return True if exist, false otherwise.
602          */
603         public boolean isListening() {
604                 return registration == null ? false : true;
605         }
606
607         /**
608          * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
609          * subscriber to the event and post event into event bus.
610          * 
611          * @param subscriber
612          *            Channel
613          */
614         public void addSubscriber(Channel subscriber) {
615                 if (!subscriber.isActive()) {
616                         logger.debug("Channel is not active between websocket server and subscriber {}"
617                                         + subscriber.remoteAddress());
618                 }
619                 Event event = new Event(EventType.REGISTER);
620                 event.setSubscriber(subscriber);
621                 eventBus.post(event);
622         }
623
624         /**
625          * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
626          * subscriber to the event and posts event into event bus.
627          * 
628          * @param subscriber
629          */
630         public void removeSubscriber(Channel subscriber) {
631                 logger.debug("Subscriber {} is removed.", subscriber.remoteAddress());
632                 Event event = new Event(EventType.DEREGISTER);
633                 event.setSubscriber(subscriber);
634                 eventBus.post(event);
635         }
636
637         /**
638          * Checks if exists at least one {@link Channel} subscriber.
639          * 
640          * @return True if exist at least one {@link Channel} subscriber, false
641          *         otherwise.
642          */
643         public boolean hasSubscribers() {
644                 return !subscribers.isEmpty();
645         }
646
647         /**
648          * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
649          */
650         private static enum Store {
651                 CONFIG("config"), OPERATION("operation");
652
653                 private final String value;
654
655                 private Store(String value) {
656                         this.value = value;
657                 }
658         }
659
660         /**
661          * Consists of three types {@link Operation#CREATED},
662          * {@link Operation#UPDATED} and {@link Operation#DELETED}.
663          */
664         private static enum Operation {
665                 CREATED("created"), UPDATED("updated"), DELETED("deleted");
666
667                 private final String value;
668
669                 private Operation(String value) {
670                         this.value = value;
671                 }
672         }
673
674 }