X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=restconf%2Frestconf-nb-rfc8040%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Frestconf%2Fnb%2Frfc8040%2Fstreams%2Flisteners%2FAbstractCommonSubscriber.java;h=1aec614ed05c160f4b161a78f8ead5c44250b7a7;hb=a0128f82bcc2a3ac2f8611049f7c9ab4dfeda985;hp=ea3ad270ff44d58bb33d2664d6dbba53aad78e05;hpb=824baf9fe14e31465c58f3842c4cb0aa88b34757;p=netconf.git diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java index ea3ad270ff..1aec614ed0 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java @@ -7,13 +7,17 @@ */ package org.opendaylight.restconf.nb.rfc8040.streams.listeners; -import com.google.common.eventbus.AsyncEventBus; -import com.google.common.eventbus.EventBus; -import io.netty.channel.Channel; -import io.netty.util.internal.ConcurrentSet; +import static java.util.Objects.requireNonNull; + +import com.google.common.base.Preconditions; +import java.util.HashSet; +import java.util.Iterator; import java.util.Set; -import java.util.concurrent.Executors; -import org.opendaylight.yangtools.concepts.ListenerRegistration; +import java.util.concurrent.ExecutionException; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; +import org.opendaylight.restconf.nb.rfc8040.streams.StreamSessionHandler; +import org.opendaylight.yangtools.concepts.Registration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,121 +25,91 @@ import org.slf4j.LoggerFactory; * Features of subscribing part of both notifications. */ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface { - private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class); - private final Set subscribers = new ConcurrentSet<>(); - private final EventBus eventBus; - - @SuppressWarnings("rawtypes") - private EventBusChangeRecorder eventBusChangeRecorder; - @SuppressWarnings("rawtypes") - private ListenerRegistration registration; - - /** - * Creating {@link EventBus}. - */ - protected AbstractCommonSubscriber() { - this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); - } + @GuardedBy("this") + private final Set subscribers = new HashSet<>(); + @GuardedBy("this") + private Registration registration; @Override - public final boolean hasSubscribers() { + public final synchronized boolean hasSubscribers() { return !this.subscribers.isEmpty(); } @Override - public final Set getSubscribers() { - return this.subscribers; + public final synchronized Set getSubscribers() { + return new HashSet<>(this.subscribers); } @Override - public final void close() throws Exception { - this.registration.close(); - this.registration = null; - - deleteDataInDS(); - unregister(); - } - - /** - * Creates event of type {@link EventType#REGISTER}, set {@link Channel} - * subscriber to the event and post event into event bus. - * - * @param subscriber - * Channel - */ - public void addSubscriber(final Channel subscriber) { - if (!subscriber.isActive()) { - LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress()); + public final synchronized void close() throws InterruptedException, ExecutionException { + if (this.registration != null) { + this.registration.close(); + this.registration = null; } - final Event event = new Event(EventType.REGISTER); - event.setSubscriber(subscriber); - this.eventBus.post(event); + deleteDataInDS().get(); + this.subscribers.clear(); } - /** - * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} - * subscriber to the event and posts event into event bus. - * - * @param subscriber subscriber channel - */ - public void removeSubscriber(final Channel subscriber) { - LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress()); - final Event event = new Event(EventType.DEREGISTER); - event.setSubscriber(subscriber); - this.eventBus.post(event); + @Override + public synchronized void addSubscriber(final StreamSessionHandler subscriber) { + final boolean isConnected = subscriber.isConnected(); + Preconditions.checkState(isConnected); + LOG.debug("Subscriber {} is added.", subscriber); + subscribers.add(subscriber); } - /** - * Sets {@link ListenerRegistration} registration. - * - * @param registration - * DOMDataChangeListener registration - */ - @SuppressWarnings("rawtypes") - public void setRegistration(final ListenerRegistration registration) { - this.registration = registration; + @Override + public synchronized void removeSubscriber(final StreamSessionHandler subscriber) { + final boolean isConnected = subscriber.isConnected(); + Preconditions.checkState(isConnected); + LOG.debug("Subscriber {} is removed", subscriber); + subscribers.remove(subscriber); + if (!hasSubscribers()) { + ListenersBroker.getInstance().removeAndCloseListener(this); + } } /** - * Checks if {@link ListenerRegistration} registration exist. + * Sets {@link Registration} registration. * - * @return True if exist, false otherwise. + * @param registration a listener registration registration. */ - public boolean isListening() { - return this.registration != null; + @Holding("this") + final void setRegistration(final Registration registration) { + this.registration = requireNonNull(registration); } /** - * Creating and registering {@link EventBusChangeRecorder} of specific - * listener on {@link EventBus}. + * Checks if {@link Registration} registration exists. * - * @param listener - * specific listener of notifications + * @return {@code true} if exists, {@code false} otherwise. */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - protected void register(final T listener) { - this.eventBusChangeRecorder = new EventBusChangeRecorder(listener); - this.eventBus.register(this.eventBusChangeRecorder); + @Holding("this") + final boolean isListening() { + return registration != null; } /** - * Post event to event bus. + * Post data to subscribed SSE session handlers. * - * @param event - * data of incoming notifications + * @param data Data of incoming notifications. */ - protected void post(final Event event) { - this.eventBus.post(event); - } - - /** - * Removes all subscribers and unregisters event bus change recorder form - * event bus. - */ - protected void unregister() { - this.subscribers.clear(); - this.eventBus.unregister(this.eventBusChangeRecorder); + synchronized void post(final String data) { + final Iterator iterator = subscribers.iterator(); + while (iterator.hasNext()) { + final StreamSessionHandler subscriber = iterator.next(); + final boolean isConnected = subscriber.isConnected(); + if (isConnected) { + subscriber.sendDataMessage(data); + LOG.debug("Data was sent to subscriber {} on connection {}:", this, subscriber); + } else { + // removal is probably not necessary, because it will be removed explicitly soon after invocation of + // onWebSocketClosed(..) in handler; but just to be sure ... + iterator.remove(); + LOG.debug("Subscriber for {} was removed - web-socket session is not open.", this); + } + } } }