Bug 6949 / Bug 6950 - Implementation of start-time and stop-time
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / ListenerAdapter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.eventbus.AsyncEventBus;
12 import com.google.common.eventbus.EventBus;
13 import com.google.common.eventbus.Subscribe;
14 import io.netty.channel.Channel;
15 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
16 import io.netty.util.internal.ConcurrentSet;
17 import java.io.ByteArrayOutputStream;
18 import java.io.IOException;
19 import java.io.OutputStreamWriter;
20 import java.io.UnsupportedEncodingException;
21 import java.nio.charset.StandardCharsets;
22 import java.text.SimpleDateFormat;
23 import java.util.Collection;
24 import java.util.Date;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.Random;
29 import java.util.Set;
30 import java.util.concurrent.Executors;
31 import java.util.regex.Pattern;
32 import javax.xml.parsers.DocumentBuilder;
33 import javax.xml.parsers.DocumentBuilderFactory;
34 import javax.xml.parsers.ParserConfigurationException;
35 import javax.xml.stream.XMLOutputFactory;
36 import javax.xml.stream.XMLStreamException;
37 import javax.xml.stream.XMLStreamWriter;
38 import javax.xml.transform.OutputKeys;
39 import javax.xml.transform.Transformer;
40 import javax.xml.transform.TransformerException;
41 import javax.xml.transform.TransformerFactory;
42 import javax.xml.transform.dom.DOMResult;
43 import javax.xml.transform.dom.DOMSource;
44 import javax.xml.transform.stream.StreamResult;
45 import org.json.JSONObject;
46 import org.json.XML;
47 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
48 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
49 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
50 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
51 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
52 import org.opendaylight.yangtools.concepts.ListenerRegistration;
53 import org.opendaylight.yangtools.yang.common.QName;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
58 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
60 import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
62 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
63 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
64 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
65 import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70 import org.w3c.dom.Document;
71 import org.w3c.dom.Element;
72 import org.w3c.dom.Node;
73
74 /**
75  * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
76  */
77 public class ListenerAdapter implements DOMDataChangeListener {
78
79     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
80     private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
81     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
82     private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
83
84     private static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
85
86     private final YangInstanceIdentifier path;
87     private ListenerRegistration<DOMDataChangeListener> registration;
88     private final String streamName;
89     private Set<Channel> subscribers = new ConcurrentSet<>();
90     private final EventBus eventBus;
91     private final EventBusChangeRecorder eventBusChangeRecorder;
92     private final NotificationOutputType outputType;
93     private Date start = null;
94     private Date stop = null;
95
96     /**
97      * Creates new {@link ListenerAdapter} listener specified by path and stream
98      * name.
99      *
100      * @param path
101      *            Path to data in data store.
102      * @param streamName
103      *            The name of the stream.
104      * @param outputType
105      *            - type of output on notification (JSON, XML)
106      */
107     ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
108             final NotificationOutputType outputType) {
109         this.outputType = outputType;
110         Preconditions.checkNotNull(path);
111         Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
112         this.path = path;
113         this.streamName = streamName;
114         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
115         this.eventBusChangeRecorder = new EventBusChangeRecorder();
116         this.eventBus.register(this.eventBusChangeRecorder);
117     }
118
119     @Override
120     public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
121         final Date now = new Date();
122         if (this.stop != null) {
123             if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
124                 prepareAndPostData(change);
125             }
126             if (this.stop.compareTo(now) < 0) {
127                 try {
128                     this.close();
129                 } catch (final Exception e) {
130                     throw new RestconfDocumentedException("Problem with unregister listener." + e);
131                 }
132             }
133         } else if (this.start != null) {
134             if (this.start.compareTo(now) < 0) {
135                 this.start = null;
136                 prepareAndPostData(change);
137             }
138         } else {
139             prepareAndPostData(change);
140         }
141     }
142
143     private void prepareAndPostData(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
144         if (!change.getCreatedData().isEmpty() || !change.getUpdatedData().isEmpty()
145                 || !change.getRemovedPaths().isEmpty()) {
146             final String xml = prepareXmlFrom(change);
147             final Event event = new Event(EventType.NOTIFY);
148             if (this.outputType.equals(NotificationOutputType.JSON)) {
149                 final JSONObject jsonObject = XML.toJSONObject(xml);
150                 event.setData(jsonObject.toString());
151             } else {
152                 event.setData(xml);
153             }
154             this.eventBus.post(event);
155         }
156     }
157
158     /**
159      * Tracks events of data change by customer.
160      */
161     private final class EventBusChangeRecorder {
162         @Subscribe
163         public void recordCustomerChange(final Event event) {
164             if (event.getType() == EventType.REGISTER) {
165                 final Channel subscriber = event.getSubscriber();
166                 if (!ListenerAdapter.this.subscribers.contains(subscriber)) {
167                     ListenerAdapter.this.subscribers.add(subscriber);
168                 }
169             } else if (event.getType() == EventType.DEREGISTER) {
170                 ListenerAdapter.this.subscribers.remove(event.getSubscriber());
171                 Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
172             } else if (event.getType() == EventType.NOTIFY) {
173                 for (final Channel subscriber : ListenerAdapter.this.subscribers) {
174                     if (subscriber.isActive()) {
175                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
176                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
177                     } else {
178                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
179                         ListenerAdapter.this.subscribers.remove(subscriber);
180                     }
181                 }
182             }
183         }
184     }
185
186     /**
187      * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber.
188      */
189     private final class Event {
190         private final EventType type;
191         private Channel subscriber;
192         private String data;
193
194         /**
195          * Creates new event specified by {@link EventType} type.
196          *
197          * @param type
198          *            EventType
199          */
200         public Event(final EventType type) {
201             this.type = type;
202         }
203
204         /**
205          * Gets the {@link Channel} subscriber.
206          *
207          * @return Channel
208          */
209         public Channel getSubscriber() {
210             return this.subscriber;
211         }
212
213         /**
214          * Sets subscriber for event.
215          *
216          * @param subscriber
217          *            Channel
218          */
219         public void setSubscriber(final Channel subscriber) {
220             this.subscriber = subscriber;
221         }
222
223         /**
224          * Gets event String.
225          *
226          * @return String representation of event data.
227          */
228         public String getData() {
229             return this.data;
230         }
231
232         /**
233          * Sets event data.
234          *
235          * @param data String.
236          */
237         public void setData(final String data) {
238             this.data = data;
239         }
240
241         /**
242          * Gets event type.
243          *
244          * @return The type of the event.
245          */
246         public EventType getType() {
247             return this.type;
248         }
249     }
250
251     /**
252      * Type of the event.
253      */
254     private enum EventType {
255         REGISTER,
256         DEREGISTER,
257         NOTIFY;
258     }
259
260     /**
261      * Prepare data in printable form and transform it to String.
262      *
263      * @param change
264      *            DataChangeEvent
265      * @return Data in printable form.
266      */
267     private String prepareXmlFrom(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
268         final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
269         final DataSchemaContextTree dataContextTree =  DataSchemaContextTree.from(schemaContext);
270         final Document doc = createDocument();
271         final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
272                 "notification");
273
274         doc.appendChild(notificationElement);
275
276         final Element eventTimeElement = doc.createElement("eventTime");
277         eventTimeElement.setTextContent(toRFC3339(new Date()));
278         notificationElement.appendChild(eventTimeElement);
279
280         final Element dataChangedNotificationEventElement = doc.createElementNS(
281                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
282
283         addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change,
284                 schemaContext, dataContextTree);
285         notificationElement.appendChild(dataChangedNotificationEventElement);
286
287         try {
288             final ByteArrayOutputStream out = new ByteArrayOutputStream();
289             final Transformer transformer = FACTORY.newTransformer();
290             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
291             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
292             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
293             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
294             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
295             transformer.transform(new DOMSource(doc),
296                     new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
297             final byte[] charData = out.toByteArray();
298             return new String(charData, "UTF-8");
299         } catch (TransformerException | UnsupportedEncodingException e) {
300             final String msg = "Error during transformation of Document into String";
301             LOG.error(msg, e);
302             return msg;
303         }
304     }
305
306     /**
307      * Formats data specified by RFC3339.
308      *
309      * @param d
310      *            Date
311      * @return Data specified by RFC3339.
312      */
313     public static String toRFC3339(final Date d) {
314         return RFC3339_PATTERN.matcher(RFC3339.format(d)).replaceAll("$1:$2");
315     }
316
317     /**
318      * Creates {@link Document} document.
319      * @return {@link Document} document.
320      */
321     public static Document createDocument() {
322         final DocumentBuilder bob;
323         try {
324             bob = DBF.newDocumentBuilder();
325         } catch (final ParserConfigurationException e) {
326             return null;
327         }
328         return bob.newDocument();
329     }
330
331     /**
332      * Adds values to data changed notification event element.
333      *
334      * @param doc
335      *            {@link Document}
336      * @param dataChangedNotificationEventElement
337      *            {@link Element}
338      * @param change
339      *            {@link AsyncDataChangeEvent}
340      */
341     private void addValuesToDataChangedNotificationEventElement(final Document doc,
342             final Element dataChangedNotificationEventElement,
343             final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change,
344             final SchemaContext  schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
345
346         addCreatedChangedValuesFromDataToElement(doc, change.getCreatedData().entrySet(),
347                 dataChangedNotificationEventElement,
348                 Operation.CREATED, schemaContext, dataSchemaContextTree);
349
350         addCreatedChangedValuesFromDataToElement(doc, change.getUpdatedData().entrySet(),
351                     dataChangedNotificationEventElement,
352                     Operation.UPDATED, schemaContext, dataSchemaContextTree);
353
354         addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement,
355                 Operation.DELETED);
356     }
357
358     /**
359      * Adds values from data to element.
360      *
361      * @param doc
362      *            {@link Document}
363      * @param data
364      *            Set of {@link YangInstanceIdentifier}.
365      * @param element
366      *            {@link Element}
367      * @param operation
368      *            {@link Operation}
369      */
370     private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data,
371             final Element element, final Operation operation) {
372         if ((data == null) || data.isEmpty()) {
373             return;
374         }
375         for (final YangInstanceIdentifier path : data) {
376             if (!ControllerContext.getInstance().isNodeMixin(path)) {
377                 final Node node = createDataChangeEventElement(doc, path, operation);
378                 element.appendChild(node);
379             }
380         }
381     }
382
383     private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set<Entry<YangInstanceIdentifier,
384                 NormalizedNode<?,?>>> data, final Element element, final Operation operation, final SchemaContext
385             schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
386         if ((data == null) || data.isEmpty()) {
387             return;
388         }
389         for (final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
390             if (!ControllerContext.getInstance().isNodeMixin(entry.getKey())) {
391                 final Node node = createCreatedChangedDataChangeEventElement(doc, entry, operation, schemaContext,
392                         dataSchemaContextTree);
393                 element.appendChild(node);
394             }
395         }
396     }
397
398     /**
399      * Creates changed event element from data.
400      *
401      * @param doc
402      *            {@link Document}
403      * @param path
404      *            Path to data in data store.
405      * @param operation
406      *            {@link Operation}
407      * @return {@link Node} node represented by changed event element.
408      */
409     private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path,
410             final Operation operation) {
411         final Element dataChangeEventElement = doc.createElement("data-change-event");
412         final Element pathElement = doc.createElement("path");
413         addPathAsValueToElement(path, pathElement);
414         dataChangeEventElement.appendChild(pathElement);
415
416         final Element operationElement = doc.createElement("operation");
417         operationElement.setTextContent(operation.value);
418         dataChangeEventElement.appendChild(operationElement);
419
420         return dataChangeEventElement;
421     }
422
423     private Node createCreatedChangedDataChangeEventElement(final Document doc, final Entry<YangInstanceIdentifier,
424             NormalizedNode<?, ?>> entry, final Operation operation, final SchemaContext
425             schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
426         final Element dataChangeEventElement = doc.createElement("data-change-event");
427         final Element pathElement = doc.createElement("path");
428         final YangInstanceIdentifier path = entry.getKey();
429         addPathAsValueToElement(path, pathElement);
430         dataChangeEventElement.appendChild(pathElement);
431
432         final Element operationElement = doc.createElement("operation");
433         operationElement.setTextContent(operation.value);
434         dataChangeEventElement.appendChild(operationElement);
435
436         try {
437             final DOMResult domResult = writeNormalizedNode(entry.getValue(), path,
438                     schemaContext, dataSchemaContextTree);
439             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
440             final Element dataElement = doc.createElement("data");
441             dataElement.appendChild(result);
442             dataChangeEventElement.appendChild(dataElement);
443         } catch (final IOException e) {
444             LOG.error("Error in writer ", e);
445         } catch (final XMLStreamException e) {
446             LOG.error("Error processing stream", e);
447         }
448
449         return dataChangeEventElement;
450     }
451
452     private static DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized,
453                                                  final YangInstanceIdentifier path, final SchemaContext context,
454                                                  final DataSchemaContextTree dataSchemaContextTree)
455             throws IOException, XMLStreamException {
456         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
457         final Document doc = XmlDocumentUtils.getDocument();
458         final DOMResult result = new DOMResult(doc);
459         NormalizedNodeWriter normalizedNodeWriter = null;
460         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
461         XMLStreamWriter writer = null;
462         final SchemaPath nodePath;
463
464         if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) {
465             nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
466         } else {
467             nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
468         }
469
470         try {
471             writer = XML_FACTORY.createXMLStreamWriter(result);
472             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, nodePath);
473             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
474
475             normalizedNodeWriter.write(normalized);
476
477             normalizedNodeWriter.flush();
478         } finally {
479             if (normalizedNodeWriter != null) {
480                 normalizedNodeWriter.close();
481             }
482             if (normalizedNodeStreamWriter != null) {
483                 normalizedNodeStreamWriter.close();
484             }
485             if (writer != null) {
486                 writer.close();
487             }
488         }
489
490         return result;
491     }
492
493     /**
494      * Adds path as value to element.
495      *
496      * @param path
497      *            Path to data in data store.
498      * @param element
499      *            {@link Element}
500      */
501     private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) {
502         // Map< key = namespace, value = prefix>
503         final Map<String, String> prefixes = new HashMap<>();
504         final YangInstanceIdentifier normalizedPath = ControllerContext.getInstance().toXpathRepresentation(path);
505         final StringBuilder textContent = new StringBuilder();
506
507         // FIXME: BUG-1281: this is duplicated code from yangtools (BUG-1275)
508         for (final PathArgument pathArgument : normalizedPath.getPathArguments()) {
509             if (pathArgument instanceof YangInstanceIdentifier.AugmentationIdentifier) {
510                 continue;
511             }
512             textContent.append("/");
513             writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes);
514             if (pathArgument instanceof NodeIdentifierWithPredicates) {
515                 final Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
516                 for (final QName keyValue : predicates.keySet()) {
517                     final String predicateValue = String.valueOf(predicates.get(keyValue));
518                     textContent.append("[");
519                     writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes);
520                     textContent.append("='");
521                     textContent.append(predicateValue);
522                     textContent.append("'");
523                     textContent.append("]");
524                 }
525             } else if (pathArgument instanceof NodeWithValue) {
526                 textContent.append("[.='");
527                 textContent.append(((NodeWithValue) pathArgument).getValue());
528                 textContent.append("'");
529                 textContent.append("]");
530             }
531         }
532         element.setTextContent(textContent.toString());
533     }
534
535     /**
536      * Writes identifier that consists of prefix and QName.
537      *
538      * @param element
539      *            {@link Element}
540      * @param textContent
541      *            StringBuilder
542      * @param qName
543      *            QName
544      * @param prefixes
545      *            Map of namespaces and prefixes.
546      */
547     private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
548             final QName qName, final Map<String, String> prefixes) {
549         final String namespace = qName.getNamespace().toString();
550         String prefix = prefixes.get(namespace);
551         if (prefix == null) {
552             prefix = generateNewPrefix(prefixes.values());
553         }
554
555         element.setAttribute("xmlns:" + prefix, namespace);
556         textContent.append(prefix);
557         prefixes.put(namespace, prefix);
558
559         textContent.append(":");
560         textContent.append(qName.getLocalName());
561     }
562
563     /**
564      * Generates new prefix which consists of four random characters <a-z>.
565      *
566      * @param prefixes
567      *            Collection of prefixes.
568      * @return New prefix which consists of four random characters <a-z>.
569      */
570     private static String generateNewPrefix(final Collection<String> prefixes) {
571         StringBuilder result = null;
572         final Random random = new Random();
573         do {
574             result = new StringBuilder();
575             for (int i = 0; i < 4; i++) {
576                 final int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
577                 result.append(Character.toChars(randomNumber));
578             }
579         } while (prefixes.contains(result.toString()));
580
581         return result.toString();
582     }
583
584     /**
585      * Gets path pointed to data in data store.
586      *
587      * @return Path pointed to data in data store.
588      */
589     public YangInstanceIdentifier getPath() {
590         return this.path;
591     }
592
593     /**
594      * Sets {@link ListenerRegistration} registration.
595      *
596      * @param registration DOMDataChangeListener registration
597      */
598     public void setRegistration(final ListenerRegistration<DOMDataChangeListener> registration) {
599         this.registration = registration;
600     }
601
602     /**
603      * Gets the name of the stream.
604      *
605      * @return The name of the stream.
606      */
607     public String getStreamName() {
608         return this.streamName;
609     }
610
611     /**
612      * Removes all subscribers and unregisters event bus change recorder form event bus.
613      */
614     public void close() throws Exception {
615         this.subscribers = new ConcurrentSet<>();
616         this.registration.close();
617         this.registration = null;
618         this.eventBus.unregister(this.eventBusChangeRecorder);
619     }
620
621     /**
622      * Checks if {@link ListenerRegistration} registration exist.
623      *
624      * @return True if exist, false otherwise.
625      */
626     public boolean isListening() {
627         return this.registration == null ? false : true;
628     }
629
630     /**
631      * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into
632      * event bus.
633      *
634      * @param subscriber
635      *            Channel
636      */
637     public void addSubscriber(final Channel subscriber) {
638         if (!subscriber.isActive()) {
639             LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
640         }
641         final Event event = new Event(EventType.REGISTER);
642         event.setSubscriber(subscriber);
643         this.eventBus.post(event);
644     }
645
646     /**
647      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event
648      * into event bus.
649      *
650      * @param subscriber
651      */
652     public void removeSubscriber(final Channel subscriber) {
653         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
654         final Event event = new Event(EventType.DEREGISTER);
655         event.setSubscriber(subscriber);
656         this.eventBus.post(event);
657     }
658
659     /**
660      * Checks if exists at least one {@link Channel} subscriber.
661      *
662      * @return True if exist at least one {@link Channel} subscriber, false otherwise.
663      */
664     public boolean hasSubscribers() {
665         return !this.subscribers.isEmpty();
666     }
667
668     /**
669      * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
670      */
671     private static enum Store {
672         CONFIG("config"),
673         OPERATION("operation");
674
675         private final String value;
676
677         private Store(final String value) {
678             this.value = value;
679         }
680     }
681
682     /**
683      * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
684      */
685     private static enum Operation {
686         CREATED("created"),
687         UPDATED("updated"),
688         DELETED("deleted");
689
690         private final String value;
691
692         private Operation(final String value) {
693             this.value = value;
694         }
695     }
696
697     /**
698      * Set query parameters for listener
699      *
700      * @param start
701      *            - start-time of getting notification
702      * @param stop
703      *            - stop-time of getting notification
704      */
705     public void setTimer(final Date start, final Date stop) {
706         this.start = start;
707         this.stop = stop;
708     }
709
710 }