Merge "BUG 2854 : Do not add empty read write transactions to the replicable journal"
[controller.git] / opendaylight / md-sal / sal-rest-connector / src / main / java / org / opendaylight / controller / sal / streams / listeners / ListenerAdapter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.streams.listeners;
9
10 import com.google.common.base.Charsets;
11 import com.google.common.base.Preconditions;
12 import com.google.common.eventbus.AsyncEventBus;
13 import com.google.common.eventbus.EventBus;
14 import com.google.common.eventbus.Subscribe;
15 import io.netty.channel.Channel;
16 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
17 import io.netty.util.internal.ConcurrentSet;
18 import java.io.ByteArrayOutputStream;
19 import java.io.OutputStreamWriter;
20 import java.io.UnsupportedEncodingException;
21 import java.text.SimpleDateFormat;
22 import java.util.Collection;
23 import java.util.Date;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Random;
27 import java.util.Set;
28 import java.util.concurrent.Executors;
29 import java.util.regex.Pattern;
30 import javax.xml.parsers.DocumentBuilder;
31 import javax.xml.parsers.DocumentBuilderFactory;
32 import javax.xml.parsers.ParserConfigurationException;
33 import javax.xml.transform.OutputKeys;
34 import javax.xml.transform.Transformer;
35 import javax.xml.transform.TransformerException;
36 import javax.xml.transform.TransformerFactory;
37 import javax.xml.transform.dom.DOMSource;
38 import javax.xml.transform.stream.StreamResult;
39 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
40 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
41 import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
42 import org.opendaylight.yangtools.concepts.ListenerRegistration;
43 import org.opendaylight.yangtools.yang.common.QName;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
48 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import org.w3c.dom.Document;
52 import org.w3c.dom.Element;
53 import org.w3c.dom.Node;
54
55 /**
56  * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
57  */
58 public class ListenerAdapter implements DOMDataChangeListener {
59
60     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
61     private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
62     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
63     private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
64
65     private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
66
67     private final YangInstanceIdentifier path;
68     private ListenerRegistration<DOMDataChangeListener> registration;
69     private final String streamName;
70     private Set<Channel> subscribers = new ConcurrentSet<>();
71     private final EventBus eventBus;
72     private final EventBusChangeRecorder eventBusChangeRecorder;
73
74     /**
75      * Creates new {@link ListenerAdapter} listener specified by path and stream name.
76      *
77      * @param path
78      *            Path to data in data store.
79      * @param streamName
80      *            The name of the stream.
81      */
82     ListenerAdapter(final YangInstanceIdentifier path, final String streamName) {
83         Preconditions.checkNotNull(path);
84         Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
85         this.path = path;
86         this.streamName = streamName;
87         eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
88         eventBusChangeRecorder = new EventBusChangeRecorder();
89         eventBus.register(eventBusChangeRecorder);
90     }
91
92     @Override
93     public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
94         // TODO Auto-generated method stub
95
96         if (!change.getCreatedData().isEmpty() || !change.getUpdatedData().isEmpty()
97                 || !change.getRemovedPaths().isEmpty()) {
98             final String xml = prepareXmlFrom(change);
99             final Event event = new Event(EventType.NOTIFY);
100             event.setData(xml);
101             eventBus.post(event);
102         }
103     }
104
105     /**
106      * Tracks events of data change by customer.
107      */
108     private final class EventBusChangeRecorder {
109         @Subscribe
110         public void recordCustomerChange(final Event event) {
111             if (event.getType() == EventType.REGISTER) {
112                 final Channel subscriber = event.getSubscriber();
113                 if (!subscribers.contains(subscriber)) {
114                     subscribers.add(subscriber);
115                 }
116             } else if (event.getType() == EventType.DEREGISTER) {
117                 subscribers.remove(event.getSubscriber());
118                 Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
119             } else if (event.getType() == EventType.NOTIFY) {
120                 for (final Channel subscriber : subscribers) {
121                     if (subscriber.isActive()) {
122                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
123                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
124                     } else {
125                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
126                         subscribers.remove(subscriber);
127                     }
128                 }
129             }
130         }
131     }
132
133     /**
134      * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber.
135      */
136     private final class Event {
137         private final EventType type;
138         private Channel subscriber;
139         private String data;
140
141         /**
142          * Creates new event specified by {@link EventType} type.
143          *
144          * @param type
145          *            EventType
146          */
147         public Event(final EventType type) {
148             this.type = type;
149         }
150
151         /**
152          * Gets the {@link Channel} subscriber.
153          *
154          * @return Channel
155          */
156         public Channel getSubscriber() {
157             return subscriber;
158         }
159
160         /**
161          * Sets subscriber for event.
162          *
163          * @param subscriber
164          *            Channel
165          */
166         public void setSubscriber(final Channel subscriber) {
167             this.subscriber = subscriber;
168         }
169
170         /**
171          * Gets event String.
172          *
173          * @return String representation of event data.
174          */
175         public String getData() {
176             return data;
177         }
178
179         /**
180          * Sets event data.
181          *
182          * @param data String.
183          */
184         public void setData(final String data) {
185             this.data = data;
186         }
187
188         /**
189          * Gets event type.
190          *
191          * @return The type of the event.
192          */
193         public EventType getType() {
194             return type;
195         }
196     }
197
198     /**
199      * Type of the event.
200      */
201     private enum EventType {
202         REGISTER,
203         DEREGISTER,
204         NOTIFY;
205     }
206
207     /**
208      * Prepare data in printable form and transform it to String.
209      *
210      * @param change
211      *            DataChangeEvent
212      * @return Data in printable form.
213      */
214     private String prepareXmlFrom(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
215         final Document doc = createDocument();
216         final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
217                 "notification");
218         doc.appendChild(notificationElement);
219
220         final Element eventTimeElement = doc.createElement("eventTime");
221         eventTimeElement.setTextContent(toRFC3339(new Date()));
222         notificationElement.appendChild(eventTimeElement);
223
224         final Element dataChangedNotificationEventElement = doc.createElementNS(
225                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
226         addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change);
227         notificationElement.appendChild(dataChangedNotificationEventElement);
228
229         try {
230             final ByteArrayOutputStream out = new ByteArrayOutputStream();
231             final Transformer transformer = FACTORY.newTransformer();
232             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
233             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
234             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
235             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
236             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
237             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
238             final byte[] charData = out.toByteArray();
239             return new String(charData, "UTF-8");
240         } catch (TransformerException | UnsupportedEncodingException e) {
241             final String msg = "Error during transformation of Document into String";
242             LOG.error(msg, e);
243             return msg;
244         }
245     }
246
247     /**
248      * Formats data specified by RFC3339.
249      *
250      * @param d
251      *            Date
252      * @return Data specified by RFC3339.
253      */
254     private String toRFC3339(final Date d) {
255         return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2");
256     }
257
258     /**
259      * Creates {@link Document} document.
260      * @return {@link Document} document.
261      */
262     private Document createDocument() {
263         final DocumentBuilder bob;
264         try {
265             bob = DBF.newDocumentBuilder();
266         } catch (final ParserConfigurationException e) {
267             return null;
268         }
269         return bob.newDocument();
270     }
271
272     /**
273      * Adds values to data changed notification event element.
274      *
275      * @param doc
276      *            {@link Document}
277      * @param dataChangedNotificationEventElement
278      *            {@link Element}
279      * @param change
280      *            {@link AsyncDataChangeEvent}
281      */
282     private void addValuesToDataChangedNotificationEventElement(final Document doc,
283             final Element dataChangedNotificationEventElement,
284             final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
285         addValuesFromDataToElement(doc, change.getCreatedData().keySet(), dataChangedNotificationEventElement,
286                 Operation.CREATED);
287         if (change.getCreatedData().isEmpty()) {
288             addValuesFromDataToElement(doc, change.getUpdatedData().keySet(), dataChangedNotificationEventElement,
289                     Operation.UPDATED);
290         }
291         addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement,
292                 Operation.DELETED);
293     }
294
295     /**
296      * Adds values from data to element.
297      *
298      * @param doc
299      *            {@link Document}
300      * @param data
301      *            Set of {@link YangInstanceIdentifier}.
302      * @param element
303      *            {@link Element}
304      * @param operation
305      *            {@link Operation}
306      */
307     private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data, final Element element,
308             final Operation operation) {
309         if (data == null || data.isEmpty()) {
310             return;
311         }
312         for (final YangInstanceIdentifier path : data) {
313             if (!ControllerContext.getInstance().isNodeMixin(path)) {
314                 final Node node = createDataChangeEventElement(doc, path, operation);
315                 element.appendChild(node);
316             }
317         }
318     }
319
320
321     /**
322      * Creates changed event element from data.
323      *
324      * @param doc
325      *            {@link Document}
326      * @param path
327      *            Path to data in data store.
328      * @param operation
329      *            {@link Operation}
330      * @return {@link Node} node represented by changed event element.
331      */
332     private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path, final Operation operation) {
333         final Element dataChangeEventElement = doc.createElement("data-change-event");
334         final Element pathElement = doc.createElement("path");
335         addPathAsValueToElement(path, pathElement);
336         dataChangeEventElement.appendChild(pathElement);
337
338         final Element operationElement = doc.createElement("operation");
339         operationElement.setTextContent(operation.value);
340         dataChangeEventElement.appendChild(operationElement);
341
342         return dataChangeEventElement;
343     }
344
345
346     /**
347      * Adds path as value to element.
348      *
349      * @param path
350      *            Path to data in data store.
351      * @param element
352      *            {@link Element}
353      */
354     private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) {
355         // Map< key = namespace, value = prefix>
356         final Map<String, String> prefixes = new HashMap<>();
357         final YangInstanceIdentifier normalizedPath = ControllerContext.getInstance().toXpathRepresentation(path);
358         final StringBuilder textContent = new StringBuilder();
359
360         // FIXME: BUG-1281: this is duplicated code from yangtools (BUG-1275)
361         for (final PathArgument pathArgument : normalizedPath.getPathArguments()) {
362             if (pathArgument instanceof YangInstanceIdentifier.AugmentationIdentifier) {
363                 continue;
364             }
365             textContent.append("/");
366             writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes);
367             if (pathArgument instanceof NodeIdentifierWithPredicates) {
368                 final Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
369                 for (final QName keyValue : predicates.keySet()) {
370                     final String predicateValue = String.valueOf(predicates.get(keyValue));
371                     textContent.append("[");
372                     writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes);
373                     textContent.append("='");
374                     textContent.append(predicateValue);
375                     textContent.append("'");
376                     textContent.append("]");
377                 }
378             } else if (pathArgument instanceof NodeWithValue) {
379                 textContent.append("[.='");
380                 textContent.append(((NodeWithValue) pathArgument).getValue());
381                 textContent.append("'");
382                 textContent.append("]");
383             }
384         }
385         element.setTextContent(textContent.toString());
386     }
387
388     /**
389      * Writes identifier that consists of prefix and QName.
390      *
391      * @param element
392      *            {@link Element}
393      * @param textContent
394      *            StringBuilder
395      * @param qName
396      *            QName
397      * @param prefixes
398      *            Map of namespaces and prefixes.
399      */
400     private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
401             final QName qName, final Map<String, String> prefixes) {
402         final String namespace = qName.getNamespace().toString();
403         String prefix = prefixes.get(namespace);
404         if (prefix == null) {
405             prefix = generateNewPrefix(prefixes.values());
406         }
407
408         element.setAttribute("xmlns:" + prefix, namespace);
409         textContent.append(prefix);
410         prefixes.put(namespace, prefix);
411
412         textContent.append(":");
413         textContent.append(qName.getLocalName());
414     }
415
416     /**
417      * Generates new prefix which consists of four random characters <a-z>.
418      *
419      * @param prefixes
420      *            Collection of prefixes.
421      * @return New prefix which consists of four random characters <a-z>.
422      */
423     private static String generateNewPrefix(final Collection<String> prefixes) {
424         StringBuilder result = null;
425         final Random random = new Random();
426         do {
427             result = new StringBuilder();
428             for (int i = 0; i < 4; i++) {
429                 final int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
430                 result.append(Character.toChars(randomNumber));
431             }
432         } while (prefixes.contains(result.toString()));
433
434         return result.toString();
435     }
436
437     /**
438      * Gets path pointed to data in data store.
439      *
440      * @return Path pointed to data in data store.
441      */
442     public YangInstanceIdentifier getPath() {
443         return path;
444     }
445
446     /**
447      * Sets {@link ListenerRegistration} registration.
448      *
449      * @param registration
450      *            ListenerRegistration<DataChangeListener>
451      */
452     public void setRegistration(final ListenerRegistration<DOMDataChangeListener> registration) {
453         this.registration = registration;
454     }
455
456     /**
457      * Gets the name of the stream.
458      *
459      * @return The name of the stream.
460      */
461     public String getStreamName() {
462         return streamName;
463     }
464
465     /**
466      * Removes all subscribers and unregisters event bus change recorder form event bus.
467      */
468     public void close() throws Exception {
469         subscribers = new ConcurrentSet<>();
470         registration.close();
471         registration = null;
472         eventBus.unregister(eventBusChangeRecorder);
473     }
474
475     /**
476      * Checks if {@link ListenerRegistration} registration exist.
477      *
478      * @return True if exist, false otherwise.
479      */
480     public boolean isListening() {
481         return registration == null ? false : true;
482     }
483
484     /**
485      * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into
486      * event bus.
487      *
488      * @param subscriber
489      *            Channel
490      */
491     public void addSubscriber(final Channel subscriber) {
492         if (!subscriber.isActive()) {
493             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
494         }
495         final Event event = new Event(EventType.REGISTER);
496         event.setSubscriber(subscriber);
497         eventBus.post(event);
498     }
499
500     /**
501      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event
502      * into event bus.
503      *
504      * @param subscriber
505      */
506     public void removeSubscriber(final Channel subscriber) {
507         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
508         final Event event = new Event(EventType.DEREGISTER);
509         event.setSubscriber(subscriber);
510         eventBus.post(event);
511     }
512
513     /**
514      * Checks if exists at least one {@link Channel} subscriber.
515      *
516      * @return True if exist at least one {@link Channel} subscriber, false otherwise.
517      */
518     public boolean hasSubscribers() {
519         return !subscribers.isEmpty();
520     }
521
522     /**
523      * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
524      */
525     private static enum Store {
526         CONFIG("config"),
527         OPERATION("operation");
528
529         private final String value;
530
531         private Store(final String value) {
532             this.value = value;
533         }
534     }
535
536     /**
537      * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
538      */
539     private static enum Operation {
540         CREATED("created"),
541         UPDATED("updated"),
542         DELETED("deleted");
543
544         private final String value;
545
546         private Operation(final String value) {
547             this.value = value;
548         }
549     }
550
551 }