Eliminate SubscribeToStreamUtil 05/108805/3
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 1 Nov 2023 16:58:47 +0000 (17:58 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 1 Nov 2023 17:09:40 +0000 (18:09 +0100)
SubscribeToStreamUtil is tightly coupled to ListenersBroker, as it needs
its services. ListenersBroker in turn will require access to
SubscribeToStreamUtil's understanding of endpoints.

This patch merges the two classes, eliminating SubscribeToStreamUtil in
the process of doing so.

JIRA: NETCONF-1102
Change-Id: I9b33844b594c3cef7272ce7b81539b1fc78ec156
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
13 files changed:
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/JaxRsNorthbound.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/RestconfApplication.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java [deleted file]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtilTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImplTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImplTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/JsonNotificationListenerTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapterTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/XmlNotificationListenerTest.java

index 2042e29d061d8e6d5a623f405c6d6df056bf6139..e97779e4a2b252c9ff71292a92c085736f0333be 100644 (file)
@@ -28,7 +28,6 @@ import org.opendaylight.mdsal.dom.api.DOMSchemaService;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.MdsalRestconfServer;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfDataStreamServiceImpl;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.SubscribeToStreamUtil;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
 import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer;
@@ -78,11 +77,10 @@ public final class JaxRsNorthbound implements AutoCloseable {
             @Reference final DOMMountPointService mountPointService,
             @Reference final DOMNotificationService notificationService, @Reference final DOMRpcService rpcService,
             @Reference final DOMSchemaService schemaService, @Reference final DatabindProvider databindProvider,
-            @Reference final MdsalRestconfServer server, @Reference final ListenersBroker listenersBroker,
-            final Configuration configuration) throws ServletException {
+            @Reference final MdsalRestconfServer server, final Configuration configuration) throws ServletException {
         this(webServer, webContextSecurer, servletSupport, filterAdapterConfiguration, actionService, dataBroker,
             mountPointService, notificationService, rpcService, schemaService, databindProvider, server,
-            listenersBroker, configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(),
+            configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(),
             new StreamsConfiguration(configuration.maximum$_$fragment$_$length(),
                 configuration.idle$_$timeout(), configuration.heartbeat$_$interval(), configuration.use$_$sse()));
     }
@@ -92,16 +90,15 @@ public final class JaxRsNorthbound implements AutoCloseable {
             final DOMActionService actionService, final DOMDataBroker dataBroker,
             final DOMMountPointService mountPointService, final DOMNotificationService notificationService,
             final DOMRpcService rpcService, final DOMSchemaService schemaService,
-            final DatabindProvider databindProvider, final MdsalRestconfServer server,
-            final ListenersBroker listenersBroker, final String pingNamePrefix, final int pingMaxThreadCount,
-            final StreamsConfiguration streamsConfiguration) throws ServletException {
+            final DatabindProvider databindProvider, final MdsalRestconfServer server, final String pingNamePrefix,
+            final int pingMaxThreadCount, final StreamsConfiguration streamsConfiguration) throws ServletException {
         final var scheduledThreadPool = new ScheduledThreadPoolWrapper(pingMaxThreadCount,
             new NamingThreadPoolFactory(pingNamePrefix));
 
-        final SubscribeToStreamUtil streamUtils;
+        final ListenersBroker listenersBroker;
         final ServletDetails streamServlet;
         if (streamsConfiguration.useSSE()) {
-            streamUtils = SubscribeToStreamUtil.serverSentEvents(listenersBroker);
+            listenersBroker = new ListenersBroker.ServerSentEvents();
             streamServlet = ServletDetails.builder()
                 .addUrlPattern("/" + URLConstants.SSE_SUBPATH + "/*")
                 .servlet(servletSupport.createHttpServletBuilder(
@@ -112,7 +109,7 @@ public final class JaxRsNorthbound implements AutoCloseable {
                 .asyncSupported(true)
                 .build();
         } else {
-            streamUtils = SubscribeToStreamUtil.webSockets(listenersBroker);
+            listenersBroker = new ListenersBroker.WebSockets();
             streamServlet = ServletDetails.builder()
                 .addUrlPattern("/" + RestconfStreamsConstants.DATA_SUBSCRIPTION + "/*")
                 .addUrlPattern("/" + RestconfStreamsConstants.NOTIFICATION_STREAM + "/*")
@@ -129,7 +126,7 @@ public final class JaxRsNorthbound implements AutoCloseable {
                 .addUrlPattern("/*")
                 .servlet(servletSupport.createHttpServletBuilder(
                     new RestconfApplication(databindProvider, server, mountPointService, dataBroker, actionService,
-                        notificationService, schemaService, streamUtils))
+                        notificationService, schemaService, listenersBroker))
                     .build())
                 .asyncSupported(true)
                 .build())
index 8c62f743a9b211aebc4cca1fa29ec235f84d85e6..00d660abd49a0e1f906cd2dd38cadd33a55d9ffe 100644 (file)
@@ -23,7 +23,7 @@ import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfInvokeOp
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfOperationsServiceImpl;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfSchemaServiceImpl;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.SubscribeToStreamUtil;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 
 @Singleton
 public class RestconfApplication extends AbstractRestconfApplication {
@@ -31,12 +31,13 @@ public class RestconfApplication extends AbstractRestconfApplication {
     public RestconfApplication(final DatabindProvider databindProvider, final MdsalRestconfServer server,
             final DOMMountPointService mountPointService, final DOMDataBroker dataBroker,
             final DOMActionService actionService, final DOMNotificationService notificationService,
-            final DOMSchemaService domSchemaService, final SubscribeToStreamUtil streamUtils) {
+            final DOMSchemaService domSchemaService, final ListenersBroker listenersBroker) {
         super(databindProvider, List.of(
             // FIXME: NETCONF:1102: do not instantiate this service
-            new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider, streamUtils),
+            new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
+                listenersBroker),
             new RestconfDataServiceImpl(databindProvider, server, actionService),
-            new RestconfInvokeOperationsServiceImpl(databindProvider, server, mountPointService, streamUtils),
+            new RestconfInvokeOperationsServiceImpl(databindProvider, server, mountPointService, listenersBroker),
             new RestconfOperationsServiceImpl(databindProvider, server),
             new RestconfSchemaServiceImpl(domSchemaService, mountPointService),
             new RestconfImpl(databindProvider)));
index 4df6c98d8d52973d80b2010cee3d94c781c9c2aa..f8d420953c174c70e3209da8c666a8c55f0af7f6 100644 (file)
@@ -35,6 +35,7 @@ import org.opendaylight.restconf.nb.rfc8040.databind.OperationInputBody;
 import org.opendaylight.restconf.nb.rfc8040.databind.XmlOperationInputBody;
 import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext;
 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStream;
@@ -56,15 +57,15 @@ public final class RestconfInvokeOperationsServiceImpl {
     private final MdsalRestconfServer server;
     @Deprecated(forRemoval = true)
     private final DOMMountPointService mountPointService;
-    private final SubscribeToStreamUtil streamUtils;
+    private final ListenersBroker listenersBroker;
 
     public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider,
             final MdsalRestconfServer server, final DOMMountPointService mountPointService,
-            final SubscribeToStreamUtil streamUtils) {
+            final ListenersBroker listenersBroker) {
         this.databindProvider = requireNonNull(databindProvider);
         this.server = requireNonNull(server);
         this.mountPointService = requireNonNull(mountPointService);
-        this.streamUtils = requireNonNull(streamUtils);
+        this.listenersBroker = requireNonNull(listenersBroker);
     }
 
     /**
@@ -159,14 +160,14 @@ public final class RestconfInvokeOperationsServiceImpl {
         if (mountPoint == null) {
             // Hacked-up integration of streams
             if (CreateDataChangeEventSubscription.QNAME.equals(type)) {
-                return CreateStreamUtil.createDataChangeNotifiStream(streamUtils.listenersBroker(), input,
+                return CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, input,
                     localDatabind.modelContext());
             } else if (CreateNotificationStream.QNAME.equals(type)) {
-                return CreateStreamUtil.createNotificationStream(streamUtils.listenersBroker(), input,
+                return CreateStreamUtil.createNotificationStream(listenersBroker, input,
                     localDatabind.modelContext());
             } else if (SubscribeDeviceNotification.QNAME.equals(type)) {
-                return CreateStreamUtil.createDeviceNotificationListener(streamUtils.listenersBroker(), input,
-                    streamUtils.prepareUriByStreamName(uriInfo, "").toString(), mountPointService);
+                return CreateStreamUtil.createDeviceNotificationListener(listenersBroker, input,
+                    listenersBroker.prepareUriByStreamName(uriInfo, "").toString(), mountPointService);
             }
         }
 
index 24132cf3b9e46b25657c2ed4e154e3533b25a5e4..d0bc3a5998d75a52d0d2b32b61c9c73620f53e0b 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.restconf.nb.rfc8040.databind.jaxrs.QueryParams;
 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfStreamsSubscriptionService;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.yang.gen.v1.subscribe.to.notification.rev161028.Notifi;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
@@ -39,7 +40,7 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu
     private static final QName LOCATION_QNAME = QName.create(Notifi.QNAME, "location").intern();
     private static final NodeIdentifier LOCATION_NODEID = NodeIdentifier.create(LOCATION_QNAME);
 
-    private final SubscribeToStreamUtil streamUtils;
+    private final ListenersBroker listenersBroker;
     private final HandlersHolder handlersHolder;
 
     /**
@@ -48,13 +49,13 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu
      * @param dataBroker {@link DOMDataBroker}
      * @param notificationService {@link DOMNotificationService}
      * @param databindProvider a {@link DatabindProvider}
-     * @param streamUtils a {@link SubscribeToStreamUtil}
+     * @param listenersBroker a {@link ListenersBroker}
      */
     public RestconfStreamsSubscriptionServiceImpl(final DOMDataBroker dataBroker,
             final DOMNotificationService notificationService, final DatabindProvider databindProvider,
-            final SubscribeToStreamUtil streamUtils) {
+            final ListenersBroker listenersBroker) {
         handlersHolder = new HandlersHolder(dataBroker, notificationService, databindProvider);
-        this.streamUtils = requireNonNull(streamUtils);
+        this.listenersBroker = requireNonNull(listenersBroker);
     }
 
     @Override
@@ -63,9 +64,9 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu
 
         final URI location;
         if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
-            location = streamUtils.subscribeToDataStream(identifier, uriInfo, params, handlersHolder);
+            location = listenersBroker.subscribeToDataStream(identifier, uriInfo, params, handlersHolder);
         } else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
-            location = streamUtils.subscribeToYangStream(identifier, uriInfo, params, handlersHolder);
+            location = listenersBroker.subscribeToYangStream(identifier, uriInfo, params, handlersHolder);
         } else {
             final String msg = "Bad type of notification of sal-remote";
             LOG.warn(msg);
diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java
deleted file mode 100644 (file)
index babdc4d..0000000
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
-
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.base.Splitter;
-import java.net.URI;
-import java.util.concurrent.ExecutionException;
-import javax.ws.rs.core.UriInfo;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
-import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
-import org.opendaylight.restconf.nb.rfc8040.URLConstants;
-import org.opendaylight.restconf.nb.rfc8040.monitoring.RestconfStateStreams;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
-import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
-import org.opendaylight.yangtools.yang.common.ErrorTag;
-import org.opendaylight.yangtools.yang.common.ErrorType;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Subscribe to stream util class.
- */
-public abstract class SubscribeToStreamUtil {
-    /**
-     * Implementation of SubscribeToStreamUtil for Server-sent events.
-     */
-    private static final class ServerSentEvents extends SubscribeToStreamUtil {
-        ServerSentEvents(final ListenersBroker listenersBroker) {
-            super(listenersBroker);
-        }
-
-        @Override
-        public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
-            return uriInfo.getBaseUriBuilder()
-                .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.SSE_SUBPATH + '/' + streamName)
-                .build();
-        }
-    }
-
-    /**
-     * Implementation of SubscribeToStreamUtil for Web sockets.
-     */
-    private static final class WebSockets extends SubscribeToStreamUtil {
-        WebSockets(final ListenersBroker listenersBroker) {
-            super(listenersBroker);
-        }
-
-        @Override
-        public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
-            final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
-                // Secured HTTP goes to Secured WebSockets
-                case "https" -> "wss";
-                // Unsecured HTTP and others go to unsecured WebSockets
-                default -> "ws";
-            };
-
-            return uriInfo.getBaseUriBuilder()
-                .scheme(scheme)
-                .replacePath(URLConstants.BASE_PATH + '/' + streamName)
-                .build();
-        }
-    }
-
-    private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
-    private static final Splitter SLASH_SPLITTER = Splitter.on('/');
-
-    private final @NonNull ListenersBroker listenersBroker;
-
-    private SubscribeToStreamUtil(final ListenersBroker listenersBroker) {
-        this.listenersBroker = requireNonNull(listenersBroker);
-    }
-
-    public static @NonNull SubscribeToStreamUtil serverSentEvents(final ListenersBroker listenersBroker) {
-        return new ServerSentEvents(listenersBroker);
-    }
-
-    public static @NonNull SubscribeToStreamUtil webSockets(final ListenersBroker listenersBroker) {
-        return new WebSockets(listenersBroker);
-    }
-
-    public final @NonNull ListenersBroker listenersBroker() {
-        return listenersBroker;
-    }
-
-    /**
-     * Prepare URL from base name and stream name.
-     *
-     * @param uriInfo base URL information
-     * @param streamName name of stream for create
-     * @return final URL
-     */
-    abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
-
-    /**
-     * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
-     * about listener to DS according to ietf-restconf-monitoring.
-     *
-     * @param identifier              Name of the stream.
-     * @param uriInfo                 URI information.
-     * @param notificationQueryParams Query parameters of notification.
-     * @param handlersHolder          Holder of handlers for notifications.
-     * @return Stream location for listening.
-     */
-    final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
-            final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
-        final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
-        if (isNullOrEmpty(streamName)) {
-            throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
-        }
-
-        final var notificationListenerAdapter = listenersBroker.notificationListenerFor(streamName);
-        if (notificationListenerAdapter == null) {
-            throw new RestconfDocumentedException(String.format("Stream with name %s was not found.", streamName),
-                ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
-        }
-
-        final URI uri = prepareUriByStreamName(uriInfo, streamName);
-        notificationListenerAdapter.setQueryParams(notificationQueryParams);
-        notificationListenerAdapter.listen(handlersHolder.notificationService());
-        final DOMDataBroker dataBroker = handlersHolder.dataBroker();
-        notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
-        final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
-            notificationListenerAdapter.qnames(), notificationListenerAdapter.getStart(),
-            notificationListenerAdapter.getOutputType(), uri);
-
-        // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
-        final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
-        writeDataToDS(writeTransaction, mapToStreams);
-        submitData(writeTransaction);
-        return uri;
-    }
-
-    /**
-     * Register listener by streamName in identifier to listen to data change notifications, and put or delete
-     * information about listener to DS according to ietf-restconf-monitoring.
-     *
-     * @param identifier              Identifier as stream name.
-     * @param uriInfo                 Base URI information.
-     * @param notificationQueryParams Query parameters of notification.
-     * @param handlersHolder          Holder of handlers for notifications.
-     * @return Location for listening.
-     */
-    final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
-            final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
-        final var streamName = ListenersBroker.createStreamNameFromUri(identifier);
-        final var listener = listenersBroker.dataChangeListenerFor(streamName);
-        if (listener == null) {
-            throw new RestconfDocumentedException("No listener found for stream " + streamName,
-                ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
-        }
-
-        listener.setQueryParams(notificationQueryParams);
-
-        final var dataBroker = handlersHolder.dataBroker();
-        final var schemaHandler = handlersHolder.databindProvider();
-        listener.setCloseVars(dataBroker, schemaHandler);
-        listener.listen(dataBroker);
-
-        final var uri = prepareUriByStreamName(uriInfo, streamName);
-        final var schemaContext = schemaHandler.currentContext().modelContext();
-        final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
-
-        final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
-                listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
-        final var writeTransaction = dataBroker.newWriteOnlyTransaction();
-        writeDataToDS(writeTransaction, mapToStreams);
-        submitData(writeTransaction);
-        return uri;
-    }
-
-    // FIXME: callers are utter duplicates, refactor them
-    private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
-        // FIXME: use put() here
-        tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
-            mapToStreams);
-    }
-
-    private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
-        try {
-            readWriteTransaction.commit().get();
-        } catch (final InterruptedException | ExecutionException e) {
-            throw new RestconfDocumentedException("Problem while putting data to DS.", e);
-        }
-    }
-}
index 0e4295a39187c032ffe6bd8550d50a962696096c..1cf9dec00bfcc4e49ad02dbe82cf1ec4a6ba8c17 100644 (file)
@@ -7,21 +7,29 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
 
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.collect.ImmutableSet;
+import java.net.URI;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.StampedLock;
-import javax.inject.Inject;
-import javax.inject.Singleton;
+import javax.ws.rs.core.UriInfo;
+import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
+import org.opendaylight.restconf.nb.rfc8040.monitoring.RestconfStateStreams;
+import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
@@ -30,11 +38,10 @@ import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,9 +52,39 @@ import org.slf4j.LoggerFactory;
 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
 //        names. We essentially need a component which deals with allocation of stream names and their lifecycle and
 //        the contents of /restconf-state/streams.
-@Singleton
-@Component(service = ListenersBroker.class, immediate = true)
-public final class ListenersBroker {
+public abstract sealed class ListenersBroker {
+    /**
+     * A ListenersBroker working with Server-Sent Events.
+     */
+    public static final class ServerSentEvents extends ListenersBroker {
+        @Override
+        public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+            return uriInfo.getBaseUriBuilder()
+                .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.SSE_SUBPATH + '/' + streamName)
+                .build();
+        }
+    }
+
+    /**
+     * A ListenersBroker working with WebSockets.
+     */
+    public static final class WebSockets extends ListenersBroker {
+        @Override
+        public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+            final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
+                // Secured HTTP goes to Secured WebSockets
+                case "https" -> "wss";
+                // Unsecured HTTP and others go to unsecured WebSockets
+                default -> "ws";
+            };
+
+            return uriInfo.getBaseUriBuilder()
+                .scheme(scheme)
+                .replacePath(URLConstants.BASE_PATH + '/' + streamName)
+                .build();
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
 
     private final StampedLock dataChangeListenersLock = new StampedLock();
@@ -57,10 +94,8 @@ public final class ListenersBroker {
     private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
     private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
 
-    @Inject
-    @Activate
-    public ListenersBroker() {
-
+    private ListenersBroker() {
+        // Hidden on purpose
     }
 
     /**
@@ -71,7 +106,7 @@ public final class ListenersBroker {
      *         does not exist.
      * @throws NullPointerException in {@code streamName} is {@code null}
      */
-    public @Nullable ListenerAdapter dataChangeListenerFor(final String streamName) {
+    public final @Nullable ListenerAdapter dataChangeListenerFor(final String streamName) {
         requireNonNull(streamName);
 
         final long stamp = dataChangeListenersLock.readLock();
@@ -90,7 +125,7 @@ public final class ListenersBroker {
      *         stream name does not exist.
      * @throws NullPointerException in {@code streamName} is {@code null}
      */
-    public @Nullable NotificationListenerAdapter notificationListenerFor(final String streamName) {
+    public final @Nullable NotificationListenerAdapter notificationListenerFor(final String streamName) {
         requireNonNull(streamName);
 
         final long stamp = notificationListenersLock.readLock();
@@ -109,7 +144,7 @@ public final class ListenersBroker {
      *         specified stream name does not exist.
      * @throws NullPointerException in {@code path} is {@code null}
      */
-    public @Nullable DeviceNotificationListenerAdaptor deviceNotificationListenerFor(final String streamName) {
+    public final @Nullable DeviceNotificationListenerAdaptor deviceNotificationListenerFor(final String streamName) {
         requireNonNull(streamName);
 
         final long stamp = deviceNotificationListenersLock.readLock();
@@ -127,7 +162,7 @@ public final class ListenersBroker {
      * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
      */
-    public @Nullable BaseListenerInterface listenerFor(final String streamName) {
+    public final @Nullable BaseListenerInterface listenerFor(final String streamName) {
         if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
             return notificationListenerFor(streamName);
         } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
@@ -147,7 +182,7 @@ public final class ListenersBroker {
      * @param outputType Specific type of output for notifications - XML or JSON.
      * @return Created or existing data-change listener adapter.
      */
-    public ListenerAdapter registerDataChangeListener(final EffectiveModelContext modelContext,
+    public final ListenerAdapter registerDataChangeListener(final EffectiveModelContext modelContext,
             final LogicalDatastoreType datastore, final YangInstanceIdentifier path, final Scope scope,
             final NotificationOutputType outputType) {
         final var sb = new StringBuilder(RestconfStreamsConstants.DATA_SUBSCRIPTION)
@@ -176,7 +211,7 @@ public final class ListenersBroker {
      * @param outputType Specific type of output for notifications - XML or JSON.
      * @return Created or existing notification listener adapter.
      */
-    public NotificationListenerAdapter registerNotificationListener(final EffectiveModelContext refSchemaCtx,
+    public final NotificationListenerAdapter registerNotificationListener(final EffectiveModelContext refSchemaCtx,
             final ImmutableSet<QName> notifications, final NotificationOutputType outputType) {
         final var sb = new StringBuilder(RestconfStreamsConstants.NOTIFICATION_STREAM).append('/');
         var haveFirst = false;
@@ -222,7 +257,7 @@ public final class ListenersBroker {
      * @param mountPointService Mount point service
      * @return Created or existing device notification listener adapter.
      */
-    public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
+    public final DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
             final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
             final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
         final var sb = new StringBuilder(RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM).append('/')
@@ -241,7 +276,7 @@ public final class ListenersBroker {
     /**
      * Removal and closing of all data-change-event and notification listeners.
      */
-    public synchronized void removeAndCloseAllListeners() {
+    public final synchronized void removeAndCloseAllListeners() {
         final long stampNotifications = notificationListenersLock.writeLock();
         final long stampDataChanges = dataChangeListenersLock.writeLock();
         try {
@@ -256,7 +291,7 @@ public final class ListenersBroker {
     /**
      * Closes and removes all data-change listeners.
      */
-    public void removeAndCloseAllDataChangeListeners() {
+    public final void removeAndCloseAllDataChangeListeners() {
         final long stamp = dataChangeListenersLock.writeLock();
         try {
             removeAndCloseAllDataChangeListenersTemplate();
@@ -267,23 +302,22 @@ public final class ListenersBroker {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void removeAndCloseAllDataChangeListenersTemplate() {
-        dataChangeListeners.values()
-                .forEach(listenerAdapter -> {
-                    try {
-                        listenerAdapter.close();
-                    } catch (final Exception exception) {
-                        LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
-                        throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
-                                listenerAdapter), exception);
-                    }
-                });
+        dataChangeListeners.values().forEach(listenerAdapter -> {
+            try {
+                listenerAdapter.close();
+            } catch (Exception e) {
+                LOG.error("Failed to close data-change listener {}.", listenerAdapter, e);
+                throw new IllegalStateException("Failed to close data-change listener %s.".formatted(listenerAdapter),
+                    e);
+            }
+        });
         dataChangeListeners.clear();
     }
 
     /**
      * Closes and removes all notification listeners.
      */
-    public void removeAndCloseAllNotificationListeners() {
+    public final void removeAndCloseAllNotificationListeners() {
         final long stamp = notificationListenersLock.writeLock();
         try {
             removeAndCloseAllNotificationListenersTemplate();
@@ -294,16 +328,15 @@ public final class ListenersBroker {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void removeAndCloseAllNotificationListenersTemplate() {
-        notificationListeners.values()
-                .forEach(listenerAdapter -> {
-                    try {
-                        listenerAdapter.close();
-                    } catch (final Exception exception) {
-                        LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
-                        throw new IllegalStateException(String.format("Failed to close notification listener %s.",
-                                listenerAdapter), exception);
-                    }
-                });
+        notificationListeners.values().forEach(listenerAdapter -> {
+            try {
+                listenerAdapter.close();
+            } catch (Exception e) {
+                LOG.error("Failed to close notification listener {}.", listenerAdapter, e);
+                throw new IllegalStateException("Failed to close notification listener %s.".formatted(listenerAdapter),
+                    e);
+            }
+        });
         notificationListeners.clear();
     }
 
@@ -313,11 +346,11 @@ public final class ListenersBroker {
      * @param listener Listener to be closed and removed.
      */
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
+    public final void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
         final long stamp = dataChangeListenersLock.writeLock();
         try {
             removeAndCloseDataChangeListenerTemplate(listener);
-        } catch (final Exception exception) {
+        } catch (Exception exception) {
             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
         } finally {
             dataChangeListenersLock.unlockWrite(stamp);
@@ -335,11 +368,9 @@ public final class ListenersBroker {
             if (dataChangeListeners.inverse().remove(listener) == null) {
                 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
             }
-        } catch (final InterruptedException | ExecutionException exception) {
-            LOG.error("Data-change listener {} cannot be closed.", listener, exception);
-            throw new IllegalStateException(String.format(
-                    "Data-change listener %s cannot be closed.",
-                    listener), exception);
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Data-change listener {} cannot be closed.", listener, e);
+            throw new IllegalStateException("Data-change listener %s cannot be closed.".formatted(listener), e);
         }
     }
 
@@ -349,12 +380,12 @@ public final class ListenersBroker {
      * @param listener Listener to be closed and removed.
      */
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
+    public final void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
         final long stamp = notificationListenersLock.writeLock();
         try {
             removeAndCloseNotificationListenerTemplate(listener);
-        } catch (final Exception exception) {
-            LOG.error("Notification listener {} cannot be closed.", listener, exception);
+        } catch (Exception e) {
+            LOG.error("Notification listener {} cannot be closed.", listener, e);
         } finally {
             notificationListenersLock.unlockWrite(stamp);
         }
@@ -367,7 +398,7 @@ public final class ListenersBroker {
      * @param listener Listener to be closed and removed.
      */
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
+    public final void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
         final long stamp = deviceNotificationListenersLock.writeLock();
         try {
             requireNonNull(listener);
@@ -387,11 +418,9 @@ public final class ListenersBroker {
             if (notificationListeners.inverse().remove(listener) == null) {
                 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
             }
-        } catch (final InterruptedException | ExecutionException exception) {
-            LOG.error("Notification listener {} cannot be closed.", listener, exception);
-            throw new IllegalStateException(String.format(
-                    "Notification listener %s cannot be closed.", listener),
-                    exception);
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Notification listener {} cannot be closed.", listener, e);
+            throw new IllegalStateException("Notification listener %s cannot be closed.".formatted(listener), e);
         }
     }
 
@@ -400,7 +429,7 @@ public final class ListenersBroker {
      *
      * @param listener Listener to be closed and removed from cache.
      */
-    void removeAndCloseListener(final BaseListenerInterface listener) {
+    final void removeAndCloseListener(final BaseListenerInterface listener) {
         requireNonNull(listener);
         if (listener instanceof ListenerAdapter) {
             removeAndCloseDataChangeListener((ListenerAdapter) listener);
@@ -432,4 +461,105 @@ public final class ListenersBroker {
         }
         return result;
     }
+
+    /**
+     * Prepare URL from base name and stream name.
+     *
+     * @param uriInfo base URL information
+     * @param streamName name of stream for create
+     * @return final URL
+     */
+    public abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
+
+    /**
+     * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
+     * about listener to DS according to ietf-restconf-monitoring.
+     *
+     * @param identifier              Name of the stream.
+     * @param uriInfo                 URI information.
+     * @param notificationQueryParams Query parameters of notification.
+     * @param handlersHolder          Holder of handlers for notifications.
+     * @return Stream location for listening.
+     */
+    public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
+            final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
+        final String streamName = createStreamNameFromUri(identifier);
+        if (isNullOrEmpty(streamName)) {
+            throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+        }
+
+        final var notificationListenerAdapter = notificationListenerFor(streamName);
+        if (notificationListenerAdapter == null) {
+            throw new RestconfDocumentedException("Stream with name %s was not found.".formatted(streamName),
+                ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
+        }
+
+        final URI uri = prepareUriByStreamName(uriInfo, streamName);
+        notificationListenerAdapter.setQueryParams(notificationQueryParams);
+        notificationListenerAdapter.listen(handlersHolder.notificationService());
+        final DOMDataBroker dataBroker = handlersHolder.dataBroker();
+        notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
+        final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
+            notificationListenerAdapter.qnames(), notificationListenerAdapter.getStart(),
+            notificationListenerAdapter.getOutputType(), uri);
+
+        // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
+        final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+        writeDataToDS(writeTransaction, mapToStreams);
+        submitData(writeTransaction);
+        return uri;
+    }
+
+    /**
+     * Register listener by streamName in identifier to listen to data change notifications, and put or delete
+     * information about listener to DS according to ietf-restconf-monitoring.
+     *
+     * @param identifier              Identifier as stream name.
+     * @param uriInfo                 Base URI information.
+     * @param notificationQueryParams Query parameters of notification.
+     * @param handlersHolder          Holder of handlers for notifications.
+     * @return Location for listening.
+     */
+    public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
+            final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
+        final var streamName = createStreamNameFromUri(identifier);
+        final var listener = dataChangeListenerFor(streamName);
+        if (listener == null) {
+            throw new RestconfDocumentedException("No listener found for stream " + streamName,
+                ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
+        }
+
+        listener.setQueryParams(notificationQueryParams);
+
+        final var dataBroker = handlersHolder.dataBroker();
+        final var schemaHandler = handlersHolder.databindProvider();
+        listener.setCloseVars(dataBroker, schemaHandler);
+        listener.listen(dataBroker);
+
+        final var uri = prepareUriByStreamName(uriInfo, streamName);
+        final var schemaContext = schemaHandler.currentContext().modelContext();
+        final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
+
+        final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
+                listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
+        final var writeTransaction = dataBroker.newWriteOnlyTransaction();
+        writeDataToDS(writeTransaction, mapToStreams);
+        submitData(writeTransaction);
+        return uri;
+    }
+
+    // FIXME: callers are utter duplicates, refactor them
+    private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
+        // FIXME: use put() here
+        tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
+            mapToStreams);
+    }
+
+    private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
+        try {
+            readWriteTransaction.commit().get();
+        } catch (final InterruptedException | ExecutionException e) {
+            throw new RestconfDocumentedException("Problem while putting data to DS.", e);
+        }
+    }
 }
index c0ad276ce1fb8f7c4bc88720233a904def93101a..eff27b1843902922ebaf57f53cfc8c4ebc3061fa 100644 (file)
@@ -40,7 +40,7 @@ import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
 public class CreateStreamUtilTest {
     private static EffectiveModelContext SCHEMA_CTX;
 
-    private final ListenersBroker listenersBroker = new ListenersBroker();
+    private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
 
     @BeforeClass
     public static void setUp() {
index 67e50dfcd77e30811660d265d881ec0669b344c6..ee13f9da190f6dfb8ad4050ce5f5bd34d32c12d5 100644 (file)
@@ -89,7 +89,7 @@ public class RestconfInvokeOperationsServiceImplTest {
     public void setup() {
         server = new MdsalRestconfServer(dataBroker, rpcService, mountPointService);
         invokeOperationsService = new RestconfInvokeOperationsServiceImpl(() -> CONTEXT, server, mountPointService,
-            SubscribeToStreamUtil.webSockets(new ListenersBroker()));
+            new ListenersBroker.WebSockets());
     }
 
     @Test
index 7d430b5bb941817e2951a24d537365b72028938a..c327834c2d8e6cf7caf3b2ed5a0f7a2c7325b5b7 100644 (file)
@@ -15,7 +15,6 @@ import static org.mockito.Mockito.mock;
 
 import com.google.common.collect.ImmutableClassToInstanceMap;
 import java.net.URI;
-import java.net.URISyntaxException;
 import javax.ws.rs.core.MultivaluedHashMap;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
@@ -42,8 +41,6 @@ import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev14070
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class RestconfStreamsSubscriptionServiceImplTest extends AbstractNotificationListenerTest {
@@ -57,19 +54,15 @@ public class RestconfStreamsSubscriptionServiceImplTest extends AbstractNotifica
     @Mock
     private DOMNotificationService notificationService;
 
-    private final ListenersBroker listenersBroker = new ListenersBroker();
     private final DatabindProvider databindProvider = () -> DatabindContext.ofModel(MODEL_CONTEXT);
 
     @Before
-    public void setUp() throws URISyntaxException {
-        listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
-            YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
-            Scope.ONE, NotificationOutputType.JSON);
-        final DOMDataTreeWriteTransaction wTx = mock(DOMDataTreeWriteTransaction.class);
+    public void setUp() throws Exception {
+        final var wTx = mock(DOMDataTreeWriteTransaction.class);
         doReturn(wTx).when(dataBroker).newWriteOnlyTransaction();
         doReturn(CommitInfo.emptyFluentFuture()).when(wTx).commit();
 
-        DOMDataTreeChangeService dataTreeChangeService = mock(DOMDataTreeChangeService.class);
+        final var dataTreeChangeService = mock(DOMDataTreeChangeService.class);
         doReturn(mock(ListenerRegistration.class)).when(dataTreeChangeService)
                 .registerDataTreeChangeListener(any(), any());
 
@@ -83,11 +76,12 @@ public class RestconfStreamsSubscriptionServiceImplTest extends AbstractNotifica
 
     @Test
     public void testSubscribeToStreamSSE() {
+        final var listenersBroker = new ListenersBroker.ServerSentEvents();
         listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
             IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
             NotificationOutputType.XML);
         final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-            notificationService, databindProvider, SubscribeToStreamUtil.serverSentEvents(listenersBroker));
+            notificationService, databindProvider,listenersBroker);
         final var response = streamsSubscriptionService.subscribeToStream(
             "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
         assertEquals("http://localhost:8181/" + URLConstants.BASE_PATH + "/" + URLConstants.SSE_SUBPATH
@@ -97,11 +91,12 @@ public class RestconfStreamsSubscriptionServiceImplTest extends AbstractNotifica
 
     @Test
     public void testSubscribeToStreamWS() {
+        final var listenersBroker = new ListenersBroker.WebSockets();
         listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
             IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
             NotificationOutputType.XML);
         final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-            notificationService, databindProvider, SubscribeToStreamUtil.webSockets(listenersBroker));
+            notificationService, databindProvider, listenersBroker);
         final var response = streamsSubscriptionService.subscribeToStream(
             "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
         assertEquals("ws://localhost:8181/" + URLConstants.BASE_PATH
@@ -111,8 +106,9 @@ public class RestconfStreamsSubscriptionServiceImplTest extends AbstractNotifica
 
     @Test
     public void testSubscribeToStreamMissingDatastoreInPath() {
+        final var listenersBroker = new ListenersBroker.WebSockets();
         final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-            notificationService, databindProvider, SubscribeToStreamUtil.webSockets(listenersBroker));
+            notificationService, databindProvider, listenersBroker);
         final var errors = assertThrows(RestconfDocumentedException.class,
             () -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/scope=ONE", uriInfo))
             .getErrors();
@@ -125,8 +121,9 @@ public class RestconfStreamsSubscriptionServiceImplTest extends AbstractNotifica
 
     @Test
     public void testSubscribeToStreamMissingScopeInPath() {
+        final var listenersBroker = new ListenersBroker.WebSockets();
         final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-            notificationService, databindProvider, SubscribeToStreamUtil.webSockets(listenersBroker));
+            notificationService, databindProvider, listenersBroker);
         final var errors = assertThrows(RestconfDocumentedException.class,
             () -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/datastore=OPERATIONAL",
                 uriInfo)).getErrors();
index 2c514172c309ef4c2e43f183b142887119abf1da..cbffe56451f119e98dc758f572604b69972544c5 100644 (file)
@@ -34,7 +34,7 @@ class WebSocketFactoryTest extends AbstractNotificationListenerTest {
     private static final String REGISTERED_STREAM_NAME = "data-change-event-subscription/"
             + "toaster:toaster/datastore=CONFIGURATION/scope=SUBTREE/JSON";
 
-    private final ListenersBroker listenersBroker = new ListenersBroker();
+    private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
 
     @Mock
     private ScheduledExecutorService execService;
index 3909a627377e9898f5bc0dcc7bd771283d1e5def..d1f1e278f1a26053237822a7fded1358648beefd 100644 (file)
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 public class JsonNotificationListenerTest extends AbstractNotificationListenerTest {
     private static final Logger LOG = LoggerFactory.getLogger(JsonNotificationListenerTest.class);
 
-    private final ListenersBroker listenersBroker = new ListenersBroker();
+    private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
 
     @Test
     public void notifi_leafTest() throws Exception {
index f2b16dd6c78d7cbe9854fcdfb8cf13636925a1aa..3f073bac727c9280530f13a966e517c6948fe583 100644 (file)
@@ -149,7 +149,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     private static EffectiveModelContext SCHEMA_CONTEXT;
 
-    private final ListenersBroker listenersBroker = new ListenersBroker();
+    private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
     private DataBroker dataBroker;
     private DOMDataBroker domDataBroker;
     private DatabindProvider databindProvider;
index 632171c6857ab69df0356b0ea706aef019c05892..96febc4079ddedc07a9fe51d5d121daf58c2cfe3 100644 (file)
@@ -31,7 +31,7 @@ import org.xmlunit.assertj.XmlAssert;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class XmlNotificationListenerTest extends AbstractNotificationListenerTest {
-    private final ListenersBroker listenersBroker = new ListenersBroker();
+    private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
 
     @Test
     public void notifi_leafTest() throws Exception {