925a09337ca3ee1e8fba707020e3c7a19b8e03f6
[controller.git] / opendaylight / md-sal / sal-rest-connector / src / main / java / org / opendaylight / controller / sal / streams / listeners / ListenerAdapter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15
16 import io.netty.channel.Channel;
17 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
18 import io.netty.util.internal.ConcurrentSet;
19
20 import java.io.ByteArrayOutputStream;
21 import java.io.OutputStreamWriter;
22 import java.io.UnsupportedEncodingException;
23 import java.text.SimpleDateFormat;
24 import java.util.Collection;
25 import java.util.Date;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.Random;
30 import java.util.Set;
31 import java.util.concurrent.Executors;
32 import java.util.regex.Pattern;
33
34 import javax.activation.UnsupportedDataTypeException;
35 import javax.xml.parsers.DocumentBuilder;
36 import javax.xml.parsers.DocumentBuilderFactory;
37 import javax.xml.parsers.ParserConfigurationException;
38 import javax.xml.transform.OutputKeys;
39 import javax.xml.transform.Transformer;
40 import javax.xml.transform.TransformerException;
41 import javax.xml.transform.TransformerFactory;
42 import javax.xml.transform.dom.DOMSource;
43 import javax.xml.transform.stream.StreamResult;
44
45 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
46 import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
47 import org.opendaylight.controller.sal.rest.impl.XmlMapper;
48 import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
49 import org.opendaylight.yangtools.concepts.ListenerRegistration;
50 import org.opendaylight.yangtools.yang.common.QName;
51 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
52 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
54 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeWithValue;
55 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
56 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import org.w3c.dom.Document;
60 import org.w3c.dom.Element;
61 import org.w3c.dom.Node;
62
63 /**
64  * {@link ListenerAdapter} is responsible to track events, which occurred by
65  * changing data in data source.
66  */
67 public class ListenerAdapter implements DataChangeListener {
68
69     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
70     private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
71     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
72     private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
73
74     private final XmlMapper xmlMapper = new XmlMapper();
75     private final SimpleDateFormat rfc3339 = new SimpleDateFormat(
76             "yyyy-MM-dd'T'hh:mm:ssZ");
77
78     private final InstanceIdentifier path;
79     private ListenerRegistration<DataChangeListener> registration;
80     private final String streamName;
81     private Set<Channel> subscribers = new ConcurrentSet<>();
82     private final EventBus eventBus;
83     private final EventBusChangeRecorder eventBusChangeRecorder;
84
85
86     /**
87      * Creates new {@link ListenerAdapter} listener specified by path and stream
88      * name.
89      *
90      * @param path
91      *            Path to data in data store.
92      * @param streamName
93      *            The name of the stream.
94      */
95     ListenerAdapter(final InstanceIdentifier path, final String streamName) {
96         Preconditions.checkNotNull(path);
97         Preconditions
98         .checkArgument(streamName != null && !streamName.isEmpty());
99         this.path = path;
100         this.streamName = streamName;
101         eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
102         eventBusChangeRecorder = new EventBusChangeRecorder();
103         eventBus.register(eventBusChangeRecorder);
104     }
105
106     @Override
107     public void onDataChanged(
108             final DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
109         if (!change.getCreatedConfigurationData().isEmpty()
110                 || !change.getCreatedOperationalData().isEmpty()
111                 || !change.getUpdatedConfigurationData().isEmpty()
112                 || !change.getUpdatedOperationalData().isEmpty()
113                 || !change.getRemovedConfigurationData().isEmpty()
114                 || !change.getRemovedOperationalData().isEmpty()) {
115             String xml = prepareXmlFrom(change);
116             Event event = new Event(EventType.NOTIFY);
117             event.setData(xml);
118             eventBus.post(event);
119         }
120     }
121
122     /**
123      * Tracks events of data change by customer.
124      */
125     private final class EventBusChangeRecorder {
126         @Subscribe
127         public void recordCustomerChange(final Event event) {
128             if (event.getType() == EventType.REGISTER) {
129                 Channel subscriber = event.getSubscriber();
130                 if (!subscribers.contains(subscriber)) {
131                     subscribers.add(subscriber);
132                 }
133             } else if (event.getType() == EventType.DEREGISTER) {
134                 subscribers.remove(event.getSubscriber());
135                 Notificator
136                 .removeListenerIfNoSubscriberExists(ListenerAdapter.this);
137             } else if (event.getType() == EventType.NOTIFY) {
138                 for (Channel subscriber : subscribers) {
139                     if (subscriber.isActive()) {
140                         LOG.debug("Data are sent to subscriber {}:",
141                                 subscriber.remoteAddress());
142                         subscriber.writeAndFlush(new TextWebSocketFrame(event
143                                 .getData()));
144                     } else {
145                         LOG.debug(
146                                 "Subscriber {} is removed - channel is not active yet.",
147                                 subscriber.remoteAddress());
148                         subscribers.remove(subscriber);
149                     }
150                 }
151             }
152         }
153     }
154
155     /**
156      * Represents event of specific {@link EventType} type, holds data and
157      * {@link Channel} subscriber.
158      */
159     private final class Event {
160         private final EventType type;
161         private Channel subscriber;
162         private String data;
163
164         /**
165          * Creates new event specified by {@link EventType} type.
166          *
167          * @param type
168          *            EventType
169          */
170         public Event(final EventType type) {
171             this.type = type;
172         }
173
174         /**
175          * Gets the {@link Channel} subscriber.
176          *
177          * @return Channel
178          */
179         public Channel getSubscriber() {
180             return subscriber;
181         }
182
183         /**
184          * Sets subscriber for event.
185          *
186          * @param subscriber
187          *            Channel
188          */
189         public void setSubscriber(final Channel subscriber) {
190             this.subscriber = subscriber;
191         }
192
193         /**
194          * Gets event data.
195          *
196          * @return String representation of event data.
197          */
198         public String getData() {
199             return data;
200         }
201
202         /**
203          * Sets event data.
204          *
205          * @param String
206          *            data.
207          */
208         public void setData(final String data) {
209             this.data = data;
210         }
211
212         /**
213          * Gets event type.
214          *
215          * @return The type of the event.
216          */
217         public EventType getType() {
218             return type;
219         }
220     }
221
222     /**
223      * Type of the event.
224      */
225     private enum EventType {
226         REGISTER, DEREGISTER, NOTIFY;
227     }
228
229     /**
230      * Prepare data in printable form and transform it to String.
231      *
232      * @param change
233      *            DataChangeEvent
234      * @return Data in printable form.
235      */
236     private String prepareXmlFrom(
237             final DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
238         Document doc = createDocument();
239         Element notificationElement = doc.createElementNS(
240                 "urn:ietf:params:xml:ns:netconf:notification:1.0",
241                 "notification");
242         doc.appendChild(notificationElement);
243
244         Element eventTimeElement = doc.createElement("eventTime");
245         eventTimeElement.setTextContent(toRFC3339(new Date()));
246         notificationElement.appendChild(eventTimeElement);
247
248         Element dataChangedNotificationEventElement = doc.createElementNS(
249                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote",
250                 "data-changed-notification");
251         addValuesToDataChangedNotificationEventElement(doc,
252                 dataChangedNotificationEventElement, change);
253         notificationElement.appendChild(dataChangedNotificationEventElement);
254
255         try {
256             ByteArrayOutputStream out = new ByteArrayOutputStream();
257             Transformer transformer = FACTORY.newTransformer();
258             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
259             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
260             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
261             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
262             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
263             transformer.transform(new DOMSource(doc), new StreamResult(
264                     new OutputStreamWriter(out, Charsets.UTF_8)));
265             byte[] charData = out.toByteArray();
266             return new String(charData, "UTF-8");
267         } catch (TransformerException | UnsupportedEncodingException e) {
268             String msg = "Error during transformation of Document into String";
269             LOG.error(msg, e);
270             return msg;
271         }
272     }
273
274     /**
275      * Formats data specified by RFC3339.
276      *
277      * @param d
278      *            Date
279      * @return Data specified by RFC3339.
280      */
281     private String toRFC3339(final Date d) {
282         return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2");
283     }
284
285     /**
286      * Creates {@link Document} document.
287      *
288      * @return {@link Document} document.
289      */
290     private Document createDocument() {
291         final DocumentBuilder bob;
292         try {
293             bob = DBF.newDocumentBuilder();
294         } catch (ParserConfigurationException e) {
295             return null;
296         }
297         return bob.newDocument();
298     }
299
300     /**
301      * Adds values to data changed notification event element.
302      *
303      * @param doc
304      *            {@link Document}
305      * @param dataChangedNotificationEventElement
306      *            {@link Element}
307      * @param change
308      *            {@link DataChangeEvent}
309      */
310     private void addValuesToDataChangedNotificationEventElement(final Document doc,
311             final Element dataChangedNotificationEventElement,
312             final DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
313         addValuesFromDataToElement(doc, change.getCreatedConfigurationData(),
314                 dataChangedNotificationEventElement, Store.CONFIG,
315                 Operation.CREATED);
316         addValuesFromDataToElement(doc, change.getCreatedOperationalData(),
317                 dataChangedNotificationEventElement, Store.OPERATION,
318                 Operation.CREATED);
319         if (change.getCreatedConfigurationData().isEmpty()) {
320             addValuesFromDataToElement(doc,
321                     change.getUpdatedConfigurationData(),
322                     dataChangedNotificationEventElement, Store.CONFIG,
323                     Operation.UPDATED);
324         }
325         if (change.getCreatedOperationalData().isEmpty()) {
326             addValuesFromDataToElement(doc, change.getUpdatedOperationalData(),
327                     dataChangedNotificationEventElement, Store.OPERATION,
328                     Operation.UPDATED);
329         }
330         addValuesFromDataToElement(doc, change.getRemovedConfigurationData(),
331                 dataChangedNotificationEventElement, Store.CONFIG,
332                 Operation.DELETED);
333         addValuesFromDataToElement(doc, change.getRemovedOperationalData(),
334                 dataChangedNotificationEventElement, Store.OPERATION,
335                 Operation.DELETED);
336     }
337
338     /**
339      * Adds values from data to element.
340      *
341      * @param doc
342      *            {@link Document}
343      * @param data
344      *            Set of {@link InstanceIdentifier}.
345      * @param element
346      *            {@link Element}
347      * @param store
348      *            {@link Store}
349      * @param operation
350      *            {@link Operation}
351      */
352     private void addValuesFromDataToElement(final Document doc,
353             final Set<InstanceIdentifier> data, final Element element, final Store store,
354             final Operation operation) {
355         if (data == null || data.isEmpty()) {
356             return;
357         }
358         for (InstanceIdentifier path : data) {
359             Node node = createDataChangeEventElement(doc, path, null, store,
360                     operation);
361             element.appendChild(node);
362         }
363     }
364
365     /**
366      * Adds values from data to element.
367      *
368      * @param doc
369      *            {@link Document}
370      * @param data
371      *            Map of {@link InstanceIdentifier} and {@link CompositeNode}.
372      * @param element
373      *            {@link Element}
374      * @param store
375      *            {@link Store}
376      * @param operation
377      *            {@link Operation}
378      */
379     private void addValuesFromDataToElement(final Document doc,
380             final Map<InstanceIdentifier, CompositeNode> data, final Element element,
381             final Store store, final Operation operation) {
382         if (data == null || data.isEmpty()) {
383             return;
384         }
385         for (Entry<InstanceIdentifier, CompositeNode> entry : data.entrySet()) {
386             Node node = createDataChangeEventElement(doc, entry.getKey(),
387                     entry.getValue(), store, operation);
388             element.appendChild(node);
389         }
390     }
391
392     /**
393      * Creates changed event element from data.
394      *
395      * @param doc
396      *            {@link Document}
397      * @param path
398      *            Path to data in data store.
399      * @param data
400      *            {@link CompositeNode}
401      * @param store
402      *            {@link Store}
403      * @param operation
404      *            {@link Operation}
405      * @return {@link Node} node represented by changed event element.
406      */
407     private Node createDataChangeEventElement(final Document doc,
408             final InstanceIdentifier path, final CompositeNode data, final Store store,
409             final Operation operation) {
410         Element dataChangeEventElement = doc.createElement("data-change-event");
411
412         Element pathElement = doc.createElement("path");
413         addPathAsValueToElement(path, pathElement);
414         dataChangeEventElement.appendChild(pathElement);
415
416         Element storeElement = doc.createElement("store");
417         storeElement.setTextContent(store.value);
418         dataChangeEventElement.appendChild(storeElement);
419
420         Element operationElement = doc.createElement("operation");
421         operationElement.setTextContent(operation.value);
422         dataChangeEventElement.appendChild(operationElement);
423
424         if (data != null) {
425             Element dataElement = doc.createElement("data");
426             Node dataAnyXml = translateToXml(path, data);
427             Node adoptedNode = doc.adoptNode(dataAnyXml);
428             dataElement.appendChild(adoptedNode);
429             dataChangeEventElement.appendChild(dataElement);
430         }
431
432         return dataChangeEventElement;
433     }
434
435     /**
436      * Translates {@link CompositeNode} data to XML format.
437      *
438      * @param path
439      *            Path to data in data store.
440      * @param data
441      *            {@link CompositeNode}
442      * @return Data in XML format.
443      */
444     private Node translateToXml(final InstanceIdentifier path, final CompositeNode data) {
445         DataNodeContainer schemaNode = ControllerContext.getInstance()
446                 .getDataNodeContainerFor(path);
447         if (schemaNode == null) {
448             LOG.info(
449                     "Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.",
450                     path);
451             return null;
452         }
453         try {
454             Document xml = xmlMapper.write(data, schemaNode);
455             return xml.getFirstChild();
456         } catch (UnsupportedDataTypeException e) {
457             LOG.error(
458                     "Error occured during translation of notification to XML.",
459                     e);
460             return null;
461         }
462     }
463
464     /**
465      * Adds path as value to element.
466      *
467      * @param path
468      *            Path to data in data store.
469      * @param element
470      *            {@link Element}
471      */
472     private void addPathAsValueToElement(final InstanceIdentifier path,
473             final Element element) {
474         // Map< key = namespace, value = prefix>
475         Map<String, String> prefixes = new HashMap<>();
476         InstanceIdentifier instanceIdentifier = path;
477         StringBuilder textContent = new StringBuilder();
478
479         // FIXME: BUG-1281: this is duplicated code from yangtools (BUG-1275)
480         for (PathArgument pathArgument : instanceIdentifier.getPathArguments()) {
481             textContent.append("/");
482             writeIdentifierWithNamespacePrefix(element, textContent,
483                     pathArgument.getNodeType(), prefixes);
484             if (pathArgument instanceof NodeIdentifierWithPredicates) {
485                 Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument)
486                         .getKeyValues();
487                 for (QName keyValue : predicates.keySet()) {
488                     String predicateValue = String.valueOf(predicates
489                             .get(keyValue));
490                     textContent.append("[");
491                     writeIdentifierWithNamespacePrefix(element, textContent,
492                             keyValue, prefixes);
493                     textContent.append("='");
494                     textContent.append(predicateValue);
495                     textContent.append("'");
496                     textContent.append("]");
497                 }
498             } else if (pathArgument instanceof NodeWithValue) {
499                 textContent.append("[.='");
500                 textContent.append(((NodeWithValue) pathArgument).getValue());
501                 textContent.append("'");
502                 textContent.append("]");
503             }
504         }
505         element.setTextContent(textContent.toString());
506     }
507
508     /**
509      * Writes identifier that consists of prefix and QName.
510      *
511      * @param element
512      *            {@link Element}
513      * @param textContent
514      *            StringBuilder
515      * @param qName
516      *            QName
517      * @param prefixes
518      *            Map of namespaces and prefixes.
519      */
520     private static void writeIdentifierWithNamespacePrefix(final Element element,
521             final StringBuilder textContent, final QName qName, final Map<String, String> prefixes) {
522         String namespace = qName.getNamespace().toString();
523         String prefix = prefixes.get(namespace);
524         if (prefix == null) {
525             prefix = qName.getPrefix();
526             if (prefix == null || prefix.isEmpty()
527                     || prefixes.containsValue(prefix)) {
528                 prefix = generateNewPrefix(prefixes.values());
529             }
530         }
531
532         element.setAttribute("xmlns:" + prefix, namespace);
533         textContent.append(prefix);
534         prefixes.put(namespace, prefix);
535
536         textContent.append(":");
537         textContent.append(qName.getLocalName());
538     }
539
540     /**
541      * Generates new prefix which consists of four random characters <a-z>.
542      *
543      * @param prefixes
544      *            Collection of prefixes.
545      * @return New prefix which consists of four random characters <a-z>.
546      */
547     private static String generateNewPrefix(final Collection<String> prefixes) {
548         StringBuilder result = null;
549         Random random = new Random();
550         do {
551             result = new StringBuilder();
552             for (int i = 0; i < 4; i++) {
553                 int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
554                 result.append(Character.toChars(randomNumber));
555             }
556         } while (prefixes.contains(result.toString()));
557
558         return result.toString();
559     }
560
561     /**
562      * Gets path pointed to data in data store.
563      *
564      * @return Path pointed to data in data store.
565      */
566     public InstanceIdentifier getPath() {
567         return path;
568     }
569
570     /**
571      * Sets {@link ListenerRegistration} registration.
572      *
573      * @param registration
574      *            ListenerRegistration<DataChangeListener>
575      */
576     public void setRegistration(
577             final ListenerRegistration<DataChangeListener> registration) {
578         this.registration = registration;
579     }
580
581     /**
582      * Gets the name of the stream.
583      *
584      * @return The name of the stream.
585      */
586     public String getStreamName() {
587         return streamName;
588     }
589
590     /**
591      * Removes all subscribers and unregisters event bus change recorder form
592      * event bus.
593      */
594     public void close() throws Exception {
595         subscribers = new ConcurrentSet<>();
596         registration.close();
597         registration = null;
598         eventBus.unregister(eventBusChangeRecorder);
599     }
600
601     /**
602      * Checks if {@link ListenerRegistration} registration exist.
603      *
604      * @return True if exist, false otherwise.
605      */
606     public boolean isListening() {
607         return registration == null ? false : true;
608     }
609
610     /**
611      * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
612      * subscriber to the event and post event into event bus.
613      *
614      * @param subscriber
615      *            Channel
616      */
617     public void addSubscriber(final Channel subscriber) {
618         if (!subscriber.isActive()) {
619             LOG.debug("Channel is not active between websocket server and subscriber {}"
620                     + subscriber.remoteAddress());
621         }
622         Event event = new Event(EventType.REGISTER);
623         event.setSubscriber(subscriber);
624         eventBus.post(event);
625     }
626
627     /**
628      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
629      * subscriber to the event and posts event into event bus.
630      *
631      * @param subscriber
632      */
633     public void removeSubscriber(final Channel subscriber) {
634         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
635         Event event = new Event(EventType.DEREGISTER);
636         event.setSubscriber(subscriber);
637         eventBus.post(event);
638     }
639
640     /**
641      * Checks if exists at least one {@link Channel} subscriber.
642      *
643      * @return True if exist at least one {@link Channel} subscriber, false
644      *         otherwise.
645      */
646     public boolean hasSubscribers() {
647         return !subscribers.isEmpty();
648     }
649
650     /**
651      * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
652      */
653     private static enum Store {
654         CONFIG("config"), OPERATION("operation");
655
656         private final String value;
657
658         private Store(final String value) {
659             this.value = value;
660         }
661     }
662
663     /**
664      * Consists of three types {@link Operation#CREATED},
665      * {@link Operation#UPDATED} and {@link Operation#DELETED}.
666      */
667     private static enum Operation {
668         CREATED("created"), UPDATED("updated"), DELETED("deleted");
669
670         private final String value;
671
672         private Operation(final String value) {
673             this.value = value;
674         }
675     }
676
677 }