76827d4837aab083c672f44eb0879abc588191aa
[netconf.git] / restconf / restconf-nb-bierman02 / src / main / java / org / opendaylight / netconf / sal / streams / listeners / AbstractCommonSubscriber.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.eventbus.AsyncEventBus;
11 import com.google.common.eventbus.EventBus;
12 import io.netty.channel.Channel;
13 import java.util.Set;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.Executors;
16 import org.opendaylight.yangtools.concepts.ListenerRegistration;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 /**
21  * Features of subscribing part of both notifications.
22  */
23 abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface {
24
25     private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
26
27     private final Set<Channel> subscribers = ConcurrentHashMap.newKeySet();
28     private final EventBus eventBus;
29
30     @SuppressWarnings("rawtypes")
31     private EventBusChangeRecorder eventBusChangeRecorder;
32     @SuppressWarnings("rawtypes")
33     private ListenerRegistration registration;
34
35     /**
36      * Creating {@link EventBus}.
37      */
38     protected AbstractCommonSubscriber() {
39         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
40     }
41
42     @Override
43     public final boolean hasSubscribers() {
44         return !this.subscribers.isEmpty();
45     }
46
47     @Override
48     public final Set<Channel> getSubscribers() {
49         return this.subscribers;
50     }
51
52     @Override
53     public final void close() {
54         if (registration != null) {
55             this.registration.close();
56             this.registration = null;
57         }
58
59         unregister();
60     }
61
62     /**
63      * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
64      * subscriber to the event and post event into event bus.
65      *
66      * @param subscriber
67      *            Channel
68      */
69     public void addSubscriber(final Channel subscriber) {
70         if (!subscriber.isActive()) {
71             LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
72         }
73         final Event event = new Event(EventType.REGISTER);
74         event.setSubscriber(subscriber);
75         this.eventBus.post(event);
76     }
77
78     /**
79      * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
80      * subscriber to the event and posts event into event bus.
81      *
82      * @param subscriber subscriber channel
83      */
84     public void removeSubscriber(final Channel subscriber) {
85         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
86         final Event event = new Event(EventType.DEREGISTER);
87         event.setSubscriber(subscriber);
88         this.eventBus.post(event);
89     }
90
91     /**
92      * Sets {@link ListenerRegistration} registration.
93      *
94      * @param registration
95      *            DOMDataChangeListener registration
96      */
97     @SuppressWarnings("rawtypes")
98     public void setRegistration(final ListenerRegistration registration) {
99         this.registration = registration;
100     }
101
102     /**
103      * Checks if {@link ListenerRegistration} registration exist.
104      *
105      * @return True if exist, false otherwise.
106      */
107     public boolean isListening() {
108         return this.registration != null;
109     }
110
111     /**
112      * Creating and registering {@link EventBusChangeRecorder} of specific
113      * listener on {@link EventBus}.
114      *
115      * @param listener
116      *            specific listener of notifications
117      */
118     @SuppressWarnings({ "unchecked", "rawtypes" })
119     protected <T extends BaseListenerInterface> void register(final T listener) {
120         this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
121         this.eventBus.register(this.eventBusChangeRecorder);
122     }
123
124     /**
125      * Post event to event bus.
126      *
127      * @param event
128      *            data of incoming notifications
129      */
130     protected void post(final Event event) {
131         this.eventBus.post(event);
132     }
133
134     /**
135      * Removes all subscribers and unregisters event bus change recorder form
136      * event bus.
137      */
138     protected void unregister() {
139         this.subscribers.clear();
140         this.eventBus.unregister(this.eventBusChangeRecorder);
141     }
142 }