2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
10 import com.google.common.base.Preconditions;
11 import java.net.InetSocketAddress;
12 import java.util.HashSet;
13 import java.util.Iterator;
14 import java.util.Optional;
16 import org.opendaylight.restconf.nb.rfc8040.streams.websockets.WebSocketSessionHandler;
17 import org.opendaylight.yangtools.concepts.ListenerRegistration;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
22 * Features of subscribing part of both notifications.
24 abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface {
26 private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
28 private final Set<WebSocketSessionHandler> subscribers = new HashSet<>();
29 private volatile ListenerRegistration<?> registration;
32 public final synchronized boolean hasSubscribers() {
33 return !this.subscribers.isEmpty();
37 public final synchronized Set<WebSocketSessionHandler> getSubscribers() {
38 return new HashSet<>(this.subscribers);
42 public final synchronized void close() throws Exception {
43 if (this.registration != null) {
44 this.registration.close();
45 this.registration = null;
48 this.subscribers.clear();
52 public synchronized void addSubscriber(final WebSocketSessionHandler subscriber) {
53 final Optional<InetSocketAddress> remoteEndpointAddress = subscriber.getRemoteEndpointAddress();
54 Preconditions.checkState(remoteEndpointAddress.isPresent());
55 LOG.debug("Subscriber {} is added.", remoteEndpointAddress.get());
56 subscribers.add(subscriber);
60 public synchronized void removeSubscriber(final WebSocketSessionHandler subscriber) {
61 final Optional<InetSocketAddress> remoteEndpointAddress = subscriber.getRemoteEndpointAddress();
62 Preconditions.checkState(remoteEndpointAddress.isPresent());
63 LOG.debug("Subscriber {} is removed.", remoteEndpointAddress.get());
64 subscribers.remove(subscriber);
65 if (!hasSubscribers()) {
66 ListenersBroker.getInstance().removeAndCloseListener(this);
71 public void setRegistration(final ListenerRegistration<?> registration) {
72 this.registration = registration;
76 public boolean isListening() {
77 return this.registration != null;
81 * Post data to subscribed web-socket session handlers.
83 * @param data Data of incoming notifications.
85 synchronized void post(final String data) {
86 final Iterator<WebSocketSessionHandler> iterator = subscribers.iterator();
87 while (iterator.hasNext()) {
88 final WebSocketSessionHandler subscriber = iterator.next();
89 final Optional<InetSocketAddress> remoteEndpointAddress = subscriber.getRemoteEndpointAddress();
90 if (remoteEndpointAddress.isPresent()) {
91 subscriber.sendDataMessage(data);
92 LOG.debug("Data was sent to subscriber {} on address {}:", this, remoteEndpointAddress.get());
94 // removal is probably not necessary, because it will be removed explicitly soon after invocation of
95 // onWebSocketClosed(..) in handler; but just to be sure ...
97 LOG.debug("Subscriber for {} was removed - web-socket session is not open.", this);