2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
10 import io.netty.channel.Channel;
11 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
12 import java.util.HashSet;
14 import org.opendaylight.yangtools.concepts.ListenerRegistration;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
19 * Features of subscribing part of both notifications.
21 abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface {
23 private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
25 private final Set<Channel> subscribers = new HashSet<>();
26 private volatile ListenerRegistration<?> registration;
29 public final synchronized boolean hasSubscribers() {
30 return !this.subscribers.isEmpty();
34 public final synchronized Set<Channel> getSubscribers() {
35 return new HashSet<>(this.subscribers);
39 public final synchronized void close() throws Exception {
40 if (this.registration != null) {
41 this.registration.close();
42 this.registration = null;
45 this.subscribers.clear();
49 public synchronized void addSubscriber(final Channel subscriber) {
50 if (!subscriber.isActive()) {
51 LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
53 subscribers.add(subscriber);
57 public synchronized void removeSubscriber(final Channel subscriber) {
58 LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
59 subscribers.remove(subscriber);
60 if (!hasSubscribers()) {
61 ListenersBroker.getInstance().removeAndCloseListener(this);
66 public void setRegistration(final ListenerRegistration<?> registration) {
67 this.registration = registration;
71 public boolean isListening() {
72 return this.registration != null;
76 * Post data to subscribed channels.
78 * @param data Data of incoming notifications.
80 synchronized void post(final String data) {
81 for (final Channel subscriber : subscribers) {
82 if (subscriber.isActive()) {
83 LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
84 subscriber.writeAndFlush(new TextWebSocketFrame(data));
86 LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
87 subscribers.remove(subscriber);