*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.base.Preconditions;
-import java.net.InetSocketAddress;
import java.util.HashSet;
-import java.util.Optional;
+import java.util.Iterator;
import java.util.Set;
-import org.opendaylight.restconf.nb.rfc8040.streams.websockets.WebSocketSessionHandler;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import java.util.concurrent.ExecutionException;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.opendaylight.restconf.nb.rfc8040.streams.StreamSessionHandler;
+import org.opendaylight.yangtools.concepts.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Features of subscribing part of both notifications.
*/
abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface {
-
private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
- private final Set<WebSocketSessionHandler> subscribers = new HashSet<>();
- private volatile ListenerRegistration<?> registration;
+ @GuardedBy("this")
+ private final Set<StreamSessionHandler> subscribers = new HashSet<>();
+ @GuardedBy("this")
+ private Registration registration;
@Override
public final synchronized boolean hasSubscribers() {
}
@Override
- public final synchronized Set<WebSocketSessionHandler> getSubscribers() {
+ public final synchronized Set<StreamSessionHandler> getSubscribers() {
return new HashSet<>(this.subscribers);
}
@Override
- public final synchronized void close() throws Exception {
+ public final synchronized void close() throws InterruptedException, ExecutionException {
if (this.registration != null) {
this.registration.close();
this.registration = null;
}
- deleteDataInDS();
+ deleteDataInDS().get();
this.subscribers.clear();
}
@Override
- public synchronized void addSubscriber(final WebSocketSessionHandler subscriber) {
- final Optional<InetSocketAddress> remoteEndpointAddress = subscriber.getRemoteEndpointAddress();
- Preconditions.checkState(remoteEndpointAddress.isPresent());
- LOG.debug("Subscriber {} is added.", remoteEndpointAddress.get());
+ public synchronized void addSubscriber(final StreamSessionHandler subscriber) {
+ final boolean isConnected = subscriber.isConnected();
+ Preconditions.checkState(isConnected);
+ LOG.debug("Subscriber {} is added.", subscriber);
subscribers.add(subscriber);
}
@Override
- public synchronized void removeSubscriber(final WebSocketSessionHandler subscriber) {
- final Optional<InetSocketAddress> remoteEndpointAddress = subscriber.getRemoteEndpointAddress();
- Preconditions.checkState(remoteEndpointAddress.isPresent());
- LOG.debug("Subscriber {} is removed.", remoteEndpointAddress.get());
+ public synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
+ final boolean isConnected = subscriber.isConnected();
+ Preconditions.checkState(isConnected);
+ LOG.debug("Subscriber {} is removed", subscriber);
subscribers.remove(subscriber);
if (!hasSubscribers()) {
ListenersBroker.getInstance().removeAndCloseListener(this);
}
}
- @Override
- public void setRegistration(final ListenerRegistration<?> registration) {
- this.registration = registration;
+ /**
+ * Sets {@link Registration} registration.
+ *
+ * @param registration a listener registration registration.
+ */
+ @Holding("this")
+ final void setRegistration(final Registration registration) {
+ this.registration = requireNonNull(registration);
}
- @Override
- public boolean isListening() {
- return this.registration != null;
+ /**
+ * Checks if {@link Registration} registration exists.
+ *
+ * @return {@code true} if exists, {@code false} otherwise.
+ */
+ @Holding("this")
+ final boolean isListening() {
+ return registration != null;
}
/**
- * Post data to subscribed web-socket session handlers.
+ * Post data to subscribed SSE session handlers.
*
* @param data Data of incoming notifications.
*/
synchronized void post(final String data) {
- for (final WebSocketSessionHandler subscriber : subscribers) {
- final Optional<InetSocketAddress> remoteEndpointAddress = subscriber.getRemoteEndpointAddress();
- if (remoteEndpointAddress.isPresent()) {
+ final Iterator<StreamSessionHandler> iterator = subscribers.iterator();
+ while (iterator.hasNext()) {
+ final StreamSessionHandler subscriber = iterator.next();
+ final boolean isConnected = subscriber.isConnected();
+ if (isConnected) {
subscriber.sendDataMessage(data);
- LOG.debug("Data was sent to subscriber {} on address {}:", this, remoteEndpointAddress.get());
+ LOG.debug("Data was sent to subscriber {} on connection {}:", this, subscriber);
} else {
// removal is probably not necessary, because it will be removed explicitly soon after invocation of
// onWebSocketClosed(..) in handler; but just to be sure ...
- subscribers.remove(subscriber);
+ iterator.remove();
LOG.debug("Subscriber for {} was removed - web-socket session is not open.", this);
}
}
}
-}
\ No newline at end of file
+}