Switch to using StandardCharsets
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / ListenerAdapter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.eventbus.AsyncEventBus;
12 import com.google.common.eventbus.EventBus;
13 import com.google.common.eventbus.Subscribe;
14 import io.netty.channel.Channel;
15 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
16 import io.netty.util.internal.ConcurrentSet;
17 import java.io.ByteArrayOutputStream;
18 import java.io.IOException;
19 import java.io.OutputStreamWriter;
20 import java.io.UnsupportedEncodingException;
21 import java.nio.charset.StandardCharsets;
22 import java.text.SimpleDateFormat;
23 import java.util.Collection;
24 import java.util.Date;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.Random;
29 import java.util.Set;
30 import java.util.concurrent.Executors;
31 import java.util.regex.Pattern;
32 import javax.xml.parsers.DocumentBuilder;
33 import javax.xml.parsers.DocumentBuilderFactory;
34 import javax.xml.parsers.ParserConfigurationException;
35 import javax.xml.stream.XMLOutputFactory;
36 import javax.xml.stream.XMLStreamException;
37 import javax.xml.stream.XMLStreamWriter;
38 import javax.xml.transform.OutputKeys;
39 import javax.xml.transform.Transformer;
40 import javax.xml.transform.TransformerException;
41 import javax.xml.transform.TransformerFactory;
42 import javax.xml.transform.dom.DOMResult;
43 import javax.xml.transform.dom.DOMSource;
44 import javax.xml.transform.stream.StreamResult;
45 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
46 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
47 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
48 import org.opendaylight.yangtools.concepts.ListenerRegistration;
49 import org.opendaylight.yangtools.yang.common.QName;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
54 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
57 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
58 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
59 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
60 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
61 import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
62 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
63 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66 import org.w3c.dom.Document;
67 import org.w3c.dom.Element;
68 import org.w3c.dom.Node;
69
70 /**
71  * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
72  */
73 public class ListenerAdapter implements DOMDataChangeListener {
74
75     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
76     private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
77     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
78     private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
79
80     private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
81
82     private final YangInstanceIdentifier path;
83     private ListenerRegistration<DOMDataChangeListener> registration;
84     private final String streamName;
85     private Set<Channel> subscribers = new ConcurrentSet<>();
86     private final EventBus eventBus;
87     private final EventBusChangeRecorder eventBusChangeRecorder;
88
89     /**
90      * Creates new {@link ListenerAdapter} listener specified by path and stream name.
91      *
92      * @param path
93      *            Path to data in data store.
94      * @param streamName
95      *            The name of the stream.
96      */
97     ListenerAdapter(final YangInstanceIdentifier path, final String streamName) {
98         Preconditions.checkNotNull(path);
99         Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
100         this.path = path;
101         this.streamName = streamName;
102         eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
103         eventBusChangeRecorder = new EventBusChangeRecorder();
104         eventBus.register(eventBusChangeRecorder);
105     }
106
107     @Override
108     public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
109         if (!change.getCreatedData().isEmpty() || !change.getUpdatedData().isEmpty()
110                 || !change.getRemovedPaths().isEmpty()) {
111             final String xml = prepareXmlFrom(change);
112             final Event event = new Event(EventType.NOTIFY);
113             event.setData(xml);
114             eventBus.post(event);
115         }
116     }
117
118     /**
119      * Tracks events of data change by customer.
120      */
121     private final class EventBusChangeRecorder {
122         @Subscribe
123         public void recordCustomerChange(final Event event) {
124             if (event.getType() == EventType.REGISTER) {
125                 final Channel subscriber = event.getSubscriber();
126                 if (!subscribers.contains(subscriber)) {
127                     subscribers.add(subscriber);
128                 }
129             } else if (event.getType() == EventType.DEREGISTER) {
130                 subscribers.remove(event.getSubscriber());
131                 Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
132             } else if (event.getType() == EventType.NOTIFY) {
133                 for (final Channel subscriber : subscribers) {
134                     if (subscriber.isActive()) {
135                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
136                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
137                     } else {
138                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
139                         subscribers.remove(subscriber);
140                     }
141                 }
142             }
143         }
144     }
145
146     /**
147      * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber.
148      */
149     private final class Event {
150         private final EventType type;
151         private Channel subscriber;
152         private String data;
153
154         /**
155          * Creates new event specified by {@link EventType} type.
156          *
157          * @param type
158          *            EventType
159          */
160         public Event(final EventType type) {
161             this.type = type;
162         }
163
164         /**
165          * Gets the {@link Channel} subscriber.
166          *
167          * @return Channel
168          */
169         public Channel getSubscriber() {
170             return subscriber;
171         }
172
173         /**
174          * Sets subscriber for event.
175          *
176          * @param subscriber
177          *            Channel
178          */
179         public void setSubscriber(final Channel subscriber) {
180             this.subscriber = subscriber;
181         }
182
183         /**
184          * Gets event String.
185          *
186          * @return String representation of event data.
187          */
188         public String getData() {
189             return data;
190         }
191
192         /**
193          * Sets event data.
194          *
195          * @param data String.
196          */
197         public void setData(final String data) {
198             this.data = data;
199         }
200
201         /**
202          * Gets event type.
203          *
204          * @return The type of the event.
205          */
206         public EventType getType() {
207             return type;
208         }
209     }
210
211     /**
212      * Type of the event.
213      */
214     private enum EventType {
215         REGISTER,
216         DEREGISTER,
217         NOTIFY;
218     }
219
220     /**
221      * Prepare data in printable form and transform it to String.
222      *
223      * @param change
224      *            DataChangeEvent
225      * @return Data in printable form.
226      */
227     private String prepareXmlFrom(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
228         final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
229         final DataSchemaContextTree dataContextTree =  DataSchemaContextTree.from(schemaContext);
230         final Document doc = createDocument();
231         final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
232                 "notification");
233         doc.appendChild(notificationElement);
234
235         final Element eventTimeElement = doc.createElement("eventTime");
236         eventTimeElement.setTextContent(toRFC3339(new Date()));
237         notificationElement.appendChild(eventTimeElement);
238
239         final Element dataChangedNotificationEventElement = doc.createElementNS(
240                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
241         addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change,
242                 schemaContext, dataContextTree);
243         notificationElement.appendChild(dataChangedNotificationEventElement);
244
245         try {
246             final ByteArrayOutputStream out = new ByteArrayOutputStream();
247             final Transformer transformer = FACTORY.newTransformer();
248             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
249             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
250             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
251             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
252             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
253             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
254             final byte[] charData = out.toByteArray();
255             return new String(charData, "UTF-8");
256         } catch (TransformerException | UnsupportedEncodingException e) {
257             final String msg = "Error during transformation of Document into String";
258             LOG.error(msg, e);
259             return msg;
260         }
261     }
262
263     /**
264      * Formats data specified by RFC3339.
265      *
266      * @param d
267      *            Date
268      * @return Data specified by RFC3339.
269      */
270     private String toRFC3339(final Date d) {
271         return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2");
272     }
273
274     /**
275      * Creates {@link Document} document.
276      * @return {@link Document} document.
277      */
278     private Document createDocument() {
279         final DocumentBuilder bob;
280         try {
281             bob = DBF.newDocumentBuilder();
282         } catch (final ParserConfigurationException e) {
283             return null;
284         }
285         return bob.newDocument();
286     }
287
288     /**
289      * Adds values to data changed notification event element.
290      *
291      * @param doc
292      *            {@link Document}
293      * @param dataChangedNotificationEventElement
294      *            {@link Element}
295      * @param change
296      *            {@link AsyncDataChangeEvent}
297      */
298     private void addValuesToDataChangedNotificationEventElement(final Document doc,
299             final Element dataChangedNotificationEventElement,
300             final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change,
301             final SchemaContext  schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
302
303         addCreatedChangedValuesFromDataToElement(doc, change.getCreatedData().entrySet(),
304                 dataChangedNotificationEventElement,
305                 Operation.CREATED, schemaContext, dataSchemaContextTree);
306
307         addCreatedChangedValuesFromDataToElement(doc, change.getUpdatedData().entrySet(),
308                     dataChangedNotificationEventElement,
309                     Operation.UPDATED, schemaContext, dataSchemaContextTree);
310
311         addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement,
312                 Operation.DELETED);
313     }
314
315     /**
316      * Adds values from data to element.
317      *
318      * @param doc
319      *            {@link Document}
320      * @param data
321      *            Set of {@link YangInstanceIdentifier}.
322      * @param element
323      *            {@link Element}
324      * @param operation
325      *            {@link Operation}
326      */
327     private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data, final Element element,
328             final Operation operation) {
329         if (data == null || data.isEmpty()) {
330             return;
331         }
332         for (final YangInstanceIdentifier path : data) {
333             if (!ControllerContext.getInstance().isNodeMixin(path)) {
334                 final Node node = createDataChangeEventElement(doc, path, operation);
335                 element.appendChild(node);
336             }
337         }
338     }
339
340     private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set<Entry<YangInstanceIdentifier,
341                 NormalizedNode<?,?>>> data, final Element element, final Operation operation, final SchemaContext
342             schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
343         if (data == null || data.isEmpty()) {
344             return;
345         }
346         for (Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
347             if (!ControllerContext.getInstance().isNodeMixin(entry.getKey())) {
348                 final Node node = createCreatedChangedDataChangeEventElement(doc, entry, operation, schemaContext,
349                         dataSchemaContextTree);
350                 element.appendChild(node);
351             }
352         }
353     }
354
355     /**
356      * Creates changed event element from data.
357      *
358      * @param doc
359      *            {@link Document}
360      * @param path
361      *            Path to data in data store.
362      * @param operation
363      *            {@link Operation}
364      * @return {@link Node} node represented by changed event element.
365      */
366     private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path, final Operation operation) {
367         final Element dataChangeEventElement = doc.createElement("data-change-event");
368         final Element pathElement = doc.createElement("path");
369         addPathAsValueToElement(path, pathElement);
370         dataChangeEventElement.appendChild(pathElement);
371
372         final Element operationElement = doc.createElement("operation");
373         operationElement.setTextContent(operation.value);
374         dataChangeEventElement.appendChild(operationElement);
375
376         return dataChangeEventElement;
377     }
378
379     private Node createCreatedChangedDataChangeEventElement(final Document doc, final Entry<YangInstanceIdentifier,
380             NormalizedNode<?, ?>> entry, final Operation operation, final SchemaContext
381             schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
382         final Element dataChangeEventElement = doc.createElement("data-change-event");
383         final Element pathElement = doc.createElement("path");
384         final YangInstanceIdentifier path = entry.getKey();
385         addPathAsValueToElement(path, pathElement);
386         dataChangeEventElement.appendChild(pathElement);
387
388         final Element operationElement = doc.createElement("operation");
389         operationElement.setTextContent(operation.value);
390         dataChangeEventElement.appendChild(operationElement);
391
392         try {
393             final DOMResult domResult = writeNormalizedNode(entry.getValue(), path,
394                     schemaContext, dataSchemaContextTree);
395             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
396             final Element dataElement = doc.createElement("data");
397             dataElement.appendChild(result);
398             dataChangeEventElement.appendChild(dataElement);
399         } catch (IOException e) {
400             LOG.error("Error in writer ", e);
401         } catch (XMLStreamException e) {
402             LOG.error("Error processing stream", e);
403         }
404
405         return dataChangeEventElement;
406     }
407
408     private static DOMResult writeNormalizedNode(final NormalizedNode<?,?> normalized, final
409         YangInstanceIdentifier path, final SchemaContext context, final DataSchemaContextTree dataSchemaContextTree) throws
410             IOException, XMLStreamException {
411         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
412         final Document doc = XmlDocumentUtils.getDocument();
413         final DOMResult result = new DOMResult(doc);
414         NormalizedNodeWriter normalizedNodeWriter = null;
415         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
416         XMLStreamWriter writer = null;
417         final SchemaPath nodePath;
418
419         if (normalized instanceof MapEntryNode || normalized instanceof UnkeyedListEntryNode) {
420             nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
421         } else {
422             nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
423         }
424
425         try {
426             writer = XML_FACTORY.createXMLStreamWriter(result);
427             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, nodePath);
428             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
429
430             normalizedNodeWriter.write(normalized);
431
432             normalizedNodeWriter.flush();
433         } finally {
434             if (normalizedNodeWriter != null) {
435                 normalizedNodeWriter.close();
436             }
437             if (normalizedNodeStreamWriter != null) {
438                 normalizedNodeStreamWriter.close();
439             }
440             if (writer != null) {
441                 writer.close();
442             }
443         }
444
445         return result;
446     }
447
448     /**
449      * Adds path as value to element.
450      *
451      * @param path
452      *            Path to data in data store.
453      * @param element
454      *            {@link Element}
455      */
456     private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) {
457         // Map< key = namespace, value = prefix>
458         final Map<String, String> prefixes = new HashMap<>();
459         final YangInstanceIdentifier normalizedPath = ControllerContext.getInstance().toXpathRepresentation(path);
460         final StringBuilder textContent = new StringBuilder();
461
462         // FIXME: BUG-1281: this is duplicated code from yangtools (BUG-1275)
463         for (final PathArgument pathArgument : normalizedPath.getPathArguments()) {
464             if (pathArgument instanceof YangInstanceIdentifier.AugmentationIdentifier) {
465                 continue;
466             }
467             textContent.append("/");
468             writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes);
469             if (pathArgument instanceof NodeIdentifierWithPredicates) {
470                 final Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
471                 for (final QName keyValue : predicates.keySet()) {
472                     final String predicateValue = String.valueOf(predicates.get(keyValue));
473                     textContent.append("[");
474                     writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes);
475                     textContent.append("='");
476                     textContent.append(predicateValue);
477                     textContent.append("'");
478                     textContent.append("]");
479                 }
480             } else if (pathArgument instanceof NodeWithValue) {
481                 textContent.append("[.='");
482                 textContent.append(((NodeWithValue) pathArgument).getValue());
483                 textContent.append("'");
484                 textContent.append("]");
485             }
486         }
487         element.setTextContent(textContent.toString());
488     }
489
490     /**
491      * Writes identifier that consists of prefix and QName.
492      *
493      * @param element
494      *            {@link Element}
495      * @param textContent
496      *            StringBuilder
497      * @param qName
498      *            QName
499      * @param prefixes
500      *            Map of namespaces and prefixes.
501      */
502     private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
503             final QName qName, final Map<String, String> prefixes) {
504         final String namespace = qName.getNamespace().toString();
505         String prefix = prefixes.get(namespace);
506         if (prefix == null) {
507             prefix = generateNewPrefix(prefixes.values());
508         }
509
510         element.setAttribute("xmlns:" + prefix, namespace);
511         textContent.append(prefix);
512         prefixes.put(namespace, prefix);
513
514         textContent.append(":");
515         textContent.append(qName.getLocalName());
516     }
517
518     /**
519      * Generates new prefix which consists of four random characters <a-z>.
520      *
521      * @param prefixes
522      *            Collection of prefixes.
523      * @return New prefix which consists of four random characters <a-z>.
524      */
525     private static String generateNewPrefix(final Collection<String> prefixes) {
526         StringBuilder result = null;
527         final Random random = new Random();
528         do {
529             result = new StringBuilder();
530             for (int i = 0; i < 4; i++) {
531                 final int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
532                 result.append(Character.toChars(randomNumber));
533             }
534         } while (prefixes.contains(result.toString()));
535
536         return result.toString();
537     }
538
539     /**
540      * Gets path pointed to data in data store.
541      *
542      * @return Path pointed to data in data store.
543      */
544     public YangInstanceIdentifier getPath() {
545         return path;
546     }
547
548     /**
549      * Sets {@link ListenerRegistration} registration.
550      *
551      * @param registration DOMDataChangeListener registration
552      */
553     public void setRegistration(final ListenerRegistration<DOMDataChangeListener> registration) {
554         this.registration = registration;
555     }
556
557     /**
558      * Gets the name of the stream.
559      *
560      * @return The name of the stream.
561      */
562     public String getStreamName() {
563         return streamName;
564     }
565
566     /**
567      * Removes all subscribers and unregisters event bus change recorder form event bus.
568      */
569     public void close() throws Exception {
570         subscribers = new ConcurrentSet<>();
571         registration.close();
572         registration = null;
573         eventBus.unregister(eventBusChangeRecorder);
574     }
575
576     /**
577      * Checks if {@link ListenerRegistration} registration exist.
578      *
579      * @return True if exist, false otherwise.
580      */
581     public boolean isListening() {
582         return registration == null ? false : true;
583     }
584
585     /**
586      * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into
587      * event bus.
588      *
589      * @param subscriber
590      *            Channel
591      */
592     public void addSubscriber(final Channel subscriber) {
593         if (!subscriber.isActive()) {
594             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
595         }
596         final Event event = new Event(EventType.REGISTER);
597         event.setSubscriber(subscriber);
598         eventBus.post(event);
599     }
600
601     /**
602      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event
603      * into event bus.
604      *
605      * @param subscriber
606      */
607     public void removeSubscriber(final Channel subscriber) {
608         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
609         final Event event = new Event(EventType.DEREGISTER);
610         event.setSubscriber(subscriber);
611         eventBus.post(event);
612     }
613
614     /**
615      * Checks if exists at least one {@link Channel} subscriber.
616      *
617      * @return True if exist at least one {@link Channel} subscriber, false otherwise.
618      */
619     public boolean hasSubscribers() {
620         return !subscribers.isEmpty();
621     }
622
623     /**
624      * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
625      */
626     private static enum Store {
627         CONFIG("config"),
628         OPERATION("operation");
629
630         private final String value;
631
632         private Store(final String value) {
633             this.value = value;
634         }
635     }
636
637     /**
638      * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
639      */
640     private static enum Operation {
641         CREATED("created"),
642         UPDATED("updated"),
643         DELETED("deleted");
644
645         private final String value;
646
647         private Operation(final String value) {
648             this.value = value;
649         }
650     }
651
652 }