*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
-import com.google.common.eventbus.AsyncEventBus;
-import com.google.common.eventbus.EventBus;
import io.netty.channel.Channel;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
- private final Set<Channel> subscribers = ConcurrentHashMap.newKeySet();
- private final EventBus eventBus;
-
- private EventBusChangeRecorder eventBusChangeRecorder;
-
+ private final Set<Channel> subscribers = new HashSet<>();
private volatile ListenerRegistration<?> registration;
- /**
- * Creating {@link EventBus}.
- */
- AbstractCommonSubscriber() {
- this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
- }
-
@Override
- public final boolean hasSubscribers() {
+ public final synchronized boolean hasSubscribers() {
return !this.subscribers.isEmpty();
}
@Override
- public final Set<Channel> getSubscribers() {
- return this.subscribers;
+ public final synchronized Set<Channel> getSubscribers() {
+ return new HashSet<>(this.subscribers);
}
@Override
- public final void close() throws Exception {
+ public final synchronized void close() throws Exception {
if (this.registration != null) {
this.registration.close();
this.registration = null;
}
deleteDataInDS();
- unregister();
+ this.subscribers.clear();
}
@Override
- public void addSubscriber(final Channel subscriber) {
+ public synchronized void addSubscriber(final Channel subscriber) {
if (!subscriber.isActive()) {
LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
}
- final Event event = new Event(EventType.REGISTER);
- event.setSubscriber(subscriber);
- this.eventBus.post(event);
+ subscribers.add(subscriber);
}
@Override
- public void removeSubscriber(final Channel subscriber) {
+ public synchronized void removeSubscriber(final Channel subscriber) {
LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
- final Event event = new Event(EventType.DEREGISTER);
- event.setSubscriber(subscriber);
- this.eventBus.post(event);
+ subscribers.remove(subscriber);
+ if (!hasSubscribers()) {
+ ListenersBroker.getInstance().removeAndCloseListener(this);
+ }
}
@Override
}
/**
- * Creating and registering {@link EventBusChangeRecorder} of specific
- * listener on {@link EventBus}.
+ * Post data to subscribed channels.
*
- * @param listener Specific listener of notifications.
+ * @param data Data of incoming notifications.
*/
- @SuppressWarnings({"unchecked", "rawtypes"})
- <T extends BaseListenerInterface> void register(final T listener) {
- this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
- this.eventBus.register(this.eventBusChangeRecorder);
- }
-
- /**
- * Post event to event bus.
- *
- * @param event Data of incoming notifications.
- */
- protected void post(final Event event) {
- this.eventBus.post(event);
- }
-
- /**
- * Removes all subscribers and unregisters event bus change recorder form event bus.
- */
- private void unregister() {
- this.subscribers.clear();
- this.eventBus.unregister(this.eventBusChangeRecorder);
+ synchronized void post(final String data) {
+ for (final Channel subscriber : subscribers) {
+ if (subscriber.isActive()) {
+ LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
+ subscriber.writeAndFlush(new TextWebSocketFrame(data));
+ } else {
+ LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
+ subscribers.remove(subscriber);
+ }
+ }
}
}