2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.sal.streams.listeners;
10 import com.google.common.eventbus.AsyncEventBus;
11 import com.google.common.eventbus.EventBus;
12 import io.netty.channel.Channel;
13 import io.netty.util.internal.ConcurrentSet;
15 import java.util.concurrent.Executors;
16 import org.opendaylight.yangtools.concepts.ListenerRegistration;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
21 * Features of subscribing part of both notifications.
23 abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface {
25 private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
27 private final Set<Channel> subscribers = new ConcurrentSet<>();
28 private final EventBus eventBus;
30 @SuppressWarnings("rawtypes")
31 private EventBusChangeRecorder eventBusChangeRecorder;
32 @SuppressWarnings("rawtypes")
33 private ListenerRegistration registration;
36 * Creating {@link EventBus}.
38 protected AbstractCommonSubscriber() {
39 this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
43 public final boolean hasSubscribers() {
44 return !this.subscribers.isEmpty();
48 public final Set<Channel> getSubscribers() {
49 return this.subscribers;
53 public final void close() throws Exception {
54 this.registration.close();
55 this.registration = null;
62 * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
63 * subscriber to the event and post event into event bus.
68 public void addSubscriber(final Channel subscriber) {
69 if (!subscriber.isActive()) {
70 LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
72 final Event event = new Event(EventType.REGISTER);
73 event.setSubscriber(subscriber);
74 this.eventBus.post(event);
78 * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
79 * subscriber to the event and posts event into event bus.
81 * @param subscriber subscriber channel
83 public void removeSubscriber(final Channel subscriber) {
84 LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
85 final Event event = new Event(EventType.DEREGISTER);
86 event.setSubscriber(subscriber);
87 this.eventBus.post(event);
91 * Sets {@link ListenerRegistration} registration.
94 * DOMDataChangeListener registration
96 @SuppressWarnings("rawtypes")
97 public void setRegistration(final ListenerRegistration registration) {
98 this.registration = registration;
102 * Checks if {@link ListenerRegistration} registration exist.
104 * @return True if exist, false otherwise.
106 public boolean isListening() {
107 return this.registration != null;
111 * Creating and registering {@link EventBusChangeRecorder} of specific
112 * listener on {@link EventBus}.
115 * specific listener of notifications
117 @SuppressWarnings({ "unchecked", "rawtypes" })
118 protected <T extends BaseListenerInterface> void register(final T listener) {
119 this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
120 this.eventBus.register(this.eventBusChangeRecorder);
124 * Post event to event bus.
127 * data of incoming notifications
129 protected void post(final Event event) {
130 this.eventBus.post(event);
134 * Removes all subscribers and unregisters event bus change recorder form
137 protected void unregister() {
138 this.subscribers.clear();
139 this.eventBus.unregister(this.eventBusChangeRecorder);