Merge "Changed NetconfDeviceDatastoreAdapter and NetconfDeviceTopologyAdapter to...
[controller.git] / opendaylight / md-sal / sal-rest-connector / src / main / java / org / opendaylight / controller / sal / streams / listeners / ListenerAdapter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.OutputStreamWriter;
20 import java.io.UnsupportedEncodingException;
21 import java.text.SimpleDateFormat;
22 import java.util.Collection;
23 import java.util.Date;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Random;
27 import java.util.Set;
28 import java.util.concurrent.Executors;
29 import java.util.regex.Pattern;
30 import javax.xml.parsers.DocumentBuilder;
31 import javax.xml.parsers.DocumentBuilderFactory;
32 import javax.xml.parsers.ParserConfigurationException;
33 import javax.xml.transform.OutputKeys;
34 import javax.xml.transform.Transformer;
35 import javax.xml.transform.TransformerException;
36 import javax.xml.transform.TransformerFactory;
37 import javax.xml.transform.dom.DOMSource;
38 import javax.xml.transform.stream.StreamResult;
39 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
40 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
41 import org.opendaylight.controller.sal.rest.impl.XmlMapper;
42 import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
43 import org.opendaylight.yangtools.concepts.ListenerRegistration;
44 import org.opendaylight.yangtools.yang.common.QName;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import org.w3c.dom.Document;
53 import org.w3c.dom.Element;
54 import org.w3c.dom.Node;
55
56 /**
57  * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
58  */
59 public class ListenerAdapter implements DOMDataChangeListener {
60
61     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
62     private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
63     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
64     private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
65
66     private final XmlMapper xmlMapper = new XmlMapper();
67     private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
68
69     private final YangInstanceIdentifier path;
70     private ListenerRegistration<DOMDataChangeListener> registration;
71     private final String streamName;
72     private Set<Channel> subscribers = new ConcurrentSet<>();
73     private final EventBus eventBus;
74     private final EventBusChangeRecorder eventBusChangeRecorder;
75
76     /**
77      * Creates new {@link ListenerAdapter} listener specified by path and stream name.
78      *
79      * @param path
80      *            Path to data in data store.
81      * @param streamName
82      *            The name of the stream.
83      */
84     ListenerAdapter(final YangInstanceIdentifier path, final String streamName) {
85         Preconditions.checkNotNull(path);
86         Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
87         this.path = path;
88         this.streamName = streamName;
89         eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
90         eventBusChangeRecorder = new EventBusChangeRecorder();
91         eventBus.register(eventBusChangeRecorder);
92     }
93
94     @Override
95     public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
96         // TODO Auto-generated method stub
97
98         if (!change.getCreatedData().isEmpty() || !change.getUpdatedData().isEmpty()
99                 || !change.getRemovedPaths().isEmpty()) {
100             final String xml = prepareXmlFrom(change);
101             final Event event = new Event(EventType.NOTIFY);
102             event.setData(xml);
103             eventBus.post(event);
104         }
105     }
106
107     /**
108      * Tracks events of data change by customer.
109      */
110     private final class EventBusChangeRecorder {
111         @Subscribe
112         public void recordCustomerChange(final Event event) {
113             if (event.getType() == EventType.REGISTER) {
114                 final Channel subscriber = event.getSubscriber();
115                 if (!subscribers.contains(subscriber)) {
116                     subscribers.add(subscriber);
117                 }
118             } else if (event.getType() == EventType.DEREGISTER) {
119                 subscribers.remove(event.getSubscriber());
120                 Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
121             } else if (event.getType() == EventType.NOTIFY) {
122                 for (final Channel subscriber : subscribers) {
123                     if (subscriber.isActive()) {
124                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
125                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
126                     } else {
127                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
128                         subscribers.remove(subscriber);
129                     }
130                 }
131             }
132         }
133     }
134
135     /**
136      * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber.
137      */
138     private final class Event {
139         private final EventType type;
140         private Channel subscriber;
141         private String data;
142
143         /**
144          * Creates new event specified by {@link EventType} type.
145          *
146          * @param type
147          *            EventType
148          */
149         public Event(final EventType type) {
150             this.type = type;
151         }
152
153         /**
154          * Gets the {@link Channel} subscriber.
155          *
156          * @return Channel
157          */
158         public Channel getSubscriber() {
159             return subscriber;
160         }
161
162         /**
163          * Sets subscriber for event.
164          *
165          * @param subscriber
166          *            Channel
167          */
168         public void setSubscriber(final Channel subscriber) {
169             this.subscriber = subscriber;
170         }
171
172         /**
173          * Gets event String.
174          *
175          * @return String representation of event data.
176          */
177         public String getData() {
178             return data;
179         }
180
181         /**
182          * Sets event data.
183          *
184          * @param data String.
185          */
186         public void setData(final String data) {
187             this.data = data;
188         }
189
190         /**
191          * Gets event type.
192          *
193          * @return The type of the event.
194          */
195         public EventType getType() {
196             return type;
197         }
198     }
199
200     /**
201      * Type of the event.
202      */
203     private enum EventType {
204         REGISTER,
205         DEREGISTER,
206         NOTIFY;
207     }
208
209     /**
210      * Prepare data in printable form and transform it to String.
211      *
212      * @param change
213      *            DataChangeEvent
214      * @return Data in printable form.
215      */
216     private String prepareXmlFrom(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
217         final Document doc = createDocument();
218         final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
219                 "notification");
220         doc.appendChild(notificationElement);
221
222         final Element eventTimeElement = doc.createElement("eventTime");
223         eventTimeElement.setTextContent(toRFC3339(new Date()));
224         notificationElement.appendChild(eventTimeElement);
225
226         final Element dataChangedNotificationEventElement = doc.createElementNS(
227                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
228         addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change);
229         notificationElement.appendChild(dataChangedNotificationEventElement);
230
231         try {
232             final ByteArrayOutputStream out = new ByteArrayOutputStream();
233             final Transformer transformer = FACTORY.newTransformer();
234             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
235             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
236             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
237             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
238             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
239             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
240             final byte[] charData = out.toByteArray();
241             return new String(charData, "UTF-8");
242         } catch (TransformerException | UnsupportedEncodingException e) {
243             final String msg = "Error during transformation of Document into String";
244             LOG.error(msg, e);
245             return msg;
246         }
247     }
248
249     /**
250      * Formats data specified by RFC3339.
251      *
252      * @param d
253      *            Date
254      * @return Data specified by RFC3339.
255      */
256     private String toRFC3339(final Date d) {
257         return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2");
258     }
259
260     /**
261      * Creates {@link Document} document.
262      * @return {@link Document} document.
263      */
264     private Document createDocument() {
265         final DocumentBuilder bob;
266         try {
267             bob = DBF.newDocumentBuilder();
268         } catch (final ParserConfigurationException e) {
269             return null;
270         }
271         return bob.newDocument();
272     }
273
274     /**
275      * Adds values to data changed notification event element.
276      *
277      * @param doc
278      *            {@link Document}
279      * @param dataChangedNotificationEventElement
280      *            {@link Element}
281      * @param change
282      *            {@link AsyncDataChangeEvent}
283      */
284     private void addValuesToDataChangedNotificationEventElement(final Document doc,
285             final Element dataChangedNotificationEventElement,
286             final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
287         addValuesFromDataToElement(doc, change.getCreatedData().keySet(), dataChangedNotificationEventElement,
288                 Operation.CREATED);
289         if (change.getCreatedData().isEmpty()) {
290             addValuesFromDataToElement(doc, change.getUpdatedData().keySet(), dataChangedNotificationEventElement,
291                     Operation.UPDATED);
292         }
293         addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement,
294                 Operation.DELETED);
295     }
296
297     /**
298      * Adds values from data to element.
299      *
300      * @param doc
301      *            {@link Document}
302      * @param data
303      *            Set of {@link YangInstanceIdentifier}.
304      * @param element
305      *            {@link Element}
306      * @param operation
307      *            {@link Operation}
308      */
309     private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data, final Element element,
310             final Operation operation) {
311         if (data == null || data.isEmpty()) {
312             return;
313         }
314         for (final YangInstanceIdentifier path : data) {
315             if (!ControllerContext.getInstance().isNodeMixin(path)) {
316                 final Node node = createDataChangeEventElement(doc, path, operation);
317                 element.appendChild(node);
318             }
319         }
320     }
321
322
323     /**
324      * Creates changed event element from data.
325      *
326      * @param doc
327      *            {@link Document}
328      * @param path
329      *            Path to data in data store.
330      * @param operation
331      *            {@link Operation}
332      * @return {@link Node} node represented by changed event element.
333      */
334     private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path, final Operation operation) {
335         final Element dataChangeEventElement = doc.createElement("data-change-event");
336         final Element pathElement = doc.createElement("path");
337         addPathAsValueToElement(path, pathElement);
338         dataChangeEventElement.appendChild(pathElement);
339
340         final Element operationElement = doc.createElement("operation");
341         operationElement.setTextContent(operation.value);
342         dataChangeEventElement.appendChild(operationElement);
343
344         return dataChangeEventElement;
345     }
346
347
348     /**
349      * Adds path as value to element.
350      *
351      * @param path
352      *            Path to data in data store.
353      * @param element
354      *            {@link Element}
355      */
356     private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) {
357         // Map< key = namespace, value = prefix>
358         final Map<String, String> prefixes = new HashMap<>();
359         final YangInstanceIdentifier normalizedPath = ControllerContext.getInstance().toXpathRepresentation(path);
360         final StringBuilder textContent = new StringBuilder();
361
362         // FIXME: BUG-1281: this is duplicated code from yangtools (BUG-1275)
363         for (final PathArgument pathArgument : normalizedPath.getPathArguments()) {
364             if (pathArgument instanceof YangInstanceIdentifier.AugmentationIdentifier) {
365                 continue;
366             }
367             textContent.append("/");
368             writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes);
369             if (pathArgument instanceof NodeIdentifierWithPredicates) {
370                 final Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
371                 for (final QName keyValue : predicates.keySet()) {
372                     final String predicateValue = String.valueOf(predicates.get(keyValue));
373                     textContent.append("[");
374                     writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes);
375                     textContent.append("='");
376                     textContent.append(predicateValue);
377                     textContent.append("'");
378                     textContent.append("]");
379                 }
380             } else if (pathArgument instanceof NodeWithValue) {
381                 textContent.append("[.='");
382                 textContent.append(((NodeWithValue) pathArgument).getValue());
383                 textContent.append("'");
384                 textContent.append("]");
385             }
386         }
387         element.setTextContent(textContent.toString());
388     }
389
390     /**
391      * Writes identifier that consists of prefix and QName.
392      *
393      * @param element
394      *            {@link Element}
395      * @param textContent
396      *            StringBuilder
397      * @param qName
398      *            QName
399      * @param prefixes
400      *            Map of namespaces and prefixes.
401      */
402     private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
403             final QName qName, final Map<String, String> prefixes) {
404         final String namespace = qName.getNamespace().toString();
405         String prefix = prefixes.get(namespace);
406         if (prefix == null) {
407             prefix = generateNewPrefix(prefixes.values());
408         }
409
410         element.setAttribute("xmlns:" + prefix, namespace);
411         textContent.append(prefix);
412         prefixes.put(namespace, prefix);
413
414         textContent.append(":");
415         textContent.append(qName.getLocalName());
416     }
417
418     /**
419      * Generates new prefix which consists of four random characters <a-z>.
420      *
421      * @param prefixes
422      *            Collection of prefixes.
423      * @return New prefix which consists of four random characters <a-z>.
424      */
425     private static String generateNewPrefix(final Collection<String> prefixes) {
426         StringBuilder result = null;
427         final Random random = new Random();
428         do {
429             result = new StringBuilder();
430             for (int i = 0; i < 4; i++) {
431                 final int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
432                 result.append(Character.toChars(randomNumber));
433             }
434         } while (prefixes.contains(result.toString()));
435
436         return result.toString();
437     }
438
439     /**
440      * Gets path pointed to data in data store.
441      *
442      * @return Path pointed to data in data store.
443      */
444     public YangInstanceIdentifier getPath() {
445         return path;
446     }
447
448     /**
449      * Sets {@link ListenerRegistration} registration.
450      *
451      * @param registration
452      *            ListenerRegistration<DataChangeListener>
453      */
454     public void setRegistration(final ListenerRegistration<DOMDataChangeListener> registration) {
455         this.registration = registration;
456     }
457
458     /**
459      * Gets the name of the stream.
460      *
461      * @return The name of the stream.
462      */
463     public String getStreamName() {
464         return streamName;
465     }
466
467     /**
468      * Removes all subscribers and unregisters event bus change recorder form event bus.
469      */
470     public void close() throws Exception {
471         subscribers = new ConcurrentSet<>();
472         registration.close();
473         registration = null;
474         eventBus.unregister(eventBusChangeRecorder);
475     }
476
477     /**
478      * Checks if {@link ListenerRegistration} registration exist.
479      *
480      * @return True if exist, false otherwise.
481      */
482     public boolean isListening() {
483         return registration == null ? false : true;
484     }
485
486     /**
487      * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into
488      * event bus.
489      *
490      * @param subscriber
491      *            Channel
492      */
493     public void addSubscriber(final Channel subscriber) {
494         if (!subscriber.isActive()) {
495             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
496         }
497         final Event event = new Event(EventType.REGISTER);
498         event.setSubscriber(subscriber);
499         eventBus.post(event);
500     }
501
502     /**
503      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event
504      * into event bus.
505      *
506      * @param subscriber
507      */
508     public void removeSubscriber(final Channel subscriber) {
509         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
510         final Event event = new Event(EventType.DEREGISTER);
511         event.setSubscriber(subscriber);
512         eventBus.post(event);
513     }
514
515     /**
516      * Checks if exists at least one {@link Channel} subscriber.
517      *
518      * @return True if exist at least one {@link Channel} subscriber, false otherwise.
519      */
520     public boolean hasSubscribers() {
521         return !subscribers.isEmpty();
522     }
523
524     /**
525      * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
526      */
527     private static enum Store {
528         CONFIG("config"),
529         OPERATION("operation");
530
531         private final String value;
532
533         private Store(final String value) {
534             this.value = value;
535         }
536     }
537
538     /**
539      * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
540      */
541     private static enum Operation {
542         CREATED("created"),
543         UPDATED("updated"),
544         DELETED("deleted");
545
546         private final String value;
547
548         private Operation(final String value) {
549             this.value = value;
550         }
551     }
552
553 }