Refactoring of web-sockets in RESTCONF RFC-8040
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / listeners / AbstractCommonSubscriber.java
index ebf5da4a0d5e24cccb7142a760e92c3e46c9cb10..7c65f5ca4f311da85591ba48b8e4e47750b41a7a 100644 (file)
@@ -10,8 +10,8 @@ package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
 import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.EventBus;
 import io.netty.channel.Channel;
-import io.netty.util.internal.ConcurrentSet;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.slf4j.Logger;
@@ -24,10 +24,9 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
 
-    private final Set<Channel> subscribers = new ConcurrentSet<>();
+    private final Set<Channel> subscribers = ConcurrentHashMap.newKeySet();
     private final EventBus eventBus;
 
-    @SuppressWarnings("rawtypes")
     private EventBusChangeRecorder eventBusChangeRecorder;
 
     private volatile ListenerRegistration<?> registration;
@@ -35,7 +34,7 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
     /**
      * Creating {@link EventBus}.
      */
-    protected AbstractCommonSubscriber() {
+    AbstractCommonSubscriber() {
         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
     }
 
@@ -55,18 +54,11 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
             this.registration.close();
             this.registration = null;
         }
-
         deleteDataInDS();
         unregister();
     }
 
-    /**
-     * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
-     * subscriber to the event and post event into event bus.
-     *
-     * @param subscriber
-     *            Channel
-     */
+    @Override
     public void addSubscriber(final Channel subscriber) {
         if (!subscriber.isActive()) {
             LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
@@ -76,12 +68,7 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
         this.eventBus.post(event);
     }
 
-    /**
-     * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
-     * subscriber to the event and posts event into event bus.
-     *
-     * @param subscriber subscriber channel
-     */
+    @Override
     public void removeSubscriber(final Channel subscriber) {
         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
         final Event event = new Event(EventType.DEREGISTER);
@@ -89,21 +76,12 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
         this.eventBus.post(event);
     }
 
-    /**
-     * Sets {@link ListenerRegistration} registration.
-     *
-     * @param registration
-     *            DOMDataChangeListener registration
-     */
+    @Override
     public void setRegistration(final ListenerRegistration<?> registration) {
         this.registration = registration;
     }
 
-    /**
-     * Checks if {@link ListenerRegistration} registration exist.
-     *
-     * @return True if exist, false otherwise.
-     */
+    @Override
     public boolean isListening() {
         return this.registration != null;
     }
@@ -112,11 +90,10 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
      * Creating and registering {@link EventBusChangeRecorder} of specific
      * listener on {@link EventBus}.
      *
-     * @param listener
-     *            specific listener of notifications
+     * @param listener Specific listener of notifications.
      */
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    protected <T extends BaseListenerInterface> void register(final T listener) {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    <T extends BaseListenerInterface> void register(final T listener) {
         this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
         this.eventBus.register(this.eventBusChangeRecorder);
     }
@@ -124,18 +101,16 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
     /**
      * Post event to event bus.
      *
-     * @param event
-     *            data of incoming notifications
+     * @param event Data of incoming notifications.
      */
     protected void post(final Event event) {
         this.eventBus.post(event);
     }
 
     /**
-     * Removes all subscribers and unregisters event bus change recorder form
-     * event bus.
+     * Removes all subscribers and unregisters event bus change recorder form event bus.
      */
-    protected void unregister() {
+    private void unregister() {
         this.subscribers.clear();
         this.eventBus.unregister(this.eventBusChangeRecorder);
     }