Turn ListenersBroker into a component 34/107834/17
authorOleksandr Zharov <oleksandr.zharov@pantheon.tech>
Tue, 12 Sep 2023 11:57:54 +0000 (13:57 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 5 Oct 2023 14:50:08 +0000 (16:50 +0200)
Refactored the ListenersBroker class into a component using
osgi.service.component.annotations.

JIRA: NETCONF-1104
Change-Id: I6ef5fa2ab22792c3d408dfa9db92ae7f07b3c63d
Signed-off-by: Oleksandr Zharov <oleksandr.zharov@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
24 files changed:
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/JaxRsNorthbound.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/RestconfApplication.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketInitializer.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractNotificationListenerAdaptor.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/DeviceNotificationListenerAdaptor.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtilTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/Netconf799Test.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImplTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImplTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImplTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/JsonNotificationListenerTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapterTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/XmlNotificationListenerTest.java

index 901ea211eb695943ff970cec89b378a5db364f02..d3e2676a7951e7b115a9d50241669852e9757a6f 100644 (file)
@@ -35,6 +35,7 @@ import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.MdsalRestconfSer
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfDataStreamServiceImpl;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
 import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -80,12 +81,13 @@ public final class JaxRsNorthbound implements AutoCloseable {
             @Reference final DOMMountPointService mountPointService,
             @Reference final DOMNotificationService notificationService, @Reference final DOMRpcService rpcService,
             @Reference final DOMSchemaService schemaService, @Reference final DatabindProvider databindProvider,
-            @Reference final MdsalRestconfServer server, final Configuration configuration) throws ServletException {
+            @Reference final MdsalRestconfServer server, @Reference final ListenersBroker listenersBroker,
+            final Configuration configuration) throws ServletException {
         this(webServer, webContextSecurer, servletSupport, filterAdapterConfiguration, actionService, dataBroker,
             mountPointService, notificationService, rpcService, schemaService, databindProvider, server,
-            configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(),
-            new StreamsConfiguration(configuration.maximum$_$fragment$_$length(), configuration.idle$_$timeout(),
-                configuration.heartbeat$_$interval(), configuration.use$_$sse()));
+            listenersBroker, configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(),
+            new StreamsConfiguration(configuration.maximum$_$fragment$_$length(),
+                configuration.idle$_$timeout(), configuration.heartbeat$_$interval(), configuration.use$_$sse()));
     }
 
     public JaxRsNorthbound(final WebServer webServer, final WebContextSecurer webContextSecurer,
@@ -94,7 +96,7 @@ public final class JaxRsNorthbound implements AutoCloseable {
             final DOMMountPointService mountPointService, final DOMNotificationService notificationService,
             final DOMRpcService rpcService, final DOMSchemaService schemaService,
             final DatabindProvider databindProvider, final MdsalRestconfServer server,
-            final String pingNamePrefix, final int pingMaxThreadCount,
+            final ListenersBroker listenersBroker, final String pingNamePrefix, final int pingMaxThreadCount,
             final StreamsConfiguration streamsConfiguration) throws ServletException {
         final var scheduledThreadPool = new ScheduledThreadPoolWrapper(pingMaxThreadCount,
             new NamingThreadPoolFactory(pingNamePrefix));
@@ -107,7 +109,7 @@ public final class JaxRsNorthbound implements AutoCloseable {
                 .addUrlPattern("/*")
                 .servlet(servletSupport.createHttpServletBuilder(
                     new RestconfApplication(databindProvider, server, mountPointService, dataBroker, rpcService,
-                        actionService, notificationService, schemaService, streamsConfiguration))
+                        actionService, notificationService, schemaService, listenersBroker, streamsConfiguration))
                     .build())
                 .asyncSupported(true)
                 .build())
@@ -115,7 +117,7 @@ public final class JaxRsNorthbound implements AutoCloseable {
                 .addUrlPattern("/" + SSE_SUBPATH + "/*")
                 .servlet(servletSupport.createHttpServletBuilder(
                     new DataStreamApplication(databindProvider,
-                        new RestconfDataStreamServiceImpl(scheduledThreadPool, streamsConfiguration)))
+                        new RestconfDataStreamServiceImpl(scheduledThreadPool, listenersBroker, streamsConfiguration)))
                     .build())
                 .name("notificationServlet")
                 .asyncSupported(true)
@@ -123,7 +125,7 @@ public final class JaxRsNorthbound implements AutoCloseable {
             .addServlet(ServletDetails.builder()
                 .addUrlPattern("/" + DATA_SUBSCRIPTION + "/*")
                 .addUrlPattern("/" + NOTIFICATION_STREAM + "/*")
-                .servlet(new WebSocketInitializer(scheduledThreadPool, streamsConfiguration))
+                .servlet(new WebSocketInitializer(scheduledThreadPool, listenersBroker, streamsConfiguration))
                 .build())
 
             // Allows user to add javax.servlet.Filter(s) in front of REST services
index 9b8b88d26239a1b59dcac31f36add7af10df384e..70b22e3a2203cafb4603edb5e7d6a486b93b890e 100644 (file)
@@ -26,6 +26,7 @@ import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfOperatio
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfSchemaServiceImpl;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 
 @Singleton
 public class RestconfApplication extends AbstractRestconfApplication {
@@ -33,12 +34,14 @@ public class RestconfApplication extends AbstractRestconfApplication {
             final DOMMountPointService mountPointService,
             final RestconfStreamsSubscriptionService streamSubscription, final DOMDataBroker dataBroker,
             final DOMActionService actionService, final DOMNotificationService notificationService,
-            final DOMSchemaService domSchemaService, final StreamsConfiguration configuration) {
+            final DOMSchemaService domSchemaService, final ListenersBroker listenersBroker,
+            final StreamsConfiguration configuration) {
         super(databindProvider, List.of(
             streamSubscription,
             new RestconfDataServiceImpl(databindProvider, server, dataBroker, streamSubscription, actionService,
+                listenersBroker, configuration),
+            new RestconfInvokeOperationsServiceImpl(databindProvider, server, mountPointService, listenersBroker,
                 configuration),
-            new RestconfInvokeOperationsServiceImpl(databindProvider, server, mountPointService, configuration),
             new RestconfOperationsServiceImpl(databindProvider, server),
             new RestconfSchemaServiceImpl(domSchemaService, mountPointService),
             new RestconfImpl(databindProvider)));
@@ -49,10 +52,10 @@ public class RestconfApplication extends AbstractRestconfApplication {
             final DOMMountPointService mountPointService, final DOMDataBroker dataBroker,
             final DOMRpcService rpcService, final DOMActionService actionService,
             final DOMNotificationService notificationService, final DOMSchemaService domSchemaService,
-            final StreamsConfiguration configuration) {
+            final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
         this(databindProvider, server, mountPointService,
             new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
-                configuration),
-            dataBroker, actionService, notificationService, domSchemaService, configuration);
+                listenersBroker, configuration),
+            dataBroker, actionService, notificationService, domSchemaService, listenersBroker, configuration);
     }
 }
index d94ea1e47a5c67c5c9ca2a9407f130f1350b9d1e..c61860dc4d0d09e9a7ace13cd73ad3a22ced9120 100644 (file)
@@ -136,13 +136,14 @@ final class CreateStreamUtil {
      *
      * @param baseUrl base Url
      * @param input RPC input
-     * @param streamUtil stream utility
      * @param mountPointService dom mount point service
+     * @param listenersBroker {@link ListenersBroker}
      * @return {@link DOMRpcResult} - Output of RPC - example in JSON
      */
     // FIXME: this should be an RPC invocation
     static ContainerNode createDeviceNotificationListener(final String baseUrl, final ContainerNode input,
-            final SubscribeToStreamUtil streamUtil, final DOMMountPointService mountPointService) {
+            final SubscribeToStreamUtil streamUtil, final DOMMountPointService mountPointService,
+            final ListenersBroker listenersBroker) {
         // parsing out of container with settings and path
         // FIXME: ugly cast
         final YangInstanceIdentifier path =
@@ -182,7 +183,7 @@ final class CreateStreamUtil {
                 ErrorTag.OPERATION_FAILED);
         }
 
-        final DeviceNotificationListenerAdaptor notificationListenerAdapter = streamUtil.listenersBroker()
+        final DeviceNotificationListenerAdaptor notificationListenerAdapter = listenersBroker
             .registerDeviceNotificationListener(deviceName, prepareOutputType(input), mountModelContext,
                 mountPointService, mountPoint.getIdentifier());
         notificationListenerAdapter.listen(mountNotifService, notificationPaths);
index 4d00582d3348af88ea0a7a6af41d23b733132757..651726b019bcfa7cd6fae16ab93704258aa5027c 100644 (file)
@@ -124,18 +124,20 @@ public final class RestconfDataServiceImpl {
     private final MdsalRestconfServer server;
     @Deprecated(forRemoval = true)
     private final DOMDataBroker dataBroker;
-    private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+    private final ListenersBroker listenersBroker;
 
     public RestconfDataServiceImpl(final DatabindProvider databindProvider, final MdsalRestconfServer server,
             final DOMDataBroker dataBroker, final RestconfStreamsSubscriptionService delegRestconfSubscrService,
-            final DOMActionService actionService, final StreamsConfiguration configuration) {
+            final DOMActionService actionService, final ListenersBroker listenersBroker,
+            final StreamsConfiguration configuration) {
         this.databindProvider = requireNonNull(databindProvider);
         this.server = requireNonNull(server);
         this.dataBroker = requireNonNull(dataBroker);
         this.delegRestconfSubscrService = requireNonNull(delegRestconfSubscrService);
         this.actionService = requireNonNull(actionService);
-        streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents()
-                : SubscribeToStreamUtil.webSockets();
+        this.listenersBroker = requireNonNull(listenersBroker);
+        streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents(listenersBroker)
+                : SubscribeToStreamUtil.webSockets(listenersBroker);
     }
 
     /**
@@ -246,9 +248,9 @@ public final class RestconfDataServiceImpl {
                 final var notifName = notification.argument();
 
                 writeNotificationStreamToDatastore(schemaContext, uriInfo, transaction,
-                    createYangNotifiStream(listenersBroker, moduleName, notifName, NotificationOutputType.XML));
+                    createYangNotifiStream(moduleName, notifName, NotificationOutputType.XML));
                 writeNotificationStreamToDatastore(schemaContext, uriInfo, transaction,
-                    createYangNotifiStream(listenersBroker, moduleName, notifName, NotificationOutputType.JSON));
+                    createYangNotifiStream(moduleName, notifName, NotificationOutputType.JSON));
             });
         }
 
@@ -259,8 +261,8 @@ public final class RestconfDataServiceImpl {
         }
     }
 
-    private static NotificationListenerAdapter createYangNotifiStream(final ListenersBroker listenersBroker,
-            final String moduleName, final QName notifName, final NotificationOutputType outputType) {
+    private NotificationListenerAdapter createYangNotifiStream(final String moduleName, final QName notifName,
+            final NotificationOutputType outputType) {
         final var streamName = createNotificationStreamName(moduleName, notifName.getLocalName(), outputType);
 
         final var existing = listenersBroker.notificationListenerFor(streamName);
index da7da6f2c27d888504e1a98cae365bdccea89ee8..46ba9e78e5268036db4629a40f57166308a97c51 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
 
+import static java.util.Objects.requireNonNull;
+
 import java.util.concurrent.ScheduledExecutorService;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -40,15 +42,16 @@ import org.slf4j.LoggerFactory;
 public final class RestconfDataStreamServiceImpl {
     private static final Logger LOG = LoggerFactory.getLogger(RestconfDataStreamServiceImpl.class);
 
-    private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+    private final ListenersBroker listenersBroker;
     private final ScheduledExecutorService executorService;
     private final int maximumFragmentLength;
     private final int heartbeatInterval;
 
     @Inject
     public RestconfDataStreamServiceImpl(final ScheduledThreadPool scheduledThreadPool,
-            final StreamsConfiguration configuration) {
+            final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
         executorService = scheduledThreadPool.getExecutor();
+        this.listenersBroker = requireNonNull(listenersBroker);
         heartbeatInterval = configuration.heartbeatInterval();
         maximumFragmentLength = configuration.maximumFragmentLength();
     }
index 0fc3d26c115d91a00c54cb97031880f8e682befa..03fbe8e93bd202817378848ab5c66b2f99491b10 100644 (file)
@@ -39,6 +39,7 @@ import org.opendaylight.restconf.nb.rfc8040.databind.XmlOperationInputBody;
 import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext;
 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
@@ -60,15 +61,17 @@ public final class RestconfInvokeOperationsServiceImpl {
     @Deprecated(forRemoval = true)
     private final DOMMountPointService mountPointService;
     private final SubscribeToStreamUtil streamUtils;
+    private final ListenersBroker listenersBroker;
 
     public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider,
             final MdsalRestconfServer server, final DOMMountPointService mountPointService,
-            final StreamsConfiguration configuration) {
+            final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
         this.databindProvider = requireNonNull(databindProvider);
         this.server = requireNonNull(server);
         this.mountPointService = requireNonNull(mountPointService);
-        streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents()
-            : SubscribeToStreamUtil.webSockets();
+        this.listenersBroker = requireNonNull(listenersBroker);
+        streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents(listenersBroker)
+            : SubscribeToStreamUtil.webSockets(listenersBroker);
     }
 
     /**
@@ -177,7 +180,7 @@ public final class RestconfInvokeOperationsServiceImpl {
             } else if (SubscribeDeviceNotification.QNAME.equals(type)) {
                 final var baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
                 return RestconfFuture.of(Optional.of(CreateStreamUtil.createDeviceNotificationListener(baseUrl, input,
-                    streamUtils, mountPointService)));
+                    streamUtils, mountPointService, listenersBroker)));
             }
         }
 
index ce5643b3bd1bed50e62ec415235a3e48d3673566..3a192290a375855a97fc126059b3784cf9d1a883 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfStreamsSubscriptionService;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.yang.gen.v1.subscribe.to.notification.rev161028.Notifi;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
@@ -50,10 +51,10 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu
      */
     public RestconfStreamsSubscriptionServiceImpl(final DOMDataBroker dataBroker,
             final DOMNotificationService notificationService, final DatabindProvider databindProvider,
-            final StreamsConfiguration configuration) {
+            final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
         handlersHolder = new HandlersHolder(dataBroker, notificationService, databindProvider);
-        streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents()
-                : SubscribeToStreamUtil.webSockets();
+        streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents(listenersBroker)
+                : SubscribeToStreamUtil.webSockets(listenersBroker);
     }
 
     @Override
index 640b1fe3c46a331790b3da1a58acdec303530755..ab679b1baff7e22d80e5fda78eeab9feb200eb99 100644 (file)
@@ -47,7 +47,6 @@ abstract class SubscribeToStreamUtil {
      * Implementation of SubscribeToStreamUtil for Server-sent events.
      */
     private static final class ServerSentEvents extends SubscribeToStreamUtil {
-        static final ServerSentEvents INSTANCE = new ServerSentEvents(ListenersBroker.getInstance());
 
         private ServerSentEvents(final ListenersBroker listenersBroker) {
             super(listenersBroker);
@@ -65,7 +64,6 @@ abstract class SubscribeToStreamUtil {
      * Implementation of SubscribeToStreamUtil for Web sockets.
      */
     private static final class WebSockets extends SubscribeToStreamUtil {
-        static final WebSockets INSTANCE = new WebSockets(ListenersBroker.getInstance());
 
         private WebSockets(final ListenersBroker listenersBroker) {
             super(listenersBroker);
@@ -98,12 +96,12 @@ abstract class SubscribeToStreamUtil {
         this.listenersBroker = requireNonNull(listenersBroker);
     }
 
-    static SubscribeToStreamUtil serverSentEvents() {
-        return ServerSentEvents.INSTANCE;
+    static SubscribeToStreamUtil serverSentEvents(final ListenersBroker listenersBroker) {
+        return new ServerSentEvents(listenersBroker);
     }
 
-    static SubscribeToStreamUtil webSockets() {
-        return WebSockets.INSTANCE;
+    static SubscribeToStreamUtil webSockets(final ListenersBroker listenersBroker) {
+        return new WebSockets(listenersBroker);
     }
 
     public final @NonNull ListenersBroker listenersBroker() {
index c4016cc9247e3f5363bdf2b88a8db2a38290b3a4..97a93e585b3fc1b557c5223e927deee90ad68f8e 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.annotations.VisibleForTesting;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.Serial;
@@ -38,6 +40,8 @@ public final class WebSocketInitializer extends WebSocketServlet {
     private final int maximumFragmentLength;
     private final int heartbeatInterval;
     private final int idleTimeoutMillis;
+    @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Required for session mgmt")
+    private final ListenersBroker listenersBroker;
 
     /**
      * Creation of the web-socket initializer.
@@ -47,8 +51,9 @@ public final class WebSocketInitializer extends WebSocketServlet {
      */
     @Inject
     public WebSocketInitializer(final ScheduledThreadPool scheduledThreadPool,
-            final StreamsConfiguration configuration) {
+            final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
         executorService = scheduledThreadPool.getExecutor();
+        this.listenersBroker = requireNonNull(listenersBroker);
         maximumFragmentLength = configuration.maximumFragmentLength();
         heartbeatInterval = configuration.heartbeatInterval();
         idleTimeoutMillis = configuration.idleTimeout();
@@ -62,7 +67,8 @@ public final class WebSocketInitializer extends WebSocketServlet {
     @Override
     public void configure(final WebSocketServletFactory factory) {
         factory.getPolicy().setIdleTimeout(idleTimeoutMillis);
-        factory.setCreator(new WebSocketFactory(executorService, maximumFragmentLength, heartbeatInterval));
+        factory.setCreator(new WebSocketFactory(executorService, listenersBroker, maximumFragmentLength,
+            heartbeatInterval));
     }
 
     /**
@@ -73,8 +79,7 @@ public final class WebSocketInitializer extends WebSocketServlet {
         private static final Logger LOG = LoggerFactory.getLogger(WebSocketFactory.class);
 
         private final ScheduledExecutorService executorService;
-        // FIXME: inject this reference
-        private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+        private final ListenersBroker listenersBroker;
         private final int maximumFragmentLength;
         private final int heartbeatInterval;
 
@@ -86,9 +91,10 @@ public final class WebSocketInitializer extends WebSocketServlet {
          *                              (exceeded message length leads to fragmentation of messages).
          * @param heartbeatInterval     Interval in milliseconds between sending of ping control frames.
          */
-        WebSocketFactory(final ScheduledExecutorService executorService, final int maximumFragmentLength,
-                final int heartbeatInterval) {
+        WebSocketFactory(final ScheduledExecutorService executorService, final ListenersBroker listenersBroker,
+                final int maximumFragmentLength, final int heartbeatInterval) {
             this.executorService = executorService;
+            this.listenersBroker = listenersBroker;
             this.maximumFragmentLength = maximumFragmentLength;
             this.heartbeatInterval = heartbeatInterval;
         }
index ccbd86c395bdcb9605013560970ce74600f6eca1..97711fdd657f951c5811aa919d3ef41014c7d636 100644 (file)
@@ -54,6 +54,7 @@ abstract class AbstractCommonSubscriber<T> extends AbstractNotificationsData imp
     private final EventFormatterFactory<T> formatterFactory;
     private final NotificationOutputType outputType;
     private final String streamName;
+    private final ListenersBroker listenersBroker;
 
     @GuardedBy("this")
     private final Set<StreamSessionHandler> subscribers = new HashSet<>();
@@ -66,12 +67,13 @@ abstract class AbstractCommonSubscriber<T> extends AbstractNotificationsData imp
     private Instant stop = null;
 
     AbstractCommonSubscriber(final String streamName, final NotificationOutputType outputType,
-            final EventFormatterFactory<T> formatterFactory) {
+            final EventFormatterFactory<T> formatterFactory, final ListenersBroker listenersBroker) {
         this.streamName = requireNonNull(streamName);
         checkArgument(!streamName.isEmpty());
 
         this.outputType = requireNonNull(outputType);
         this.formatterFactory = requireNonNull(formatterFactory);
+        this.listenersBroker = requireNonNull(listenersBroker);
         formatter = formatterFactory.emptyFormatter();
     }
 
@@ -118,7 +120,7 @@ abstract class AbstractCommonSubscriber<T> extends AbstractNotificationsData imp
         subscribers.remove(subscriber);
         LOG.debug("Subscriber {} is removed", subscriber);
         if (!hasSubscribers()) {
-            ListenersBroker.getInstance().removeAndCloseListener(this);
+            listenersBroker.removeAndCloseListener(this);
         }
     }
 
index 0ca8f4233401f407828249f2265f1c021911d951..ab829cbf8bfe409e8f8f30694c83290ed263f06a 100644 (file)
@@ -29,8 +29,9 @@ abstract class AbstractNotificationListenerAdaptor extends AbstractCommonSubscri
     private static final NotificationFormatterFactory JSON_FORMATTER_FACTORY =
         JSONNotificationFormatter.createFactory(JSONCodecFactorySupplier.RFC7951);
 
-    AbstractNotificationListenerAdaptor(final String streamName, final NotificationOutputType outputType) {
-        super(streamName, outputType, getFormatterFactory(outputType));
+    AbstractNotificationListenerAdaptor(final String streamName, final NotificationOutputType outputType,
+            final ListenersBroker listenersBroker) {
+        super(streamName, outputType, getFormatterFactory(outputType), listenersBroker);
     }
 
     private static NotificationFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
index c7ede8729df486cf52cfcb4845c8576524a0e8ee..0f5355e4490e1902c20b62f195363d9405d9b970 100644 (file)
@@ -33,17 +33,18 @@ public final class DeviceNotificationListenerAdaptor extends AbstractNotificatio
     private final @NonNull EffectiveModelContext effectiveModel;
     private final @NonNull DOMMountPointService mountPointService;
     private final @NonNull YangInstanceIdentifier instanceIdentifier;
-    private final @NonNull ListenersBroker listenersBroker = ListenersBroker.getInstance();
+    private final @NonNull ListenersBroker listenersBroker;
 
     private ListenerRegistration<DOMMountPointListener> reg;
 
     public DeviceNotificationListenerAdaptor(final String streamName, final NotificationOutputType outputType,
             final EffectiveModelContext effectiveModel, final DOMMountPointService mountPointService,
-            final YangInstanceIdentifier path) {
-        super(streamName, outputType);
+            final YangInstanceIdentifier path, final ListenersBroker listenersBroker) {
+        super(streamName, outputType, listenersBroker);
         this.effectiveModel = requireNonNull(effectiveModel);
         this.mountPointService = requireNonNull(mountPointService);
         instanceIdentifier = requireNonNull(path);
+        this.listenersBroker = requireNonNull(listenersBroker);
     }
 
     public synchronized void listen(final DOMNotificationService notificationService, final Set<Absolute> paths) {
index 6fee17d65f5bd9561c03677de80b01ae52ef0ba5..b25cf1c37afa0daefa98873b3ef90d5150444801 100644 (file)
@@ -48,8 +48,8 @@ public class ListenerAdapter extends AbstractCommonSubscriber<Collection<DataTre
      */
     @VisibleForTesting
     public ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
-            final NotificationOutputType outputType) {
-        super(streamName, outputType, getFormatterFactory(outputType));
+            final NotificationOutputType outputType, final ListenersBroker listenersBroker) {
+        super(streamName, outputType, getFormatterFactory(outputType), listenersBroker);
         this.path = requireNonNull(path);
     }
 
index 288adbf67907d70b852ab12c975153d5c04b11b9..af6d5f7e6a2b011a4e93609c4fe9f26eb4b43a04 100644 (file)
@@ -9,13 +9,13 @@ package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
 
 import static java.util.Objects.requireNonNull;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.StampedLock;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
@@ -25,6 +25,8 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,17 +34,12 @@ import org.slf4j.LoggerFactory;
  * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
  * {@link NotificationListenerAdapter} listeners.
  */
-// FIXME: NETCONF-1104: this should be a component
 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
 //        names. We essentially need a component which deals with allocation of stream names and their lifecycle and
 //        the contents of /restconf-state/streams.
+@Singleton
+@Component(service = ListenersBroker.class, immediate = true)
 public final class ListenersBroker {
-    // FIXME: NETCONF-1104: remove this class
-    @Deprecated(since = "7.0.0")
-    private static final class Holder {
-        static final ListenersBroker INSTANCE = new ListenersBroker();
-    }
-
     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
 
     private final StampedLock dataChangeListenersLock = new StampedLock();
@@ -52,19 +49,10 @@ public final class ListenersBroker {
     private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
     private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
 
-    private ListenersBroker() {
-        // FIXME: NETCONF-1104: this constructor should be a public thing
-    }
+    @Inject
+    @Activate
+    public ListenersBroker() {
 
-    /**
-     * Creation of the singleton listeners broker.
-     *
-     * @return Reusable instance of {@link ListenersBroker}.
-     */
-    // FIXME: NETCONF-1104: remove this method
-    @Deprecated(since = "7.0.0")
-    public static ListenersBroker getInstance() {
-        return Holder.INSTANCE;
     }
 
     /**
@@ -159,7 +147,7 @@ public final class ListenersBroker {
         final long stamp = dataChangeListenersLock.writeLock();
         try {
             return dataChangeListeners.computeIfAbsent(streamName,
-                stream -> new ListenerAdapter(path, stream, outputType));
+                stream -> new ListenerAdapter(path, stream, outputType, this));
         } finally {
             dataChangeListenersLock.unlockWrite(stamp);
         }
@@ -183,7 +171,7 @@ public final class ListenersBroker {
         final long stamp = notificationListenersLock.writeLock();
         try {
             return notificationListeners.computeIfAbsent(streamName,
-                stream -> new NotificationListenerAdapter(schemaPath, stream, outputType));
+                stream -> new NotificationListenerAdapter(schemaPath, stream, outputType, this));
         } finally {
             notificationListenersLock.unlockWrite(stamp);
         }
@@ -207,7 +195,7 @@ public final class ListenersBroker {
         try {
             return deviceNotificationListeners.computeIfAbsent(streamName,
                 stream -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
-                    mountPointService, path));
+                    mountPointService, path, this));
         } finally {
             deviceNotificationListenersLock.unlockWrite(stamp);
         }
@@ -407,15 +395,4 @@ public final class ListenersBroker {
         }
         return result;
     }
-
-    @VisibleForTesting
-    public synchronized void setDataChangeListeners(final Map<String, ListenerAdapter> listenerAdapterCollection) {
-        final long stamp = dataChangeListenersLock.writeLock();
-        try {
-            dataChangeListeners.clear();
-            dataChangeListeners.putAll(listenerAdapterCollection);
-        } finally {
-            dataChangeListenersLock.unlockWrite(stamp);
-        }
-    }
 }
\ No newline at end of file
index dc61e868fa9e0fde5a8a7f505e9fd66900cee6ea..0bec0dfa7275b8de9b96ac91ccd66c818a3ed32a 100644 (file)
@@ -28,8 +28,9 @@ public final class NotificationListenerAdapter extends AbstractNotificationListe
      * @param streamName Name of the stream.
      * @param outputType Type of output on notification (JSON or XML).
      */
-    NotificationListenerAdapter(final Absolute path, final String streamName, final NotificationOutputType outputType) {
-        super(streamName, outputType);
+    NotificationListenerAdapter(final Absolute path, final String streamName, final NotificationOutputType outputType,
+            final ListenersBroker listenersBroker) {
+        super(streamName, outputType, listenersBroker);
         this.path = requireNonNull(path);
     }
 
index 804e0149d98441bba353f82eb3cadb469f4e8e95..6dae3dfa49840f216ced4314dbcf62bfa8a90d05 100644 (file)
@@ -40,7 +40,7 @@ import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
 public class CreateStreamUtilTest {
     private static EffectiveModelContext SCHEMA_CTX;
 
-    private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+    private final ListenersBroker listenersBroker = new ListenersBroker();
 
     @BeforeClass
     public static void setUp() {
index 8c0c252b5fc673386faf2da62ad356ec4941d5a5..5a39d2315f35299ea719182d5d0584eb431b8c27 100644 (file)
@@ -27,6 +27,7 @@ import org.opendaylight.restconf.nb.rfc8040.AbstractInstanceIdentifierTest;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfStreamsSubscriptionService;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
@@ -55,7 +56,7 @@ public class Netconf799Test extends AbstractInstanceIdentifierTest {
 
         final var dataService = new RestconfDataServiceImpl(() -> DatabindContext.ofModel(IID_SCHEMA),
             new MdsalRestconfServer(dataBroker, rpcService, mountPointService), dataBroker, restconfStreamSubService,
-            actionService, new StreamsConfiguration(0, 1, 0, false));
+            actionService, new ListenersBroker(), new StreamsConfiguration(0, 1, 0, false));
 
         final var response = dataService.postDataJSON("instance-identifier-module:cont/cont1/reset",
             stringInputStream("""
index fdd288311092c063ec871722cd43bcfd53af27af..d2204b465c14b489835c7c9cd6336f28576737e8 100644 (file)
@@ -62,6 +62,7 @@ import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfStreamsSubscriptionService;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.patch.rev170222.yang.patch.yang.patch.Edit.Operation;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
@@ -91,8 +92,6 @@ public class RestconfDataServiceImplTest extends AbstractJukeboxTest {
             .withChild(Builders.mapBuilder().withNodeIdentifier(PLAYLIST_NID).build())
             .build();
 
-    private RestconfDataServiceImpl dataService;
-
     @Mock
     private UriInfo uriInfo;
     @Mock
@@ -118,6 +117,8 @@ public class RestconfDataServiceImplTest extends AbstractJukeboxTest {
     @Mock
     private AsyncResponse asyncResponse;
 
+    private RestconfDataServiceImpl dataService;
+
     @Before
     public void setUp() throws Exception {
         doReturn(Set.of()).when(queryParamenters).entrySet();
@@ -131,7 +132,7 @@ public class RestconfDataServiceImplTest extends AbstractJukeboxTest {
 
         dataService = new RestconfDataServiceImpl(() -> DatabindContext.ofModel(JUKEBOX_SCHEMA),
             new MdsalRestconfServer(dataBroker, rpcService, mountPointService), dataBroker, delegRestconfSubscrService,
-            actionService, new StreamsConfiguration(0, 1, 0, false));
+            actionService, new ListenersBroker(), new StreamsConfiguration(0, 1, 0, false));
         doReturn(Optional.of(mountPoint)).when(mountPointService)
                 .getMountPoint(any(YangInstanceIdentifier.class));
         doReturn(Optional.of(FixedDOMSchemaService.of(JUKEBOX_SCHEMA))).when(mountPoint)
index ba9af579faacffae79a68b92e64e65cbd745ef96..5d160aaf02bc8f74ae86cbfb95ba9208a3bf4381 100644 (file)
@@ -45,6 +45,7 @@ import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -89,7 +90,7 @@ public class RestconfInvokeOperationsServiceImplTest {
     public void setup() {
         server = new MdsalRestconfServer(dataBroker, rpcService, mountPointService);
         invokeOperationsService = new RestconfInvokeOperationsServiceImpl(() -> CONTEXT, server, mountPointService,
-            new StreamsConfiguration(0, 1, 0, false));
+            new ListenersBroker(), new StreamsConfiguration(0, 1, 0, false));
     }
 
     @Test
index f84269061713b605455b2ab9965d101f95daee17..c9fb933c8697fec38a61ffd3d0d9079ec9a548fe 100644 (file)
@@ -16,11 +16,9 @@ import static org.mockito.Mockito.mock;
 import com.google.common.collect.ImmutableClassToInstanceMap;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Map;
 import javax.ws.rs.core.MultivaluedHashMap;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -37,7 +35,6 @@ import org.opendaylight.restconf.nb.rfc8040.URLConstants;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
@@ -55,8 +52,6 @@ public class RestconfStreamsSubscriptionServiceImplTest {
             + "toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
 
     private static EffectiveModelContext MODEL_CONTEXT;
-    // FIXME: NETCONF-1104: this should be non-static and set up for each test separately
-    private static ListenersBroker LISTENERS_BROKER;
 
     @Mock
     private DOMDataBroker dataBroker;
@@ -65,34 +60,22 @@ public class RestconfStreamsSubscriptionServiceImplTest {
     @Mock
     private DOMNotificationService notificationService;
 
+    private final ListenersBroker listenersBroker = new ListenersBroker();
     private StreamsConfiguration configurationWs;
     private StreamsConfiguration configurationSse;
-
     private DatabindProvider databindProvider;
 
     @BeforeClass
     public static void beforeClass() {
         MODEL_CONTEXT = YangParserTestUtils.parseYangResourceDirectory("/notifications");
-
-        final String name =
-            "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
-        final ListenerAdapter adapter = new ListenerAdapter(YangInstanceIdentifier.of(
-            QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
-            name, NotificationOutputType.JSON);
-        LISTENERS_BROKER = ListenersBroker.getInstance();
-        LISTENERS_BROKER.setDataChangeListeners(Map.of(name, adapter));
-    }
-
-    @AfterClass
-    public static void afterClass() {
-        if (LISTENERS_BROKER != null) {
-            LISTENERS_BROKER.setDataChangeListeners(Map.of());
-            LISTENERS_BROKER = null;
-        }
     }
 
     @Before
     public void setUp() throws URISyntaxException {
+        final var name = "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
+        listenersBroker.registerDataChangeListener(
+            YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
+            name, NotificationOutputType.JSON);
         final DOMDataTreeWriteTransaction wTx = mock(DOMDataTreeWriteTransaction.class);
         doReturn(wTx).when(dataBroker).newWriteOnlyTransaction();
         doReturn(CommitInfo.emptyFluentFuture()).when(wTx).commit();
@@ -115,12 +98,12 @@ public class RestconfStreamsSubscriptionServiceImplTest {
 
     @Test
     public void testSubscribeToStreamSSE() {
-        LISTENERS_BROKER.registerDataChangeListener(
+        listenersBroker.registerDataChangeListener(
                 IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT),
                 "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
                 NotificationOutputType.XML);
         final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-            notificationService, databindProvider, configurationSse);
+            notificationService, databindProvider, listenersBroker, configurationSse);
         final var response = streamsSubscriptionService.subscribeToStream(
             "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
         assertEquals("http://localhost:8181/" + URLConstants.BASE_PATH + "/" + URLConstants.SSE_SUBPATH
@@ -130,12 +113,12 @@ public class RestconfStreamsSubscriptionServiceImplTest {
 
     @Test
     public void testSubscribeToStreamWS() {
-        LISTENERS_BROKER.registerDataChangeListener(
+        listenersBroker.registerDataChangeListener(
                 IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT),
                 "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
                 NotificationOutputType.XML);
         final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-            notificationService, databindProvider, configurationWs);
+            notificationService, databindProvider, listenersBroker, configurationWs);
         final var response = streamsSubscriptionService.subscribeToStream(
             "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
         assertEquals("ws://localhost:8181/" + URLConstants.BASE_PATH
@@ -147,7 +130,7 @@ public class RestconfStreamsSubscriptionServiceImplTest {
     public void testSubscribeToStreamMissingDatastoreInPath() {
         final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
                 new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
-                        configurationWs);
+                    listenersBroker, configurationWs);
         final var errors = assertThrows(RestconfDocumentedException.class,
             () -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/scope=ONE", uriInfo))
             .getErrors();
@@ -162,7 +145,7 @@ public class RestconfStreamsSubscriptionServiceImplTest {
     public void testSubscribeToStreamMissingScopeInPath() {
         final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
                 new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
-                        configurationWs);
+                    listenersBroker, configurationWs);
         final var errors = assertThrows(RestconfDocumentedException.class,
             () -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/datastore=OPERATIONAL",
                 uriInfo)).getErrors();
index 5f1153c49244698ed985962d1fbc3955eb0b1a5e..0dc747beb18ecc8a0ef199cc9841a8c6ef830ec0 100644 (file)
@@ -18,7 +18,7 @@ import java.net.URI;
 import java.util.concurrent.ScheduledExecutorService;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer.WebSocketFactory;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
@@ -27,19 +27,20 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class WebSocketFactoryTest {
-
     private static final String REGISTERED_STREAM_NAME = "data-change-event-subscription/"
             + "toaster:toaster/datastore=CONFIGURATION/scope=SUBTREE";
     private static final YangInstanceIdentifier TOASTER_YIID = YangInstanceIdentifier.builder()
             .node(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))
             .build();
 
+    private final ListenersBroker listenersBroker = new ListenersBroker();
+
     private final WebSocketFactory webSocketFactory = new WebSocketFactory(mock(ScheduledExecutorService.class),
-            5000, 2000);
+        listenersBroker, 5000, 2000);
 
-    @BeforeClass
-    public static void prepareListenersBroker() {
-        ListenersBroker.getInstance().registerDataChangeListener(TOASTER_YIID, REGISTERED_STREAM_NAME,
+    @Before
+    public void prepareListenersBroker() {
+        listenersBroker.registerDataChangeListener(TOASTER_YIID, REGISTERED_STREAM_NAME,
                 NotificationOutputTypeGrouping.NotificationOutputType.JSON);
     }
 
index e3d63a647ce9cb2a10cae8c74601646a18e562e3..8dbd7b7594901d9c339b0347dbf57735eec6433f 100644 (file)
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
 public class JsonNotificationListenerTest extends AbstractNotificationListenerTest {
     private static final Logger LOG = LoggerFactory.getLogger(JsonNotificationListenerTest.class);
 
-    private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+    private final ListenersBroker listenersBroker = new ListenersBroker();
 
     @Test
     public void notifi_leafTest() throws Exception {
index 1d46377ee656e21dd7c85ec60eb02fbb15ec72e8..f706b32240c015013b33790066635dc4127448b6 100644 (file)
@@ -149,6 +149,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     private static EffectiveModelContext SCHEMA_CONTEXT;
 
+    private final ListenersBroker listenersBroker = new ListenersBroker();
     private DataBroker dataBroker;
     private DOMDataBroker domDataBroker;
     private DatabindProvider databindProvider;
@@ -177,8 +178,9 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
         ListenerAdapterTester(final YangInstanceIdentifier path, final String streamName,
                 final NotificationOutputType outputType, final boolean leafNodesOnly,
-                final boolean skipNotificationData, final boolean changedLeafNodesOnly, final boolean childNodesOnly) {
-            super(path, streamName, outputType);
+                final boolean skipNotificationData, final boolean changedLeafNodesOnly, final boolean childNodesOnly,
+                final ListenersBroker listenersBroker) {
+            super(path, streamName, outputType, listenersBroker);
             setQueryParams(NotificationQueryParams.of(StartTimeParam.forUriValue("1970-01-01T00:00:00Z"), null, null,
                 leafNodesOnly ? LeafNodesOnlyParam.of(true) : null,
                 skipNotificationData ? SkipNotificationDataParam.of(true) : null,
@@ -235,7 +237,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
     @Test
     public void testJsonNotifsLeaves() throws Exception {
         ListenerAdapterTester adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
-            true, false, false, false);
+            true, false, false, false, listenersBroker);
         adapter.setCloseVars(domDataBroker, databindProvider);
 
         final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
@@ -276,7 +278,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
     @Test
     public void testJsonNotifsChangedLeaves() throws Exception {
         ListenerAdapterTester adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
-                false, false, true, false);
+                false, false, true, false, listenersBroker);
         adapter.setCloseVars(domDataBroker, databindProvider);
 
         final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
@@ -325,7 +327,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
     @Test
     public void testJsonChildNodesOnly() throws Exception {
         final var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey",
-            NotificationOutputType.JSON, false, false, false, true);
+            NotificationOutputType.JSON, false, false, false, true, listenersBroker);
         adapter.setCloseVars(domDataBroker, databindProvider);
 
         final var changeService = domDataBroker.getExtensions()
@@ -361,7 +363,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
     @Test
     public void testXmlLeavesOnly() throws Exception {
         ListenerAdapterTester adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
-            true, false, false, false);
+            true, false, false, false, listenersBroker);
         adapter.setCloseVars(domDataBroker, databindProvider);
 
         DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
@@ -413,7 +415,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
     @Test
     public void testXmlChangedLeavesOnly() throws Exception {
         ListenerAdapterTester adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
-                false, false, true, false);
+                false, false, true, false, listenersBroker);
         adapter.setCloseVars(domDataBroker, databindProvider);
 
         DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
@@ -473,7 +475,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
     @Test
     public void testXmlChildNodesOnly() throws Exception {
         final var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey",
-            NotificationOutputType.XML, false, false, false, true);
+            NotificationOutputType.XML, false, false, false, true, listenersBroker);
         adapter.setCloseVars(domDataBroker, databindProvider);
 
         final var changeService = domDataBroker.getExtensions()
@@ -579,7 +581,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
     private void jsonNotifications(final YangInstanceIdentifier pathYiid, final boolean skipData,
             final String jsonNotifCreate, final String jsonNotifUpdate, final String jsonNotifDelete) throws Exception {
         final var adapter = new ListenerAdapterTester(pathYiid, "Casey",
-                NotificationOutputType.JSON, false, skipData, false, false);
+                NotificationOutputType.JSON, false, skipData, false, false, listenersBroker);
         adapter.setCloseVars(domDataBroker, databindProvider);
 
         final var changeService = domDataBroker.getExtensions()
@@ -610,7 +612,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
     private void xmlNotifications(final YangInstanceIdentifier pathYiid, final boolean skipData,
             final String xmlNotifCreate, final String xmlNotifUpdate, final String xmlNotifDelete) throws Exception {
         final var adapter = new ListenerAdapterTester(pathYiid, "Casey", NotificationOutputType.XML,
-                false, skipData, false, false);
+                false, skipData, false, false, listenersBroker);
         adapter.setCloseVars(domDataBroker, databindProvider);
 
         final var changeService = domDataBroker.getExtensions()
index e40282e2e404e03395400aa7f7fc72ef8ef83644..15afe969bfcd90395fb6ce721b9f5bee080d1d6a 100644 (file)
@@ -30,7 +30,7 @@ import org.xmlunit.assertj.XmlAssert;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class XmlNotificationListenerTest extends AbstractNotificationListenerTest {
-    private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+    private final ListenersBroker listenersBroker = new ListenersBroker();
 
     @Test
     public void notifi_leafTest() throws Exception {