Bug 3934: Websockets: Scope ONE doesn't work correctly
[netconf.git] / opendaylight / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / ListenerAdapter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.IOException;
20 import java.io.OutputStreamWriter;
21 import java.io.UnsupportedEncodingException;
22 import java.text.SimpleDateFormat;
23 import java.util.Collection;
24 import java.util.Date;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.Random;
29 import java.util.Set;
30 import java.util.concurrent.Executors;
31 import java.util.regex.Pattern;
32 import javax.xml.parsers.DocumentBuilder;
33 import javax.xml.parsers.DocumentBuilderFactory;
34 import javax.xml.parsers.ParserConfigurationException;
35 import javax.xml.stream.XMLOutputFactory;
36 import javax.xml.stream.XMLStreamException;
37 import javax.xml.stream.XMLStreamWriter;
38 import javax.xml.transform.OutputKeys;
39 import javax.xml.transform.Transformer;
40 import javax.xml.transform.TransformerException;
41 import javax.xml.transform.TransformerFactory;
42 import javax.xml.transform.dom.DOMResult;
43 import javax.xml.transform.dom.DOMSource;
44 import javax.xml.transform.stream.StreamResult;
45 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
46 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
47 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
48 import org.opendaylight.yangtools.concepts.ListenerRegistration;
49 import org.opendaylight.yangtools.yang.common.QName;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
54 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
56 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
57 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
58 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
59 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
60 import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
61 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import org.w3c.dom.Document;
65 import org.w3c.dom.Element;
66 import org.w3c.dom.Node;
67
68 /**
69  * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
70  */
71 public class ListenerAdapter implements DOMDataChangeListener {
72
73     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
74     private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
75     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
76     private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
77
78     private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
79
80     private final YangInstanceIdentifier path;
81     private ListenerRegistration<DOMDataChangeListener> registration;
82     private final String streamName;
83     private Set<Channel> subscribers = new ConcurrentSet<>();
84     private final EventBus eventBus;
85     private final EventBusChangeRecorder eventBusChangeRecorder;
86
87     /**
88      * Creates new {@link ListenerAdapter} listener specified by path and stream name.
89      *
90      * @param path
91      *            Path to data in data store.
92      * @param streamName
93      *            The name of the stream.
94      */
95     ListenerAdapter(final YangInstanceIdentifier path, final String streamName) {
96         Preconditions.checkNotNull(path);
97         Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
98         this.path = path;
99         this.streamName = streamName;
100         eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
101         eventBusChangeRecorder = new EventBusChangeRecorder();
102         eventBus.register(eventBusChangeRecorder);
103     }
104
105     @Override
106     public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
107         if (!change.getCreatedData().isEmpty() || !change.getUpdatedData().isEmpty()
108                 || !change.getRemovedPaths().isEmpty()) {
109             final String xml = prepareXmlFrom(change);
110             final Event event = new Event(EventType.NOTIFY);
111             event.setData(xml);
112             eventBus.post(event);
113         }
114     }
115
116     /**
117      * Tracks events of data change by customer.
118      */
119     private final class EventBusChangeRecorder {
120         @Subscribe
121         public void recordCustomerChange(final Event event) {
122             if (event.getType() == EventType.REGISTER) {
123                 final Channel subscriber = event.getSubscriber();
124                 if (!subscribers.contains(subscriber)) {
125                     subscribers.add(subscriber);
126                 }
127             } else if (event.getType() == EventType.DEREGISTER) {
128                 subscribers.remove(event.getSubscriber());
129                 Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
130             } else if (event.getType() == EventType.NOTIFY) {
131                 for (final Channel subscriber : subscribers) {
132                     if (subscriber.isActive()) {
133                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
134                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
135                     } else {
136                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
137                         subscribers.remove(subscriber);
138                     }
139                 }
140             }
141         }
142     }
143
144     /**
145      * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber.
146      */
147     private final class Event {
148         private final EventType type;
149         private Channel subscriber;
150         private String data;
151
152         /**
153          * Creates new event specified by {@link EventType} type.
154          *
155          * @param type
156          *            EventType
157          */
158         public Event(final EventType type) {
159             this.type = type;
160         }
161
162         /**
163          * Gets the {@link Channel} subscriber.
164          *
165          * @return Channel
166          */
167         public Channel getSubscriber() {
168             return subscriber;
169         }
170
171         /**
172          * Sets subscriber for event.
173          *
174          * @param subscriber
175          *            Channel
176          */
177         public void setSubscriber(final Channel subscriber) {
178             this.subscriber = subscriber;
179         }
180
181         /**
182          * Gets event String.
183          *
184          * @return String representation of event data.
185          */
186         public String getData() {
187             return data;
188         }
189
190         /**
191          * Sets event data.
192          *
193          * @param data String.
194          */
195         public void setData(final String data) {
196             this.data = data;
197         }
198
199         /**
200          * Gets event type.
201          *
202          * @return The type of the event.
203          */
204         public EventType getType() {
205             return type;
206         }
207     }
208
209     /**
210      * Type of the event.
211      */
212     private enum EventType {
213         REGISTER,
214         DEREGISTER,
215         NOTIFY;
216     }
217
218     /**
219      * Prepare data in printable form and transform it to String.
220      *
221      * @param change
222      *            DataChangeEvent
223      * @return Data in printable form.
224      */
225     private String prepareXmlFrom(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
226         final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
227         final DataSchemaContextTree dataContextTree =  DataSchemaContextTree.from(schemaContext);
228         final Document doc = createDocument();
229         final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
230                 "notification");
231         doc.appendChild(notificationElement);
232
233         final Element eventTimeElement = doc.createElement("eventTime");
234         eventTimeElement.setTextContent(toRFC3339(new Date()));
235         notificationElement.appendChild(eventTimeElement);
236
237         final Element dataChangedNotificationEventElement = doc.createElementNS(
238                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
239         addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change,
240                 schemaContext, dataContextTree);
241         notificationElement.appendChild(dataChangedNotificationEventElement);
242
243         try {
244             final ByteArrayOutputStream out = new ByteArrayOutputStream();
245             final Transformer transformer = FACTORY.newTransformer();
246             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
247             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
248             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
249             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
250             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
251             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
252             final byte[] charData = out.toByteArray();
253             return new String(charData, "UTF-8");
254         } catch (TransformerException | UnsupportedEncodingException e) {
255             final String msg = "Error during transformation of Document into String";
256             LOG.error(msg, e);
257             return msg;
258         }
259     }
260
261     /**
262      * Formats data specified by RFC3339.
263      *
264      * @param d
265      *            Date
266      * @return Data specified by RFC3339.
267      */
268     private String toRFC3339(final Date d) {
269         return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2");
270     }
271
272     /**
273      * Creates {@link Document} document.
274      * @return {@link Document} document.
275      */
276     private Document createDocument() {
277         final DocumentBuilder bob;
278         try {
279             bob = DBF.newDocumentBuilder();
280         } catch (final ParserConfigurationException e) {
281             return null;
282         }
283         return bob.newDocument();
284     }
285
286     /**
287      * Adds values to data changed notification event element.
288      *
289      * @param doc
290      *            {@link Document}
291      * @param dataChangedNotificationEventElement
292      *            {@link Element}
293      * @param change
294      *            {@link AsyncDataChangeEvent}
295      */
296     private void addValuesToDataChangedNotificationEventElement(final Document doc,
297             final Element dataChangedNotificationEventElement,
298             final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change,
299             final SchemaContext  schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
300
301         addCreatedChangedValuesFromDataToElement(doc, change.getCreatedData().entrySet(),
302                 dataChangedNotificationEventElement,
303                 Operation.CREATED, schemaContext, dataSchemaContextTree);
304         if (change.getCreatedData().isEmpty()) {
305             addCreatedChangedValuesFromDataToElement(doc, change.getUpdatedData().entrySet(),
306                     dataChangedNotificationEventElement,
307                     Operation.UPDATED, schemaContext, dataSchemaContextTree);
308         }
309         addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement,
310                 Operation.DELETED);
311     }
312
313     /**
314      * Adds values from data to element.
315      *
316      * @param doc
317      *            {@link Document}
318      * @param data
319      *            Set of {@link YangInstanceIdentifier}.
320      * @param element
321      *            {@link Element}
322      * @param operation
323      *            {@link Operation}
324      */
325     private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data, final Element element,
326             final Operation operation) {
327         if (data == null || data.isEmpty()) {
328             return;
329         }
330         for (final YangInstanceIdentifier path : data) {
331             if (!ControllerContext.getInstance().isNodeMixin(path)) {
332                 final Node node = createDataChangeEventElement(doc, path, operation);
333                 element.appendChild(node);
334             }
335         }
336     }
337
338     private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set<Entry<YangInstanceIdentifier,
339                 NormalizedNode<?,?>>> data, final Element element, final Operation operation, final SchemaContext
340             schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
341         if (data == null || data.isEmpty()) {
342             return;
343         }
344         for (Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
345             if (!ControllerContext.getInstance().isNodeMixin(entry.getKey())) {
346                 final Node node = createCreatedChangedDataChangeEventElement(doc, entry, operation, schemaContext,
347                         dataSchemaContextTree);
348                 element.appendChild(node);
349             }
350         }
351     }
352
353     /**
354      * Creates changed event element from data.
355      *
356      * @param doc
357      *            {@link Document}
358      * @param path
359      *            Path to data in data store.
360      * @param operation
361      *            {@link Operation}
362      * @return {@link Node} node represented by changed event element.
363      */
364     private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path, final Operation operation) {
365         final Element dataChangeEventElement = doc.createElement("data-change-event");
366         final Element pathElement = doc.createElement("path");
367         addPathAsValueToElement(path, pathElement);
368         dataChangeEventElement.appendChild(pathElement);
369
370         final Element operationElement = doc.createElement("operation");
371         operationElement.setTextContent(operation.value);
372         dataChangeEventElement.appendChild(operationElement);
373
374         return dataChangeEventElement;
375     }
376
377     private Node createCreatedChangedDataChangeEventElement(final Document doc, final Entry<YangInstanceIdentifier,
378             NormalizedNode<?, ?>> entry, final Operation operation, final SchemaContext
379             schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
380         final Element dataChangeEventElement = doc.createElement("data-change-event");
381         final Element pathElement = doc.createElement("path");
382         final YangInstanceIdentifier path = entry.getKey();
383         addPathAsValueToElement(path, pathElement);
384         dataChangeEventElement.appendChild(pathElement);
385
386         final Element operationElement = doc.createElement("operation");
387         operationElement.setTextContent(operation.value);
388         dataChangeEventElement.appendChild(operationElement);
389
390         try {
391             final DOMResult domResult = writeNormalizedNode((NormalizedNodeContainer<?,?,?>) entry.getValue(), path,
392                     schemaContext, dataSchemaContextTree);
393             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
394             final Element dataElement = doc.createElement("data");
395             dataElement.appendChild(result);
396             dataChangeEventElement.appendChild(dataElement);
397         } catch (IOException e) {
398             LOG.error("Error in writer ", e);
399         } catch (XMLStreamException e) {
400             LOG.error("Error processing stream", e);
401         }
402
403         return dataChangeEventElement;
404     }
405
406     private static DOMResult writeNormalizedNode(final NormalizedNodeContainer<?,?,?> normalized, final
407         YangInstanceIdentifier path, final SchemaContext context, final DataSchemaContextTree dataSchemaContextTree) throws
408             IOException, XMLStreamException {
409         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
410         final Document doc = XmlDocumentUtils.getDocument();
411         final DOMResult result = new DOMResult(doc);
412         NormalizedNodeWriter normalizedNodeWriter = null;
413         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
414         XMLStreamWriter writer = null;
415         try {
416             writer = XML_FACTORY.createXMLStreamWriter(result);
417             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, dataSchemaContextTree
418                     .getChild(path).getDataSchemaNode().getPath().getParent());
419             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
420
421             normalizedNodeWriter.write(normalized);
422
423             normalizedNodeWriter.flush();
424         } finally {
425             if (normalizedNodeWriter != null) {
426                 normalizedNodeWriter.close();
427             }
428             if (normalizedNodeStreamWriter != null) {
429                 normalizedNodeStreamWriter.close();
430             }
431             if (writer != null) {
432                 writer.close();
433             }
434         }
435
436         return result;
437     }
438
439     /**
440      * Adds path as value to element.
441      *
442      * @param path
443      *            Path to data in data store.
444      * @param element
445      *            {@link Element}
446      */
447     private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) {
448         // Map< key = namespace, value = prefix>
449         final Map<String, String> prefixes = new HashMap<>();
450         final YangInstanceIdentifier normalizedPath = ControllerContext.getInstance().toXpathRepresentation(path);
451         final StringBuilder textContent = new StringBuilder();
452
453         // FIXME: BUG-1281: this is duplicated code from yangtools (BUG-1275)
454         for (final PathArgument pathArgument : normalizedPath.getPathArguments()) {
455             if (pathArgument instanceof YangInstanceIdentifier.AugmentationIdentifier) {
456                 continue;
457             }
458             textContent.append("/");
459             writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes);
460             if (pathArgument instanceof NodeIdentifierWithPredicates) {
461                 final Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
462                 for (final QName keyValue : predicates.keySet()) {
463                     final String predicateValue = String.valueOf(predicates.get(keyValue));
464                     textContent.append("[");
465                     writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes);
466                     textContent.append("='");
467                     textContent.append(predicateValue);
468                     textContent.append("'");
469                     textContent.append("]");
470                 }
471             } else if (pathArgument instanceof NodeWithValue) {
472                 textContent.append("[.='");
473                 textContent.append(((NodeWithValue) pathArgument).getValue());
474                 textContent.append("'");
475                 textContent.append("]");
476             }
477         }
478         element.setTextContent(textContent.toString());
479     }
480
481     /**
482      * Writes identifier that consists of prefix and QName.
483      *
484      * @param element
485      *            {@link Element}
486      * @param textContent
487      *            StringBuilder
488      * @param qName
489      *            QName
490      * @param prefixes
491      *            Map of namespaces and prefixes.
492      */
493     private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
494             final QName qName, final Map<String, String> prefixes) {
495         final String namespace = qName.getNamespace().toString();
496         String prefix = prefixes.get(namespace);
497         if (prefix == null) {
498             prefix = generateNewPrefix(prefixes.values());
499         }
500
501         element.setAttribute("xmlns:" + prefix, namespace);
502         textContent.append(prefix);
503         prefixes.put(namespace, prefix);
504
505         textContent.append(":");
506         textContent.append(qName.getLocalName());
507     }
508
509     /**
510      * Generates new prefix which consists of four random characters <a-z>.
511      *
512      * @param prefixes
513      *            Collection of prefixes.
514      * @return New prefix which consists of four random characters <a-z>.
515      */
516     private static String generateNewPrefix(final Collection<String> prefixes) {
517         StringBuilder result = null;
518         final Random random = new Random();
519         do {
520             result = new StringBuilder();
521             for (int i = 0; i < 4; i++) {
522                 final int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
523                 result.append(Character.toChars(randomNumber));
524             }
525         } while (prefixes.contains(result.toString()));
526
527         return result.toString();
528     }
529
530     /**
531      * Gets path pointed to data in data store.
532      *
533      * @return Path pointed to data in data store.
534      */
535     public YangInstanceIdentifier getPath() {
536         return path;
537     }
538
539     /**
540      * Sets {@link ListenerRegistration} registration.
541      *
542      * @param registration DOMDataChangeListener registration
543      */
544     public void setRegistration(final ListenerRegistration<DOMDataChangeListener> registration) {
545         this.registration = registration;
546     }
547
548     /**
549      * Gets the name of the stream.
550      *
551      * @return The name of the stream.
552      */
553     public String getStreamName() {
554         return streamName;
555     }
556
557     /**
558      * Removes all subscribers and unregisters event bus change recorder form event bus.
559      */
560     public void close() throws Exception {
561         subscribers = new ConcurrentSet<>();
562         registration.close();
563         registration = null;
564         eventBus.unregister(eventBusChangeRecorder);
565     }
566
567     /**
568      * Checks if {@link ListenerRegistration} registration exist.
569      *
570      * @return True if exist, false otherwise.
571      */
572     public boolean isListening() {
573         return registration == null ? false : true;
574     }
575
576     /**
577      * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into
578      * event bus.
579      *
580      * @param subscriber
581      *            Channel
582      */
583     public void addSubscriber(final Channel subscriber) {
584         if (!subscriber.isActive()) {
585             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
586         }
587         final Event event = new Event(EventType.REGISTER);
588         event.setSubscriber(subscriber);
589         eventBus.post(event);
590     }
591
592     /**
593      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event
594      * into event bus.
595      *
596      * @param subscriber
597      */
598     public void removeSubscriber(final Channel subscriber) {
599         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
600         final Event event = new Event(EventType.DEREGISTER);
601         event.setSubscriber(subscriber);
602         eventBus.post(event);
603     }
604
605     /**
606      * Checks if exists at least one {@link Channel} subscriber.
607      *
608      * @return True if exist at least one {@link Channel} subscriber, false otherwise.
609      */
610     public boolean hasSubscribers() {
611         return !subscribers.isEmpty();
612     }
613
614     /**
615      * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
616      */
617     private static enum Store {
618         CONFIG("config"),
619         OPERATION("operation");
620
621         private final String value;
622
623         private Store(final String value) {
624             this.value = value;
625         }
626     }
627
628     /**
629      * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
630      */
631     private static enum Operation {
632         CREATED("created"),
633         UPDATED("updated"),
634         DELETED("deleted");
635
636         private final String value;
637
638         private Operation(final String value) {
639             this.value = value;
640         }
641     }
642
643 }