ccc30a5616c82b7c082b17048afdc8c493557ec3
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / ListenerAdapter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.eventbus.AsyncEventBus;
12 import com.google.common.eventbus.EventBus;
13 import com.google.common.eventbus.Subscribe;
14 import io.netty.channel.Channel;
15 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
16 import io.netty.util.internal.ConcurrentSet;
17 import java.io.ByteArrayOutputStream;
18 import java.io.IOException;
19 import java.io.OutputStreamWriter;
20 import java.io.StringReader;
21 import java.io.UnsupportedEncodingException;
22 import java.nio.charset.StandardCharsets;
23 import java.text.SimpleDateFormat;
24 import java.util.Collection;
25 import java.util.Date;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.Random;
29 import java.util.Set;
30 import java.util.concurrent.Executors;
31 import java.util.regex.Pattern;
32 import javax.xml.parsers.DocumentBuilder;
33 import javax.xml.parsers.DocumentBuilderFactory;
34 import javax.xml.parsers.ParserConfigurationException;
35 import javax.xml.stream.XMLOutputFactory;
36 import javax.xml.stream.XMLStreamException;
37 import javax.xml.stream.XMLStreamWriter;
38 import javax.xml.transform.OutputKeys;
39 import javax.xml.transform.Transformer;
40 import javax.xml.transform.TransformerException;
41 import javax.xml.transform.TransformerFactory;
42 import javax.xml.transform.dom.DOMResult;
43 import javax.xml.transform.dom.DOMSource;
44 import javax.xml.transform.stream.StreamResult;
45 import javax.xml.xpath.XPath;
46 import javax.xml.xpath.XPathConstants;
47 import javax.xml.xpath.XPathFactory;
48 import org.json.JSONObject;
49 import org.json.XML;
50 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
51 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
52 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
53 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
54 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
55 import org.opendaylight.yangtools.concepts.ListenerRegistration;
56 import org.opendaylight.yangtools.yang.common.QName;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
59 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
60 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
61 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
62 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
63 import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
64 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
65 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
66 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
67 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
68 import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
69 import org.opendaylight.yangtools.yang.model.api.Module;
70 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
71 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
74 import org.w3c.dom.Document;
75 import org.w3c.dom.Element;
76 import org.w3c.dom.Node;
77 import org.xml.sax.InputSource;
78
79 /**
80  * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
81  */
82 public class ListenerAdapter implements DOMDataChangeListener {
83
84     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
85     private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
86     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
87     private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
88
89     private static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
90
91     private final YangInstanceIdentifier path;
92     private ListenerRegistration<DOMDataChangeListener> registration;
93     private final String streamName;
94     private Set<Channel> subscribers = new ConcurrentSet<>();
95     private final EventBus eventBus;
96     private final EventBusChangeRecorder eventBusChangeRecorder;
97     private final NotificationOutputType outputType;
98     private Date start = null;
99     private Date stop = null;
100     private String filter = null;
101
102     /**
103      * Creates new {@link ListenerAdapter} listener specified by path and stream
104      * name.
105      *
106      * @param path
107      *            Path to data in data store.
108      * @param streamName
109      *            The name of the stream.
110      * @param outputType
111      *            - type of output on notification (JSON, XML)
112      */
113     ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
114             final NotificationOutputType outputType) {
115         this.outputType = outputType;
116         Preconditions.checkNotNull(path);
117         Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
118         this.path = path;
119         this.streamName = streamName;
120         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
121         this.eventBusChangeRecorder = new EventBusChangeRecorder();
122         this.eventBus.register(this.eventBusChangeRecorder);
123     }
124
125     @Override
126     public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
127         final Date now = new Date();
128         if (this.stop != null) {
129             if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
130                 checkFilter(change);
131             }
132             if (this.stop.compareTo(now) < 0) {
133                 try {
134                     this.close();
135                 } catch (final Exception e) {
136                     throw new RestconfDocumentedException("Problem with unregister listener." + e);
137                 }
138             }
139         } else if (this.start != null) {
140             if (this.start.compareTo(now) < 0) {
141                 this.start = null;
142                 checkFilter(change);
143             }
144         } else {
145             checkFilter(change);
146         }
147     }
148
149     /**
150      * Check if is filter used and then prepare and post data do client
151      *
152      * @param change
153      *            - data of notification
154      */
155     private void checkFilter(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
156         final String xml = prepareXmlFrom(change);
157         if (this.filter == null) {
158             prepareAndPostData(xml);
159         } else {
160             try {
161                 if (parseFilterParam(xml)) {
162                     prepareAndPostData(xml);
163                 }
164             } catch (final Exception e) {
165                 throw new RestconfDocumentedException("Problem while parsing filter.", e);
166             }
167         }
168     }
169
170     /**
171      * Parse and evaluate filter value by xml
172      *
173      * @param xml
174      *            - notification data in xml
175      * @return true or false - depends on filter expression and data of
176      *         notifiaction
177      * @throws Exception
178      */
179     private boolean parseFilterParam(final String xml) throws Exception {
180         final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
181         final DocumentBuilder builder = factory.newDocumentBuilder();
182         final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
183         final XPath xPath = XPathFactory.newInstance().newXPath();
184         return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
185     }
186
187     /**
188      * Prepare data of notification and data to client
189      *
190      * @param xml
191      */
192     private void prepareAndPostData(final String xml) {
193             final Event event = new Event(EventType.NOTIFY);
194             if (this.outputType.equals(NotificationOutputType.JSON)) {
195                 final JSONObject jsonObject = XML.toJSONObject(xml);
196                 event.setData(jsonObject.toString());
197             } else {
198                 event.setData(xml);
199             }
200             this.eventBus.post(event);
201     }
202
203     /**
204      * Tracks events of data change by customer.
205      */
206     private final class EventBusChangeRecorder {
207         @Subscribe
208         public void recordCustomerChange(final Event event) {
209             if (event.getType() == EventType.REGISTER) {
210                 final Channel subscriber = event.getSubscriber();
211                 if (!ListenerAdapter.this.subscribers.contains(subscriber)) {
212                     ListenerAdapter.this.subscribers.add(subscriber);
213                 }
214             } else if (event.getType() == EventType.DEREGISTER) {
215                 ListenerAdapter.this.subscribers.remove(event.getSubscriber());
216                 Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
217             } else if (event.getType() == EventType.NOTIFY) {
218                 for (final Channel subscriber : ListenerAdapter.this.subscribers) {
219                     if (subscriber.isActive()) {
220                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
221                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
222                     } else {
223                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
224                         ListenerAdapter.this.subscribers.remove(subscriber);
225                     }
226                 }
227             }
228         }
229     }
230
231     /**
232      * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber.
233      */
234     private final class Event {
235         private final EventType type;
236         private Channel subscriber;
237         private String data;
238
239         /**
240          * Creates new event specified by {@link EventType} type.
241          *
242          * @param type
243          *            EventType
244          */
245         public Event(final EventType type) {
246             this.type = type;
247         }
248
249         /**
250          * Gets the {@link Channel} subscriber.
251          *
252          * @return Channel
253          */
254         public Channel getSubscriber() {
255             return this.subscriber;
256         }
257
258         /**
259          * Sets subscriber for event.
260          *
261          * @param subscriber
262          *            Channel
263          */
264         public void setSubscriber(final Channel subscriber) {
265             this.subscriber = subscriber;
266         }
267
268         /**
269          * Gets event String.
270          *
271          * @return String representation of event data.
272          */
273         public String getData() {
274             return this.data;
275         }
276
277         /**
278          * Sets event data.
279          *
280          * @param data String.
281          */
282         public void setData(final String data) {
283             this.data = data;
284         }
285
286         /**
287          * Gets event type.
288          *
289          * @return The type of the event.
290          */
291         public EventType getType() {
292             return this.type;
293         }
294     }
295
296     /**
297      * Type of the event.
298      */
299     private enum EventType {
300         REGISTER,
301         DEREGISTER,
302         NOTIFY;
303     }
304
305     /**
306      * Prepare data in printable form and transform it to String.
307      *
308      * @param change
309      *            DataChangeEvent
310      * @return Data in printable form.
311      */
312     private String prepareXmlFrom(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
313         final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
314         final DataSchemaContextTree dataContextTree =  DataSchemaContextTree.from(schemaContext);
315         final Document doc = createDocument();
316         final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
317                 "notification");
318
319         doc.appendChild(notificationElement);
320
321         final Element eventTimeElement = doc.createElement("eventTime");
322         eventTimeElement.setTextContent(toRFC3339(new Date()));
323         notificationElement.appendChild(eventTimeElement);
324
325         final Element dataChangedNotificationEventElement = doc.createElementNS(
326                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
327
328         addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change,
329                 schemaContext, dataContextTree);
330         notificationElement.appendChild(dataChangedNotificationEventElement);
331
332         try {
333             final ByteArrayOutputStream out = new ByteArrayOutputStream();
334             final Transformer transformer = FACTORY.newTransformer();
335             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
336             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
337             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
338             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
339             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
340             transformer.transform(new DOMSource(doc),
341                     new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
342             final byte[] charData = out.toByteArray();
343             return new String(charData, "UTF-8");
344         } catch (TransformerException | UnsupportedEncodingException e) {
345             final String msg = "Error during transformation of Document into String";
346             LOG.error(msg, e);
347             return msg;
348         }
349     }
350
351     /**
352      * Formats data specified by RFC3339.
353      *
354      * @param d
355      *            Date
356      * @return Data specified by RFC3339.
357      */
358     public static String toRFC3339(final Date d) {
359         return RFC3339_PATTERN.matcher(RFC3339.format(d)).replaceAll("$1:$2");
360     }
361
362     /**
363      * Creates {@link Document} document.
364      * @return {@link Document} document.
365      */
366     public static Document createDocument() {
367         final DocumentBuilder bob;
368         try {
369             bob = DBF.newDocumentBuilder();
370         } catch (final ParserConfigurationException e) {
371             return null;
372         }
373         return bob.newDocument();
374     }
375
376     /**
377      * Adds values to data changed notification event element.
378      *
379      * @param doc
380      *            {@link Document}
381      * @param dataChangedNotificationEventElement
382      *            {@link Element}
383      * @param change
384      *            {@link AsyncDataChangeEvent}
385      */
386     private void addValuesToDataChangedNotificationEventElement(final Document doc,
387             final Element dataChangedNotificationEventElement,
388             final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change,
389             final SchemaContext  schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
390
391         addCreatedChangedValuesFromDataToElement(doc, change.getCreatedData().entrySet(),
392                 dataChangedNotificationEventElement,
393                 Operation.CREATED, schemaContext, dataSchemaContextTree);
394
395         addCreatedChangedValuesFromDataToElement(doc, change.getUpdatedData().entrySet(),
396                     dataChangedNotificationEventElement,
397                     Operation.UPDATED, schemaContext, dataSchemaContextTree);
398
399         addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement,
400                 Operation.DELETED);
401     }
402
403     /**
404      * Adds values from data to element.
405      *
406      * @param doc
407      *            {@link Document}
408      * @param data
409      *            Set of {@link YangInstanceIdentifier}.
410      * @param element
411      *            {@link Element}
412      * @param operation
413      *            {@link Operation}
414      */
415     private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data,
416             final Element element, final Operation operation) {
417         if ((data == null) || data.isEmpty()) {
418             return;
419         }
420         for (final YangInstanceIdentifier path : data) {
421             if (!ControllerContext.getInstance().isNodeMixin(path)) {
422                 final Node node = createDataChangeEventElement(doc, path, operation);
423                 element.appendChild(node);
424             }
425         }
426     }
427
428     private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set<Entry<YangInstanceIdentifier,
429                 NormalizedNode<?,?>>> data, final Element element, final Operation operation, final SchemaContext
430             schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
431         if ((data == null) || data.isEmpty()) {
432             return;
433         }
434         for (final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
435             if (!ControllerContext.getInstance().isNodeMixin(entry.getKey())) {
436                 final Node node = createCreatedChangedDataChangeEventElement(doc, entry, operation, schemaContext,
437                         dataSchemaContextTree);
438                 element.appendChild(node);
439             }
440         }
441     }
442
443     /**
444      * Creates changed event element from data.
445      *
446      * @param doc
447      *            {@link Document}
448      * @param path
449      *            Path to data in data store.
450      * @param operation
451      *            {@link Operation}
452      * @return {@link Node} node represented by changed event element.
453      */
454     private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path,
455             final Operation operation) {
456         final Element dataChangeEventElement = doc.createElement("data-change-event");
457         final Element pathElement = doc.createElement("path");
458         addPathAsValueToElement(path, pathElement);
459         dataChangeEventElement.appendChild(pathElement);
460
461         final Element operationElement = doc.createElement("operation");
462         operationElement.setTextContent(operation.value);
463         dataChangeEventElement.appendChild(operationElement);
464
465         return dataChangeEventElement;
466     }
467
468     private Node createCreatedChangedDataChangeEventElement(final Document doc, final Entry<YangInstanceIdentifier,
469             NormalizedNode<?, ?>> entry, final Operation operation, final SchemaContext
470             schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
471         final Element dataChangeEventElement = doc.createElement("data-change-event");
472         final Element pathElement = doc.createElement("path");
473         final YangInstanceIdentifier path = entry.getKey();
474         addPathAsValueToElement(path, pathElement);
475         dataChangeEventElement.appendChild(pathElement);
476
477         final Element operationElement = doc.createElement("operation");
478         operationElement.setTextContent(operation.value);
479         dataChangeEventElement.appendChild(operationElement);
480
481         try {
482             final DOMResult domResult = writeNormalizedNode(entry.getValue(), path,
483                     schemaContext, dataSchemaContextTree);
484             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
485             final Element dataElement = doc.createElement("data");
486             dataElement.appendChild(result);
487             dataChangeEventElement.appendChild(dataElement);
488         } catch (final IOException e) {
489             LOG.error("Error in writer ", e);
490         } catch (final XMLStreamException e) {
491             LOG.error("Error processing stream", e);
492         }
493
494         return dataChangeEventElement;
495     }
496
497     private static DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized,
498                                                  final YangInstanceIdentifier path, final SchemaContext context,
499                                                  final DataSchemaContextTree dataSchemaContextTree)
500             throws IOException, XMLStreamException {
501         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
502         final Document doc = XmlDocumentUtils.getDocument();
503         final DOMResult result = new DOMResult(doc);
504         NormalizedNodeWriter normalizedNodeWriter = null;
505         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
506         XMLStreamWriter writer = null;
507         final SchemaPath nodePath;
508
509         if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) {
510             nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
511         } else {
512             nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
513         }
514
515         try {
516             writer = XML_FACTORY.createXMLStreamWriter(result);
517             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, nodePath);
518             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
519
520             normalizedNodeWriter.write(normalized);
521
522             normalizedNodeWriter.flush();
523         } finally {
524             if (normalizedNodeWriter != null) {
525                 normalizedNodeWriter.close();
526             }
527             if (normalizedNodeStreamWriter != null) {
528                 normalizedNodeStreamWriter.close();
529             }
530             if (writer != null) {
531                 writer.close();
532             }
533         }
534
535         return result;
536     }
537
538     /**
539      * Adds path as value to element.
540      *
541      * @param path
542      *            Path to data in data store.
543      * @param element
544      *            {@link Element}
545      */
546     private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) {
547         final YangInstanceIdentifier normalizedPath = ControllerContext.getInstance().toXpathRepresentation(path);
548         final StringBuilder textContent = new StringBuilder();
549
550         for (final PathArgument pathArgument : normalizedPath.getPathArguments()) {
551             if (pathArgument instanceof YangInstanceIdentifier.AugmentationIdentifier) {
552                 continue;
553             }
554             textContent.append("/");
555             writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType());
556             if (pathArgument instanceof NodeIdentifierWithPredicates) {
557                 final Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
558                 for (final QName keyValue : predicates.keySet()) {
559                     final String predicateValue = String.valueOf(predicates.get(keyValue));
560                     textContent.append("[");
561                     writeIdentifierWithNamespacePrefix(element, textContent, keyValue);
562                     textContent.append("='");
563                     textContent.append(predicateValue);
564                     textContent.append("'");
565                     textContent.append("]");
566                 }
567             } else if (pathArgument instanceof NodeWithValue) {
568                 textContent.append("[.='");
569                 textContent.append(((NodeWithValue) pathArgument).getValue());
570                 textContent.append("'");
571                 textContent.append("]");
572             }
573         }
574         element.setTextContent(textContent.toString());
575     }
576
577     /**
578      * Writes identifier that consists of prefix and QName.
579      *
580      * @param element
581      *            {@link Element}
582      * @param textContent
583      *            StringBuilder
584      * @param qName
585      *            QName
586      */
587     private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
588             final QName qName) {
589         final Module module = ControllerContext.getInstance().getGlobalSchema()
590                 .findModuleByNamespaceAndRevision(qName.getNamespace(), qName.getRevision());
591
592         textContent.append(module.getName());
593         textContent.append(":");
594         textContent.append(qName.getLocalName());
595     }
596
597     /**
598      * Generates new prefix which consists of four random characters <a-z>.
599      *
600      * @param prefixes
601      *            Collection of prefixes.
602      * @return New prefix which consists of four random characters <a-z>.
603      */
604     private static String generateNewPrefix(final Collection<String> prefixes) {
605         StringBuilder result = null;
606         final Random random = new Random();
607         do {
608             result = new StringBuilder();
609             for (int i = 0; i < 4; i++) {
610                 final int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
611                 result.append(Character.toChars(randomNumber));
612             }
613         } while (prefixes.contains(result.toString()));
614
615         return result.toString();
616     }
617
618     /**
619      * Gets path pointed to data in data store.
620      *
621      * @return Path pointed to data in data store.
622      */
623     public YangInstanceIdentifier getPath() {
624         return this.path;
625     }
626
627     /**
628      * Sets {@link ListenerRegistration} registration.
629      *
630      * @param registration DOMDataChangeListener registration
631      */
632     public void setRegistration(final ListenerRegistration<DOMDataChangeListener> registration) {
633         this.registration = registration;
634     }
635
636     /**
637      * Gets the name of the stream.
638      *
639      * @return The name of the stream.
640      */
641     public String getStreamName() {
642         return this.streamName;
643     }
644
645     /**
646      * Removes all subscribers and unregisters event bus change recorder form event bus.
647      */
648     public void close() throws Exception {
649         this.subscribers = new ConcurrentSet<>();
650         this.registration.close();
651         this.registration = null;
652         this.eventBus.unregister(this.eventBusChangeRecorder);
653     }
654
655     /**
656      * Checks if {@link ListenerRegistration} registration exist.
657      *
658      * @return True if exist, false otherwise.
659      */
660     public boolean isListening() {
661         return this.registration == null ? false : true;
662     }
663
664     /**
665      * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into
666      * event bus.
667      *
668      * @param subscriber
669      *            Channel
670      */
671     public void addSubscriber(final Channel subscriber) {
672         if (!subscriber.isActive()) {
673             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
674         }
675         final Event event = new Event(EventType.REGISTER);
676         event.setSubscriber(subscriber);
677         this.eventBus.post(event);
678     }
679
680     /**
681      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event
682      * into event bus.
683      *
684      * @param subscriber
685      */
686     public void removeSubscriber(final Channel subscriber) {
687         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
688         final Event event = new Event(EventType.DEREGISTER);
689         event.setSubscriber(subscriber);
690         this.eventBus.post(event);
691     }
692
693     /**
694      * Checks if exists at least one {@link Channel} subscriber.
695      *
696      * @return True if exist at least one {@link Channel} subscriber, false otherwise.
697      */
698     public boolean hasSubscribers() {
699         return !this.subscribers.isEmpty();
700     }
701
702     /**
703      * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
704      */
705     private static enum Store {
706         CONFIG("config"),
707         OPERATION("operation");
708
709         private final String value;
710
711         private Store(final String value) {
712             this.value = value;
713         }
714     }
715
716     /**
717      * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
718      */
719     private static enum Operation {
720         CREATED("created"),
721         UPDATED("updated"),
722         DELETED("deleted");
723
724         private final String value;
725
726         private Operation(final String value) {
727             this.value = value;
728         }
729     }
730
731     /**
732      * Set query parameters for listener
733      *
734      * @param start
735      *            - start-time of getting notification
736      * @param stop
737      *            - stop-time of getting notification
738      * @param filter
739      *            - indicate which subset of all possible events are of interest
740      */
741     public void setQueryParams(final Date start, final Date stop, final String filter) {
742         this.start = start;
743         this.stop = stop;
744         this.filter = filter;
745     }
746
747 }