Removal of EventBusChangeRecorder
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / listeners / AbstractCommonSubscriber.java
index 7c65f5ca4f311da85591ba48b8e4e47750b41a7a..a7449565aa1048719885029ae5c1714c2fd0a8f8 100644 (file)
@@ -7,12 +7,10 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
 
-import com.google.common.eventbus.AsyncEventBus;
-import com.google.common.eventbus.EventBus;
 import io.netty.channel.Channel;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -24,56 +22,44 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
 
-    private final Set<Channel> subscribers = ConcurrentHashMap.newKeySet();
-    private final EventBus eventBus;
-
-    private EventBusChangeRecorder eventBusChangeRecorder;
-
+    private final Set<Channel> subscribers = new HashSet<>();
     private volatile ListenerRegistration<?> registration;
 
-    /**
-     * Creating {@link EventBus}.
-     */
-    AbstractCommonSubscriber() {
-        this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
-    }
-
     @Override
-    public final boolean hasSubscribers() {
+    public final synchronized boolean hasSubscribers() {
         return !this.subscribers.isEmpty();
     }
 
     @Override
-    public final Set<Channel> getSubscribers() {
-        return this.subscribers;
+    public final synchronized Set<Channel> getSubscribers() {
+        return new HashSet<>(this.subscribers);
     }
 
     @Override
-    public final void close() throws Exception {
+    public final synchronized void close() throws Exception {
         if (this.registration != null) {
             this.registration.close();
             this.registration = null;
         }
         deleteDataInDS();
-        unregister();
+        this.subscribers.clear();
     }
 
     @Override
-    public void addSubscriber(final Channel subscriber) {
+    public synchronized void addSubscriber(final Channel subscriber) {
         if (!subscriber.isActive()) {
             LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
         }
-        final Event event = new Event(EventType.REGISTER);
-        event.setSubscriber(subscriber);
-        this.eventBus.post(event);
+        subscribers.add(subscriber);
     }
 
     @Override
-    public void removeSubscriber(final Channel subscriber) {
+    public synchronized void removeSubscriber(final Channel subscriber) {
         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
-        final Event event = new Event(EventType.DEREGISTER);
-        event.setSubscriber(subscriber);
-        this.eventBus.post(event);
+        subscribers.remove(subscriber);
+        if (!hasSubscribers()) {
+            ListenersBroker.getInstance().removeAndCloseListener(this);
+        }
     }
 
     @Override
@@ -87,31 +73,19 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
     }
 
     /**
-     * Creating and registering {@link EventBusChangeRecorder} of specific
-     * listener on {@link EventBus}.
+     * Post data to subscribed channels.
      *
-     * @param listener Specific listener of notifications.
+     * @param data Data of incoming notifications.
      */
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    <T extends BaseListenerInterface> void register(final T listener) {
-        this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
-        this.eventBus.register(this.eventBusChangeRecorder);
-    }
-
-    /**
-     * Post event to event bus.
-     *
-     * @param event Data of incoming notifications.
-     */
-    protected void post(final Event event) {
-        this.eventBus.post(event);
-    }
-
-    /**
-     * Removes all subscribers and unregisters event bus change recorder form event bus.
-     */
-    private void unregister() {
-        this.subscribers.clear();
-        this.eventBus.unregister(this.eventBusChangeRecorder);
+    synchronized void post(final String data) {
+        for (final Channel subscriber : subscribers) {
+            if (subscriber.isActive()) {
+                LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
+                subscriber.writeAndFlush(new TextWebSocketFrame(data));
+            } else {
+                LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
+                subscribers.remove(subscriber);
+            }
+        }
     }
 }