ListenersBroker requires DOMDataBroker 40/108840/6
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 3 Nov 2023 11:16:58 +0000 (12:16 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 3 Nov 2023 20:55:38 +0000 (21:55 +0100)
Managing the contents of the operation data store is the responsibility
of ListenersBroker, not individual streams. We therefore ListenersBroker
should have a reference to it directly and not have it passed through
from each stream.

JIRA: NETCONF-1102
Change-Id: I79523e139648eacb36def92f24253f79af81b021
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/JaxRsNorthbound.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractStream.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBroker.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImplTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenerAdapterTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBrokerTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java

index 058ac8449ed7dac32b14f17d4f1dc6aaa299d5cc..69a385a0e6037d286e250eef8ffe180eacd35087 100644 (file)
@@ -97,12 +97,12 @@ public final class JaxRsNorthbound implements AutoCloseable {
         final ListenersBroker listenersBroker;
         final HttpServlet streamServlet;
         if (streamsConfiguration.useSSE()) {
-            listenersBroker = new ListenersBroker.ServerSentEvents();
+            listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker);
             streamServlet = servletSupport.createHttpServletBuilder(
                 new ServerSentEventsApplication(scheduledThreadPool, listenersBroker, streamsConfiguration))
                 .build();
         } else {
-            listenersBroker = new ListenersBroker.WebSockets();
+            listenersBroker = new ListenersBroker.WebSockets(dataBroker);
             streamServlet = new WebSocketInitializer(scheduledThreadPool, listenersBroker, streamsConfiguration);
         }
 
index afa79dcae105c0fb5e8f449c09d600d380715302..69eb568f814da9595deafa461db8019521713bea 100644 (file)
@@ -20,7 +20,6 @@ import javax.xml.xpath.XPathExpressionException;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.checkerframework.checker.lock.qual.Holding;
 import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.dom.api.DOMDataBroker;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
@@ -52,7 +51,6 @@ abstract class AbstractStream<T> {
 
     // FIXME: these really should not live here
     protected DatabindProvider databindProvider;
-    private DOMDataBroker dataBroker;
 
     AbstractStream(final String streamName, final NotificationOutputType outputType,
             final EventFormatterFactory<T> formatterFactory, final ListenersBroker listenersBroker) {
@@ -115,7 +113,7 @@ abstract class AbstractStream<T> {
         LOG.debug("Subscriber {} is removed", subscriber);
         if (subscribers.isEmpty()) {
             closeRegistration();
-            listenersBroker.removeStream(dataBroker, this);
+            listenersBroker.removeStream(this);
         }
     }
 
@@ -132,7 +130,7 @@ abstract class AbstractStream<T> {
             it.remove();
         }
 
-        listenersBroker.removeStream(dataBroker, this);
+        listenersBroker.removeStream(this);
     }
 
     @Holding("this")
@@ -233,13 +231,11 @@ abstract class AbstractStream<T> {
     /**
      * Data broker for delete data in DS on close().
      *
-     * @param dataBroker creating new write transaction for delete data on close
      * @param databindProvider for formatting notifications
      */
     @SuppressWarnings("checkstyle:hiddenField")
     // FIXME: this is pure lifecycle nightmare just because ...
-    public synchronized void setCloseVars(final DOMDataBroker dataBroker, final DatabindProvider databindProvider) {
-        this.dataBroker = dataBroker;
+    public synchronized void setCloseVars(final DatabindProvider databindProvider) {
         this.databindProvider = databindProvider;
     }
 
index 1146ad7230d6a1a7076e3743f6566e42ecca32bb..6cb40860c0369517d40a7e11a3948f0cc86f7961 100644 (file)
@@ -71,6 +71,10 @@ public abstract sealed class ListenersBroker {
      * A ListenersBroker working with Server-Sent Events.
      */
     public static final class ServerSentEvents extends ListenersBroker {
+        public ServerSentEvents(final DOMDataBroker dataBroker) {
+            super(dataBroker);
+        }
+
         @Override
         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
             return uriInfo.getBaseUriBuilder()
@@ -83,6 +87,10 @@ public abstract sealed class ListenersBroker {
      * A ListenersBroker working with WebSockets.
      */
     public static final class WebSockets extends ListenersBroker {
+        public WebSockets(final DOMDataBroker dataBroker) {
+            super(dataBroker);
+        }
+
         @Override
         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
             final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
@@ -209,9 +217,10 @@ public abstract sealed class ListenersBroker {
         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
 
     private final ConcurrentMap<String, AbstractStream<?>> streams = new ConcurrentHashMap<>();
+    private final DOMDataBroker dataBroker;
 
-    private ListenersBroker() {
-        // Hidden on purpose
+    private ListenersBroker(final DOMDataBroker dataBroker) {
+        this.dataBroker = requireNonNull(dataBroker);
     }
 
     /**
@@ -252,7 +261,7 @@ public abstract sealed class ListenersBroker {
      *
      * @param stream Stream to remove
      */
-    final void removeStream(final DOMDataBroker dataBroker, final AbstractStream<?> stream) {
+    final void removeStream(final AbstractStream<?> stream) {
         // Defensive check to see if we are still tracking the stream
         final var streamName = stream.getStreamName();
         if (streams.get(streamName) != stream) {
@@ -393,7 +402,7 @@ public abstract sealed class ListenersBroker {
 //
 //        final var dataBroker = handlersHolder.dataBroker();
 //        final var schemaHandler = handlersHolder.databindProvider();
-//        listener.setCloseVars(dataBroker, schemaHandler);
+//        listener.setCloseVars(schemaHandler);
 //        listener.listen(dataBroker);
 //
 //        final var uri = prepareUriByStreamName(uriInfo, streamName);
@@ -480,7 +489,7 @@ public abstract sealed class ListenersBroker {
 //        notificationListenerAdapter.setQueryParams(notificationQueryParams);
 //        notificationListenerAdapter.listen(handlersHolder.notificationService());
 //        final DOMDataBroker dataBroker = handlersHolder.dataBroker();
-//        notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
+//        notificationListenerAdapter.setCloseVars(handlersHolder.databindProvider());
 //        final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
 //            notificationListenerAdapter.qnames(), notificationListenerAdapter.getOutputType(), uri);
 //
index 525cbd92680c1f28d26f03287a569419f17f238a..7990de85346167ea7f7ad7cddaa49647ae323b8e 100644 (file)
@@ -89,7 +89,7 @@ public class RestconfInvokeOperationsServiceImplTest {
     public void setup() {
         server = new MdsalRestconfServer(dataBroker, rpcService, mountPointService);
         invokeOperationsService = new RestconfInvokeOperationsServiceImpl(() -> CONTEXT, server, mountPointService,
-            new ListenersBroker.WebSockets());
+            new ListenersBroker.WebSockets(dataBroker));
     }
 
     @Test
index d4285de7b86a7277215a8f7d6fdc77eedd2fa70b..55fd2b54631ced20366e572ea15beb006375a08a 100644 (file)
@@ -148,10 +148,10 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     private static EffectiveModelContext SCHEMA_CONTEXT;
 
-    private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
     private DataBroker dataBroker;
     private DOMDataBroker domDataBroker;
     private DatabindProvider databindProvider;
+    private ListenersBroker listenersBroker;
 
     @BeforeClass
     public static void beforeClass() {
@@ -168,6 +168,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
         dataBroker = getDataBroker();
         domDataBroker = getDomBroker();
         databindProvider = () -> DatabindContext.ofModel(SCHEMA_CONTEXT);
+        listenersBroker = new ListenersBroker.ServerSentEvents(domDataBroker);
     }
 
     class ListenerAdapterTester extends ListenerAdapter {
@@ -235,15 +236,15 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     @Test
     public void testJsonNotifsLeaves() throws Exception {
-        ListenerAdapterTester adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
+        var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
             true, false, false, false, listenersBroker);
-        adapter.setCloseVars(domDataBroker, databindProvider);
+        adapter.setCloseVars(databindProvider);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
         changeService.registerDataTreeChangeListener(root, adapter);
 
-        WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+        var writeTransaction = dataBroker.newWriteOnlyTransaction();
         final var iid = InstanceIdentifier.create(PatchCont.class);
         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, iid, new PatchContBuilder()
             .addAugmentation(new PatchCont1Builder()
@@ -274,15 +275,15 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     @Test
     public void testJsonNotifsChangedLeaves() throws Exception {
-        ListenerAdapterTester adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
+        var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
                 false, false, true, false, listenersBroker);
-        adapter.setCloseVars(domDataBroker, databindProvider);
+        adapter.setCloseVars(databindProvider);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
         changeService.registerDataTreeChangeListener(root, adapter);
 
-        WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+        var writeTransaction = dataBroker.newWriteOnlyTransaction();
         final var iid = InstanceIdentifier.create(PatchCont.class);
         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, iid, new PatchContBuilder()
             .addAugmentation(new PatchCont1Builder()
@@ -323,7 +324,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
     public void testJsonChildNodesOnly() throws Exception {
         final var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey",
             NotificationOutputType.JSON, false, false, false, true, listenersBroker);
-        adapter.setCloseVars(domDataBroker, databindProvider);
+        adapter.setCloseVars(databindProvider);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
@@ -356,14 +357,14 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     @Test
     public void testXmlLeavesOnly() throws Exception {
-        ListenerAdapterTester adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
+        var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
             true, false, false, false, listenersBroker);
-        adapter.setCloseVars(domDataBroker, databindProvider);
+        adapter.setCloseVars(databindProvider);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
         changeService.registerDataTreeChangeListener(root, adapter);
-        WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+        var writeTransaction = dataBroker.newWriteOnlyTransaction();
         final var iid = InstanceIdentifier.create(PatchCont.class);
         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, iid, new PatchContBuilder()
             .addAugmentation(new PatchCont1Builder()
@@ -407,9 +408,9 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
 
     @Test
     public void testXmlChangedLeavesOnly() throws Exception {
-        ListenerAdapterTester adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
+        var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
                 false, false, true, false, listenersBroker);
-        adapter.setCloseVars(domDataBroker, databindProvider);
+        adapter.setCloseVars(databindProvider);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
@@ -468,7 +469,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
     public void testXmlChildNodesOnly() throws Exception {
         final var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey",
             NotificationOutputType.XML, false, false, false, true, listenersBroker);
-        adapter.setCloseVars(domDataBroker, databindProvider);
+        adapter.setCloseVars(databindProvider);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
@@ -573,14 +574,14 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
             final String jsonNotifCreate, final String jsonNotifUpdate, final String jsonNotifDelete) throws Exception {
         final var adapter = new ListenerAdapterTester(pathYiid, "Casey",
                 NotificationOutputType.JSON, false, skipData, false, false, listenersBroker);
-        adapter.setCloseVars(domDataBroker, databindProvider);
+        adapter.setCloseVars(databindProvider);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, pathYiid);
         changeService.registerDataTreeChangeListener(root, adapter);
 
-        WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
-        MyList1Builder builder = new MyList1Builder().setMyLeaf11("Jed").setName("Althea");
+        var writeTransaction = dataBroker.newWriteOnlyTransaction();
+        var builder = new MyList1Builder().setMyLeaf11("Jed").setName("Althea");
         final var iid = InstanceIdentifier.create(PatchCont.class)
                 .child(MyList1.class, new MyList1Key("Althea"));
         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, iid, builder.build());
@@ -603,14 +604,14 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
             final String xmlNotifCreate, final String xmlNotifUpdate, final String xmlNotifDelete) throws Exception {
         final var adapter = new ListenerAdapterTester(pathYiid, "Casey", NotificationOutputType.XML,
                 false, skipData, false, false, listenersBroker);
-        adapter.setCloseVars(domDataBroker, databindProvider);
+        adapter.setCloseVars(databindProvider);
 
         final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, pathYiid);
         changeService.registerDataTreeChangeListener(root, adapter);
 
-        WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
-        MyList1Builder builder = new MyList1Builder().setMyLeaf11("Jed").setName("Althea");
+        var writeTransaction = dataBroker.newWriteOnlyTransaction();
+        var builder = new MyList1Builder().setMyLeaf11("Jed").setName("Althea");
         final var iid = InstanceIdentifier.create(PatchCont.class)
                 .child(MyList1.class, new MyList1Key("Althea"));
         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, iid, builder.build());
index ae3459db2899c3adb6c343d438233df505db84e2..e1280d87e8c89f5f553bac4edf3dbb16060bf97b 100644 (file)
@@ -15,9 +15,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.UUID;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
@@ -39,7 +42,15 @@ import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
 class ListenersBrokerTest {
     private static final EffectiveModelContext SCHEMA_CTX = YangParserTestUtils.parseYangResourceDirectory("/streams");
 
-    private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
+    @Mock
+    private DOMDataBroker dataBroker;
+
+    private ListenersBroker listenersBroker;
+
+    @BeforeEach
+    public void before() {
+        listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker);
+    }
 
     @Test
     void createStreamTest() {
index 373bee8ce1e4ce39f0380d4530d7787362d10d19..b836839a21a1720f848aedabd2477e68205e6857 100644 (file)
@@ -22,26 +22,29 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 @ExtendWith(MockitoExtension.class)
 class WebSocketFactoryTest extends AbstractNotificationListenerTest {
-    private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
-
     @Mock
     private ScheduledExecutorService execService;
     @Mock
     private ServletUpgradeRequest upgradeRequest;
     @Mock
     private ServletUpgradeResponse upgradeResponse;
+    @Mock
+    private DOMDataBroker dataBroker;
 
+    private ListenersBroker listenersBroker;
     private WebSocketFactory webSocketFactory;
     private String streamName;
 
     @BeforeEach
     void prepareListenersBroker() {
+        listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker);
         webSocketFactory = new WebSocketFactory(execService, listenersBroker, 5000, 2000);
 
         streamName = listenersBroker.createStream(name -> new ListenerAdapter(name, NotificationOutputType.JSON,