Improve subscriber tracking 31/108831/12
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 3 Nov 2023 07:06:15 +0000 (08:06 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 8 Nov 2023 10:14:05 +0000 (11:14 +0100)
Create a dedicated immutable Subscribers object, which tracks handlers
and their corresponding formatters.

Subscribers are structured, so multiple subscribers with the same
formatter with reuse the same formatted message.

Each RestconfStream starts with Subscribers.empty(), indicating a first
subscriber is required. When we lose the last subscriber, Subscribers
will become null and we trigger stream removal.

This improves on the HashSet tracking in terms of memory footprint, as
we typically will have 0 or 1 subscribers.

The newly introduced indirection through Subscriber allows the
subscriber to dictate the actual formatter which we will use in upcoming
patches.

JIRA: NETCONF-1102
Change-Id: I75e581ed7615b883e65ea60285cd4c1ba09d109e
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractNotificationStream.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeChangeStream.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStream.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandler.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/Subscriber.java [new file with mode: 0644]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/Subscribers.java [new file with mode: 0644]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandler.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeChangeStreamTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandlerTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandlerTest.java

index b9e5eeeab32d8167d7813fa01de93171826e68ca..320dee9346b851a1414a24a41d13d375abdf786f 100644 (file)
@@ -15,15 +15,12 @@ import org.opendaylight.mdsal.dom.api.DOMNotification;
 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Abstract base class for functionality shared between {@link NotificationStream} and
  * {@link DeviceNotificationStream}.
  */
 abstract class AbstractNotificationStream extends RestconfStream<DOMNotification> implements DOMNotificationListener {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationStream.class);
     private static final ImmutableMap<EncodingName, NotificationFormatterFactory> ENCODINGS = ImmutableMap.of(
         EncodingName.RFC8040_JSON, JSONNotificationFormatter.FACTORY,
         EncodingName.RFC8040_XML, XMLNotificationFormatter.FACTORY);
@@ -36,17 +33,8 @@ abstract class AbstractNotificationStream extends RestconfStream<DOMNotification
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
     public final void onNotification(final DOMNotification notification) {
-        final var eventInstant = notification instanceof DOMEvent domEvent ? domEvent.getEventInstant() : Instant.now();
-        final String data;
-        try {
-            data = formatter().eventData(effectiveModel(), notification, eventInstant);
-        } catch (Exception e) {
-            LOG.error("Failed to process notification {}", notification, e);
-            return;
-        }
-        if (data != null) {
-            post(data);
-        }
+        sendDataMessage(effectiveModel(), notification,
+            notification instanceof DOMEvent domEvent ? domEvent.getEventInstant() : Instant.now());
     }
 
     abstract @NonNull EffectiveModelContext effectiveModel();
index d6c19177a071614780bd227a5d06bf9b3d705f8d..f2225ebc5f2f4e04190ed52a3d99252e657a3c2e 100644 (file)
@@ -13,7 +13,6 @@ import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.collect.ImmutableMap;
 import java.time.Instant;
 import java.util.List;
-import java.util.stream.Collectors;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
@@ -24,15 +23,12 @@ import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A {@link RestconfStream} reporting changes on a particular data tree.
  */
-public class DataTreeChangeStream extends RestconfStream<List<DataTreeCandidate>>
+public final class DataTreeChangeStream extends RestconfStream<List<DataTreeCandidate>>
         implements ClusteredDOMDataTreeChangeListener {
-    private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeStream.class);
     private static final ImmutableMap<EncodingName, DataTreeCandidateFormatterFactory> ENCODINGS = ImmutableMap.of(
         EncodingName.RFC8040_JSON, JSONDataTreeCandidateFormatter.FACTORY,
         EncodingName.RFC8040_XML, XMLDataTreeCandidateFormatter.FACTORY);
@@ -58,18 +54,7 @@ public class DataTreeChangeStream extends RestconfStream<List<DataTreeCandidate>
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
     public void onDataTreeChanged(final List<DataTreeCandidate> dataTreeCandidates) {
-        final var now = Instant.now();
-        final String data;
-        try {
-            data = formatter().eventData(databindProvider.currentContext().modelContext(), dataTreeCandidates, now);
-        } catch (final Exception e) {
-            LOG.error("Failed to process notification {}",
-                    dataTreeCandidates.stream().map(Object::toString).collect(Collectors.joining(",")), e);
-            return;
-        }
-        if (data != null) {
-            post(data);
-        }
+        sendDataMessage(databindProvider.currentContext().modelContext(), dataTreeCandidates, Instant.now());
     }
 
     /**
@@ -86,7 +71,7 @@ public class DataTreeChangeStream extends RestconfStream<List<DataTreeCandidate>
      *
      * @param domDataBroker data broker for register data change listener
      */
-    public final synchronized void listen(final DOMDataBroker domDataBroker) {
+    public synchronized void listen(final DOMDataBroker domDataBroker) {
         if (!isListening()) {
             final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
             if (changeService == null) {
index 94a2ff5edde27e2a5246d4d99be7162b4e6e78e4..f639ea70a42eeec94850271ce5cff0ddbad49b43 100644 (file)
@@ -12,13 +12,15 @@ import static java.util.Objects.requireNonNull;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.collect.ImmutableMap;
-import java.util.HashSet;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.time.Instant;
 import java.util.Set;
 import java.util.regex.Pattern;
 import javax.xml.xpath.XPathExpressionException;
-import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.checkerframework.checker.lock.qual.Holding;
 import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
@@ -26,6 +28,7 @@ import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev23110
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,15 +65,25 @@ public abstract class RestconfStream<T> {
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(RestconfStream.class);
+    private static final VarHandle SUBSCRIBERS;
+
+    static {
+        try {
+            SUBSCRIBERS = MethodHandles.lookup().findVarHandle(RestconfStream.class, "subscribers", Subscribers.class);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
 
     // ImmutableMap because it retains iteration order
     private final @NonNull ImmutableMap<EncodingName, ? extends EventFormatterFactory<T>> encodings;
     private final @NonNull ListenersBroker listenersBroker;
     private final @NonNull String name;
 
-    @GuardedBy("this")
-    private final Set<StreamSessionHandler> subscribers = new HashSet<>();
-    @GuardedBy("this")
+    // Accessed via SUBSCRIBERS, 'null' indicates we have been shut down
+    @SuppressWarnings("unused")
+    private volatile Subscribers<T> subscribers = Subscribers.empty();
+
     private Registration registration;
 
     // FIXME: NETCONF-1102: this should be tied to a subscriber
@@ -136,53 +149,96 @@ public abstract class RestconfStream<T> {
     /**
      * Registers {@link StreamSessionHandler} subscriber.
      *
-     * @param subscriber SSE or WS session handler.
+     * @param handler SSE or WS session handler.
+     * @return A new {@link Registration}, or {@code null} if the subscriber cannot be added
+     * @throws NullPointerException if {@code handler} is {@code null}
      */
-    synchronized void addSubscriber(final StreamSessionHandler subscriber) {
-        if (!subscriber.isConnected()) {
-            throw new IllegalStateException(subscriber + " is not connected");
+    @Nullable Registration addSubscriber(final StreamSessionHandler handler) {
+        // Lockless add of a subscriber. If we observe a null this stream is dead before the new subscriber could be
+        // added.
+        final var toAdd = new Subscriber<>(this, handler, formatter);
+        var observed = acquireSubscribers();
+        while (observed != null) {
+            final var next = observed.add(toAdd);
+            final var witness = (Subscribers<T>) SUBSCRIBERS.compareAndExchangeRelease(this, observed, next);
+            if (witness == observed) {
+                LOG.debug("Subscriber {} is added.", handler);
+                return toAdd;
+            }
+
+            // We have raced: retry the operation
+            observed = witness;
         }
-        LOG.debug("Subscriber {} is added.", subscriber);
-        subscribers.add(subscriber);
+        return null;
     }
 
     /**
-     * Removes {@link StreamSessionHandler} subscriber. If this was the last subscriber also shut down this stream and
-     * initiate its removal from global state.
+     * Removes a {@link Subscriber}. If this was the last subscriber also shut down this stream and initiate its removal
+     * from global state.
      *
-     * @param subscriber SSE or WS session handler.
+     * @param subscriber The {@link Subscriber} to remove
+     * @throws NullPointerException if {@code subscriber} is {@code null}
      */
-    synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
-        subscribers.remove(subscriber);
-        LOG.debug("Subscriber {} is removed", subscriber);
-        if (subscribers.isEmpty()) {
-            closeRegistration();
-            listenersBroker.removeStream(this);
+    void removeSubscriber(final Subscriber<T> subscriber) {
+        final var toRemove = requireNonNull(subscriber);
+        var observed = acquireSubscribers();
+        while (observed != null) {
+            final var next = observed.remove(toRemove);
+            final var witness = (Subscribers<T>) SUBSCRIBERS.compareAndExchangeRelease(this, observed, next);
+            if (witness == observed) {
+                LOG.debug("Subscriber {} is removed", subscriber);
+                if (next == null) {
+                    // We have lost the last subscriber, terminate.
+                    terminate();
+                }
+                return;
+            }
+
+            // We have raced: retry the operation
+            observed = witness;
         }
     }
 
+    private Subscribers<T> acquireSubscribers() {
+        return (Subscribers<T>) SUBSCRIBERS.getAcquire(this);
+    }
+
     /**
      * Signal the end-of-stream condition to subscribers, shut down this stream and initiate its removal from global
      * state.
      */
-    final synchronized void endOfStream() {
-        closeRegistration();
-
-        final var it = subscribers.iterator();
-        while (it.hasNext()) {
-            it.next().endOfStream();
-            it.remove();
+    final void endOfStream() {
+        // Atomic assertion we are ending plus locked clean up
+        final var local = (Subscribers<T>) SUBSCRIBERS.getAndSetRelease(this, null);
+        if (local != null) {
+            terminate();
+            local.endOfStream();
         }
+    }
 
-        listenersBroker.removeStream(this);
+    /**
+     * Post data to subscribed SSE session handlers.
+     *
+     * @param modelContext An {@link EffectiveModelContext} used to format the input
+     * @param input Input data
+     * @param now Current time
+     * @throws NullPointerException if any argument is {@code null}
+     */
+    void sendDataMessage(final EffectiveModelContext modelContext, final T input, final Instant now) {
+        final var local = acquireSubscribers();
+        if (local != null) {
+            local.publish(modelContext, input, now);
+        } else {
+            LOG.debug("Ignoring sendDataMessage() on terminated stream {}", this);
+        }
     }
 
-    @Holding("this")
-    private void closeRegistration() {
+    private void terminate() {
         if (registration != null) {
             registration.close();
             registration = null;
         }
+        listenersBroker.removeStream(this);
     }
 
     /**
@@ -250,27 +306,6 @@ public abstract class RestconfStream<T> {
         return registration != null;
     }
 
-    /**
-     * Post data to subscribed SSE session handlers.
-     *
-     * @param data Data of incoming notifications.
-     */
-    synchronized void post(final String data) {
-        final var iterator = subscribers.iterator();
-        while (iterator.hasNext()) {
-            final var subscriber = iterator.next();
-            if (subscriber.isConnected()) {
-                subscriber.sendDataMessage(data);
-                LOG.debug("Data was sent to subscriber {} on connection {}:", this, subscriber);
-            } else {
-                // removal is probably not necessary, because it will be removed explicitly soon after invocation of
-                // onWebSocketClosed(..) in handler; but just to be sure ...
-                iterator.remove();
-                LOG.debug("Subscriber for {} was removed - web-socket session is not open.", this);
-            }
-        }
-    }
-
     @Override
     public final String toString() {
         return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
index dfec4a1fb3fe16ad8d77259286af25b6a615fb41..2af6a34aae9834f1c093ac5084349224d5535952 100644 (file)
@@ -15,6 +15,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import javax.ws.rs.sse.Sse;
 import javax.ws.rs.sse.SseEventSink;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,14 +28,14 @@ public final class SSESessionHandler implements StreamSessionHandler {
     private static final CharMatcher CR_OR_LF = CharMatcher.anyOf("\r\n");
 
     private final ScheduledExecutorService executorService;
-    // FIXME: this really should include subscription details like formatter etc.
-    private final RestconfStream<?> listener;
+    private final RestconfStream<?> stream;
     private final int maximumFragmentLength;
     private final int heartbeatInterval;
     private final SseEventSink sink;
     private final Sse sse;
 
     private ScheduledFuture<?> pingProcess;
+    private Registration subscriber;
 
     /**
      * Creation of the new server-sent events session handler.
@@ -57,7 +58,7 @@ public final class SSESessionHandler implements StreamSessionHandler {
         this.executorService = executorService;
         this.sse = sse;
         this.sink = sink;
-        this.listener = listener;
+        stream = listener;
         this.maximumFragmentLength = maximumFragmentLength;
         this.heartbeatInterval = heartbeatInterval;
     }
@@ -66,12 +67,18 @@ public final class SSESessionHandler implements StreamSessionHandler {
      * Initialization of SSE connection. SSE session handler is registered at data-change-event / YANG notification
      * listener and the heartbeat ping process is started if it is enabled.
      */
-    public synchronized void init() {
-        listener.addSubscriber(this);
+    public synchronized boolean init() {
+        final var local = stream.addSubscriber(this);
+        if (local == null) {
+            return false;
+        }
+
+        subscriber = local;
         if (heartbeatInterval != 0) {
             pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval,
-                    heartbeatInterval, TimeUnit.MILLISECONDS);
+                heartbeatInterval, TimeUnit.MILLISECONDS);
         }
+        return true;
     }
 
     /**
@@ -79,8 +86,11 @@ public final class SSESessionHandler implements StreamSessionHandler {
      */
     @VisibleForTesting
     synchronized void close() {
-        listener.removeSubscriber(this);
-        stopPingProcess();
+        final var local = subscriber;
+        if (local != null) {
+            local.close();
+            stopPingProcess();
+        }
     }
 
     @Override
diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/Subscriber.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/Subscriber.java
new file mode 100644 (file)
index 0000000..eb38035
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.nb.rfc8040.streams;
+
+import static java.util.Objects.requireNonNull;
+
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yangtools.concepts.AbstractRegistration;
+
+/**
+ * A single subscriber to an {@link AbstractStream}.
+ */
+final class Subscriber<T> extends AbstractRegistration {
+    private final @NonNull RestconfStream<T> stream;
+    private final @NonNull StreamSessionHandler handler;
+    private final @NonNull EventFormatter<T> formatter;
+
+    Subscriber(final RestconfStream<T> stream, final StreamSessionHandler handler, final EventFormatter<T> formatter) {
+        this.stream = requireNonNull(stream);
+        this.handler = requireNonNull(handler);
+        this.formatter = requireNonNull(formatter);
+    }
+
+    @NonNull EventFormatter<T> formatter() {
+        return formatter;
+    }
+
+    @NonNull StreamSessionHandler handler() {
+        return handler;
+    }
+
+    @Override
+    protected void removeRegistration() {
+        stream.removeSubscriber(this);
+    }
+}
diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/Subscribers.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/Subscribers.java
new file mode 100644 (file)
index 0000000..32f0a8e
--- /dev/null
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.nb.rfc8040.streams;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.VerifyException;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ListMultimap;
+import java.time.Instant;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of subscribers attached to an {@link AbstractStream}. This is an immutable structure, which can be updated
+ * through a copy-on-writer process driven by {@link #add(Subscriber)} and {@link #remove(Subscriber)}.
+ *
+ * @param <T> event type
+ */
+abstract sealed class Subscribers<T> {
+    private static final class Empty<T> extends Subscribers<T> {
+        static final @NonNull Empty<?> INSTANCE = new Empty<>();
+
+        @Override
+        Subscribers<T> add(final Subscriber<T> toAdd) {
+            return new Single<>(toAdd);
+        }
+
+        @Override
+        Subscribers<T> remove(final Subscriber<?> toRemove) {
+            return this;
+        }
+
+        @Override
+        void endOfStream() {
+            // No-op
+        }
+
+        @Override
+        void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
+            // No-op
+        }
+    }
+
+    private static final class Single<T> extends Subscribers<T> {
+        private final Subscriber<T> subscriber;
+
+        Single(final Subscriber<T> subscriber) {
+            this.subscriber = requireNonNull(subscriber);
+        }
+
+        @Override
+        Subscribers<T> add(final Subscriber<T> toAdd) {
+            return new Multiple<>(ImmutableListMultimap.of(
+                subscriber.formatter(), subscriber,
+                toAdd.formatter(), toAdd));
+        }
+
+        @Override
+        Subscribers<T> remove(final Subscriber<?> toRemove) {
+            return toRemove.equals(subscriber) ? null : this;
+        }
+
+        @Override
+        void endOfStream() {
+            subscriber.handler().endOfStream();
+        }
+
+        @Override
+        void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
+            final var formatted = format(subscriber.formatter(), modelContext, input, now);
+            if (formatted != null) {
+                subscriber.handler().sendDataMessage(formatted);
+            }
+        }
+    }
+
+    private static final class Multiple<T> extends Subscribers<T> {
+        private final ImmutableListMultimap<EventFormatter<T>, Subscriber<T>> subscribers;
+
+        Multiple(final ListMultimap<EventFormatter<T>, Subscriber<T>> subscribers) {
+            this.subscribers = ImmutableListMultimap.copyOf(subscribers);
+        }
+
+        @Override
+        Subscribers<T> add(final Subscriber<T> toAdd) {
+            final var newSubscribers = ArrayListMultimap.create(subscribers);
+            newSubscribers.put(toAdd.formatter(), toAdd);
+            return new Multiple<>(newSubscribers);
+        }
+
+        @Override
+        Subscribers<T> remove(final Subscriber<?> toRemove) {
+            final var newSubscribers = ArrayListMultimap.create(subscribers);
+            return newSubscribers.remove(toRemove.formatter(), toRemove) ? switch (newSubscribers.size()) {
+                case 0 -> throw new VerifyException("Unexpected empty subscribers");
+                case 1 -> new Single<>(newSubscribers.values().iterator().next());
+                default -> new Multiple<>(newSubscribers);
+            } : this;
+        }
+
+        @Override
+        void endOfStream() {
+            subscribers.forEach((formatter, subscriber) -> subscriber.handler().endOfStream());
+        }
+
+        @Override
+        void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
+            for (var entry : subscribers.asMap().entrySet()) {
+                final var formatted = format(entry.getKey(), modelContext, input, now);
+                if (formatted != null) {
+                    for (var subscriber : entry.getValue()) {
+                        subscriber.handler().sendDataMessage(formatted);
+                    }
+                }
+            }
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(Subscribers.class);
+
+    private Subscribers() {
+        // Hidden on purpose
+    }
+
+    /**
+     * Return an An empty set of subscribers. This is the initial state of {@link RestconfStream}, waiting for the first
+     * subscriber to appear.
+     *
+     * @param <T> event type
+     * @return An empty {@link Subscribers} file
+     */
+    @SuppressWarnings("unchecked")
+    static <T> @NonNull Subscribers<T> empty() {
+        return (Subscribers<T>) Empty.INSTANCE;
+    }
+
+    /**
+     * Add a new subscriber to this file.
+     *
+     * @param toAdd subscriber to add
+     * @return A new {@link Subscribers} file
+     * @throws NullPointerException if {@code toAdd} is {@code null}
+     */
+    abstract @NonNull Subscribers<T> add(Subscriber<T> toAdd);
+
+    /**
+     * Remove a subscriber to this file.
+     *
+     * @param toRemove subscriber to add
+     * @return A new {@link Subscribers} file, or {@code null} if this file was not empty and it became empty
+     * @throws NullPointerException if {@code toRemove} is {@code null}
+     */
+    abstract @Nullable Subscribers<T> remove(Subscriber<?> toRemove);
+
+    /**
+     * Signal end-of-stream to all subscribers.
+     */
+    abstract void endOfStream();
+
+    /**
+     * Publish an event to all {@link Subscriber}s in this file.
+     *
+     * @param modelContext An {@link EffectiveModelContext} used to format the input
+     * @param input Input data
+     * @param now Current time
+     * @throws NullPointerException if any argument is {@code null}
+     */
+    abstract void publish(EffectiveModelContext modelContext, T input, Instant now);
+
+    @SuppressWarnings("checkstyle:illegalCatch")
+    private static <T> @Nullable String format(final EventFormatter<T> formatter,
+            final EffectiveModelContext modelContext, final T input, final Instant now) {
+        try {
+            return formatter.eventData(modelContext, input, now);
+        } catch (Exception e) {
+            LOG.warn("Failed to format", e);
+            return null;
+        }
+    }
+}
index e7b5b2a48c2619b1bae53a1be8b0419a258643b7..58e470c92153562a351e013d5d59baed10ef0214 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.base.Strings;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -25,6 +27,7 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,12 +41,12 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
     private static final byte[] PING_PAYLOAD = "ping".getBytes(Charset.defaultCharset());
 
     private final ScheduledExecutorService executorService;
-    // FIXME: this really should include formatter etc.
-    private final RestconfStream<?> listener;
+    private final RestconfStream<?> stream;
     private final int maximumFragmentLength;
     private final int heartbeatInterval;
 
     private Session session;
+    private Registration subscriber;
     private ScheduledFuture<?> pingProcess;
 
     /**
@@ -52,7 +55,7 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
      * @param executorService       Executor that is used for periodical sending of web-socket ping messages to keep
      *                              session up even if the notifications doesn't flow from server to clients or clients
      *                              don't implement ping-pong service.
-     * @param listener              YANG notification or data-change event listener to which client on this web-socket
+     * @param stream                YANG notification or data-change event listener to which client on this web-socket
      *                              session subscribes to.
      * @param maximumFragmentLength Maximum fragment length in number of Unicode code units (characters).
      *                              If this parameter is set to 0, the maximum fragment length is disabled and messages
@@ -62,10 +65,10 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
      * @param heartbeatInterval     Interval in milliseconds of sending of ping control frames to remote endpoint
      *                              to keep session up. Ping control frames are disabled if this parameter is set to 0.
      */
-    WebSocketSessionHandler(final ScheduledExecutorService executorService, final RestconfStream<?> listener,
+    WebSocketSessionHandler(final ScheduledExecutorService executorService, final RestconfStream<?> stream,
             final int maximumFragmentLength, final int heartbeatInterval) {
-        this.executorService = executorService;
-        this.listener = listener;
+        this.executorService = requireNonNull(executorService);
+        this.stream = requireNonNull(stream);
         this.maximumFragmentLength = maximumFragmentLength;
         this.heartbeatInterval = heartbeatInterval;
     }
@@ -82,7 +85,8 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
     public synchronized void onWebSocketConnected(final Session webSocketSession) {
         if (session == null || !session.isOpen()) {
             session = webSocketSession;
-            listener.addSubscriber(this);
+            subscriber = stream.addSubscriber(this);
+
             LOG.debug("A new web-socket session {} has been successfully registered.", webSocketSession);
             if (heartbeatInterval != 0) {
                 // sending of PING frame can be long if there is an error on web-socket - from this reason
@@ -106,9 +110,12 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
         // note: there is not guarantee that Session.isOpen() returns true - it is better to not check it here
         // using 'session != null && session.isOpen()'
         if (session != null) {
-            LOG.debug("Web-socket session has been closed with status code {} and reason message: {}.",
-                    statusCode, reason);
-            listener.removeSubscriber(this);
+            LOG.debug("Web-socket session has been closed with status code {} and reason message: {}.", statusCode,
+                reason);
+            if (subscriber != null) {
+                subscriber.close();
+                subscriber = null;
+            }
             stopPingProcess();
         }
     }
@@ -130,7 +137,10 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
         }
         if (session != null) {
             LOG.info("Trying to close web-socket session {} gracefully after error.", session);
-            listener.removeSubscriber(this);
+            if (subscriber != null) {
+                subscriber.close();
+                subscriber = null;
+            }
             if (session.isOpen()) {
                 session.close();
             }
index c80f6387b7ed2741f1a35380d5f8dbf52059953c..0e20b653eec85f4f46b34547141a6ed67d00062c 100644 (file)
@@ -61,6 +61,62 @@ import org.slf4j.LoggerFactory;
 import org.xmlunit.assertj.XmlAssert;
 
 public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
+    private static final class TestHandler implements StreamSessionHandler {
+        private CountDownLatch notificationLatch = new CountDownLatch(1);
+        private volatile String lastNotification;
+
+        @Override
+        public boolean isConnected() {
+            return true;
+        }
+
+        @Override
+        public void endOfStream() {
+            // No-op
+        }
+
+        @Override
+        public void sendDataMessage(final String data) {
+            lastNotification = data;
+            notificationLatch.countDown();
+        }
+
+        void assertGot(final String json) throws Exception {
+            // FIXME: use awaitility
+            if (!Uninterruptibles.awaitUninterruptibly(notificationLatch, 500, TimeUnit.SECONDS)) {
+                fail("Timed out waiting for notification for: " + json);
+            }
+
+            LOG.info("lastNotification: {}", lastNotification);
+            final String withFakeDate = withFakeDate(lastNotification);
+            LOG.info("Comparing: \n{}\n{}", json, withFakeDate);
+
+            JSONAssert.assertEquals(json, withFakeDate, false);
+            lastNotification = null;
+            notificationLatch = new CountDownLatch(1);
+        }
+
+        void assertXmlSimilar(final String xml) throws Exception {
+            awaitUntilNotification(xml);
+
+            LOG.info("lastNotification: {}", lastNotification);
+            final String withFakeDate = withFakeXmlDate(lastNotification);
+            LOG.info("Comparing: \n{}\n{}", xml, withFakeDate);
+
+            XmlAssert.assertThat(xml).and(withFakeDate).ignoreWhitespace().ignoreChildNodesOrder().areSimilar();
+            lastNotification = null;
+            notificationLatch = new CountDownLatch(1);
+        }
+
+        String awaitUntilNotification(final String xml) {
+            // FIXME: use awaitility
+            if (!Uninterruptibles.awaitUninterruptibly(notificationLatch, 500, TimeUnit.SECONDS)) {
+                fail("Timed out waiting for notification for: " + xml);
+            }
+            return lastNotification;
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeStreamTest.class);
 
     private static final String JSON_NOTIF_LEAVES_CREATE = "/listener-adapter-test/notif-leaves-create.json";
@@ -170,72 +226,25 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
         listenersBroker = new ListenersBroker.ServerSentEvents(domDataBroker);
     }
 
-    class TestDataTreeChangeStream extends DataTreeChangeStream {
-        private CountDownLatch notificationLatch = new CountDownLatch(1);
-        private volatile String lastNotification;
-
-        TestDataTreeChangeStream(final YangInstanceIdentifier path, final String streamName,
+    DataTreeChangeStream createStream(final YangInstanceIdentifier path, final String streamName,
                 final NotificationOutputType outputType, final boolean leafNodesOnly,
-                final boolean skipNotificationData, final boolean changedLeafNodesOnly, final boolean childNodesOnly,
-                final ListenersBroker listenersBroker) {
-            super(listenersBroker, streamName, outputType, databindProvider, LogicalDatastoreType.CONFIGURATION, path);
-            setQueryParams(new ReceiveEventsParams(null, null, null,
-                leafNodesOnly ? LeafNodesOnlyParam.of(true) : null,
-                skipNotificationData ? SkipNotificationDataParam.of(true) : null,
-                changedLeafNodesOnly ? ChangedLeafNodesOnlyParam.of(true) : null,
-                childNodesOnly ? ChildNodesOnlyParam.of(true) : null));
-        }
-
-        @Override
-        protected void post(final String data) {
-            lastNotification = data;
-            notificationLatch.countDown();
-        }
-
-        public void assertGot(final String json) throws JSONException {
-            // FIXME: use awaitility
-            if (!Uninterruptibles.awaitUninterruptibly(notificationLatch, 500, TimeUnit.SECONDS)) {
-                fail("Timed out waiting for notification for: " + json);
-            }
-
-            LOG.info("lastNotification: {}", lastNotification);
-            final String withFakeDate = withFakeDate(lastNotification);
-            LOG.info("Comparing: \n{}\n{}", json, withFakeDate);
-
-            JSONAssert.assertEquals(json, withFakeDate, false);
-            lastNotification = null;
-            notificationLatch = new CountDownLatch(1);
-        }
-
-        public void assertXmlSimilar(final String xml) {
-            awaitUntilNotification(xml);
-
-            LOG.info("lastNotification: {}", lastNotification);
-            final String withFakeDate = withFakeXmlDate(lastNotification);
-            LOG.info("Comparing: \n{}\n{}", xml, withFakeDate);
-
-            XmlAssert.assertThat(xml).and(withFakeDate).ignoreWhitespace().ignoreChildNodesOrder().areSimilar();
-            lastNotification = null;
-            notificationLatch = new CountDownLatch(1);
-        }
-
-        public String awaitUntilNotification(final String xml) {
-            // FIXME: use awaitility
-            if (!Uninterruptibles.awaitUninterruptibly(notificationLatch, 500, TimeUnit.SECONDS)) {
-                fail("Timed out waiting for notification for: " + xml);
-            }
-            return lastNotification;
-        }
-
-        public void resetLatch() {
-            notificationLatch = new CountDownLatch(1);
-        }
+                final boolean skipNotificationData, final boolean changedLeafNodesOnly, final boolean childNodesOnly) {
+        final var ret = new DataTreeChangeStream(listenersBroker, streamName, outputType, databindProvider,
+            LogicalDatastoreType.CONFIGURATION, path);
+        ret.setQueryParams(new ReceiveEventsParams(null, null, null,
+            leafNodesOnly ? LeafNodesOnlyParam.of(true) : null,
+            skipNotificationData ? SkipNotificationDataParam.of(true) : null,
+            changedLeafNodesOnly ? ChangedLeafNodesOnlyParam.of(true) : null,
+            childNodesOnly ? ChildNodesOnlyParam.of(true) : null));
+        return ret;
     }
 
     @Test
     public void testJsonNotifsLeaves() throws Exception {
-        var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
-            true, false, false, false, listenersBroker);
+        final var adapter = createStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
+            true, false, false, false);
+        final var handler = new TestHandler();
+        adapter.addSubscriber(handler);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
@@ -251,7 +260,7 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
             .setMyList1(BindingMap.of(new MyList1Builder().setMyLeaf11("Jed").setName("Althea").build()))
             .build());
         writeTransaction.commit();
-        adapter.assertGot(getNotifJson(JSON_NOTIF_LEAVES_CREATE));
+        handler.assertGot(getNotifJson(JSON_NOTIF_LEAVES_CREATE));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.merge(LogicalDatastoreType.CONFIGURATION, iid, new PatchContBuilder()
@@ -262,18 +271,20 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
             .setMyList1(BindingMap.of(new MyList1Builder().setMyLeaf12("Bertha").setName("Althea").build()))
             .build());
         writeTransaction.commit();
-        adapter.assertGot(getNotifJson(JSON_NOTIF_LEAVES_UPDATE));
+        handler.assertGot(getNotifJson(JSON_NOTIF_LEAVES_UPDATE));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, iid);
         writeTransaction.commit();
-        adapter.assertGot(getNotifJson(JSON_NOTIF_LEAVES_DELETE));
+        handler.assertGot(getNotifJson(JSON_NOTIF_LEAVES_DELETE));
     }
 
     @Test
     public void testJsonNotifsChangedLeaves() throws Exception {
-        var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
-                false, false, true, false, listenersBroker);
+        final var adapter = createStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON, false, false, true,
+            false);
+        final var handler = new TestHandler();
+        adapter.addSubscriber(handler);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
@@ -293,7 +304,7 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
             .setMyList1(BindingMap.of(new MyList1Builder().setMyLeaf11("Jed").setName("Althea").build()))
             .build());
         writeTransaction.commit();
-        adapter.assertGot(getNotifJson(JSON_NOTIF_CHANGED_LEAVES_CREATE));
+        handler.assertGot(getNotifJson(JSON_NOTIF_CHANGED_LEAVES_CREATE));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.merge(LogicalDatastoreType.CONFIGURATION, iid, new PatchContBuilder()
@@ -308,18 +319,20 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
             .setMyList1(BindingMap.of(new MyList1Builder().setMyLeaf12("Bertha").setName("Althea").build()))
             .build());
         writeTransaction.commit();
-        adapter.assertGot(getNotifJson(JSON_NOTIF_CHANGED_LEAVES_UPDATE));
+        handler.assertGot(getNotifJson(JSON_NOTIF_CHANGED_LEAVES_UPDATE));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, iid);
         writeTransaction.commit();
-        adapter.assertGot(getNotifJson(JSON_NOTIF_CHANGED_LEAVES_DELETE));
+        handler.assertGot(getNotifJson(JSON_NOTIF_CHANGED_LEAVES_DELETE));
     }
 
     @Test
     public void testJsonChildNodesOnly() throws Exception {
-        final var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey",
-            NotificationOutputType.JSON, false, false, false, true, listenersBroker);
+        final var adapter = createStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON, false, false, false,
+            true);
+        final var handler = new TestHandler();
+        adapter.addSubscriber(handler);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
@@ -330,30 +343,32 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, iid,
             new MyList1Builder().setMyLeaf11("Jed").setName("Althea").build());
         writeTransaction.commit();
-        adapter.assertGot(getNotifJson(JSON_NOTIF_CHILD_NODES_ONLY_CREATE));
+        handler.assertGot(getNotifJson(JSON_NOTIF_CHILD_NODES_ONLY_CREATE));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, iid,
             new MyList1Builder().setMyLeaf11("Bertha").setName("Althea").build());
         writeTransaction.commit();
-        adapter.assertGot(getNotifJson(JSON_NOTIF_CHILD_NODES_ONLY_UPDATE1));
+        handler.assertGot(getNotifJson(JSON_NOTIF_CHILD_NODES_ONLY_UPDATE1));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.merge(LogicalDatastoreType.CONFIGURATION, iid,
             new MyList1Builder().setMyLeaf11("Jed").setName("Althea").build());
         writeTransaction.commit();
-        adapter.assertGot(getNotifJson(JSON_NOTIF_CHILD_NODES_ONLY_UPDATE2));
+        handler.assertGot(getNotifJson(JSON_NOTIF_CHILD_NODES_ONLY_UPDATE2));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, iid);
         writeTransaction.commit();
-        adapter.assertGot(getNotifJson(JSON_NOTIF_CHILD_NODES_ONLY_DELETE));
+        handler.assertGot(getNotifJson(JSON_NOTIF_CHILD_NODES_ONLY_DELETE));
     }
 
     @Test
     public void testXmlLeavesOnly() throws Exception {
-        var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
-            true, false, false, false, listenersBroker);
+        final var adapter = createStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML, true, false, false,
+            false);
+        final var handler = new TestHandler();
+        adapter.addSubscriber(handler);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
@@ -368,7 +383,7 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
             .setMyList1(BindingMap.of(new MyList1Builder().setMyLeaf11("Jed").setName("Althea").build()))
             .build());
         writeTransaction.commit();
-        adapter.assertXmlSimilar(getResultXml(XML_NOTIF_LEAVES_CREATE));
+        handler.assertXmlSimilar(getResultXml(XML_NOTIF_LEAVES_CREATE));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.merge(LogicalDatastoreType.CONFIGURATION, iid, new PatchContBuilder()
@@ -379,14 +394,14 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
             .setMyList1(BindingMap.of(new MyList1Builder().setMyLeaf12("Bertha").setName("Althea").build()))
             .build());
         writeTransaction.commit();
-        adapter.assertXmlSimilar(getResultXml(XML_NOTIF_LEAVES_UPDATE));
+        handler.assertXmlSimilar(getResultXml(XML_NOTIF_LEAVES_UPDATE));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, iid);
         writeTransaction.commit();
 
         // xmlunit cannot compare deeper children it seems out of the box so just check the iid encoding
-        final String notification = adapter.awaitUntilNotification("");
+        final String notification = handler.awaitUntilNotification("");
         assertThat(notification, allOf(
             containsString("<path xmlns:a=\"instance:identifier:patch:module\">/a:patch-cont"
                 + "/a:my-list1[a:name='Althea']/a:my-leaf11</path>"),
@@ -402,8 +417,10 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
 
     @Test
     public void testXmlChangedLeavesOnly() throws Exception {
-        var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
-                false, false, true, false, listenersBroker);
+        final var adapter = createStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML, false, false, true,
+            false);
+        final var handler = new TestHandler();
+        adapter.addSubscriber(handler);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
@@ -422,7 +439,7 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
             .setMyList1(BindingMap.of(new MyList1Builder().setMyLeaf11("Jed").setName("Althea").build()))
             .build());
         writeTransaction.commit();
-        adapter.assertXmlSimilar(getResultXml(XML_NOTIF_CHANGED_LEAVES_CREATE));
+        handler.assertXmlSimilar(getResultXml(XML_NOTIF_CHANGED_LEAVES_CREATE));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.merge(LogicalDatastoreType.CONFIGURATION, iid, new PatchContBuilder()
@@ -437,14 +454,14 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
             .setMyList1(BindingMap.of(new MyList1Builder().setMyLeaf12("Bertha").setName("Althea").build()))
             .build());
         writeTransaction.commit();
-        adapter.assertXmlSimilar(getResultXml(XML_NOTIF_CHANGED_LEAVES_UPDATE));
+        handler.assertXmlSimilar(getResultXml(XML_NOTIF_CHANGED_LEAVES_UPDATE));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, iid);
         writeTransaction.commit();
 
         // xmlunit cannot compare deeper children it seems out of the box so just check the iid encoding
-        final String notification = adapter.awaitUntilNotification("");
+        final String notification = handler.awaitUntilNotification("");
         assertThat(notification, allOf(
             containsString("<path xmlns:a=\"instance:identifier:patch:module\">/a:patch-cont"
                 + "/a:my-list1[a:name='Althea']/a:my-leaf11</path>"),
@@ -460,8 +477,10 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
 
     @Test
     public void testXmlChildNodesOnly() throws Exception {
-        final var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey",
-            NotificationOutputType.XML, false, false, false, true, listenersBroker);
+        final var adapter = createStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML, false, false, false,
+            true);
+        final var handler = new TestHandler();
+        adapter.addSubscriber(handler);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
@@ -472,24 +491,24 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, iid,
             new MyList1Builder().setMyLeaf11("Jed").setName("Althea").build());
         writeTransaction.commit();
-        adapter.assertXmlSimilar(getResultXml(XML_NOTIF_CHILD_NODES_ONLY_CREATE));
+        handler.assertXmlSimilar(getResultXml(XML_NOTIF_CHILD_NODES_ONLY_CREATE));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, iid,
             new MyList1Builder().setMyLeaf11("Bertha").setName("Althea").build());
         writeTransaction.commit();
-        adapter.assertXmlSimilar(getResultXml(XML_NOTIF_CHILD_NODES_ONLY_UPDATE1));
+        handler.assertXmlSimilar(getResultXml(XML_NOTIF_CHILD_NODES_ONLY_UPDATE1));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.merge(LogicalDatastoreType.CONFIGURATION, iid,
             new MyList1Builder().setMyLeaf11("Jed").setName("Althea").build());
         writeTransaction.commit();
-        adapter.assertXmlSimilar(getResultXml(XML_NOTIF_CHILD_NODES_ONLY_UPDATE2));
+        handler.assertXmlSimilar(getResultXml(XML_NOTIF_CHILD_NODES_ONLY_UPDATE2));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, iid);
         writeTransaction.commit();
-        adapter.assertXmlSimilar(getResultXml(XML_NOTIF_CHILD_NODES_ONLY_DELETE));
+        handler.assertXmlSimilar(getResultXml(XML_NOTIF_CHILD_NODES_ONLY_DELETE));
     }
 
     @Test
@@ -564,12 +583,13 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
 
     private void jsonNotifications(final YangInstanceIdentifier pathYiid, final boolean skipData,
             final String jsonNotifCreate, final String jsonNotifUpdate, final String jsonNotifDelete) throws Exception {
-        final var adapter = new TestDataTreeChangeStream(pathYiid, "Casey",
-                NotificationOutputType.JSON, false, skipData, false, false, listenersBroker);
+        final var stream = createStream(pathYiid, "Casey", NotificationOutputType.JSON, false, skipData, false, false);
+        final var handler = new TestHandler();
+        stream.addSubscriber(handler);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, pathYiid);
-        changeService.registerDataTreeChangeListener(root, adapter);
+        changeService.registerDataTreeChangeListener(root, stream);
 
         var writeTransaction = dataBroker.newWriteOnlyTransaction();
         var builder = new MyList1Builder().setMyLeaf11("Jed").setName("Althea");
@@ -577,24 +597,25 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
                 .child(MyList1.class, new MyList1Key("Althea"));
         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, iid, builder.build());
         writeTransaction.commit();
-        adapter.assertGot(getNotifJson(jsonNotifCreate));
+        handler.assertGot(getNotifJson(jsonNotifCreate));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         builder = new MyList1Builder().withKey(new MyList1Key("Althea")).setMyLeaf12("Bertha");
         writeTransaction.merge(LogicalDatastoreType.CONFIGURATION, iid, builder.build());
         writeTransaction.commit();
-        adapter.assertGot(getNotifJson(jsonNotifUpdate));
+        handler.assertGot(getNotifJson(jsonNotifUpdate));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, iid);
         writeTransaction.commit();
-        adapter.assertGot(getNotifJson(jsonNotifDelete));
+        handler.assertGot(getNotifJson(jsonNotifDelete));
     }
 
     private void xmlNotifications(final YangInstanceIdentifier pathYiid, final boolean skipData,
             final String xmlNotifCreate, final String xmlNotifUpdate, final String xmlNotifDelete) throws Exception {
-        final var adapter = new TestDataTreeChangeStream(pathYiid, "Casey", NotificationOutputType.XML,
-                false, skipData, false, false, listenersBroker);
+        final var adapter = createStream(pathYiid, "Casey", NotificationOutputType.XML, false, skipData, false, false);
+        final var handler = new TestHandler();
+        adapter.addSubscriber(handler);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, pathYiid);
@@ -606,17 +627,17 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
                 .child(MyList1.class, new MyList1Key("Althea"));
         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, iid, builder.build());
         writeTransaction.commit();
-        adapter.assertXmlSimilar(getResultXml(xmlNotifCreate));
+        handler.assertXmlSimilar(getResultXml(xmlNotifCreate));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         builder = new MyList1Builder().withKey(new MyList1Key("Althea")).setMyLeaf12("Bertha");
         writeTransaction.merge(LogicalDatastoreType.CONFIGURATION, iid, builder.build());
         writeTransaction.commit();
-        adapter.assertXmlSimilar(getResultXml(xmlNotifUpdate));
+        handler.assertXmlSimilar(getResultXml(xmlNotifUpdate));
 
         writeTransaction = dataBroker.newWriteOnlyTransaction();
         writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, iid);
         writeTransaction.commit();
-        adapter.assertXmlSimilar(getResultXml(xmlNotifDelete));
+        handler.assertXmlSimilar(getResultXml(xmlNotifDelete));
     }
 }
index abe7f9e53b3921fdcdcce804621526ddd257045a..7f6efeede33da1556ca646feeccebc6c6bba67b5 100644 (file)
@@ -31,6 +31,7 @@ import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.opendaylight.yangtools.concepts.Registration;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class SSESessionHandlerTest {
@@ -44,13 +45,16 @@ public class SSESessionHandlerTest {
     private SseEventSink eventSink;
     @Mock
     private Sse sse;
+    @Mock
+    private Registration reg;
 
     private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) {
         doAnswer(inv -> new OutboundEvent.Builder().data(String.class, inv.getArgument(0, String.class)).build())
             .when(sse).newEvent(any());
 
-        final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, listener,
-            maxFragmentSize, heartbeatInterval);
+        final var sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, listener, maxFragmentSize,
+            heartbeatInterval);
+        doReturn(reg).when(listener).addSubscriber(sseSessionHandler);
         doReturn(pingFuture).when(executorService)
             .scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), eq((long) heartbeatInterval),
                 eq(TimeUnit.MILLISECONDS));
@@ -60,10 +64,9 @@ public class SSESessionHandlerTest {
     @Test
     public void onSSEConnectedWithEnabledPing() {
         final int heartbeatInterval = 1000;
-        final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
+        final var sseSessionHandler = setup(1000, heartbeatInterval);
 
         sseSessionHandler.init();
-        verify(listener).addSubscriber(sseSessionHandler);
         verify(executorService).scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval),
                 eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS));
     }
@@ -71,7 +74,7 @@ public class SSESessionHandlerTest {
     @Test
     public void onSSEConnectedWithDisabledPing() {
         final int heartbeatInterval = 0;
-        final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
+        final var sseSessionHandler = setup(1000, heartbeatInterval);
 
         sseSessionHandler.init();
         verify(listener).addSubscriber(sseSessionHandler);
@@ -80,59 +83,59 @@ public class SSESessionHandlerTest {
 
     @Test
     public void onSSEClosedWithOpenSession() {
-        final SSESessionHandler sseSessionHandler = setup(200, 10000);
+        final var sseSessionHandler = setup(200, 10000);
 
         sseSessionHandler.init();
         verify(listener).addSubscriber(sseSessionHandler);
 
         sseSessionHandler.close();
-        verify(listener).removeSubscriber(sseSessionHandler);
+        verify(reg).close();
     }
 
     @Test
     public void onSSECloseWithEnabledPingAndLivingSession() throws IOException {
-        final SSESessionHandler sseSessionHandler = setup(150, 8000);
+        final var sseSessionHandler = setup(150, 8000);
         sseSessionHandler.init();
         doReturn(false).when(pingFuture).isCancelled();
         doReturn(false).when(pingFuture).isDone();
 
         sseSessionHandler.close();
-        verify(listener).removeSubscriber(sseSessionHandler);
+        verify(reg).close();
         verify(pingFuture).cancel(anyBoolean());
     }
 
     @Test
     public void onSSECloseWithEnabledPingAndDeadSession() {
-        final SSESessionHandler sseSessionHandler = setup(150, 8000);
+        final var sseSessionHandler = setup(150, 8000);
         sseSessionHandler.init();
 
         sseSessionHandler.close();
-        verify(listener).removeSubscriber(sseSessionHandler);
+        verify(reg).close();
         verify(pingFuture).cancel(anyBoolean());
     }
 
     @Test
     public void onSSECloseWithDisabledPingAndDeadSession() {
-        final SSESessionHandler sseSessionHandler = setup(150, 8000);
+        final var sseSessionHandler = setup(150, 8000);
         sseSessionHandler.init();
         doReturn(true).when(pingFuture).isDone();
 
         sseSessionHandler.close();
-        verify(listener).removeSubscriber(sseSessionHandler);
+        verify(reg).close();
         verify(pingFuture, never()).cancel(anyBoolean());
     }
 
     @Test
     public void sendDataMessageWithDisabledFragmentation() throws IOException {
-        final SSESessionHandler sseSessionHandler = setup(0, 0);
+        final var sseSessionHandler = setup(0, 0);
         doReturn(false).when(eventSink).isClosed();
         sseSessionHandler.init();
         final String testMessage = generateRandomStringOfLength(100);
         sseSessionHandler.sendDataMessage(testMessage);
 
-        ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
+        final var cap = ArgumentCaptor.forClass(OutboundEvent.class);
         verify(eventSink, times(1)).send(cap.capture());
-        OutboundEvent event = cap.getAllValues().get(0);
+        final var event = cap.getAllValues().get(0);
         assertNotNull(event);
         assertEquals(event.getData(), testMessage);
     }
@@ -145,8 +148,7 @@ public class SSESessionHandlerTest {
 
         final String testMessage = generateRandomStringOfLength(11);
         sseSessionHandler.sendDataMessage(testMessage);
-        ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
-        verify(eventSink, times(0)).send(cap.capture());
+        verify(eventSink, times(0)).send(any());
     }
 
     @Test
index 449658fcebf7c9b897170557859fb7d32191640f..d22e65cef218f7f8e678b3a63eee49b8bc7b0780 100644 (file)
@@ -13,6 +13,8 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -29,6 +31,7 @@ import org.eclipse.jetty.websocket.api.RemoteEndpoint;
 import org.eclipse.jetty.websocket.api.Session;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.opendaylight.yangtools.concepts.Registration;
 
 public class WebSocketSessionHandlerTest {
     private static final class WebSocketTestSessionState {
@@ -82,27 +85,28 @@ public class WebSocketSessionHandlerTest {
 
     @Test
     public void onWebSocketConnectedWithAlreadyOpenSession() {
-        final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
-        final Session session = mock(Session.class);
+        final var webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
+        final var session = mock(Session.class);
         when(session.isOpen()).thenReturn(true);
 
         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
-        verify(webSocketTestSessionState.listener, times(1)).addSubscriber(any());
+        verify(webSocketTestSessionState.listener).addSubscriber(any());
     }
 
     @Test
     public void onWebSocketClosedWithOpenSession() {
-        final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(200, 10000);
-        final Session session = mock(Session.class);
+        final var webSocketTestSessionState = new WebSocketTestSessionState(200, 10000);
+        final var session = mock(Session.class);
+        final var reg = mock(Registration.class);
 
+        doReturn(reg).when(webSocketTestSessionState.listener)
+            .addSubscriber(webSocketTestSessionState.webSocketSessionHandler);
         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
-        verify(webSocketTestSessionState.listener).addSubscriber(
-                webSocketTestSessionState.webSocketSessionHandler);
+        verify(webSocketTestSessionState.listener).addSubscriber(webSocketTestSessionState.webSocketSessionHandler);
 
         webSocketTestSessionState.webSocketSessionHandler.onWebSocketClosed(200, "Simulated close");
-        verify(webSocketTestSessionState.listener).removeSubscriber(
-                webSocketTestSessionState.webSocketSessionHandler);
+        verify(reg).close();
     }
 
     @Test
@@ -114,58 +118,68 @@ public class WebSocketSessionHandlerTest {
 
     @Test
     public void onWebSocketErrorWithEnabledPingAndLivingSession() {
-        final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
-        final Session session = mock(Session.class);
+        final var webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
+        final var session = mock(Session.class);
+        final var reg = mock(Registration.class);
+
         when(session.isOpen()).thenReturn(true);
-        final Throwable sampleError = new IllegalStateException("Simulated error");
+        when(webSocketTestSessionState.listener.addSubscriber(webSocketTestSessionState.webSocketSessionHandler))
+            .thenReturn(reg);
         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
         when(webSocketTestSessionState.pingFuture.isCancelled()).thenReturn(false);
         when(webSocketTestSessionState.pingFuture.isDone()).thenReturn(false);
 
+        final var sampleError = new IllegalStateException("Simulated error");
+        doNothing().when(reg).close();
         webSocketTestSessionState.webSocketSessionHandler.onWebSocketError(sampleError);
-        verify(webSocketTestSessionState.listener).removeSubscriber(
-                webSocketTestSessionState.webSocketSessionHandler);
+        verify(reg).close();
         verify(session).close();
         verify(webSocketTestSessionState.pingFuture).cancel(anyBoolean());
     }
 
     @Test
     public void onWebSocketErrorWithEnabledPingAndDeadSession() {
-        final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
-        final Session session = mock(Session.class);
+        final var webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
+        final var session = mock(Session.class);
+        final var reg = mock(Registration.class);
+
         when(session.isOpen()).thenReturn(false);
-        final Throwable sampleError = new IllegalStateException("Simulated error");
+        when(webSocketTestSessionState.listener.addSubscriber(webSocketTestSessionState.webSocketSessionHandler))
+            .thenReturn(reg);
         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
 
+        final var sampleError = new IllegalStateException("Simulated error");
         webSocketTestSessionState.webSocketSessionHandler.onWebSocketError(sampleError);
-        verify(webSocketTestSessionState.listener).removeSubscriber(
-                webSocketTestSessionState.webSocketSessionHandler);
+        verify(reg).close();
         verify(session, never()).close();
         verify(webSocketTestSessionState.pingFuture).cancel(anyBoolean());
     }
 
     @Test
     public void onWebSocketErrorWithDisabledPingAndDeadSession() {
-        final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
-        final Session session = mock(Session.class);
+        final var webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
+        final var session = mock(Session.class);
+        final var reg = mock(Registration.class);
+
         when(session.isOpen()).thenReturn(false);
-        final Throwable sampleError = new IllegalStateException("Simulated error");
+        when(webSocketTestSessionState.listener.addSubscriber(webSocketTestSessionState.webSocketSessionHandler))
+            .thenReturn(reg);
         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
         when(webSocketTestSessionState.pingFuture.isCancelled()).thenReturn(false);
         when(webSocketTestSessionState.pingFuture.isDone()).thenReturn(true);
 
+        final var sampleError = new IllegalStateException("Simulated error");
         webSocketTestSessionState.webSocketSessionHandler.onWebSocketError(sampleError);
-        verify(webSocketTestSessionState.listener).removeSubscriber(
-                webSocketTestSessionState.webSocketSessionHandler);
+        verify(reg).close();
         verify(session, never()).close();
         verify(webSocketTestSessionState.pingFuture, never()).cancel(anyBoolean());
     }
 
     @Test
     public void sendDataMessageWithDisabledFragmentation() throws IOException {
-        final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(0, 0);
-        final Session session = mock(Session.class);
-        final RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
+        final var webSocketTestSessionState = new WebSocketTestSessionState(0, 0);
+        final var session = mock(Session.class);
+        final var remoteEndpoint = mock(RemoteEndpoint.class);
         when(session.isOpen()).thenReturn(true);
         when(session.getRemote()).thenReturn(remoteEndpoint);
         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);