From ada370c59bb7fe3fbc5131697b435aa13db96025 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sun, 10 Sep 2023 16:21:08 +0200 Subject: [PATCH] Capture ListenersBroker instances Rather than having random code call out to ListenersBroker.getInstance(), make sure we get the reference in outer scope, mostly in a field. JIRA: NETCONF-1104 Change-Id: I88b61e53ee36567d52fe297841e6910dea13d2ee Signed-off-by: Robert Varga --- .../rests/services/impl/CreateStreamUtil.java | 6 +-- .../impl/RestconfDataServiceImpl.java | 10 ++--- .../RestconfInvokeOperationsServiceImpl.java | 3 +- .../services/impl/SubscribeToStreamUtil.java | 32 +++++++++++----- .../DeviceNotificationListenerAdaptor.java | 3 +- .../services/impl/CreateStreamUtilTest.java | 9 +++-- ...onfStreamsSubscriptionServiceImplTest.java | 37 ++++++++++--------- .../JsonNotificationListenerTest.java | 8 ++-- .../XmlNotificationListenerTest.java | 8 ++-- 9 files changed, 70 insertions(+), 46 deletions(-) diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java index 4715e07c5c..8e5502928b 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java @@ -108,7 +108,7 @@ final class CreateStreamUtil { * */ // FIXME: this really should be a normal RPC implementation - static DOMRpcResult createDataChangeNotifiStream(final ContainerNode input, + static DOMRpcResult createDataChangeNotifiStream(final ListenersBroker listenersBroker, final ContainerNode input, final EffectiveModelContext refSchemaCtx) { // parsing out of container with settings and path final YangInstanceIdentifier path = preparePath(input); @@ -123,7 +123,7 @@ final class CreateStreamUtil { final String streamName = streamNameBuilder.toString(); // registration of the listener - ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType); + listenersBroker.registerDataChangeListener(path, streamName, outputType); // building of output return new DefaultDOMRpcResult(Builders.containerBuilder() @@ -182,7 +182,7 @@ final class CreateStreamUtil { ErrorTag.OPERATION_FAILED); } - final DeviceNotificationListenerAdaptor notificationListenerAdapter = ListenersBroker.getInstance() + final DeviceNotificationListenerAdaptor notificationListenerAdapter = streamUtil.listenersBroker() .registerDeviceNotificationListener(deviceName, prepareOutputType(input), mountModelContext, mountPointService, mountPoint.getIdentifier()); notificationListenerAdapter.listen(mountNotifService, notificationPaths); diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java index 660a837b59..7e085fb191 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java @@ -129,6 +129,7 @@ public final class RestconfDataServiceImpl { private final SubscribeToStreamUtil streamUtils; private final DOMActionService actionService; private final DOMDataBroker dataBroker; + private final ListenersBroker listenersBroker = ListenersBroker.getInstance(); public RestconfDataServiceImpl(final DatabindProvider databindProvider, final DOMDataBroker dataBroker, final DOMMountPointService mountPointService, @@ -246,9 +247,9 @@ public final class RestconfDataServiceImpl { final var notifName = notification.argument(); writeNotificationStreamToDatastore(schemaContext, uriInfo, transaction, - createYangNotifiStream(moduleName, notifName, NotificationOutputType.XML)); + createYangNotifiStream(listenersBroker, moduleName, notifName, NotificationOutputType.XML)); writeNotificationStreamToDatastore(schemaContext, uriInfo, transaction, - createYangNotifiStream(moduleName, notifName, NotificationOutputType.JSON)); + createYangNotifiStream(listenersBroker, moduleName, notifName, NotificationOutputType.JSON)); }); } @@ -259,10 +260,9 @@ public final class RestconfDataServiceImpl { } } - private static NotificationListenerAdapter createYangNotifiStream(final String moduleName, final QName notifName, - final NotificationOutputType outputType) { + private static NotificationListenerAdapter createYangNotifiStream(final ListenersBroker listenersBroker, + final String moduleName, final QName notifName, final NotificationOutputType outputType) { final var streamName = createNotificationStreamName(moduleName, notifName.getLocalName(), outputType); - final var listenersBroker = ListenersBroker.getInstance(); final var existing = listenersBroker.notificationListenerFor(streamName); return existing != null ? existing diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java index 4f4543d101..91a3286350 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java @@ -161,7 +161,8 @@ public final class RestconfInvokeOperationsServiceImpl { final var mountPoint = context.getMountPoint(); if (mountPoint == null) { if (CreateDataChangeEventSubscription.QNAME.equals(rpcName)) { - future = Futures.immediateFuture(CreateStreamUtil.createDataChangeNotifiStream(input, schemaContext)); + future = Futures.immediateFuture(CreateStreamUtil.createDataChangeNotifiStream( + streamUtils.listenersBroker(), input, schemaContext)); } else if (SubscribeDeviceNotification.QNAME.equals(rpcName)) { final String baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString(); future = Futures.immediateFuture(CreateStreamUtil.createDeviceNotificationListener(baseUrl, input, diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java index 8b98bc4786..640b1fe3c4 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java @@ -8,6 +8,7 @@ package org.opendaylight.restconf.nb.rfc8040.rests.services.impl; import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.Objects.requireNonNull; import com.google.common.base.Splitter; import java.net.URI; @@ -30,7 +31,6 @@ import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsS import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker; -import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter; import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec; import org.opendaylight.yangtools.yang.common.ErrorTag; import org.opendaylight.yangtools.yang.common.ErrorType; @@ -47,7 +47,11 @@ abstract class SubscribeToStreamUtil { * Implementation of SubscribeToStreamUtil for Server-sent events. */ private static final class ServerSentEvents extends SubscribeToStreamUtil { - static final ServerSentEvents INSTANCE = new ServerSentEvents(); + static final ServerSentEvents INSTANCE = new ServerSentEvents(ListenersBroker.getInstance()); + + private ServerSentEvents(final ListenersBroker listenersBroker) { + super(listenersBroker); + } @Override public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) { @@ -61,7 +65,11 @@ abstract class SubscribeToStreamUtil { * Implementation of SubscribeToStreamUtil for Web sockets. */ private static final class WebSockets extends SubscribeToStreamUtil { - static final WebSockets INSTANCE = new WebSockets(); + static final WebSockets INSTANCE = new WebSockets(ListenersBroker.getInstance()); + + private WebSockets(final ListenersBroker listenersBroker) { + super(listenersBroker); + } @Override public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) { @@ -84,8 +92,10 @@ abstract class SubscribeToStreamUtil { private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class); private static final Splitter SLASH_SPLITTER = Splitter.on('/'); - SubscribeToStreamUtil() { - // Hidden on purpose + private final @NonNull ListenersBroker listenersBroker; + + SubscribeToStreamUtil(final ListenersBroker listenersBroker) { + this.listenersBroker = requireNonNull(listenersBroker); } static SubscribeToStreamUtil serverSentEvents() { @@ -96,6 +106,10 @@ abstract class SubscribeToStreamUtil { return WebSockets.INSTANCE; } + public final @NonNull ListenersBroker listenersBroker() { + return listenersBroker; + } + /** * Prepare URL from base name and stream name. * @@ -122,15 +136,13 @@ abstract class SubscribeToStreamUtil { throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); } - final NotificationListenerAdapter notificationListenerAdapter = ListenersBroker.getInstance() - .notificationListenerFor(streamName); + final var notificationListenerAdapter = listenersBroker.notificationListenerFor(streamName); if (notificationListenerAdapter == null) { throw new RestconfDocumentedException(String.format("Stream with name %s was not found.", streamName), ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT); } - final EffectiveModelContext schemaContext = handlersHolder.getDatabindProvider().currentContext() - .modelContext(); + final var schemaContext = handlersHolder.getDatabindProvider().currentContext().modelContext(); final URI uri = prepareUriByStreamName(uriInfo, streamName); notificationListenerAdapter.setQueryParams(notificationQueryParams); notificationListenerAdapter.listen(handlersHolder.getNotificationServiceHandler()); @@ -176,7 +188,7 @@ abstract class SubscribeToStreamUtil { } final String streamName = ListenersBroker.createStreamNameFromUri(identifier); - final ListenerAdapter listener = ListenersBroker.getInstance().dataChangeListenerFor(streamName); + final ListenerAdapter listener = listenersBroker.dataChangeListenerFor(streamName); if (listener == null) { throw new RestconfDocumentedException("No listener found for stream " + streamName, ErrorType.APPLICATION, ErrorTag.DATA_MISSING); diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/DeviceNotificationListenerAdaptor.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/DeviceNotificationListenerAdaptor.java index d804f12556..c7ede8729d 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/DeviceNotificationListenerAdaptor.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/DeviceNotificationListenerAdaptor.java @@ -33,6 +33,7 @@ public final class DeviceNotificationListenerAdaptor extends AbstractNotificatio private final @NonNull EffectiveModelContext effectiveModel; private final @NonNull DOMMountPointService mountPointService; private final @NonNull YangInstanceIdentifier instanceIdentifier; + private final @NonNull ListenersBroker listenersBroker = ListenersBroker.getInstance(); private ListenerRegistration reg; @@ -84,7 +85,7 @@ public final class DeviceNotificationListenerAdaptor extends AbstractNotificatio } } }); - ListenersBroker.getInstance().removeAndCloseDeviceNotificationListener(this); + listenersBroker.removeAndCloseDeviceNotificationListener(this); resetListenerRegistration(); } } diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtilTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtilTest.java index c7df03d584..153af880c9 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtilTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtilTest.java @@ -21,6 +21,7 @@ import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; import org.opendaylight.mdsal.dom.api.DOMRpcResult; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; +import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker; import org.opendaylight.yangtools.yang.common.ErrorTag; import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.QName; @@ -41,6 +42,8 @@ import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils; public class CreateStreamUtilTest { private static EffectiveModelContext SCHEMA_CTX; + private final ListenersBroker listenersBroker = ListenersBroker.getInstance(); + @BeforeClass public static void setUp() { SCHEMA_CTX = YangParserTestUtils.parseYangResourceDirectory("/streams"); @@ -48,7 +51,7 @@ public class CreateStreamUtilTest { @Test public void createStreamTest() { - final DOMRpcResult result = CreateStreamUtil.createDataChangeNotifiStream( + final DOMRpcResult result = CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, prepareDomPayload("create-data-change-event-subscription", RpcDefinition::getInput, "toaster", "path"), SCHEMA_CTX); assertEquals(List.of(), result.errors()); @@ -63,7 +66,7 @@ public class CreateStreamUtilTest { final var payload = prepareDomPayload("create-data-change-event-subscription", RpcDefinition::getInput, "String value", "path"); final var errors = assertThrows(RestconfDocumentedException.class, - () -> CreateStreamUtil.createDataChangeNotifiStream(payload, SCHEMA_CTX)).getErrors(); + () -> CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, payload, SCHEMA_CTX)).getErrors(); assertEquals(1, errors.size()); final var error = errors.get(0); assertEquals(ErrorType.APPLICATION, error.getErrorType()); @@ -76,7 +79,7 @@ public class CreateStreamUtilTest { final var payload = prepareDomPayload("create-data-change-event-subscription2", RpcDefinition::getInput, "toaster", "path2"); final var errors = assertThrows(RestconfDocumentedException.class, - () -> CreateStreamUtil.createDataChangeNotifiStream(payload, SCHEMA_CTX)).getErrors(); + () -> CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, payload, SCHEMA_CTX)).getErrors(); assertEquals(1, errors.size()); final var error = errors.get(0); assertEquals(ErrorType.APPLICATION, error.getErrorType()); diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImplTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImplTest.java index ea3df69c64..0579dddabd 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImplTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImplTest.java @@ -56,6 +56,8 @@ public class RestconfStreamsSubscriptionServiceImplTest { + "toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE"; private static EffectiveModelContext MODEL_CONTEXT; + // FIXME: NETCONF-1104: this should be non-static and set up for each test separately + private static ListenersBroker LISTENERS_BROKER; @Mock private DOMDataBroker dataBroker; @@ -72,6 +74,22 @@ public class RestconfStreamsSubscriptionServiceImplTest { @BeforeClass public static void beforeClass() { MODEL_CONTEXT = YangParserTestUtils.parseYangResourceDirectory("/notifications"); + + final String name = + "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE"; + final ListenerAdapter adapter = new ListenerAdapter(YangInstanceIdentifier.of( + QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")), + name, NotificationOutputType.JSON); + LISTENERS_BROKER = ListenersBroker.getInstance(); + LISTENERS_BROKER.setDataChangeListeners(Map.of(name, adapter)); + } + + @AfterClass + public static void afterClass() { + if (LISTENERS_BROKER != null) { + LISTENERS_BROKER.setDataChangeListeners(Map.of()); + LISTENERS_BROKER = null; + } } @Before @@ -96,24 +114,9 @@ public class RestconfStreamsSubscriptionServiceImplTest { configurationSse = new StreamsConfiguration(0, 100, 10, true); } - @BeforeClass - public static void setUpBeforeTest() { - final String name = - "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE"; - final ListenerAdapter adapter = new ListenerAdapter(YangInstanceIdentifier.of( - QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")), - name, NotificationOutputType.JSON); - ListenersBroker.getInstance().setDataChangeListeners(Map.of(name, adapter)); - } - - @AfterClass - public static void setUpAfterTest() { - ListenersBroker.getInstance().setDataChangeListeners(Map.of()); - } - @Test public void testSubscribeToStreamSSE() { - ListenersBroker.getInstance().registerDataChangeListener( + LISTENERS_BROKER.registerDataChangeListener( IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", NotificationOutputType.XML); @@ -129,7 +132,7 @@ public class RestconfStreamsSubscriptionServiceImplTest { @Test public void testSubscribeToStreamWS() { - ListenersBroker.getInstance().registerDataChangeListener( + LISTENERS_BROKER.registerDataChangeListener( IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", NotificationOutputType.XML); diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/JsonNotificationListenerTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/JsonNotificationListenerTest.java index 7d35822df3..e3d63a647c 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/JsonNotificationListenerTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/JsonNotificationListenerTest.java @@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory; public class JsonNotificationListenerTest extends AbstractNotificationListenerTest { private static final Logger LOG = LoggerFactory.getLogger(JsonNotificationListenerTest.class); + private final ListenersBroker listenersBroker = ListenersBroker.getInstance(); + @Test public void notifi_leafTest() throws Exception { final Absolute schemaPathNotifi = Absolute.of(QName.create(MODULE, "notifi-leaf")); @@ -159,10 +161,10 @@ public class JsonNotificationListenerTest extends AbstractNotificationListenerTe return ImmutableNodes.leafNode(leafQName, "value"); } - private static String prepareJson(final DOMNotification notificationData, final Absolute schemaPathNotifi) + private String prepareJson(final DOMNotification notificationData, final Absolute schemaPathNotifi) throws Exception { - final var notifiAdapter = ListenersBroker.getInstance().registerNotificationListener( - schemaPathNotifi, "json-stream", NotificationOutputType.JSON); + final var notifiAdapter = listenersBroker.registerNotificationListener(schemaPathNotifi, "json-stream", + NotificationOutputType.JSON); return notifiAdapter.formatter().eventData(SCHEMA_CONTEXT, notificationData, Instant.now()).orElseThrow(); } } diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/XmlNotificationListenerTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/XmlNotificationListenerTest.java index 1400b78344..e40282e2e4 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/XmlNotificationListenerTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/XmlNotificationListenerTest.java @@ -30,6 +30,8 @@ import org.xmlunit.assertj.XmlAssert; @RunWith(MockitoJUnitRunner.StrictStubs.class) public class XmlNotificationListenerTest extends AbstractNotificationListenerTest { + private final ListenersBroker listenersBroker = ListenersBroker.getInstance(); + @Test public void notifi_leafTest() throws Exception { final Absolute schemaPathNotifi = Absolute.of(QName.create(MODULE, "notifi-leaf")); @@ -166,10 +168,10 @@ public class XmlNotificationListenerTest extends AbstractNotificationListenerTes return ImmutableNodes.leafNode(leafQName, "value"); } - private static String prepareXmlResult(final DOMNotification notificationData, final Absolute schemaPathNotifi) + private String prepareXmlResult(final DOMNotification notificationData, final Absolute schemaPathNotifi) throws Exception { - final var notifiAdapter = ListenersBroker.getInstance().registerNotificationListener( - schemaPathNotifi, "xml-stream", NotificationOutputType.XML); + final var notifiAdapter = listenersBroker.registerNotificationListener(schemaPathNotifi, "xml-stream", + NotificationOutputType.XML); return notifiAdapter.formatter().eventData(SCHEMA_CONTEXT, notificationData, Instant.now()).orElseThrow(); } } -- 2.36.6