Bug 5679 - implement ietf-restconf-monitoring - streams
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / ListenerAdapter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.eventbus.AsyncEventBus;
12 import com.google.common.eventbus.EventBus;
13 import com.google.common.eventbus.Subscribe;
14 import io.netty.channel.Channel;
15 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
16 import io.netty.util.internal.ConcurrentSet;
17 import java.io.ByteArrayOutputStream;
18 import java.io.IOException;
19 import java.io.OutputStreamWriter;
20 import java.io.StringReader;
21 import java.io.UnsupportedEncodingException;
22 import java.nio.charset.StandardCharsets;
23 import java.text.SimpleDateFormat;
24 import java.util.Date;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.Set;
28 import java.util.concurrent.Executors;
29 import java.util.regex.Pattern;
30 import javax.xml.parsers.DocumentBuilder;
31 import javax.xml.parsers.DocumentBuilderFactory;
32 import javax.xml.parsers.ParserConfigurationException;
33 import javax.xml.stream.XMLOutputFactory;
34 import javax.xml.stream.XMLStreamException;
35 import javax.xml.stream.XMLStreamWriter;
36 import javax.xml.transform.OutputKeys;
37 import javax.xml.transform.Transformer;
38 import javax.xml.transform.TransformerException;
39 import javax.xml.transform.TransformerFactory;
40 import javax.xml.transform.dom.DOMResult;
41 import javax.xml.transform.dom.DOMSource;
42 import javax.xml.transform.stream.StreamResult;
43 import javax.xml.xpath.XPath;
44 import javax.xml.xpath.XPathConstants;
45 import javax.xml.xpath.XPathFactory;
46 import org.json.JSONObject;
47 import org.json.XML;
48 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
49 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
50 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
51 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
52 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
53 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
54 import org.opendaylight.restconf.Draft18.MonitoringModule;
55 import org.opendaylight.restconf.handlers.SchemaContextHandler;
56 import org.opendaylight.restconf.handlers.TransactionChainHandler;
57 import org.opendaylight.restconf.parser.IdentifierCodec;
58 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
59 import org.opendaylight.yangtools.concepts.ListenerRegistration;
60 import org.opendaylight.yangtools.yang.common.QName;
61 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
62 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
64 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
65 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
67 import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
68 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
69 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
70 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
71 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
72 import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
73 import org.opendaylight.yangtools.yang.model.api.Module;
74 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78 import org.w3c.dom.Document;
79 import org.w3c.dom.Element;
80 import org.w3c.dom.Node;
81 import org.xml.sax.InputSource;
82
83 /**
84  * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
85  */
86 public class ListenerAdapter implements DOMDataChangeListener {
87
88     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
89     private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
90     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
91     private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
92
93     private static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
94
95     private final YangInstanceIdentifier path;
96     private ListenerRegistration<DOMDataChangeListener> registration;
97     private final String streamName;
98     private Set<Channel> subscribers = new ConcurrentSet<>();
99     private final EventBus eventBus;
100     private final EventBusChangeRecorder eventBusChangeRecorder;
101     private final NotificationOutputType outputType;
102     private Date start = null;
103     private Date stop = null;
104     private String filter = null;
105     private TransactionChainHandler transactionChainHandler;
106     private SchemaContextHandler schemaHandler;
107
108     /**
109      * Creates new {@link ListenerAdapter} listener specified by path and stream
110      * name.
111      *
112      * @param path
113      *            Path to data in data store.
114      * @param streamName
115      *            The name of the stream.
116      * @param outputType
117      *            - type of output on notification (JSON, XML)
118      */
119     ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
120             final NotificationOutputType outputType) {
121         this.outputType = outputType;
122         Preconditions.checkNotNull(path);
123         Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
124         this.path = path;
125         this.streamName = streamName;
126         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
127         this.eventBusChangeRecorder = new EventBusChangeRecorder();
128         this.eventBus.register(this.eventBusChangeRecorder);
129     }
130
131     @Override
132     public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
133         final Date now = new Date();
134         if (this.stop != null) {
135             if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
136                 checkFilter(change);
137             }
138             if (this.stop.compareTo(now) < 0) {
139                 try {
140                     this.close();
141                 } catch (final Exception e) {
142                     throw new RestconfDocumentedException("Problem with unregister listener." + e);
143                 }
144             }
145         } else if (this.start != null) {
146             if (this.start.compareTo(now) < 0) {
147                 this.start = null;
148                 checkFilter(change);
149             }
150         } else {
151             checkFilter(change);
152         }
153     }
154
155     /**
156      * Check if is filter used and then prepare and post data do client
157      *
158      * @param change
159      *            - data of notification
160      */
161     private void checkFilter(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
162         final String xml = prepareXmlFrom(change);
163         if (this.filter == null) {
164             prepareAndPostData(xml);
165         } else {
166             try {
167                 if (parseFilterParam(xml)) {
168                     prepareAndPostData(xml);
169                 }
170             } catch (final Exception e) {
171                 throw new RestconfDocumentedException("Problem while parsing filter.", e);
172             }
173         }
174     }
175
176     /**
177      * Parse and evaluate filter value by xml
178      *
179      * @param xml
180      *            - notification data in xml
181      * @return true or false - depends on filter expression and data of
182      *         notifiaction
183      * @throws Exception
184      */
185     private boolean parseFilterParam(final String xml) throws Exception {
186         final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
187         final DocumentBuilder builder = factory.newDocumentBuilder();
188         final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
189         final XPath xPath = XPathFactory.newInstance().newXPath();
190         return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
191     }
192
193     /**
194      * Prepare data of notification and data to client
195      *
196      * @param xml
197      */
198     private void prepareAndPostData(final String xml) {
199             final Event event = new Event(EventType.NOTIFY);
200             if (this.outputType.equals(NotificationOutputType.JSON)) {
201                 final JSONObject jsonObject = XML.toJSONObject(xml);
202                 event.setData(jsonObject.toString());
203             } else {
204                 event.setData(xml);
205             }
206             this.eventBus.post(event);
207     }
208
209     /**
210      * Tracks events of data change by customer.
211      */
212     private final class EventBusChangeRecorder {
213         @Subscribe
214         public void recordCustomerChange(final Event event) {
215             if (event.getType() == EventType.REGISTER) {
216                 final Channel subscriber = event.getSubscriber();
217                 if (!ListenerAdapter.this.subscribers.contains(subscriber)) {
218                     ListenerAdapter.this.subscribers.add(subscriber);
219                 }
220             } else if (event.getType() == EventType.DEREGISTER) {
221                 ListenerAdapter.this.subscribers.remove(event.getSubscriber());
222                 Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
223             } else if (event.getType() == EventType.NOTIFY) {
224                 for (final Channel subscriber : ListenerAdapter.this.subscribers) {
225                     if (subscriber.isActive()) {
226                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
227                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
228                     } else {
229                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
230                         ListenerAdapter.this.subscribers.remove(subscriber);
231                     }
232                 }
233             }
234         }
235     }
236
237     /**
238      * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber.
239      */
240     private final class Event {
241         private final EventType type;
242         private Channel subscriber;
243         private String data;
244
245         /**
246          * Creates new event specified by {@link EventType} type.
247          *
248          * @param type
249          *            EventType
250          */
251         public Event(final EventType type) {
252             this.type = type;
253         }
254
255         /**
256          * Gets the {@link Channel} subscriber.
257          *
258          * @return Channel
259          */
260         public Channel getSubscriber() {
261             return this.subscriber;
262         }
263
264         /**
265          * Sets subscriber for event.
266          *
267          * @param subscriber
268          *            Channel
269          */
270         public void setSubscriber(final Channel subscriber) {
271             this.subscriber = subscriber;
272         }
273
274         /**
275          * Gets event String.
276          *
277          * @return String representation of event data.
278          */
279         public String getData() {
280             return this.data;
281         }
282
283         /**
284          * Sets event data.
285          *
286          * @param data String.
287          */
288         public void setData(final String data) {
289             this.data = data;
290         }
291
292         /**
293          * Gets event type.
294          *
295          * @return The type of the event.
296          */
297         public EventType getType() {
298             return this.type;
299         }
300     }
301
302     /**
303      * Type of the event.
304      */
305     private enum EventType {
306         REGISTER,
307         DEREGISTER,
308         NOTIFY;
309     }
310
311     /**
312      * Prepare data in printable form and transform it to String.
313      *
314      * @param change
315      *            DataChangeEvent
316      * @return Data in printable form.
317      */
318     private String prepareXmlFrom(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
319         final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
320         final DataSchemaContextTree dataContextTree =  DataSchemaContextTree.from(schemaContext);
321         final Document doc = createDocument();
322         final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
323                 "notification");
324
325         doc.appendChild(notificationElement);
326
327         final Element eventTimeElement = doc.createElement("eventTime");
328         eventTimeElement.setTextContent(toRFC3339(new Date()));
329         notificationElement.appendChild(eventTimeElement);
330
331         final Element dataChangedNotificationEventElement = doc.createElementNS(
332                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
333
334         addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change,
335                 schemaContext, dataContextTree);
336         notificationElement.appendChild(dataChangedNotificationEventElement);
337
338         try {
339             final ByteArrayOutputStream out = new ByteArrayOutputStream();
340             final Transformer transformer = FACTORY.newTransformer();
341             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
342             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
343             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
344             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
345             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
346             transformer.transform(new DOMSource(doc),
347                     new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
348             final byte[] charData = out.toByteArray();
349             return new String(charData, "UTF-8");
350         } catch (TransformerException | UnsupportedEncodingException e) {
351             final String msg = "Error during transformation of Document into String";
352             LOG.error(msg, e);
353             return msg;
354         }
355     }
356
357     /**
358      * Formats data specified by RFC3339.
359      *
360      * @param d
361      *            Date
362      * @return Data specified by RFC3339.
363      */
364     public static String toRFC3339(final Date d) {
365         return RFC3339_PATTERN.matcher(RFC3339.format(d)).replaceAll("$1:$2");
366     }
367
368     /**
369      * Creates {@link Document} document.
370      * @return {@link Document} document.
371      */
372     public static Document createDocument() {
373         final DocumentBuilder bob;
374         try {
375             bob = DBF.newDocumentBuilder();
376         } catch (final ParserConfigurationException e) {
377             return null;
378         }
379         return bob.newDocument();
380     }
381
382     /**
383      * Adds values to data changed notification event element.
384      *
385      * @param doc
386      *            {@link Document}
387      * @param dataChangedNotificationEventElement
388      *            {@link Element}
389      * @param change
390      *            {@link AsyncDataChangeEvent}
391      */
392     private void addValuesToDataChangedNotificationEventElement(final Document doc,
393             final Element dataChangedNotificationEventElement,
394             final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change,
395             final SchemaContext  schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
396
397         addCreatedChangedValuesFromDataToElement(doc, change.getCreatedData().entrySet(),
398                 dataChangedNotificationEventElement,
399                 Operation.CREATED, schemaContext, dataSchemaContextTree);
400
401         addCreatedChangedValuesFromDataToElement(doc, change.getUpdatedData().entrySet(),
402                     dataChangedNotificationEventElement,
403                     Operation.UPDATED, schemaContext, dataSchemaContextTree);
404
405         addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement,
406                 Operation.DELETED);
407     }
408
409     /**
410      * Adds values from data to element.
411      *
412      * @param doc
413      *            {@link Document}
414      * @param data
415      *            Set of {@link YangInstanceIdentifier}.
416      * @param element
417      *            {@link Element}
418      * @param operation
419      *            {@link Operation}
420      */
421     private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data,
422             final Element element, final Operation operation) {
423         if ((data == null) || data.isEmpty()) {
424             return;
425         }
426         for (final YangInstanceIdentifier path : data) {
427             if (!ControllerContext.getInstance().isNodeMixin(path)) {
428                 final Node node = createDataChangeEventElement(doc, path, operation);
429                 element.appendChild(node);
430             }
431         }
432     }
433
434     private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set<Entry<YangInstanceIdentifier,
435                 NormalizedNode<?,?>>> data, final Element element, final Operation operation, final SchemaContext
436             schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
437         if ((data == null) || data.isEmpty()) {
438             return;
439         }
440         for (final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
441             if (!ControllerContext.getInstance().isNodeMixin(entry.getKey())) {
442                 final Node node = createCreatedChangedDataChangeEventElement(doc, entry, operation, schemaContext,
443                         dataSchemaContextTree);
444                 element.appendChild(node);
445             }
446         }
447     }
448
449     /**
450      * Creates changed event element from data.
451      *
452      * @param doc
453      *            {@link Document}
454      * @param path
455      *            Path to data in data store.
456      * @param operation
457      *            {@link Operation}
458      * @return {@link Node} node represented by changed event element.
459      */
460     private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path,
461             final Operation operation) {
462         final Element dataChangeEventElement = doc.createElement("data-change-event");
463         final Element pathElement = doc.createElement("path");
464         addPathAsValueToElement(path, pathElement);
465         dataChangeEventElement.appendChild(pathElement);
466
467         final Element operationElement = doc.createElement("operation");
468         operationElement.setTextContent(operation.value);
469         dataChangeEventElement.appendChild(operationElement);
470
471         return dataChangeEventElement;
472     }
473
474     private Node createCreatedChangedDataChangeEventElement(final Document doc, final Entry<YangInstanceIdentifier,
475             NormalizedNode<?, ?>> entry, final Operation operation, final SchemaContext
476             schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
477         final Element dataChangeEventElement = doc.createElement("data-change-event");
478         final Element pathElement = doc.createElement("path");
479         final YangInstanceIdentifier path = entry.getKey();
480         addPathAsValueToElement(path, pathElement);
481         dataChangeEventElement.appendChild(pathElement);
482
483         final Element operationElement = doc.createElement("operation");
484         operationElement.setTextContent(operation.value);
485         dataChangeEventElement.appendChild(operationElement);
486
487         try {
488             final DOMResult domResult = writeNormalizedNode(entry.getValue(), path,
489                     schemaContext, dataSchemaContextTree);
490             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
491             final Element dataElement = doc.createElement("data");
492             dataElement.appendChild(result);
493             dataChangeEventElement.appendChild(dataElement);
494         } catch (final IOException e) {
495             LOG.error("Error in writer ", e);
496         } catch (final XMLStreamException e) {
497             LOG.error("Error processing stream", e);
498         }
499
500         return dataChangeEventElement;
501     }
502
503     private static DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized,
504                                                  final YangInstanceIdentifier path, final SchemaContext context,
505                                                  final DataSchemaContextTree dataSchemaContextTree)
506             throws IOException, XMLStreamException {
507         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
508         final Document doc = XmlDocumentUtils.getDocument();
509         final DOMResult result = new DOMResult(doc);
510         NormalizedNodeWriter normalizedNodeWriter = null;
511         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
512         XMLStreamWriter writer = null;
513         final SchemaPath nodePath;
514
515         if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) {
516             nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
517         } else {
518             nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
519         }
520
521         try {
522             writer = XML_FACTORY.createXMLStreamWriter(result);
523             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, nodePath);
524             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
525
526             normalizedNodeWriter.write(normalized);
527
528             normalizedNodeWriter.flush();
529         } finally {
530             if (normalizedNodeWriter != null) {
531                 normalizedNodeWriter.close();
532             }
533             if (normalizedNodeStreamWriter != null) {
534                 normalizedNodeStreamWriter.close();
535             }
536             if (writer != null) {
537                 writer.close();
538             }
539         }
540
541         return result;
542     }
543
544     /**
545      * Adds path as value to element.
546      *
547      * @param path
548      *            Path to data in data store.
549      * @param element
550      *            {@link Element}
551      */
552     private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) {
553         final YangInstanceIdentifier normalizedPath = ControllerContext.getInstance().toXpathRepresentation(path);
554         final StringBuilder textContent = new StringBuilder();
555
556         for (final PathArgument pathArgument : normalizedPath.getPathArguments()) {
557             if (pathArgument instanceof YangInstanceIdentifier.AugmentationIdentifier) {
558                 continue;
559             }
560             textContent.append("/");
561             writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType());
562             if (pathArgument instanceof NodeIdentifierWithPredicates) {
563                 final Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
564                 for (final QName keyValue : predicates.keySet()) {
565                     final String predicateValue = String.valueOf(predicates.get(keyValue));
566                     textContent.append("[");
567                     writeIdentifierWithNamespacePrefix(element, textContent, keyValue);
568                     textContent.append("='");
569                     textContent.append(predicateValue);
570                     textContent.append("'");
571                     textContent.append("]");
572                 }
573             } else if (pathArgument instanceof NodeWithValue) {
574                 textContent.append("[.='");
575                 textContent.append(((NodeWithValue) pathArgument).getValue());
576                 textContent.append("'");
577                 textContent.append("]");
578             }
579         }
580         element.setTextContent(textContent.toString());
581     }
582
583     /**
584      * Writes identifier that consists of prefix and QName.
585      *
586      * @param element
587      *            {@link Element}
588      * @param textContent
589      *            StringBuilder
590      * @param qName
591      *            QName
592      */
593     private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
594             final QName qName) {
595         final Module module = ControllerContext.getInstance().getGlobalSchema()
596                 .findModuleByNamespaceAndRevision(qName.getNamespace(), qName.getRevision());
597
598         textContent.append(module.getName());
599         textContent.append(":");
600         textContent.append(qName.getLocalName());
601     }
602
603     /**
604      * Gets path pointed to data in data store.
605      *
606      * @return Path pointed to data in data store.
607      */
608     public YangInstanceIdentifier getPath() {
609         return this.path;
610     }
611
612     /**
613      * Sets {@link ListenerRegistration} registration.
614      *
615      * @param registration DOMDataChangeListener registration
616      */
617     public void setRegistration(final ListenerRegistration<DOMDataChangeListener> registration) {
618         this.registration = registration;
619     }
620
621     /**
622      * Gets the name of the stream.
623      *
624      * @return The name of the stream.
625      */
626     public String getStreamName() {
627         return this.streamName;
628     }
629
630     /**
631      * Removes all subscribers and unregisters event bus change recorder form
632      * event bus and delete data in DS
633      */
634     public void close() throws Exception {
635         final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction();
636         wTx.delete(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY
637                         + this.path.getLastPathArgument().getNodeType().getLocalName(), this.schemaHandler.get()));
638         wTx.submit().checkedGet();
639
640         this.subscribers = new ConcurrentSet<>();
641         this.registration.close();
642         this.registration = null;
643         this.eventBus.unregister(this.eventBusChangeRecorder);
644     }
645
646     /**
647      * Checks if {@link ListenerRegistration} registration exist.
648      *
649      * @return True if exist, false otherwise.
650      */
651     public boolean isListening() {
652         return this.registration == null ? false : true;
653     }
654
655     /**
656      * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into
657      * event bus.
658      *
659      * @param subscriber
660      *            Channel
661      */
662     public void addSubscriber(final Channel subscriber) {
663         if (!subscriber.isActive()) {
664             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
665         }
666         final Event event = new Event(EventType.REGISTER);
667         event.setSubscriber(subscriber);
668         this.eventBus.post(event);
669     }
670
671     /**
672      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event
673      * into event bus.
674      *
675      * @param subscriber
676      */
677     public void removeSubscriber(final Channel subscriber) {
678         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
679         final Event event = new Event(EventType.DEREGISTER);
680         event.setSubscriber(subscriber);
681         this.eventBus.post(event);
682     }
683
684     /**
685      * Checks if exists at least one {@link Channel} subscriber.
686      *
687      * @return True if exist at least one {@link Channel} subscriber, false otherwise.
688      */
689     public boolean hasSubscribers() {
690         return !this.subscribers.isEmpty();
691     }
692
693     /**
694      * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
695      */
696     private static enum Store {
697         CONFIG("config"),
698         OPERATION("operation");
699
700         private final String value;
701
702         private Store(final String value) {
703             this.value = value;
704         }
705     }
706
707     /**
708      * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
709      */
710     private static enum Operation {
711         CREATED("created"),
712         UPDATED("updated"),
713         DELETED("deleted");
714
715         private final String value;
716
717         private Operation(final String value) {
718             this.value = value;
719         }
720     }
721
722     /**
723      * Set query parameters for listener
724      *
725      * @param start
726      *            - start-time of getting notification
727      * @param stop
728      *            - stop-time of getting notification
729      * @param filter
730      *            - indicate which subset of all possible events are of interest
731      */
732     public void setQueryParams(final Date start, final Date stop, final String filter) {
733         this.start = start;
734         this.stop = stop;
735         this.filter = filter;
736     }
737
738     /**
739      * Get output type
740      *
741      * @return outputType
742      */
743     public String getOutputType() {
744         return this.outputType.getName();
745     }
746
747     /**
748      * Transaction chain to delete data in DS on close()
749      *
750      * @param transactionChainHandler
751      *            - creating new write transaction to delete data on close
752      * @param schemaHandler
753      *            - for getting schema to deserialize
754      *            {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to
755      *            {@link YangInstanceIdentifier}
756      */
757     public void setCloseVars(final TransactionChainHandler transactionChainHandler,
758             final SchemaContextHandler schemaHandler) {
759         this.transactionChainHandler = transactionChainHandler;
760         this.schemaHandler = schemaHandler;
761     }
762
763 }