Merge "Bug 1125: Added regression test"
[controller.git] / opendaylight / md-sal / sal-rest-connector / src / main / java / org / opendaylight / controller / sal / streams / listeners / ListenerAdapter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.OutputStreamWriter;
20 import java.io.UnsupportedEncodingException;
21 import java.text.SimpleDateFormat;
22 import java.util.Collection;
23 import java.util.Date;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.Random;
28 import java.util.Set;
29 import java.util.concurrent.Executors;
30 import java.util.regex.Pattern;
31 import javax.activation.UnsupportedDataTypeException;
32 import javax.xml.parsers.DocumentBuilder;
33 import javax.xml.parsers.DocumentBuilderFactory;
34 import javax.xml.parsers.ParserConfigurationException;
35 import javax.xml.transform.OutputKeys;
36 import javax.xml.transform.Transformer;
37 import javax.xml.transform.TransformerException;
38 import javax.xml.transform.TransformerFactory;
39 import javax.xml.transform.dom.DOMSource;
40 import javax.xml.transform.stream.StreamResult;
41 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
42 import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
43 import org.opendaylight.controller.sal.rest.impl.XmlMapper;
44 import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
45 import org.opendaylight.yangtools.concepts.ListenerRegistration;
46 import org.opendaylight.yangtools.yang.common.QName;
47 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
48 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
50 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeWithValue;
51 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
52 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import org.w3c.dom.Document;
56 import org.w3c.dom.Element;
57 import org.w3c.dom.Node;
58
59 /**
60  * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
61  */
62 public class ListenerAdapter implements DataChangeListener {
63
64     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
65     private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
66     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
67     private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
68
69     private final XmlMapper xmlMapper = new XmlMapper();
70     private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
71
72     private final InstanceIdentifier path;
73     private ListenerRegistration<DataChangeListener> registration;
74     private final String streamName;
75     private Set<Channel> subscribers = new ConcurrentSet<>();
76     private final EventBus eventBus;
77     private final EventBusChangeRecorder eventBusChangeRecorder;
78
79     /**
80      * Creates new {@link ListenerAdapter} listener specified by path and stream name.
81      *
82      * @param path
83      *            Path to data in data store.
84      * @param streamName
85      *            The name of the stream.
86      */
87     ListenerAdapter(final InstanceIdentifier path, final String streamName) {
88         Preconditions.checkNotNull(path);
89         Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
90         this.path = path;
91         this.streamName = streamName;
92         eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
93         eventBusChangeRecorder = new EventBusChangeRecorder();
94         eventBus.register(eventBusChangeRecorder);
95     }
96
97     @Override
98     public void onDataChanged(final DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
99         if (!change.getCreatedConfigurationData().isEmpty() || !change.getCreatedOperationalData().isEmpty()
100                 || !change.getUpdatedConfigurationData().isEmpty() || !change.getUpdatedOperationalData().isEmpty()
101                 || !change.getRemovedConfigurationData().isEmpty() || !change.getRemovedOperationalData().isEmpty()) {
102             String xml = prepareXmlFrom(change);
103             Event event = new Event(EventType.NOTIFY);
104             event.setData(xml);
105             eventBus.post(event);
106         }
107     }
108
109     /**
110      * Tracks events of data change by customer.
111      */
112     private final class EventBusChangeRecorder {
113         @Subscribe
114         public void recordCustomerChange(final Event event) {
115             if (event.getType() == EventType.REGISTER) {
116                 Channel subscriber = event.getSubscriber();
117                 if (!subscribers.contains(subscriber)) {
118                     subscribers.add(subscriber);
119                 }
120             } else if (event.getType() == EventType.DEREGISTER) {
121                 subscribers.remove(event.getSubscriber());
122                 Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
123             } else if (event.getType() == EventType.NOTIFY) {
124                 for (Channel subscriber : subscribers) {
125                     if (subscriber.isActive()) {
126                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
127                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
128                     } else {
129                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
130                         subscribers.remove(subscriber);
131                     }
132                 }
133             }
134         }
135     }
136
137     /**
138      * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber.
139      */
140     private final class Event {
141         private final EventType type;
142         private Channel subscriber;
143         private String data;
144
145         /**
146          * Creates new event specified by {@link EventType} type.
147          *
148          * @param type
149          *            EventType
150          */
151         public Event(final EventType type) {
152             this.type = type;
153         }
154
155         /**
156          * Gets the {@link Channel} subscriber.
157          *
158          * @return Channel
159          */
160         public Channel getSubscriber() {
161             return subscriber;
162         }
163
164         /**
165          * Sets subscriber for event.
166          *
167          * @param subscriber
168          *            Channel
169          */
170         public void setSubscriber(final Channel subscriber) {
171             this.subscriber = subscriber;
172         }
173
174         /**
175          * Gets event data.
176          *
177          * @return String representation of event data.
178          */
179         public String getData() {
180             return data;
181         }
182
183         /**
184          * Sets event data.
185          *
186          * @param String
187          *            data.
188          */
189         public void setData(final String data) {
190             this.data = data;
191         }
192
193         /**
194          * Gets event type.
195          *
196          * @return The type of the event.
197          */
198         public EventType getType() {
199             return type;
200         }
201     }
202
203     /**
204      * Type of the event.
205      */
206     private enum EventType {
207         REGISTER,
208         DEREGISTER,
209         NOTIFY;
210     }
211
212     /**
213      * Prepare data in printable form and transform it to String.
214      *
215      * @param change
216      *            DataChangeEvent
217      * @return Data in printable form.
218      */
219     private String prepareXmlFrom(final DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
220         Document doc = createDocument();
221         Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
222                 "notification");
223         doc.appendChild(notificationElement);
224
225         Element eventTimeElement = doc.createElement("eventTime");
226         eventTimeElement.setTextContent(toRFC3339(new Date()));
227         notificationElement.appendChild(eventTimeElement);
228
229         Element dataChangedNotificationEventElement = doc.createElementNS(
230                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
231         addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change);
232         notificationElement.appendChild(dataChangedNotificationEventElement);
233
234         try {
235             ByteArrayOutputStream out = new ByteArrayOutputStream();
236             Transformer transformer = FACTORY.newTransformer();
237             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
238             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
239             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
240             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
241             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
242             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
243             byte[] charData = out.toByteArray();
244             return new String(charData, "UTF-8");
245         } catch (TransformerException | UnsupportedEncodingException e) {
246             String msg = "Error during transformation of Document into String";
247             LOG.error(msg, e);
248             return msg;
249         }
250     }
251
252     /**
253      * Formats data specified by RFC3339.
254      *
255      * @param d
256      *            Date
257      * @return Data specified by RFC3339.
258      */
259     private String toRFC3339(final Date d) {
260         return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2");
261     }
262
263     /**
264      * Creates {@link Document} document.
265      *
266      * @return {@link Document} document.
267      */
268     private Document createDocument() {
269         final DocumentBuilder bob;
270         try {
271             bob = DBF.newDocumentBuilder();
272         } catch (ParserConfigurationException e) {
273             return null;
274         }
275         return bob.newDocument();
276     }
277
278     /**
279      * Adds values to data changed notification event element.
280      *
281      * @param doc
282      *            {@link Document}
283      * @param dataChangedNotificationEventElement
284      *            {@link Element}
285      * @param change
286      *            {@link DataChangeEvent}
287      */
288     private void addValuesToDataChangedNotificationEventElement(final Document doc,
289             final Element dataChangedNotificationEventElement,
290             final DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
291         addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), dataChangedNotificationEventElement,
292                 Store.CONFIG, Operation.CREATED);
293         addValuesFromDataToElement(doc, change.getCreatedOperationalData(), dataChangedNotificationEventElement,
294                 Store.OPERATION, Operation.CREATED);
295         if (change.getCreatedConfigurationData().isEmpty()) {
296             addValuesFromDataToElement(doc, change.getUpdatedConfigurationData(), dataChangedNotificationEventElement,
297                     Store.CONFIG, Operation.UPDATED);
298         }
299         if (change.getCreatedOperationalData().isEmpty()) {
300             addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), dataChangedNotificationEventElement,
301                     Store.OPERATION, Operation.UPDATED);
302         }
303         addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), dataChangedNotificationEventElement,
304                 Store.CONFIG, Operation.DELETED);
305         addValuesFromDataToElement(doc, change.getRemovedOperationalData(), dataChangedNotificationEventElement,
306                 Store.OPERATION, Operation.DELETED);
307     }
308
309     /**
310      * Adds values from data to element.
311      *
312      * @param doc
313      *            {@link Document}
314      * @param data
315      *            Set of {@link InstanceIdentifier}.
316      * @param element
317      *            {@link Element}
318      * @param store
319      *            {@link Store}
320      * @param operation
321      *            {@link Operation}
322      */
323     private void addValuesFromDataToElement(final Document doc, final Set<InstanceIdentifier> data,
324             final Element element, final Store store, final Operation operation) {
325         if (data == null || data.isEmpty()) {
326             return;
327         }
328         for (InstanceIdentifier path : data) {
329             Node node = createDataChangeEventElement(doc, path, null, store, operation);
330             element.appendChild(node);
331         }
332     }
333
334     /**
335      * Adds values from data to element.
336      *
337      * @param doc
338      *            {@link Document}
339      * @param data
340      *            Map of {@link InstanceIdentifier} and {@link CompositeNode}.
341      * @param element
342      *            {@link Element}
343      * @param store
344      *            {@link Store}
345      * @param operation
346      *            {@link Operation}
347      */
348     private void addValuesFromDataToElement(final Document doc, final Map<InstanceIdentifier, CompositeNode> data,
349             final Element element, final Store store, final Operation operation) {
350         if (data == null || data.isEmpty()) {
351             return;
352         }
353         for (Entry<InstanceIdentifier, CompositeNode> entry : data.entrySet()) {
354             Node node = createDataChangeEventElement(doc, entry.getKey(), entry.getValue(), store, operation);
355             element.appendChild(node);
356         }
357     }
358
359     /**
360      * Creates changed event element from data.
361      *
362      * @param doc
363      *            {@link Document}
364      * @param path
365      *            Path to data in data store.
366      * @param data
367      *            {@link CompositeNode}
368      * @param store
369      *            {@link Store}
370      * @param operation
371      *            {@link Operation}
372      * @return {@link Node} node represented by changed event element.
373      */
374     private Node createDataChangeEventElement(final Document doc, final InstanceIdentifier path,
375             final CompositeNode data, final Store store, final Operation operation) {
376         Element dataChangeEventElement = doc.createElement("data-change-event");
377
378         Element pathElement = doc.createElement("path");
379         addPathAsValueToElement(path, pathElement);
380         dataChangeEventElement.appendChild(pathElement);
381
382         Element storeElement = doc.createElement("store");
383         storeElement.setTextContent(store.value);
384         dataChangeEventElement.appendChild(storeElement);
385
386         Element operationElement = doc.createElement("operation");
387         operationElement.setTextContent(operation.value);
388         dataChangeEventElement.appendChild(operationElement);
389
390         if (data != null) {
391             Element dataElement = doc.createElement("data");
392             Node dataAnyXml = translateToXml(path, data);
393             Node adoptedNode = doc.adoptNode(dataAnyXml);
394             dataElement.appendChild(adoptedNode);
395             dataChangeEventElement.appendChild(dataElement);
396         }
397
398         return dataChangeEventElement;
399     }
400
401     /**
402      * Translates {@link CompositeNode} data to XML format.
403      *
404      * @param path
405      *            Path to data in data store.
406      * @param data
407      *            {@link CompositeNode}
408      * @return Data in XML format.
409      */
410     private Node translateToXml(final InstanceIdentifier path, final CompositeNode data) {
411         DataNodeContainer schemaNode = ControllerContext.getInstance().getDataNodeContainerFor(path);
412         if (schemaNode == null) {
413             LOG.info(
414                     "Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.",
415                     path);
416             return null;
417         }
418         try {
419             Document xml = xmlMapper.write(data, schemaNode);
420             return xml.getFirstChild();
421         } catch (UnsupportedDataTypeException e) {
422             LOG.error("Error occured during translation of notification to XML.", e);
423             return null;
424         }
425     }
426
427     /**
428      * Adds path as value to element.
429      *
430      * @param path
431      *            Path to data in data store.
432      * @param element
433      *            {@link Element}
434      */
435     private void addPathAsValueToElement(final InstanceIdentifier path, final Element element) {
436         // Map< key = namespace, value = prefix>
437         Map<String, String> prefixes = new HashMap<>();
438         InstanceIdentifier instanceIdentifier = path;
439         StringBuilder textContent = new StringBuilder();
440
441         // FIXME: BUG-1281: this is duplicated code from yangtools (BUG-1275)
442         for (PathArgument pathArgument : instanceIdentifier.getPathArguments()) {
443             textContent.append("/");
444             writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes);
445             if (pathArgument instanceof NodeIdentifierWithPredicates) {
446                 Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
447                 for (QName keyValue : predicates.keySet()) {
448                     String predicateValue = String.valueOf(predicates.get(keyValue));
449                     textContent.append("[");
450                     writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes);
451                     textContent.append("='");
452                     textContent.append(predicateValue);
453                     textContent.append("'");
454                     textContent.append("]");
455                 }
456             } else if (pathArgument instanceof NodeWithValue) {
457                 textContent.append("[.='");
458                 textContent.append(((NodeWithValue) pathArgument).getValue());
459                 textContent.append("'");
460                 textContent.append("]");
461             }
462         }
463         element.setTextContent(textContent.toString());
464     }
465
466     /**
467      * Writes identifier that consists of prefix and QName.
468      *
469      * @param element
470      *            {@link Element}
471      * @param textContent
472      *            StringBuilder
473      * @param qName
474      *            QName
475      * @param prefixes
476      *            Map of namespaces and prefixes.
477      */
478     private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
479             final QName qName, final Map<String, String> prefixes) {
480         String namespace = qName.getNamespace().toString();
481         String prefix = prefixes.get(namespace);
482         if (prefix == null) {
483             prefix = qName.getPrefix();
484             if (prefix == null || prefix.isEmpty() || prefixes.containsValue(prefix)) {
485                 prefix = generateNewPrefix(prefixes.values());
486             }
487         }
488
489         element.setAttribute("xmlns:" + prefix, namespace);
490         textContent.append(prefix);
491         prefixes.put(namespace, prefix);
492
493         textContent.append(":");
494         textContent.append(qName.getLocalName());
495     }
496
497     /**
498      * Generates new prefix which consists of four random characters <a-z>.
499      *
500      * @param prefixes
501      *            Collection of prefixes.
502      * @return New prefix which consists of four random characters <a-z>.
503      */
504     private static String generateNewPrefix(final Collection<String> prefixes) {
505         StringBuilder result = null;
506         Random random = new Random();
507         do {
508             result = new StringBuilder();
509             for (int i = 0; i < 4; i++) {
510                 int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
511                 result.append(Character.toChars(randomNumber));
512             }
513         } while (prefixes.contains(result.toString()));
514
515         return result.toString();
516     }
517
518     /**
519      * Gets path pointed to data in data store.
520      *
521      * @return Path pointed to data in data store.
522      */
523     public InstanceIdentifier getPath() {
524         return path;
525     }
526
527     /**
528      * Sets {@link ListenerRegistration} registration.
529      *
530      * @param registration
531      *            ListenerRegistration<DataChangeListener>
532      */
533     public void setRegistration(final ListenerRegistration<DataChangeListener> registration) {
534         this.registration = registration;
535     }
536
537     /**
538      * Gets the name of the stream.
539      *
540      * @return The name of the stream.
541      */
542     public String getStreamName() {
543         return streamName;
544     }
545
546     /**
547      * Removes all subscribers and unregisters event bus change recorder form event bus.
548      */
549     public void close() throws Exception {
550         subscribers = new ConcurrentSet<>();
551         registration.close();
552         registration = null;
553         eventBus.unregister(eventBusChangeRecorder);
554     }
555
556     /**
557      * Checks if {@link ListenerRegistration} registration exist.
558      *
559      * @return True if exist, false otherwise.
560      */
561     public boolean isListening() {
562         return registration == null ? false : true;
563     }
564
565     /**
566      * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into
567      * event bus.
568      *
569      * @param subscriber
570      *            Channel
571      */
572     public void addSubscriber(final Channel subscriber) {
573         if (!subscriber.isActive()) {
574             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
575         }
576         Event event = new Event(EventType.REGISTER);
577         event.setSubscriber(subscriber);
578         eventBus.post(event);
579     }
580
581     /**
582      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event
583      * into event bus.
584      *
585      * @param subscriber
586      */
587     public void removeSubscriber(final Channel subscriber) {
588         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
589         Event event = new Event(EventType.DEREGISTER);
590         event.setSubscriber(subscriber);
591         eventBus.post(event);
592     }
593
594     /**
595      * Checks if exists at least one {@link Channel} subscriber.
596      *
597      * @return True if exist at least one {@link Channel} subscriber, false otherwise.
598      */
599     public boolean hasSubscribers() {
600         return !subscribers.isEmpty();
601     }
602
603     /**
604      * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
605      */
606     private static enum Store {
607         CONFIG("config"),
608         OPERATION("operation");
609
610         private final String value;
611
612         private Store(final String value) {
613             this.value = value;
614         }
615     }
616
617     /**
618      * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
619      */
620     private static enum Operation {
621         CREATED("created"),
622         UPDATED("updated"),
623         DELETED("deleted");
624
625         private final String value;
626
627         private Operation(final String value) {
628             this.value = value;
629         }
630     }
631
632 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.