Clean up stream class names 47/108847/4
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 3 Nov 2023 21:28:24 +0000 (22:28 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 3 Nov 2023 22:03:33 +0000 (23:03 +0100)
ListenerAdapter, AbstractStream et al. have rather unfortunate names.

Fix this by renaming:
- AbstractStream to RestconfStream
- AbstractNotificationListenerAdaptor to AbstractNotificationStream
- DeviceNotificationListenerAdaptor to DeviceNotificatioNStream
- ListenerAdapter to DataTreeChangeStream
- NotificationListenerAdapter to NotificationStream

JIRA: NETCONF-1102
Change-Id: Ib42d89344079c9815a628d9353abea15c67a72d5
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
16 files changed:
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractNotificationStream.java [moved from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractNotificationListenerAdaptor.java with 74% similarity]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeCandidateFormatter.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeCandidateFormatterFactory.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeChangeStream.java [moved from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenerAdapter.java with 80% similarity]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DeviceNotificationStream.java [moved from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DeviceNotificationListenerAdaptor.java with 83% similarity]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/JSONDataTreeCandidateFormatter.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBroker.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/NotificationStream.java [moved from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/NotificationListenerAdapter.java with 76% similarity]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStream.java [moved from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractStream.java with 84% similarity]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandler.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandler.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/XMLDataTreeCandidateFormatter.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeChangeStreamTest.java [moved from restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenerAdapterTest.java with 96% similarity]
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandlerTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandlerTest.java

@@ -18,23 +18,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Abstract base class for functionality shared between {@link NotificationListenerAdapter} and
- * {@link DeviceNotificationListenerAdaptor}.
+ * Abstract base class for functionality shared between {@link NotificationStream} and
+ * {@link DeviceNotificationStream}.
  */
-abstract class AbstractNotificationListenerAdaptor extends AbstractStream<DOMNotification>
-        implements DOMNotificationListener {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationListenerAdaptor.class);
+abstract class AbstractNotificationStream extends RestconfStream<DOMNotification> implements DOMNotificationListener {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationStream.class);
 
-    AbstractNotificationListenerAdaptor(final ListenersBroker listenersBroker, final String streamName,
+    AbstractNotificationStream(final ListenersBroker listenersBroker, final String name,
             final NotificationOutputType outputType) {
-        super(listenersBroker, streamName, outputType, getFormatterFactory(outputType));
-    }
-
-    private static NotificationFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
-        return switch (outputType) {
+        super(listenersBroker, name, outputType, switch (outputType) {
             case JSON -> JSONNotificationFormatter.FACTORY;
             case XML -> XMLNotificationFormatter.FACTORY;
-        };
+        });
     }
 
     @Override
index 1c8e4825fd8ae523410ad2c28f0236f10803cef4..80e69eef43b64a615d9b5be47445c3ab22840d25 100644 (file)
@@ -13,7 +13,7 @@ import static org.opendaylight.restconf.nb.rfc8040.streams.NotificationFormatter
 import static org.opendaylight.restconf.nb.rfc8040.streams.NotificationFormatter.XML_OUTPUT_FACTORY;
 
 import java.io.IOException;
-import java.util.Collection;
+import java.util.List;
 import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamWriter;
 import javax.xml.transform.dom.DOMResult;
@@ -29,7 +29,7 @@ import org.w3c.dom.Element;
 /**
  * Base formatter for DataTreeCandidates which only handles exporting to a document for filter checking purpose.
  */
-abstract class DataTreeCandidateFormatter extends EventFormatter<Collection<DataTreeCandidate>> {
+abstract class DataTreeCandidateFormatter extends EventFormatter<List<DataTreeCandidate>> {
     DataTreeCandidateFormatter(final TextParameters textParams) {
         super(textParams);
     }
@@ -41,7 +41,7 @@ abstract class DataTreeCandidateFormatter extends EventFormatter<Collection<Data
 
     @Override
     final void fillDocument(final Document doc, final EffectiveModelContext schemaContext,
-            final Collection<DataTreeCandidate> input) throws IOException {
+            final List<DataTreeCandidate> input) throws IOException {
         final Element notificationElement = NotificationFormatter.createNotificationElement(doc);
         final Element notificationEventElement = doc.createElementNS(
             SAL_REMOTE_NAMESPACE, DATA_CHANGED_NOTIFICATION_ELEMENT);
index eb0aca0234586c0e424e3acdb007db7bd4c99639..451a2e4f831f96111f56c6bf3fdcf0aa2fd3a5f3 100644 (file)
@@ -7,10 +7,10 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
-import java.util.Collection;
+import java.util.List;
 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
 
-abstract class DataTreeCandidateFormatterFactory extends EventFormatterFactory<Collection<DataTreeCandidate>> {
+abstract class DataTreeCandidateFormatterFactory extends EventFormatterFactory<List<DataTreeCandidate>> {
     DataTreeCandidateFormatterFactory(final DataTreeCandidateFormatter emptyFormatter) {
         super(emptyFormatter);
     }
similarity index 80%
rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenerAdapter.java
rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeChangeStream.java
index 3bf2f2d7e48eedd050b9d1b5ee6c657a92e594fb..fb37374b0086fca086997bf272b556cf75e8e125 100644 (file)
@@ -11,7 +11,6 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.MoreObjects.ToStringHelper;
 import java.time.Instant;
-import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.eclipse.jdt.annotation.NonNull;
@@ -28,39 +27,28 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
+ * A {@link RestconfStream} reporting changes on a particular data tree.
  */
-public class ListenerAdapter extends AbstractStream<Collection<DataTreeCandidate>>
+public class DataTreeChangeStream extends RestconfStream<List<DataTreeCandidate>>
         implements ClusteredDOMDataTreeChangeListener {
-    private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeStream.class);
 
     private final DatabindProvider databindProvider;
     private final @NonNull LogicalDatastoreType datastore;
     private final @NonNull YangInstanceIdentifier path;
 
-    /**
-     * Creates new {@link ListenerAdapter} listener specified by path and stream name and register for subscribing.
-     *
-     * @param path       Path to data in data store.
-     * @param streamName The name of the stream.
-     * @param outputType Type of output on notification (JSON, XML).
-     */
-    ListenerAdapter(final ListenersBroker listenersBroker, final String streamName,
+    DataTreeChangeStream(final ListenersBroker listenersBroker, final String name,
             final NotificationOutputType outputType, final DatabindProvider databindProvider,
             final LogicalDatastoreType datastore, final YangInstanceIdentifier path) {
-        super(listenersBroker, streamName, outputType, getFormatterFactory(outputType));
+        super(listenersBroker, name, outputType, switch (outputType) {
+            case JSON -> JSONDataTreeCandidateFormatter.FACTORY;
+            case XML -> XMLDataTreeCandidateFormatter.FACTORY;
+        });
         this.databindProvider = requireNonNull(databindProvider);
         this.datastore = requireNonNull(datastore);
         this.path = requireNonNull(path);
     }
 
-    private static DataTreeCandidateFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
-        return switch (outputType) {
-            case JSON -> JSONDataTreeCandidateFormatter.FACTORY;
-            case XML -> XMLDataTreeCandidateFormatter.FACTORY;
-        };
-    }
-
     @Override
     public void onInitialData() {
         // No-op
@@ -15,26 +15,25 @@ import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
 
 /**
- * {@link DeviceNotificationListenerAdaptor} is responsible to track events on notifications.
+ * A {@link RestconfStream} reporting YANG notifications coming from a mounted device.
  */
-public final class DeviceNotificationListenerAdaptor extends AbstractNotificationListenerAdaptor
-        implements DOMMountPointListener {
+public final class DeviceNotificationStream extends AbstractNotificationStream implements DOMMountPointListener {
     private final @NonNull EffectiveModelContext effectiveModel;
     private final @NonNull DOMMountPointService mountPointService;
     private final @NonNull YangInstanceIdentifier instanceIdentifier;
 
-    private ListenerRegistration<DOMMountPointListener> reg;
+    private Registration reg;
 
-    DeviceNotificationListenerAdaptor(final ListenersBroker listenersBroker, final String streamName,
+    DeviceNotificationStream(final ListenersBroker listenersBroker, final String name,
             final NotificationOutputType outputType, final EffectiveModelContext effectiveModel,
             final DOMMountPointService mountPointService, final YangInstanceIdentifier instanceIdentifier) {
-        super(listenersBroker, streamName, outputType);
+        super(listenersBroker, name, outputType);
         this.effectiveModel = requireNonNull(effectiveModel);
         this.mountPointService = requireNonNull(mountPointService);
         this.instanceIdentifier = requireNonNull(instanceIdentifier);
index 2129d52045f010dc10c43b852e4ed9fd4ccce819..1ceecb58c1142d32ffc249325d7342a9b5be4bdb 100644 (file)
@@ -11,7 +11,7 @@ import com.google.gson.stream.JsonWriter;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.time.Instant;
-import java.util.Collection;
+import java.util.List;
 import javax.xml.xpath.XPathExpressionException;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.$YangModuleInfoImpl;
@@ -51,7 +51,7 @@ public final class JSONDataTreeCandidateFormatter extends DataTreeCandidateForma
 
     @Override
     String createText(final TextParameters params, final EffectiveModelContext schemaContext,
-            final Collection<DataTreeCandidate> input, final Instant now) throws IOException {
+            final List<DataTreeCandidate> input, final Instant now) throws IOException {
         try (var writer = new StringWriter()) {
             boolean nonEmpty = false;
             try (var jsonWriter = new JsonWriter(writer)) {
index f61b35163d04aa8cf2a975b572818f1424d70f4c..3bf7921bb0d04b3ccc309fd63660185b75213011 100644 (file)
@@ -60,8 +60,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
- * {@link NotificationListenerAdapter} listeners.
+ * This singleton class is responsible for creation, removal and searching for {@link DataTreeChangeStream} or
+ * {@link NotificationStream} listeners.
  */
 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
 //        names. We essentially need a component which deals with allocation of stream names and their lifecycle and
@@ -108,17 +108,17 @@ public abstract sealed class ListenersBroker {
     }
 
     /**
-     * Factory interface for creating instances of {@link AbstractStream}.
+     * Factory interface for creating instances of {@link RestconfStream}.
      *
-     * @param <T> {@link AbstractStream} type
+     * @param <T> {@link RestconfStream} type
      */
     @FunctionalInterface
-    public interface StreamFactory<T extends AbstractStream<?>> {
+    public interface StreamFactory<T extends RestconfStream<?>> {
         /**
          * Create a stream with the supplied name.
          *
          * @param name Stream name
-         * @return An {@link AbstractStream}
+         * @return An {@link RestconfStream}
          */
         @NonNull T createStream(@NonNull String name);
     }
@@ -141,11 +141,6 @@ public abstract sealed class ListenersBroker {
 
 //    private static final QName LOCATION_QNAME = QName.create(Notifi.QNAME, "location").intern();
 //    private static final NodeIdentifier LOCATION_NODEID = NodeIdentifier.create(LOCATION_QNAME);
-//    private static final String STREAMS_PATH = "ietf-restconf-monitoring:restconf-state/streams";
-//    private static final String STREAM_PATH_PART = "/stream=";
-//    private static final String STREAM_PATH = STREAMS_PATH + STREAM_PATH_PART;
-//    private static final String STREAM_ACCESS_PATH_PART = "/access=";
-//    private static final String STREAM_LOCATION_PATH_PART = "/location";
 //
 //    private final ListenersBroker listenersBroker;
 //    private final HandlersHolder handlersHolder;
@@ -216,7 +211,7 @@ public abstract sealed class ListenersBroker {
     private static final NodeIdentifier STREAM_NAME_NODEID =
         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
 
-    private final ConcurrentMap<String, AbstractStream<?>> streams = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, RestconfStream<?>> streams = new ConcurrentHashMap<>();
     private final DOMDataBroker dataBroker;
 
     private ListenersBroker(final DOMDataBroker dataBroker) {
@@ -224,26 +219,26 @@ public abstract sealed class ListenersBroker {
     }
 
     /**
-     * Get an {@link AbstractStream} by its name.
+     * Get a {@link RestconfStream} by its name.
      *
      * @param streamName Stream name.
-     * @return An {@link AbstractStream}, or {@code null} if the stream with specified name does not exist.
+     * @return A {@link RestconfStream}, or {@code null} if the stream with specified name does not exist.
      * @throws NullPointerException if {@code streamName} is {@code null}
      */
-    public final @Nullable AbstractStream<?> getStream(final String streamName) {
+    public final @Nullable RestconfStream<?> getStream(final String streamName) {
         return streams.get(streamName);
     }
 
     /**
-     * Create an {@link AbstractStream} with a unique name. This method will atomically generate a stream name, create
+     * Create a {@link RestconfStream} with a unique name. This method will atomically generate a stream name, create
      * the corresponding instance and register it
      *
      * @param <T> Stream type
      * @param factory Factory for creating the actual stream instance
-     * @return An {@link AbstractStream} instance
+     * @return A {@link RestconfStream} instance
      * @throws NullPointerException if {@code factory} is {@code null}
      */
-    public final <T extends AbstractStream<?>> @NonNull T createStream(final StreamFactory<T> factory) {
+    public final <T extends RestconfStream<?>> @NonNull T createStream(final StreamFactory<T> factory) {
         String name;
         T stream;
         do {
@@ -261,9 +256,9 @@ public abstract sealed class ListenersBroker {
      *
      * @param stream Stream to remove
      */
-    final void removeStream(final AbstractStream<?> stream) {
+    final void removeStream(final RestconfStream<?> stream) {
         // Defensive check to see if we are still tracking the stream
-        final var streamName = stream.getStreamName();
+        final var streamName = stream.name();
         if (streams.get(streamName) != stream) {
             LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", streamName, stream);
             return;
@@ -371,13 +366,13 @@ public abstract sealed class ListenersBroker {
             : LogicalDatastoreType.CONFIGURATION;
         final var path = preparePath(input);
         final var outputType = prepareOutputType(input);
-        final var adapter = createStream(name -> new ListenerAdapter(this, name, outputType, databindProvider,
+        final var adapter = createStream(name -> new DataTreeChangeStream(this, name, outputType, databindProvider,
             datastore, path));
 
         // building of output
         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
-            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
+            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.name()))
             .build()));
     }
 
@@ -457,12 +452,12 @@ public abstract sealed class ListenersBroker {
 
         // registration of the listener
         final var outputType = prepareOutputType(input);
-        final var adapter = createStream(name -> new NotificationListenerAdapter(this, name, outputType,
+        final var adapter = createStream(name -> new NotificationStream(this, name, outputType,
             databindProvider, qnames));
 
         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
-            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
+            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.name()))
             .build()));
     }
 
@@ -557,14 +552,14 @@ public abstract sealed class ListenersBroker {
 
         final var outputType = prepareOutputType(input);
         final var notificationListenerAdapter = createStream(
-            streamName -> new DeviceNotificationListenerAdaptor(this, streamName, outputType, mountModelContext,
+            streamName -> new DeviceNotificationStream(this, streamName, outputType, mountModelContext,
                 mountPointService, mountPoint.getIdentifier()));
         notificationListenerAdapter.listen(mountNotifService, notificationPaths);
 
         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
             .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
             .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
-                baseUrl + notificationListenerAdapter.getStreamName()))
+                baseUrl + notificationListenerAdapter.name()))
             .build()));
     }
 
similarity index 76%
rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/NotificationListenerAdapter.java
rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/NotificationStream.java
index 7df4ba589229283f2f111fac26bb4b2f0ffcb656..6dd968f8df095e3edf466d01bffe7516ef98b36a 100644 (file)
@@ -19,24 +19,16 @@ import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
 
 /**
- * {@link NotificationListenerAdapter} is responsible to track events on notifications.
+ * A {@link RestconfStream} reporting YANG notifications.
  */
-public final class NotificationListenerAdapter extends AbstractNotificationListenerAdaptor {
+public final class NotificationStream extends AbstractNotificationStream {
     private final DatabindProvider databindProvider;
     private final ImmutableSet<QName> paths;
 
-    /**
-     * Set path of listener and stream name.
-     *
-     * @param paths      Top-level  Schema path of YANG notification.
-     * @param streamName Name of the stream.
-     * @param outputType Type of output on notification (JSON or XML).
-     * @param listenersBroker Associated {@link ListenersBroker}
-     */
-    NotificationListenerAdapter(final ListenersBroker listenersBroker, final String streamName,
+    NotificationStream(final ListenersBroker listenersBroker, final String name,
             final NotificationOutputType outputType, final DatabindProvider databindProvider,
             final ImmutableSet<QName> paths) {
-        super(listenersBroker, streamName, outputType);
+        super(listenersBroker, name, outputType);
         this.databindProvider = requireNonNull(databindProvider);
         this.paths = requireNonNull(paths);
     }
similarity index 84%
rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractStream.java
rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStream.java
index 14862eec173d23613c2a95601e69b998c9456bfe..6de20796cd0ce24492b52b25033e70dd9d1f6a43 100644 (file)
@@ -7,8 +7,6 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.MoreObjects;
@@ -31,13 +29,11 @@ import org.slf4j.LoggerFactory;
 /**
  * Base superclass for all stream types.
  */
-abstract class AbstractStream<T> {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractStream.class);
+abstract class RestconfStream<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(RestconfStream.class);
 
-    private final EventFormatterFactory<T> formatterFactory;
-    private final NotificationOutputType outputType;
-    private final String streamName;
-    protected final @NonNull ListenersBroker listenersBroker;
+    private final @NonNull ListenersBroker listenersBroker;
+    private final @NonNull String name;
 
     @GuardedBy("this")
     private final Set<StreamSessionHandler> subscribers = new HashSet<>();
@@ -45,14 +41,14 @@ abstract class AbstractStream<T> {
     private Registration registration;
 
     // FIXME: NETCONF-1102: this should be tied to a subscriber
+    private final EventFormatterFactory<T> formatterFactory;
+    private final NotificationOutputType outputType;
     private @NonNull EventFormatter<T> formatter;
 
-    AbstractStream(final ListenersBroker listenersBroker, final String streamName,
-            final NotificationOutputType outputType, final EventFormatterFactory<T> formatterFactory) {
+    RestconfStream(final ListenersBroker listenersBroker, final String name, final NotificationOutputType outputType,
+            final EventFormatterFactory<T> formatterFactory) {
         this.listenersBroker = requireNonNull(listenersBroker);
-        this.streamName = requireNonNull(streamName);
-        checkArgument(!streamName.isEmpty());
-
+        this.name = requireNonNull(name);
         this.outputType = requireNonNull(outputType);
         this.formatterFactory = requireNonNull(formatterFactory);
         formatter = formatterFactory.emptyFormatter();
@@ -63,8 +59,8 @@ abstract class AbstractStream<T> {
      *
      * @return Stream name.
      */
-    public final String getStreamName() {
-        return streamName;
+    public final @NonNull String name() {
+        return name;
     }
 
     /**
@@ -76,23 +72,15 @@ abstract class AbstractStream<T> {
         return outputType.getName();
     }
 
-    /**
-     * Checks if exists at least one {@link StreamSessionHandler} subscriber.
-     *
-     * @return {@code true} if exist at least one {@link StreamSessionHandler} subscriber, {@code false} otherwise.
-     */
-    final synchronized boolean hasSubscribers() {
-        return !subscribers.isEmpty();
-    }
-
     /**
      * Registers {@link StreamSessionHandler} subscriber.
      *
      * @param subscriber SSE or WS session handler.
      */
     synchronized void addSubscriber(final StreamSessionHandler subscriber) {
-        final boolean isConnected = subscriber.isConnected();
-        checkState(isConnected);
+        if (!subscriber.isConnected()) {
+            throw new IllegalStateException(subscriber + " is not connected");
+        }
         LOG.debug("Subscriber {} is added.", subscriber);
         subscribers.add(subscriber);
     }
@@ -144,7 +132,7 @@ abstract class AbstractStream<T> {
     public final void setQueryParams(final ReceiveEventsParams params) {
         final var startTime = params.startTime();
         if (startTime != null) {
-            throw new RestconfDocumentedException("Stream " + streamName + " does not support replay",
+            throw new RestconfDocumentedException("Stream " + name + " does not support replay",
                 ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
         }
 
@@ -228,6 +216,6 @@ abstract class AbstractStream<T> {
     }
 
     ToStringHelper addToStringAttributes(final ToStringHelper helper) {
-        return helper.add("stream-name", streamName).add("output-type", getOutputType());
+        return helper.add("name", name).add("output-type", getOutputType());
     }
 }
index d1c85ce7fc58555390334ec3b462b47780ede3e7..dfec4a1fb3fe16ad8d77259286af25b6a615fb41 100644 (file)
@@ -28,7 +28,7 @@ public final class SSESessionHandler implements StreamSessionHandler {
 
     private final ScheduledExecutorService executorService;
     // FIXME: this really should include subscription details like formatter etc.
-    private final AbstractStream<?> listener;
+    private final RestconfStream<?> listener;
     private final int maximumFragmentLength;
     private final int heartbeatInterval;
     private final SseEventSink sink;
@@ -53,7 +53,7 @@ public final class SSESessionHandler implements StreamSessionHandler {
      *            session up. Ping control frames are disabled if this parameter is set to 0.
      */
     public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse,
-            final AbstractStream<?> listener, final int maximumFragmentLength, final int heartbeatInterval) {
+            final RestconfStream<?> listener, final int maximumFragmentLength, final int heartbeatInterval) {
         this.executorService = executorService;
         this.sse = sse;
         this.sink = sink;
index 6ce653141e6601eb950594a0a52bb9cc7bc92956..e7b5b2a48c2619b1bae53a1be8b0419a258643b7 100644 (file)
@@ -39,7 +39,7 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
 
     private final ScheduledExecutorService executorService;
     // FIXME: this really should include formatter etc.
-    private final AbstractStream<?> listener;
+    private final RestconfStream<?> listener;
     private final int maximumFragmentLength;
     private final int heartbeatInterval;
 
@@ -62,7 +62,7 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
      * @param heartbeatInterval     Interval in milliseconds of sending of ping control frames to remote endpoint
      *                              to keep session up. Ping control frames are disabled if this parameter is set to 0.
      */
-    WebSocketSessionHandler(final ScheduledExecutorService executorService, final AbstractStream<?> listener,
+    WebSocketSessionHandler(final ScheduledExecutorService executorService, final RestconfStream<?> listener,
             final int maximumFragmentLength, final int heartbeatInterval) {
         this.executorService = executorService;
         this.listener = listener;
index a72427d02b2296504ce959f672f151c66a31bc9f..06a30dc5dc34015a68305a141f98426f540b99ea 100644 (file)
@@ -13,7 +13,7 @@ import static org.opendaylight.restconf.nb.rfc8040.streams.NotificationFormatter
 import java.io.IOException;
 import java.io.StringWriter;
 import java.time.Instant;
-import java.util.Collection;
+import java.util.List;
 import javax.xml.XMLConstants;
 import javax.xml.stream.XMLStreamException;
 import javax.xml.xpath.XPathExpressionException;
@@ -47,7 +47,7 @@ public final class XMLDataTreeCandidateFormatter extends DataTreeCandidateFormat
 
     @Override
     String createText(final TextParameters params, final EffectiveModelContext schemaContext,
-            final Collection<DataTreeCandidate> input, final Instant now) throws Exception {
+            final List<DataTreeCandidate> input, final Instant now) throws Exception {
         final var writer = new StringWriter();
         boolean nonEmpty = false;
         try {
similarity index 96%
rename from restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenerAdapterTest.java
rename to restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeChangeStreamTest.java
index 33a85977529480dd42464186a0721603ab0f0908..9d96255b772642fe2ab217cf66769c3085c2fd10 100644 (file)
@@ -60,8 +60,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xmlunit.assertj.XmlAssert;
 
-public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
-    private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapterTest.class);
+public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
+    private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeStreamTest.class);
 
     private static final String JSON_NOTIF_LEAVES_CREATE = "/listener-adapter-test/notif-leaves-create.json";
     private static final String JSON_NOTIF_LEAVES_UPDATE =  "/listener-adapter-test/notif-leaves-update.json";
@@ -170,12 +170,11 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
         listenersBroker = new ListenersBroker.ServerSentEvents(domDataBroker);
     }
 
-    class ListenerAdapterTester extends ListenerAdapter {
-
-        private volatile String lastNotification;
+    class TestDataTreeChangeStream extends DataTreeChangeStream {
         private CountDownLatch notificationLatch = new CountDownLatch(1);
+        private volatile String lastNotification;
 
-        ListenerAdapterTester(final YangInstanceIdentifier path, final String streamName,
+        TestDataTreeChangeStream(final YangInstanceIdentifier path, final String streamName,
                 final NotificationOutputType outputType, final boolean leafNodesOnly,
                 final boolean skipNotificationData, final boolean changedLeafNodesOnly, final boolean childNodesOnly,
                 final ListenersBroker listenersBroker) {
@@ -235,7 +234,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     @Test
     public void testJsonNotifsLeaves() throws Exception {
-        var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
+        var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
             true, false, false, false, listenersBroker);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
@@ -273,7 +272,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     @Test
     public void testJsonNotifsChangedLeaves() throws Exception {
-        var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
+        var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
                 false, false, true, false, listenersBroker);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
@@ -319,7 +318,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     @Test
     public void testJsonChildNodesOnly() throws Exception {
-        final var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey",
+        final var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey",
             NotificationOutputType.JSON, false, false, false, true, listenersBroker);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
@@ -353,7 +352,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     @Test
     public void testXmlLeavesOnly() throws Exception {
-        var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
+        var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
             true, false, false, false, listenersBroker);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
@@ -403,7 +402,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     @Test
     public void testXmlChangedLeavesOnly() throws Exception {
-        var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
+        var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
                 false, false, true, false, listenersBroker);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
@@ -461,7 +460,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     @Test
     public void testXmlChildNodesOnly() throws Exception {
-        final var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey",
+        final var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey",
             NotificationOutputType.XML, false, false, false, true, listenersBroker);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
@@ -565,7 +564,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     private void jsonNotifications(final YangInstanceIdentifier pathYiid, final boolean skipData,
             final String jsonNotifCreate, final String jsonNotifUpdate, final String jsonNotifDelete) throws Exception {
-        final var adapter = new ListenerAdapterTester(pathYiid, "Casey",
+        final var adapter = new TestDataTreeChangeStream(pathYiid, "Casey",
                 NotificationOutputType.JSON, false, skipData, false, false, listenersBroker);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
@@ -594,7 +593,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     private void xmlNotifications(final YangInstanceIdentifier pathYiid, final boolean skipData,
             final String xmlNotifCreate, final String xmlNotifUpdate, final String xmlNotifDelete) throws Exception {
-        final var adapter = new ListenerAdapterTester(pathYiid, "Casey", NotificationOutputType.XML,
+        final var adapter = new TestDataTreeChangeStream(pathYiid, "Casey", NotificationOutputType.XML,
                 false, skipData, false, false, listenersBroker);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
index a802f7a1cca369d6abaf85b151a2ffcbecb70544..abe7f9e53b3921fdcdcce804621526ddd257045a 100644 (file)
@@ -37,7 +37,7 @@ public class SSESessionHandlerTest {
     @Mock
     private ScheduledExecutorService executorService;
     @Mock
-    private AbstractStream<?> listener;
+    private RestconfStream<?> listener;
     @Mock
     private ScheduledFuture<?> pingFuture;
     @Mock
index c02729ed59f13489087dcae50b875bca0986a69f..7909193edd21523c814e480b00209be7f6dd8cf6 100644 (file)
@@ -50,10 +50,10 @@ class WebSocketFactoryTest extends AbstractNotificationListenerTest {
         listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker);
         webSocketFactory = new WebSocketFactory(execService, listenersBroker, 5000, 2000);
 
-        streamName = listenersBroker.createStream(name -> new ListenerAdapter(listenersBroker, name,
+        streamName = listenersBroker.createStream(name -> new DataTreeChangeStream(listenersBroker, name,
             NotificationOutputType.JSON, databindProvider, LogicalDatastoreType.CONFIGURATION,
             YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))))
-            .getStreamName();
+            .name();
     }
 
     @Test
index 631ef9c46c9bae9928f953984408af5607ae3bc5..449658fcebf7c9b897170557859fb7d32191640f 100644 (file)
@@ -32,7 +32,7 @@ import org.mockito.ArgumentCaptor;
 
 public class WebSocketSessionHandlerTest {
     private static final class WebSocketTestSessionState {
-        private final AbstractStream<?> listener;
+        private final RestconfStream<?> listener;
         private final ScheduledExecutorService executorService;
         private final WebSocketSessionHandler webSocketSessionHandler;
         private final int heartbeatInterval;
@@ -40,7 +40,7 @@ public class WebSocketSessionHandlerTest {
         private final ScheduledFuture pingFuture;
 
         WebSocketTestSessionState(final int maxFragmentSize, final int heartbeatInterval) {
-            listener = mock(AbstractStream.class);
+            listener = mock(RestconfStream.class);
             executorService = mock(ScheduledExecutorService.class);
             this.heartbeatInterval = heartbeatInterval;
             this.maxFragmentSize = maxFragmentSize;