*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
-import io.netty.channel.Channel;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import com.google.common.base.Preconditions;
+import java.net.InetSocketAddress;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Optional;
import java.util.Set;
+import org.opendaylight.restconf.nb.rfc8040.streams.websockets.WebSocketSessionHandler;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
- private final Set<Channel> subscribers = new HashSet<>();
+ private final Set<WebSocketSessionHandler> subscribers = new HashSet<>();
private volatile ListenerRegistration<?> registration;
@Override
}
@Override
- public final synchronized Set<Channel> getSubscribers() {
+ public final synchronized Set<WebSocketSessionHandler> getSubscribers() {
return new HashSet<>(this.subscribers);
}
}
@Override
- public synchronized void addSubscriber(final Channel subscriber) {
- if (!subscriber.isActive()) {
- LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
- }
+ public synchronized void addSubscriber(final WebSocketSessionHandler subscriber) {
+ final Optional<InetSocketAddress> remoteEndpointAddress = subscriber.getRemoteEndpointAddress();
+ Preconditions.checkState(remoteEndpointAddress.isPresent());
+ LOG.debug("Subscriber {} is added.", remoteEndpointAddress.get());
subscribers.add(subscriber);
}
@Override
- public synchronized void removeSubscriber(final Channel subscriber) {
- LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
+ public synchronized void removeSubscriber(final WebSocketSessionHandler subscriber) {
+ final Optional<InetSocketAddress> remoteEndpointAddress = subscriber.getRemoteEndpointAddress();
+ Preconditions.checkState(remoteEndpointAddress.isPresent());
+ LOG.debug("Subscriber {} is removed.", remoteEndpointAddress.get());
subscribers.remove(subscriber);
if (!hasSubscribers()) {
ListenersBroker.getInstance().removeAndCloseListener(this);
}
/**
- * Post data to subscribed channels.
+ * Post data to subscribed web-socket session handlers.
*
* @param data Data of incoming notifications.
*/
synchronized void post(final String data) {
- for (final Channel subscriber : subscribers) {
- if (subscriber.isActive()) {
- LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
- subscriber.writeAndFlush(new TextWebSocketFrame(data));
+ final Iterator<WebSocketSessionHandler> iterator = subscribers.iterator();
+ while (iterator.hasNext()) {
+ final WebSocketSessionHandler subscriber = iterator.next();
+ final Optional<InetSocketAddress> remoteEndpointAddress = subscriber.getRemoteEndpointAddress();
+ if (remoteEndpointAddress.isPresent()) {
+ subscriber.sendDataMessage(data);
+ LOG.debug("Data was sent to subscriber {} on address {}:", this, remoteEndpointAddress.get());
} else {
- LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
- subscribers.remove(subscriber);
+ // removal is probably not necessary, because it will be removed explicitly soon after invocation of
+ // onWebSocketClosed(..) in handler; but just to be sure ...
+ iterator.remove();
+ LOG.debug("Subscriber for {} was removed - web-socket session is not open.", this);
}
}
}