Refactoring of web-sockets in RESTCONF RFC-8040
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / listeners / AbstractCommonSubscriber.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
9
10 import com.google.common.eventbus.AsyncEventBus;
11 import com.google.common.eventbus.EventBus;
12 import io.netty.channel.Channel;
13 import java.util.Set;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.Executors;
16 import org.opendaylight.yangtools.concepts.ListenerRegistration;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 /**
21  * Features of subscribing part of both notifications.
22  */
23 abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface {
24
25     private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
26
27     private final Set<Channel> subscribers = ConcurrentHashMap.newKeySet();
28     private final EventBus eventBus;
29
30     private EventBusChangeRecorder eventBusChangeRecorder;
31
32     private volatile ListenerRegistration<?> registration;
33
34     /**
35      * Creating {@link EventBus}.
36      */
37     AbstractCommonSubscriber() {
38         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
39     }
40
41     @Override
42     public final boolean hasSubscribers() {
43         return !this.subscribers.isEmpty();
44     }
45
46     @Override
47     public final Set<Channel> getSubscribers() {
48         return this.subscribers;
49     }
50
51     @Override
52     public final void close() throws Exception {
53         if (this.registration != null) {
54             this.registration.close();
55             this.registration = null;
56         }
57         deleteDataInDS();
58         unregister();
59     }
60
61     @Override
62     public void addSubscriber(final Channel subscriber) {
63         if (!subscriber.isActive()) {
64             LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
65         }
66         final Event event = new Event(EventType.REGISTER);
67         event.setSubscriber(subscriber);
68         this.eventBus.post(event);
69     }
70
71     @Override
72     public void removeSubscriber(final Channel subscriber) {
73         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
74         final Event event = new Event(EventType.DEREGISTER);
75         event.setSubscriber(subscriber);
76         this.eventBus.post(event);
77     }
78
79     @Override
80     public void setRegistration(final ListenerRegistration<?> registration) {
81         this.registration = registration;
82     }
83
84     @Override
85     public boolean isListening() {
86         return this.registration != null;
87     }
88
89     /**
90      * Creating and registering {@link EventBusChangeRecorder} of specific
91      * listener on {@link EventBus}.
92      *
93      * @param listener Specific listener of notifications.
94      */
95     @SuppressWarnings({"unchecked", "rawtypes"})
96     <T extends BaseListenerInterface> void register(final T listener) {
97         this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
98         this.eventBus.register(this.eventBusChangeRecorder);
99     }
100
101     /**
102      * Post event to event bus.
103      *
104      * @param event Data of incoming notifications.
105      */
106     protected void post(final Event event) {
107         this.eventBus.post(event);
108     }
109
110     /**
111      * Removes all subscribers and unregisters event bus change recorder form event bus.
112      */
113     private void unregister() {
114         this.subscribers.clear();
115         this.eventBus.unregister(this.eventBusChangeRecorder);
116     }
117 }