Capture ListenersBroker instances 82/107782/2
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 10 Sep 2023 14:21:08 +0000 (16:21 +0200)
committerRobert Varga <nite@hq.sk>
Sun, 10 Sep 2023 15:10:49 +0000 (15:10 +0000)
Rather than having random code call out to
ListenersBroker.getInstance(), make sure we get the reference in outer
scope, mostly in a field.

JIRA: NETCONF-1104
Change-Id: I88b61e53ee36567d52fe297841e6910dea13d2ee
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/DeviceNotificationListenerAdaptor.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtilTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImplTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/JsonNotificationListenerTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/XmlNotificationListenerTest.java

index 4715e07c5c573c21d58e5b303e02032300a6d7de..8e5502928b97bb28c576edbb620ae38d7bf8c42e 100644 (file)
@@ -108,7 +108,7 @@ final class CreateStreamUtil {
      *     </pre>
      */
     // FIXME: this really should be a normal RPC implementation
-    static DOMRpcResult createDataChangeNotifiStream(final ContainerNode input,
+    static DOMRpcResult createDataChangeNotifiStream(final ListenersBroker listenersBroker, final ContainerNode input,
             final EffectiveModelContext refSchemaCtx) {
         // parsing out of container with settings and path
         final YangInstanceIdentifier path = preparePath(input);
@@ -123,7 +123,7 @@ final class CreateStreamUtil {
         final String streamName = streamNameBuilder.toString();
 
         // registration of the listener
-        ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType);
+        listenersBroker.registerDataChangeListener(path, streamName, outputType);
 
         // building of output
         return new DefaultDOMRpcResult(Builders.containerBuilder()
@@ -182,7 +182,7 @@ final class CreateStreamUtil {
                 ErrorTag.OPERATION_FAILED);
         }
 
-        final DeviceNotificationListenerAdaptor notificationListenerAdapter = ListenersBroker.getInstance()
+        final DeviceNotificationListenerAdaptor notificationListenerAdapter = streamUtil.listenersBroker()
             .registerDeviceNotificationListener(deviceName, prepareOutputType(input), mountModelContext,
                 mountPointService, mountPoint.getIdentifier());
         notificationListenerAdapter.listen(mountNotifService, notificationPaths);
index 660a837b59daae4861ffd2affd2fa54e6608a79a..7e085fb191c0143877b2dc0f801e35dcd454013e 100644 (file)
@@ -129,6 +129,7 @@ public final class RestconfDataServiceImpl {
     private final SubscribeToStreamUtil streamUtils;
     private final DOMActionService actionService;
     private final DOMDataBroker dataBroker;
+    private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
 
     public RestconfDataServiceImpl(final DatabindProvider databindProvider,
             final DOMDataBroker dataBroker, final DOMMountPointService  mountPointService,
@@ -246,9 +247,9 @@ public final class RestconfDataServiceImpl {
                 final var notifName = notification.argument();
 
                 writeNotificationStreamToDatastore(schemaContext, uriInfo, transaction,
-                    createYangNotifiStream(moduleName, notifName, NotificationOutputType.XML));
+                    createYangNotifiStream(listenersBroker, moduleName, notifName, NotificationOutputType.XML));
                 writeNotificationStreamToDatastore(schemaContext, uriInfo, transaction,
-                    createYangNotifiStream(moduleName, notifName, NotificationOutputType.JSON));
+                    createYangNotifiStream(listenersBroker, moduleName, notifName, NotificationOutputType.JSON));
             });
         }
 
@@ -259,10 +260,9 @@ public final class RestconfDataServiceImpl {
         }
     }
 
-    private static NotificationListenerAdapter createYangNotifiStream(final String moduleName, final QName notifName,
-            final NotificationOutputType outputType) {
+    private static NotificationListenerAdapter createYangNotifiStream(final ListenersBroker listenersBroker,
+            final String moduleName, final QName notifName, final NotificationOutputType outputType) {
         final var streamName = createNotificationStreamName(moduleName, notifName.getLocalName(), outputType);
-        final var listenersBroker = ListenersBroker.getInstance();
 
         final var existing = listenersBroker.notificationListenerFor(streamName);
         return existing != null ? existing
index 4f4543d101c6f353121f4497431fa6d7e561da79..91a3286350d066b653bffcf8cd60aa8b3769ea77 100644 (file)
@@ -161,7 +161,8 @@ public final class RestconfInvokeOperationsServiceImpl {
         final var mountPoint = context.getMountPoint();
         if (mountPoint == null) {
             if (CreateDataChangeEventSubscription.QNAME.equals(rpcName)) {
-                future = Futures.immediateFuture(CreateStreamUtil.createDataChangeNotifiStream(input, schemaContext));
+                future = Futures.immediateFuture(CreateStreamUtil.createDataChangeNotifiStream(
+                    streamUtils.listenersBroker(), input, schemaContext));
             } else if (SubscribeDeviceNotification.QNAME.equals(rpcName)) {
                 final String baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
                 future = Futures.immediateFuture(CreateStreamUtil.createDeviceNotificationListener(baseUrl, input,
index 8b98bc4786b093951ecf7c968b3d5fcbbd3955fc..640b1fe3c46a331790b3da1a58acdec303530755 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.Splitter;
 import java.net.URI;
@@ -30,7 +31,6 @@ import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsS
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
@@ -47,7 +47,11 @@ abstract class SubscribeToStreamUtil {
      * Implementation of SubscribeToStreamUtil for Server-sent events.
      */
     private static final class ServerSentEvents extends SubscribeToStreamUtil {
-        static final ServerSentEvents INSTANCE = new ServerSentEvents();
+        static final ServerSentEvents INSTANCE = new ServerSentEvents(ListenersBroker.getInstance());
+
+        private ServerSentEvents(final ListenersBroker listenersBroker) {
+            super(listenersBroker);
+        }
 
         @Override
         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
@@ -61,7 +65,11 @@ abstract class SubscribeToStreamUtil {
      * Implementation of SubscribeToStreamUtil for Web sockets.
      */
     private static final class WebSockets extends SubscribeToStreamUtil {
-        static final WebSockets INSTANCE = new WebSockets();
+        static final WebSockets INSTANCE = new WebSockets(ListenersBroker.getInstance());
+
+        private WebSockets(final ListenersBroker listenersBroker) {
+            super(listenersBroker);
+        }
 
         @Override
         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
@@ -84,8 +92,10 @@ abstract class SubscribeToStreamUtil {
     private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
     private static final Splitter SLASH_SPLITTER = Splitter.on('/');
 
-    SubscribeToStreamUtil() {
-        // Hidden on purpose
+    private final @NonNull ListenersBroker listenersBroker;
+
+    SubscribeToStreamUtil(final ListenersBroker listenersBroker) {
+        this.listenersBroker = requireNonNull(listenersBroker);
     }
 
     static SubscribeToStreamUtil serverSentEvents() {
@@ -96,6 +106,10 @@ abstract class SubscribeToStreamUtil {
         return WebSockets.INSTANCE;
     }
 
+    public final @NonNull ListenersBroker listenersBroker() {
+        return listenersBroker;
+    }
+
     /**
      * Prepare URL from base name and stream name.
      *
@@ -122,15 +136,13 @@ abstract class SubscribeToStreamUtil {
             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
         }
 
-        final NotificationListenerAdapter notificationListenerAdapter = ListenersBroker.getInstance()
-            .notificationListenerFor(streamName);
+        final var notificationListenerAdapter = listenersBroker.notificationListenerFor(streamName);
         if (notificationListenerAdapter == null) {
             throw new RestconfDocumentedException(String.format("Stream with name %s was not found.", streamName),
                 ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
         }
 
-        final EffectiveModelContext schemaContext = handlersHolder.getDatabindProvider().currentContext()
-            .modelContext();
+        final var schemaContext = handlersHolder.getDatabindProvider().currentContext().modelContext();
         final URI uri = prepareUriByStreamName(uriInfo, streamName);
         notificationListenerAdapter.setQueryParams(notificationQueryParams);
         notificationListenerAdapter.listen(handlersHolder.getNotificationServiceHandler());
@@ -176,7 +188,7 @@ abstract class SubscribeToStreamUtil {
         }
 
         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
-        final ListenerAdapter listener = ListenersBroker.getInstance().dataChangeListenerFor(streamName);
+        final ListenerAdapter listener = listenersBroker.dataChangeListenerFor(streamName);
         if (listener == null) {
             throw new RestconfDocumentedException("No listener found for stream " + streamName,
                 ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
index d804f125564fda3f0495d6f70f85301df0ff43d7..c7ede8729df486cf52cfcb4845c8576524a0e8ee 100644 (file)
@@ -33,6 +33,7 @@ public final class DeviceNotificationListenerAdaptor extends AbstractNotificatio
     private final @NonNull EffectiveModelContext effectiveModel;
     private final @NonNull DOMMountPointService mountPointService;
     private final @NonNull YangInstanceIdentifier instanceIdentifier;
+    private final @NonNull ListenersBroker listenersBroker = ListenersBroker.getInstance();
 
     private ListenerRegistration<DOMMountPointListener> reg;
 
@@ -84,7 +85,7 @@ public final class DeviceNotificationListenerAdaptor extends AbstractNotificatio
                     }
                 }
             });
-            ListenersBroker.getInstance().removeAndCloseDeviceNotificationListener(this);
+            listenersBroker.removeAndCloseDeviceNotificationListener(this);
             resetListenerRegistration();
         }
     }
index c7df03d5849a828b37f108eff743de6e17dcd364..153af880c955f7fb991e59b5b6928af9d8b9ba2a 100644 (file)
@@ -21,6 +21,7 @@ import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -41,6 +42,8 @@ import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
 public class CreateStreamUtilTest {
     private static EffectiveModelContext SCHEMA_CTX;
 
+    private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+
     @BeforeClass
     public static void setUp() {
         SCHEMA_CTX = YangParserTestUtils.parseYangResourceDirectory("/streams");
@@ -48,7 +51,7 @@ public class CreateStreamUtilTest {
 
     @Test
     public void createStreamTest() {
-        final DOMRpcResult result = CreateStreamUtil.createDataChangeNotifiStream(
+        final DOMRpcResult result = CreateStreamUtil.createDataChangeNotifiStream(listenersBroker,
             prepareDomPayload("create-data-change-event-subscription", RpcDefinition::getInput, "toaster", "path"),
             SCHEMA_CTX);
         assertEquals(List.of(), result.errors());
@@ -63,7 +66,7 @@ public class CreateStreamUtilTest {
         final var payload = prepareDomPayload("create-data-change-event-subscription", RpcDefinition::getInput,
             "String value", "path");
         final var errors = assertThrows(RestconfDocumentedException.class,
-            () -> CreateStreamUtil.createDataChangeNotifiStream(payload, SCHEMA_CTX)).getErrors();
+            () -> CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, payload, SCHEMA_CTX)).getErrors();
         assertEquals(1, errors.size());
         final var error = errors.get(0);
         assertEquals(ErrorType.APPLICATION, error.getErrorType());
@@ -76,7 +79,7 @@ public class CreateStreamUtilTest {
         final var payload = prepareDomPayload("create-data-change-event-subscription2", RpcDefinition::getInput,
             "toaster", "path2");
         final var errors = assertThrows(RestconfDocumentedException.class,
-            () -> CreateStreamUtil.createDataChangeNotifiStream(payload, SCHEMA_CTX)).getErrors();
+            () -> CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, payload, SCHEMA_CTX)).getErrors();
         assertEquals(1, errors.size());
         final var error = errors.get(0);
         assertEquals(ErrorType.APPLICATION, error.getErrorType());
index ea3df69c6461953775d79d802b5d62418265cc1d..0579dddabdd5757feb2c57b6d5a782df41643f6f 100644 (file)
@@ -56,6 +56,8 @@ public class RestconfStreamsSubscriptionServiceImplTest {
             + "toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
 
     private static EffectiveModelContext MODEL_CONTEXT;
+    // FIXME: NETCONF-1104: this should be non-static and set up for each test separately
+    private static ListenersBroker LISTENERS_BROKER;
 
     @Mock
     private DOMDataBroker dataBroker;
@@ -72,6 +74,22 @@ public class RestconfStreamsSubscriptionServiceImplTest {
     @BeforeClass
     public static void beforeClass() {
         MODEL_CONTEXT = YangParserTestUtils.parseYangResourceDirectory("/notifications");
+
+        final String name =
+            "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
+        final ListenerAdapter adapter = new ListenerAdapter(YangInstanceIdentifier.of(
+            QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
+            name, NotificationOutputType.JSON);
+        LISTENERS_BROKER = ListenersBroker.getInstance();
+        LISTENERS_BROKER.setDataChangeListeners(Map.of(name, adapter));
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        if (LISTENERS_BROKER != null) {
+            LISTENERS_BROKER.setDataChangeListeners(Map.of());
+            LISTENERS_BROKER = null;
+        }
     }
 
     @Before
@@ -96,24 +114,9 @@ public class RestconfStreamsSubscriptionServiceImplTest {
         configurationSse = new StreamsConfiguration(0, 100, 10, true);
     }
 
-    @BeforeClass
-    public static void setUpBeforeTest() {
-        final String name =
-            "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
-        final ListenerAdapter adapter = new ListenerAdapter(YangInstanceIdentifier.of(
-            QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
-            name, NotificationOutputType.JSON);
-        ListenersBroker.getInstance().setDataChangeListeners(Map.of(name, adapter));
-    }
-
-    @AfterClass
-    public static void setUpAfterTest() {
-        ListenersBroker.getInstance().setDataChangeListeners(Map.of());
-    }
-
     @Test
     public void testSubscribeToStreamSSE() {
-        ListenersBroker.getInstance().registerDataChangeListener(
+        LISTENERS_BROKER.registerDataChangeListener(
                 IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT),
                 "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
                 NotificationOutputType.XML);
@@ -129,7 +132,7 @@ public class RestconfStreamsSubscriptionServiceImplTest {
 
     @Test
     public void testSubscribeToStreamWS() {
-        ListenersBroker.getInstance().registerDataChangeListener(
+        LISTENERS_BROKER.registerDataChangeListener(
                 IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT),
                 "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
                 NotificationOutputType.XML);
index 7d35822df3dfababae58e41cb379f90c97ffb8e1..e3d63a647ce9cb2a10cae8c74601646a18e562e3 100644 (file)
@@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory;
 public class JsonNotificationListenerTest extends AbstractNotificationListenerTest {
     private static final Logger LOG = LoggerFactory.getLogger(JsonNotificationListenerTest.class);
 
+    private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+
     @Test
     public void notifi_leafTest() throws Exception {
         final Absolute schemaPathNotifi = Absolute.of(QName.create(MODULE, "notifi-leaf"));
@@ -159,10 +161,10 @@ public class JsonNotificationListenerTest extends AbstractNotificationListenerTe
         return ImmutableNodes.leafNode(leafQName, "value");
     }
 
-    private static String prepareJson(final DOMNotification notificationData, final Absolute schemaPathNotifi)
+    private String prepareJson(final DOMNotification notificationData, final Absolute schemaPathNotifi)
             throws Exception {
-        final var notifiAdapter = ListenersBroker.getInstance().registerNotificationListener(
-                schemaPathNotifi, "json-stream", NotificationOutputType.JSON);
+        final var notifiAdapter = listenersBroker.registerNotificationListener(schemaPathNotifi, "json-stream",
+            NotificationOutputType.JSON);
         return notifiAdapter.formatter().eventData(SCHEMA_CONTEXT, notificationData, Instant.now()).orElseThrow();
     }
 }
index 1400b783440afe3e217743973bdffe5788d4262a..e40282e2e404e03395400aa7f7fc72ef8ef83644 100644 (file)
@@ -30,6 +30,8 @@ import org.xmlunit.assertj.XmlAssert;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class XmlNotificationListenerTest extends AbstractNotificationListenerTest {
+    private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+
     @Test
     public void notifi_leafTest() throws Exception {
         final Absolute schemaPathNotifi = Absolute.of(QName.create(MODULE, "notifi-leaf"));
@@ -166,10 +168,10 @@ public class XmlNotificationListenerTest extends AbstractNotificationListenerTes
         return ImmutableNodes.leafNode(leafQName, "value");
     }
 
-    private static String prepareXmlResult(final DOMNotification notificationData, final Absolute schemaPathNotifi)
+    private String prepareXmlResult(final DOMNotification notificationData, final Absolute schemaPathNotifi)
             throws Exception {
-        final var notifiAdapter = ListenersBroker.getInstance().registerNotificationListener(
-                schemaPathNotifi, "xml-stream", NotificationOutputType.XML);
+        final var notifiAdapter = listenersBroker.registerNotificationListener(schemaPathNotifi, "xml-stream",
+            NotificationOutputType.XML);
         return notifiAdapter.formatter().eventData(SCHEMA_CONTEXT, notificationData, Instant.now()).orElseThrow();
     }
 }