Fix defensive subscriber removal
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / listeners / AbstractCommonSubscriber.java
index a7449565aa1048719885029ae5c1714c2fd0a8f8..2f312aeb5b5db446eea3765254ac01d223a80dbd 100644 (file)
@@ -7,10 +7,13 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
 
-import io.netty.channel.Channel;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import com.google.common.base.Preconditions;
+import java.net.InetSocketAddress;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Optional;
 import java.util.Set;
+import org.opendaylight.restconf.nb.rfc8040.streams.websockets.WebSocketSessionHandler;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -22,7 +25,7 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
 
-    private final Set<Channel> subscribers = new HashSet<>();
+    private final Set<WebSocketSessionHandler> subscribers = new HashSet<>();
     private volatile ListenerRegistration<?> registration;
 
     @Override
@@ -31,7 +34,7 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
     }
 
     @Override
-    public final synchronized Set<Channel> getSubscribers() {
+    public final synchronized Set<WebSocketSessionHandler> getSubscribers() {
         return new HashSet<>(this.subscribers);
     }
 
@@ -46,16 +49,18 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
     }
 
     @Override
-    public synchronized void addSubscriber(final Channel subscriber) {
-        if (!subscriber.isActive()) {
-            LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
-        }
+    public synchronized void addSubscriber(final WebSocketSessionHandler subscriber) {
+        final Optional<InetSocketAddress> remoteEndpointAddress = subscriber.getRemoteEndpointAddress();
+        Preconditions.checkState(remoteEndpointAddress.isPresent());
+        LOG.debug("Subscriber {} is added.", remoteEndpointAddress.get());
         subscribers.add(subscriber);
     }
 
     @Override
-    public synchronized void removeSubscriber(final Channel subscriber) {
-        LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
+    public synchronized void removeSubscriber(final WebSocketSessionHandler subscriber) {
+        final Optional<InetSocketAddress> remoteEndpointAddress = subscriber.getRemoteEndpointAddress();
+        Preconditions.checkState(remoteEndpointAddress.isPresent());
+        LOG.debug("Subscriber {} is removed.", remoteEndpointAddress.get());
         subscribers.remove(subscriber);
         if (!hasSubscribers()) {
             ListenersBroker.getInstance().removeAndCloseListener(this);
@@ -73,18 +78,23 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
     }
 
     /**
-     * Post data to subscribed channels.
+     * Post data to subscribed web-socket session handlers.
      *
      * @param data Data of incoming notifications.
      */
     synchronized void post(final String data) {
-        for (final Channel subscriber : subscribers) {
-            if (subscriber.isActive()) {
-                LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
-                subscriber.writeAndFlush(new TextWebSocketFrame(data));
+        final Iterator<WebSocketSessionHandler> iterator = subscribers.iterator();
+        while (iterator.hasNext()) {
+            final WebSocketSessionHandler subscriber = iterator.next();
+            final Optional<InetSocketAddress> remoteEndpointAddress = subscriber.getRemoteEndpointAddress();
+            if (remoteEndpointAddress.isPresent()) {
+                subscriber.sendDataMessage(data);
+                LOG.debug("Data was sent to subscriber {} on address {}:", this, remoteEndpointAddress.get());
             } else {
-                LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
-                subscribers.remove(subscriber);
+                // removal is probably not necessary, because it will be removed explicitly soon after invocation of
+                // onWebSocketClosed(..) in handler; but just to be sure ...
+                iterator.remove();
+                LOG.debug("Subscriber for {} was removed - web-socket session is not open.", this);
             }
         }
     }