5fd76de98bb51c4e0b322f6c16f0bd17c5d417d2
[controller.git] / opendaylight / md-sal / sal-rest-connector / src / main / java / org / opendaylight / controller / sal / streams / listeners / ListenerAdapter.java
1 package org.opendaylight.controller.sal.streams.listeners;
2
3 import io.netty.channel.Channel;
4 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
5 import io.netty.util.internal.ConcurrentSet;
6
7 import java.io.ByteArrayOutputStream;
8 import java.io.OutputStreamWriter;
9 import java.io.UnsupportedEncodingException;
10 import java.text.SimpleDateFormat;
11 import java.util.Collection;
12 import java.util.Date;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.Map.Entry;
16 import java.util.Random;
17 import java.util.Set;
18 import java.util.concurrent.Executors;
19
20 import javax.activation.UnsupportedDataTypeException;
21 import javax.xml.parsers.DocumentBuilder;
22 import javax.xml.parsers.DocumentBuilderFactory;
23 import javax.xml.parsers.ParserConfigurationException;
24 import javax.xml.transform.OutputKeys;
25 import javax.xml.transform.Transformer;
26 import javax.xml.transform.TransformerException;
27 import javax.xml.transform.TransformerFactory;
28 import javax.xml.transform.dom.DOMSource;
29 import javax.xml.transform.stream.StreamResult;
30
31 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
32 import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
33 import org.opendaylight.controller.sal.rest.impl.XmlMapper;
34 import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
35 import org.opendaylight.yangtools.concepts.ListenerRegistration;
36 import org.opendaylight.yangtools.yang.common.QName;
37 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
38 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
40 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeWithValue;
41 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
42 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.w3c.dom.Document;
46 import org.w3c.dom.Element;
47 import org.w3c.dom.Node;
48
49 import com.google.common.base.Preconditions;
50 import com.google.common.eventbus.AsyncEventBus;
51 import com.google.common.eventbus.EventBus;
52 import com.google.common.eventbus.Subscribe;
53
54 /**
55  * {@link ListenerAdapter} is responsible to track events, which occurred by
56  * changing data in data source.
57  */
58 public class ListenerAdapter implements DataChangeListener {
59
60         private static final Logger logger = LoggerFactory
61                         .getLogger(ListenerAdapter.class);
62         private final XmlMapper xmlMapper = new XmlMapper();
63         private final SimpleDateFormat rfc3339 = new SimpleDateFormat(
64                         "yyyy-MM-dd'T'hh:mm:ssZ");
65
66         private final InstanceIdentifier path;
67         private ListenerRegistration<DataChangeListener> registration;
68         private final String streamName;
69         private Set<Channel> subscribers = new ConcurrentSet<>();
70         private final EventBus eventBus;
71         private final EventBusChangeRecorder eventBusChangeRecorder;
72
73         /**
74          * Creates new {@link ListenerAdapter} listener specified by path and stream
75          * name.
76          * 
77          * @param path
78          *            Path to data in data store.
79          * @param streamName
80          *            The name of the stream.
81          */
82         ListenerAdapter(InstanceIdentifier path, String streamName) {
83                 Preconditions.checkNotNull(path);
84                 Preconditions
85                                 .checkArgument(streamName != null && !streamName.isEmpty());
86                 this.path = path;
87                 this.streamName = streamName;
88                 eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
89                 eventBusChangeRecorder = new EventBusChangeRecorder();
90                 eventBus.register(eventBusChangeRecorder);
91         }
92
93         @Override
94         public void onDataChanged(
95                         DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
96                 if (!change.getCreatedConfigurationData().isEmpty()
97                                 || !change.getCreatedOperationalData().isEmpty()
98                                 || !change.getUpdatedConfigurationData().isEmpty()
99                                 || !change.getUpdatedOperationalData().isEmpty()
100                                 || !change.getRemovedConfigurationData().isEmpty()
101                                 || !change.getRemovedOperationalData().isEmpty()) {
102                         String xml = prepareXmlFrom(change);
103                         Event event = new Event(EventType.NOTIFY);
104                         event.setData(xml);
105                         eventBus.post(event);
106                 }
107         }
108
109         /**
110          * Tracks events of data change by customer.
111          */
112         private final class EventBusChangeRecorder {
113                 @Subscribe
114                 public void recordCustomerChange(Event event) {
115                         if (event.getType() == EventType.REGISTER) {
116                                 Channel subscriber = event.getSubscriber();
117                                 if (!subscribers.contains(subscriber)) {
118                                         subscribers.add(subscriber);
119                                 }
120                         } else if (event.getType() == EventType.DEREGISTER) {
121                                 subscribers.remove(event.getSubscriber());
122                                 Notificator
123                                                 .removeListenerIfNoSubscriberExists(ListenerAdapter.this);
124                         } else if (event.getType() == EventType.NOTIFY) {
125                                 for (Channel subscriber : subscribers) {
126                                         if (subscriber.isActive()) {
127                                                 logger.debug("Data are sent to subscriber {}:",
128                                                                 subscriber.remoteAddress());
129                                                 subscriber.writeAndFlush(new TextWebSocketFrame(event
130                                                                 .getData()));
131                                         } else {
132                                                 logger.debug(
133                                                                 "Subscriber {} is removed - channel is not active yet.",
134                                                                 subscriber.remoteAddress());
135                                                 subscribers.remove(subscriber);
136                                         }
137                                 }
138                         }
139                 }
140         }
141
142         /**
143          * Represents event of specific {@link EventType} type, holds data and
144          * {@link Channel} subscriber.
145          */
146         private final class Event {
147                 private final EventType type;
148                 private Channel subscriber;
149                 private String data;
150
151                 /**
152                  * Creates new event specified by {@link EventType} type.
153                  * 
154                  * @param type
155                  *            EventType
156                  */
157                 public Event(EventType type) {
158                         this.type = type;
159                 }
160
161                 /**
162                  * Gets the {@link Channel} subscriber.
163                  * 
164                  * @return Channel
165                  */
166                 public Channel getSubscriber() {
167                         return subscriber;
168                 }
169
170                 /**
171                  * Sets subscriber for event.
172                  * 
173                  * @param subscriber
174                  *            Channel
175                  */
176                 public void setSubscriber(Channel subscriber) {
177                         this.subscriber = subscriber;
178                 }
179
180                 /**
181                  * Gets event data.
182                  * 
183                  * @return String representation of event data.
184                  */
185                 public String getData() {
186                         return data;
187                 }
188
189                 /**
190                  * Sets event data.
191                  * 
192                  * @param String
193                  *            data.
194                  */
195                 public void setData(String data) {
196                         this.data = data;
197                 }
198
199                 /**
200                  * Gets event type.
201                  * 
202                  * @return The type of the event.
203                  */
204                 public EventType getType() {
205                         return type;
206                 }
207         }
208
209         /**
210          * Type of the event.
211          */
212         private enum EventType {
213                 REGISTER, DEREGISTER, NOTIFY;
214         }
215
216         /**
217          * Prepare data in printable form and transform it to String.
218          * 
219          * @param change
220          *            DataChangeEvent
221          * @return Data in printable form.
222          */
223         private String prepareXmlFrom(
224                         DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
225                 Document doc = createDocument();
226                 Element notificationElement = doc.createElementNS(
227                                 "urn:ietf:params:xml:ns:netconf:notification:1.0",
228                                 "notification");
229                 doc.appendChild(notificationElement);
230
231                 Element eventTimeElement = doc.createElement("eventTime");
232                 eventTimeElement.setTextContent(toRFC3339(new Date()));
233                 notificationElement.appendChild(eventTimeElement);
234
235                 Element dataChangedNotificationEventElement = doc.createElementNS(
236                                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote",
237                                 "data-changed-notification");
238                 addValuesToDataChangedNotificationEventElement(doc,
239                                 dataChangedNotificationEventElement, change);
240                 notificationElement.appendChild(dataChangedNotificationEventElement);
241
242                 try {
243                         ByteArrayOutputStream out = new ByteArrayOutputStream();
244                         TransformerFactory tf = TransformerFactory.newInstance();
245                         Transformer transformer = tf.newTransformer();
246                         transformer
247                                         .setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
248                         transformer.setOutputProperty(OutputKeys.METHOD, "xml");
249                         transformer.setOutputProperty(OutputKeys.INDENT, "yes");
250                         transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
251                         transformer.setOutputProperty(
252                                         "{http://xml.apache.org/xslt}indent-amount", "4");
253                         transformer.transform(new DOMSource(doc), new StreamResult(
254                                         new OutputStreamWriter(out, "UTF-8")));
255                         byte[] charData = out.toByteArray();
256                         return new String(charData, "UTF-8");
257                 } catch (TransformerException | UnsupportedEncodingException e) {
258                         String msg = "Error during transformation of Document into String";
259                         logger.error(msg, e);
260                         return msg;
261                 }
262         }
263
264         /**
265          * Formats data specified by RFC3339.
266          * 
267          * @param d
268          *            Date
269          * @return Data specified by RFC3339.
270          */
271         private String toRFC3339(Date d) {
272                 return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2");
273         }
274
275         /**
276          * Creates {@link Document} document.
277          * 
278          * @return {@link Document} document.
279          */
280         private Document createDocument() {
281                 DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
282                 Document doc = null;
283                 try {
284                         DocumentBuilder bob = dbf.newDocumentBuilder();
285                         doc = bob.newDocument();
286                 } catch (ParserConfigurationException e) {
287                         return null;
288                 }
289                 return doc;
290         }
291
292         /**
293          * Adds values to data changed notification event element.
294          * 
295          * @param doc
296          *            {@link Document}
297          * @param dataChangedNotificationEventElement
298          *            {@link Element}
299          * @param change
300          *            {@link DataChangeEvent}
301          */
302         private void addValuesToDataChangedNotificationEventElement(Document doc,
303                         Element dataChangedNotificationEventElement,
304                         DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
305                 addValuesFromDataToElement(doc, change.getCreatedConfigurationData(),
306                                 dataChangedNotificationEventElement, Store.CONFIG,
307                                 Operation.CREATED);
308                 addValuesFromDataToElement(doc, change.getCreatedOperationalData(),
309                                 dataChangedNotificationEventElement, Store.OPERATION,
310                                 Operation.CREATED);
311                 if (change.getCreatedConfigurationData().isEmpty()) {
312                         addValuesFromDataToElement(doc,
313                                         change.getUpdatedConfigurationData(),
314                                         dataChangedNotificationEventElement, Store.CONFIG,
315                                         Operation.UPDATED);
316                 }
317                 if (change.getCreatedOperationalData().isEmpty()) {
318                         addValuesFromDataToElement(doc, change.getUpdatedOperationalData(),
319                                         dataChangedNotificationEventElement, Store.OPERATION,
320                                         Operation.UPDATED);
321                 }
322                 addValuesFromDataToElement(doc, change.getRemovedConfigurationData(),
323                                 dataChangedNotificationEventElement, Store.CONFIG,
324                                 Operation.DELETED);
325                 addValuesFromDataToElement(doc, change.getRemovedOperationalData(),
326                                 dataChangedNotificationEventElement, Store.OPERATION,
327                                 Operation.DELETED);
328         }
329
330         /**
331          * Adds values from data to element.
332          * 
333          * @param doc
334          *            {@link Document}
335          * @param data
336          *            Set of {@link InstanceIdentifier}.
337          * @param element
338          *            {@link Element}
339          * @param store
340          *            {@link Store}
341          * @param operation
342          *            {@link Operation}
343          */
344         private void addValuesFromDataToElement(Document doc,
345                         Set<InstanceIdentifier> data, Element element, Store store,
346                         Operation operation) {
347                 if (data == null || data.isEmpty()) {
348                         return;
349                 }
350                 for (InstanceIdentifier path : data) {
351                         Node node = createDataChangeEventElement(doc, path, null, store,
352                                         operation);
353                         element.appendChild(node);
354                 }
355         }
356
357         /**
358          * Adds values from data to element.
359          * 
360          * @param doc
361          *            {@link Document}
362          * @param data
363          *            Map of {@link InstanceIdentifier} and {@link CompositeNode}.
364          * @param element
365          *            {@link Element}
366          * @param store
367          *            {@link Store}
368          * @param operation
369          *            {@link Operation}
370          */
371         private void addValuesFromDataToElement(Document doc,
372                         Map<InstanceIdentifier, CompositeNode> data, Element element,
373                         Store store, Operation operation) {
374                 if (data == null || data.isEmpty()) {
375                         return;
376                 }
377                 for (Entry<InstanceIdentifier, CompositeNode> entry : data.entrySet()) {
378                         Node node = createDataChangeEventElement(doc, entry.getKey(),
379                                         entry.getValue(), store, operation);
380                         element.appendChild(node);
381                 }
382         }
383
384         /**
385          * Creates changed event element from data.
386          * 
387          * @param doc
388          *            {@link Document}
389          * @param path
390          *            Path to data in data store.
391          * @param data
392          *            {@link CompositeNode}
393          * @param store
394          *            {@link Store}
395          * @param operation
396          *            {@link Operation}
397          * @return {@link Node} node represented by changed event element.
398          */
399         private Node createDataChangeEventElement(Document doc,
400                         InstanceIdentifier path, CompositeNode data, Store store,
401                         Operation operation) {
402                 Element dataChangeEventElement = doc.createElement("data-change-event");
403
404                 Element pathElement = doc.createElement("path");
405                 addPathAsValueToElement(path, pathElement);
406                 dataChangeEventElement.appendChild(pathElement);
407
408                 Element storeElement = doc.createElement("store");
409                 storeElement.setTextContent(store.value);
410                 dataChangeEventElement.appendChild(storeElement);
411
412                 Element operationElement = doc.createElement("operation");
413                 operationElement.setTextContent(operation.value);
414                 dataChangeEventElement.appendChild(operationElement);
415
416                 if (data != null) {
417                         Element dataElement = doc.createElement("data");
418                         Node dataAnyXml = translateToXml(path, data);
419                         Node adoptedNode = doc.adoptNode(dataAnyXml);
420                         dataElement.appendChild(adoptedNode);
421                         dataChangeEventElement.appendChild(dataElement);
422                 }
423
424                 return dataChangeEventElement;
425         }
426
427         /**
428          * Translates {@link CompositeNode} data to XML format.
429          * 
430          * @param path
431          *            Path to data in data store.
432          * @param data
433          *            {@link CompositeNode}
434          * @return Data in XML format.
435          */
436         private Node translateToXml(InstanceIdentifier path, CompositeNode data) {
437                 DataNodeContainer schemaNode = ControllerContext.getInstance()
438                                 .getDataNodeContainerFor(path);
439                 if (schemaNode == null) {
440                         logger.info(
441                                         "Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.",
442                                         path);
443                         return null;
444                 }
445                 try {
446                         Document xml = xmlMapper.write(data, schemaNode);
447                         return xml.getFirstChild();
448                 } catch (UnsupportedDataTypeException e) {
449                         logger.error(
450                                         "Error occured during translation of notification to XML.",
451                                         e);
452                         return null;
453                 }
454         }
455
456         /**
457          * Adds path as value to element.
458          * 
459          * @param path
460          *            Path to data in data store.
461          * @param element
462          *            {@link Element}
463          */
464         private void addPathAsValueToElement(InstanceIdentifier path,
465                         Element element) {
466                 // Map< key = namespace, value = prefix>
467                 Map<String, String> prefixes = new HashMap<>();
468                 InstanceIdentifier instanceIdentifier = path;
469                 StringBuilder textContent = new StringBuilder();
470                 for (PathArgument pathArgument : instanceIdentifier.getPath()) {
471                         textContent.append("/");
472                         writeIdentifierWithNamespacePrefix(element, textContent,
473                                         pathArgument.getNodeType(), prefixes);
474                         if (pathArgument instanceof NodeIdentifierWithPredicates) {
475                                 Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument)
476                                                 .getKeyValues();
477                                 for (QName keyValue : predicates.keySet()) {
478                                         String predicateValue = String.valueOf(predicates
479                                                         .get(keyValue));
480                                         textContent.append("[");
481                                         writeIdentifierWithNamespacePrefix(element, textContent,
482                                                         keyValue, prefixes);
483                                         textContent.append("='");
484                                         textContent.append(predicateValue);
485                                         textContent.append("'");
486                                         textContent.append("]");
487                                 }
488                         } else if (pathArgument instanceof NodeWithValue) {
489                                 textContent.append("[.='");
490                                 textContent.append(((NodeWithValue) pathArgument).getValue());
491                                 textContent.append("'");
492                                 textContent.append("]");
493                         }
494                 }
495                 element.setTextContent(textContent.toString());
496         }
497
498         /**
499          * Writes identifier that consists of prefix and QName.
500          * 
501          * @param element
502          *            {@link Element}
503          * @param textContent
504          *            StringBuilder
505          * @param qName
506          *            QName
507          * @param prefixes
508          *            Map of namespaces and prefixes.
509          */
510         private static void writeIdentifierWithNamespacePrefix(Element element,
511                         StringBuilder textContent, QName qName, Map<String, String> prefixes) {
512                 String namespace = qName.getNamespace().toString();
513                 String prefix = prefixes.get(namespace);
514                 if (prefix == null) {
515                         prefix = qName.getPrefix();
516                         if (prefix == null || prefix.isEmpty()
517                                         || prefixes.containsValue(prefix)) {
518                                 prefix = generateNewPrefix(prefixes.values());
519                         }
520                 }
521
522                 element.setAttribute("xmlns:" + prefix, namespace.toString());
523                 textContent.append(prefix);
524                 prefixes.put(namespace, prefix);
525
526                 textContent.append(":");
527                 textContent.append(qName.getLocalName());
528         }
529
530         /**
531          * Generates new prefix which consists of four random characters <a-z>.
532          * 
533          * @param prefixes
534          *            Collection of prefixes.
535          * @return New prefix which consists of four random characters <a-z>.
536          */
537         private static String generateNewPrefix(Collection<String> prefixes) {
538                 StringBuilder result = null;
539                 Random random = new Random();
540                 do {
541                         result = new StringBuilder();
542                         for (int i = 0; i < 4; i++) {
543                                 int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
544                                 result.append(Character.toChars(randomNumber));
545                         }
546                 } while (prefixes.contains(result.toString()));
547
548                 return result.toString();
549         }
550
551         /**
552          * Gets path pointed to data in data store.
553          * 
554          * @return Path pointed to data in data store.
555          */
556         public InstanceIdentifier getPath() {
557                 return path;
558         }
559
560         /**
561          * Sets {@link ListenerRegistration} registration.
562          * 
563          * @param registration
564          *            ListenerRegistration<DataChangeListener>
565          */
566         public void setRegistration(
567                         ListenerRegistration<DataChangeListener> registration) {
568                 this.registration = registration;
569         }
570
571         /**
572          * Gets the name of the stream.
573          * 
574          * @return The name of the stream.
575          */
576         public String getStreamName() {
577                 return streamName;
578         }
579
580         /**
581          * Removes all subscribers and unregisters event bus change recorder form
582          * event bus.
583          */
584         public void close() throws Exception {
585                 subscribers = new ConcurrentSet<>();
586                 registration.close();
587                 registration = null;
588                 eventBus.unregister(eventBusChangeRecorder);
589         }
590
591         /**
592          * Checks if {@link ListenerRegistration} registration exist.
593          * 
594          * @return True if exist, false otherwise.
595          */
596         public boolean isListening() {
597                 return registration == null ? false : true;
598         }
599
600         /**
601          * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
602          * subscriber to the event and post event into event bus.
603          * 
604          * @param subscriber
605          *            Channel
606          */
607         public void addSubscriber(Channel subscriber) {
608                 if (!subscriber.isActive()) {
609                         logger.debug("Channel is not active between websocket server and subscriber {}"
610                                         + subscriber.remoteAddress());
611                 }
612                 Event event = new Event(EventType.REGISTER);
613                 event.setSubscriber(subscriber);
614                 eventBus.post(event);
615         }
616
617         /**
618          * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
619          * subscriber to the event and posts event into event bus.
620          * 
621          * @param subscriber
622          */
623         public void removeSubscriber(Channel subscriber) {
624                 logger.debug("Subscriber {} is removed.", subscriber.remoteAddress());
625                 Event event = new Event(EventType.DEREGISTER);
626                 event.setSubscriber(subscriber);
627                 eventBus.post(event);
628         }
629
630         /**
631          * Checks if exists at least one {@link Channel} subscriber.
632          * 
633          * @return True if exist at least one {@link Channel} subscriber, false
634          *         otherwise.
635          */
636         public boolean hasSubscribers() {
637                 return !subscribers.isEmpty();
638         }
639
640         /**
641          * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
642          */
643         private static enum Store {
644                 CONFIG("config"), OPERATION("operation");
645
646                 private final String value;
647
648                 private Store(String value) {
649                         this.value = value;
650                 }
651         }
652
653         /**
654          * Consists of three types {@link Operation#CREATED},
655          * {@link Operation#UPDATED} and {@link Operation#DELETED}.
656          */
657         private static enum Operation {
658                 CREATED("created"), UPDATED("updated"), DELETED("deleted");
659
660                 private final String value;
661
662                 private Operation(String value) {
663                         this.value = value;
664                 }
665         }
666
667 }