2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
10 import com.google.common.eventbus.AsyncEventBus;
11 import com.google.common.eventbus.EventBus;
12 import io.netty.channel.Channel;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.Executors;
16 import org.opendaylight.yangtools.concepts.ListenerRegistration;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
21 * Features of subscribing part of both notifications.
23 abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface {
25 private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
27 private final Set<Channel> subscribers = ConcurrentHashMap.newKeySet();
28 private final EventBus eventBus;
30 private EventBusChangeRecorder eventBusChangeRecorder;
32 private volatile ListenerRegistration<?> registration;
35 * Creating {@link EventBus}.
37 AbstractCommonSubscriber() {
38 this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
42 public final boolean hasSubscribers() {
43 return !this.subscribers.isEmpty();
47 public final Set<Channel> getSubscribers() {
48 return this.subscribers;
52 public final void close() throws Exception {
53 if (this.registration != null) {
54 this.registration.close();
55 this.registration = null;
62 public void addSubscriber(final Channel subscriber) {
63 if (!subscriber.isActive()) {
64 LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
66 final Event event = new Event(EventType.REGISTER);
67 event.setSubscriber(subscriber);
68 this.eventBus.post(event);
72 public void removeSubscriber(final Channel subscriber) {
73 LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
74 final Event event = new Event(EventType.DEREGISTER);
75 event.setSubscriber(subscriber);
76 this.eventBus.post(event);
80 public void setRegistration(final ListenerRegistration<?> registration) {
81 this.registration = registration;
85 public boolean isListening() {
86 return this.registration != null;
90 * Creating and registering {@link EventBusChangeRecorder} of specific
91 * listener on {@link EventBus}.
93 * @param listener Specific listener of notifications.
95 @SuppressWarnings({"unchecked", "rawtypes"})
96 <T extends BaseListenerInterface> void register(final T listener) {
97 this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
98 this.eventBus.register(this.eventBusChangeRecorder);
102 * Post event to event bus.
104 * @param event Data of incoming notifications.
106 protected void post(final Event event) {
107 this.eventBus.post(event);
111 * Removes all subscribers and unregisters event bus change recorder form event bus.
113 private void unregister() {
114 this.subscribers.clear();
115 this.eventBus.unregister(this.eventBusChangeRecorder);