Improve stream existence check 89/115289/5
authorYaroslav Lastivka <yaroslav.lastivka@pantheon.tech>
Tue, 11 Feb 2025 09:10:04 +0000 (11:10 +0200)
committerYaroslav Lastivka <yaroslav.lastivka@pantheon.tech>
Thu, 20 Feb 2025 12:58:55 +0000 (14:58 +0200)
Replace use of MdsalNotificationService#exist with
RestconfStreamRegistry#lookupStream which works faster
as it cheks the registered stream in a Map.

JIRA: NETCONF-1437
Change-Id: I2d71de7c06fba5623b890f90002d33926e92047c
Signed-off-by: Yaroslav Lastivka <yaroslav.lastivka@pantheon.tech>
apps/restconf-subscription/src/main/java/org/opendaylight/restconf/subscription/EstablishSubscriptionRpc.java
apps/restconf-subscription/src/main/java/org/opendaylight/restconf/subscription/ModifySubscriptionRpc.java
apps/restconf-subscription/src/test/java/org/opendaylight/restconf/subscription/EstablishSubscriptionRpcTest.java
apps/restconf-subscription/src/test/java/org/opendaylight/restconf/subscription/ModifySubscriptionRpcTest.java

index fb925ca94e7c1284c664dc42b5da808cb50a0abc..a7a674c1fd1b0c033954e4aabd9826424a706ee3 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.restconf.notifications.mdsal.SubscriptionStateService;
 import org.opendaylight.restconf.server.api.ServerException;
 import org.opendaylight.restconf.server.api.ServerRequest;
 import org.opendaylight.restconf.server.spi.OperationInput;
+import org.opendaylight.restconf.server.spi.RestconfStream;
 import org.opendaylight.restconf.server.spi.RpcImplementation;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.Encoding;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscription;
@@ -29,7 +30,6 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionOutput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.SubscriptionId;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.filters.StreamFilter;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.streams.Stream;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.dynamic.Stream1Builder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.modifiable.target.StreamBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.modifiable.target.stream.stream.filter.ByReferenceBuilder;
@@ -87,16 +87,19 @@ public final class EstablishSubscriptionRpc extends RpcImplementation {
     private final MdsalNotificationService mdsalService;
     private final SubscriptionStateService subscriptionStateService;
     private final SubscriptionStateMachine stateMachine;
+    private final RestconfStream.Registry streamRegistry;
 
     @Inject
     @Activate
     public EstablishSubscriptionRpc(@Reference final MdsalNotificationService mdsalService,
             @Reference final SubscriptionStateService subscriptionStateService,
-            @Reference final SubscriptionStateMachine stateMachine) {
+            @Reference final SubscriptionStateMachine stateMachine,
+            @Reference final RestconfStream.Registry streamRegistry) {
         super(EstablishSubscription.QNAME);
         this.mdsalService = requireNonNull(mdsalService);
         this.subscriptionStateService = requireNonNull(subscriptionStateService);
         this.stateMachine = requireNonNull(stateMachine);
+        this.streamRegistry = requireNonNull(streamRegistry);
     }
 
     @Override
@@ -139,17 +142,12 @@ public final class EstablishSubscriptionRpc extends RpcImplementation {
                 "No stream specified"));
             return;
         }
-        try {
-            if (!mdsalService.exist(SubscriptionUtil.STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME,
-                    SubscriptionUtil.QNAME_STREAM_NAME, streamName))).get()) {
-                request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
-                    "%s refers to an unknown stream", streamName));
-                return;
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT, e));
+        if (streamRegistry.lookupStream(streamName) == null) {
+            request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
+                "%s refers to an unknown stream", streamName));
             return;
         }
+
         final var stream1Builder = new Stream1Builder();
         stream1Builder.setStream(streamName);
         streamBuilder.addAugmentation(stream1Builder.build());
index 539e46d17e8995ac8d1075946b01869c62a4536b..4d6062c0caf6e8d9966cce258b2947a4782f1880 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.restconf.notifications.mdsal.SubscriptionStateService;
 import org.opendaylight.restconf.server.api.ServerException;
 import org.opendaylight.restconf.server.api.ServerRequest;
 import org.opendaylight.restconf.server.spi.OperationInput;
+import org.opendaylight.restconf.server.spi.RestconfStream;
 import org.opendaylight.restconf.server.spi.RpcImplementation;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.ModifySubscription;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.ModifySubscriptionInput;
@@ -66,16 +67,19 @@ public final class ModifySubscriptionRpc extends RpcImplementation {
     private final MdsalNotificationService mdsalService;
     private final SubscriptionStateService subscriptionStateService;
     private final SubscriptionStateMachine stateMachine;
+    private final RestconfStream.Registry streamRegistry;
 
     @Inject
     @Activate
     public ModifySubscriptionRpc(@Reference final MdsalNotificationService mdsalService,
             @Reference final SubscriptionStateService subscriptionStateService,
-            @Reference final SubscriptionStateMachine stateMachine) {
+            @Reference final SubscriptionStateMachine stateMachine,
+            @Reference RestconfStream.Registry streamRegistry) {
         super(ModifySubscription.QNAME);
         this.mdsalService = requireNonNull(mdsalService);
         this.subscriptionStateService = requireNonNull(subscriptionStateService);
         this.stateMachine = requireNonNull(stateMachine);
+        this.streamRegistry = requireNonNull(streamRegistry);
     }
 
     @Override
@@ -168,18 +172,15 @@ public final class ModifySubscriptionRpc extends RpcImplementation {
                         .withNodeIdentifier(NodeIdentifier.create(ModifySubscriptionOutput.QNAME))
                         .build());
                     try {
-                        final var subscription = mdsalService.read(SubscriptionUtil.SUBSCRIPTIONS.node(node.name()))
-                            .get();
-                        if (subscription.isEmpty()) {
+                        final var streamName = leaf(target, NodeIdentifier.create(SubscriptionUtil.QNAME_STREAM),
+                            String.class);
+                        if (streamRegistry.lookupStream(streamName) == null) {
                             LOG.warn("Could not send subscription modify notification: could not read stream name");
                             return;
                         }
-                        final var target = (DataContainerNode) ((DataContainerNode) subscription.orElseThrow())
-                            .childByArg(NodeIdentifier.create(SubscriptionUtil.QNAME_TARGET));
-                        final var streamName = leaf(target, NodeIdentifier.create(SubscriptionUtil.QNAME_STREAM),
-                            String.class);
+
                         subscriptionStateService.subscriptionModified(Instant.now(), id, streamName, "uri", null);
-                    } catch (InterruptedException | ExecutionException e) {
+                    } catch (InterruptedException e) {
                         LOG.warn("Could not send subscription modify notification", e);
                     }
                 }
index e18ca5acca36ccae8081ea88df5acd66ad404ebe..c22a67087886a324a55819630139129649ec67cb 100644 (file)
@@ -32,10 +32,10 @@ import org.opendaylight.restconf.server.api.ServerException;
 import org.opendaylight.restconf.server.api.TransportSession;
 import org.opendaylight.restconf.server.api.testlib.CompletingServerRequest;
 import org.opendaylight.restconf.server.spi.OperationInput;
+import org.opendaylight.restconf.server.spi.RestconfStream;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionOutput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.filters.StreamFilter;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.streams.Stream;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.Subscription;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.subscription.receivers.Receiver;
 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
@@ -67,6 +67,10 @@ class EstablishSubscriptionRpcTest {
     private TransportSession session;
     @Mock
     private SubscriptionStateMachine stateMachine;
+    @Mock
+    private RestconfStream.Registry streamRegistry;
+    @Mock
+    private RestconfStream restconfStream;
     @Captor
     private ArgumentCaptor<ServerException> response;
 
@@ -75,7 +79,7 @@ class EstablishSubscriptionRpcTest {
     @BeforeEach
     void before() {
         final var mdsalService = new MdsalNotificationService(dataBroker);
-        rpc = new EstablishSubscriptionRpc(mdsalService, subscriptionStateService, stateMachine);
+        rpc = new EstablishSubscriptionRpc(mdsalService, subscriptionStateService, stateMachine, streamRegistry);
     }
 
     @Test
@@ -111,10 +115,7 @@ class EstablishSubscriptionRpcTest {
             .build();
 
         doReturn(writeTx).when(dataBroker).newWriteOnlyTransaction();
-        doReturn(readTx).when(dataBroker).newReadOnlyTransaction();
-        doReturn(FluentFutures.immediateTrueFluentFuture()).when(readTx).exists(LogicalDatastoreType.OPERATIONAL,
-            SubscriptionUtil.STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME,
-            SubscriptionUtil.QNAME_STREAM_NAME, "NETCONF")));
+        doReturn(restconfStream).when(streamRegistry).lookupStream("NETCONF");
         doReturn(CommitInfo.emptyFluentFuture()).when(writeTx).commit();
         doReturn(session).when(request).session();
 
@@ -127,10 +128,7 @@ class EstablishSubscriptionRpcTest {
 
     @Test
     void establishSubscriptionWrongStreamTest() {
-        doReturn(readTx).when(dataBroker).newReadOnlyTransaction();
-        doReturn(FluentFutures.immediateFalseFluentFuture()).when(readTx).exists(LogicalDatastoreType.OPERATIONAL,
-            SubscriptionUtil.STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME,
-                SubscriptionUtil.QNAME_STREAM_NAME, "NETCONF")));
+        doReturn(null).when(streamRegistry).lookupStream("NETCONF");
         doReturn(session).when(request).session();
 
         rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, getInput()));
@@ -164,9 +162,7 @@ class EstablishSubscriptionRpcTest {
                 .build())
             .build();
         doReturn(readTx).when(dataBroker).newReadOnlyTransaction();
-        doReturn(FluentFutures.immediateTrueFluentFuture()).when(readTx).exists(LogicalDatastoreType.OPERATIONAL,
-            SubscriptionUtil.STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME,
-                SubscriptionUtil.QNAME_STREAM_NAME, "NETCONF")));
+        doReturn(restconfStream).when(streamRegistry).lookupStream("NETCONF");
         doReturn(FluentFutures.immediateFalseFluentFuture()).when(readTx).exists(LogicalDatastoreType.OPERATIONAL,
             SubscriptionUtil.FILTERS.node(NodeIdentifierWithPredicates.of(StreamFilter.QNAME,
                 SubscriptionUtil.QNAME_STREAM_FILTER_NAME, "filter")));
index 0604ab32d93ba434c930e83db470cfefc1325651..f60442ec0132d011d0a2397424496dc4cd949e39 100644 (file)
@@ -31,6 +31,7 @@ import org.opendaylight.restconf.server.api.ServerException;
 import org.opendaylight.restconf.server.api.TransportSession;
 import org.opendaylight.restconf.server.api.testlib.CompletingServerRequest;
 import org.opendaylight.restconf.server.spi.OperationInput;
+import org.opendaylight.restconf.server.spi.RestconfStream;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.ModifySubscriptionOutput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.Subscription;
@@ -62,6 +63,8 @@ class ModifySubscriptionRpcTest {
     private TransportSession session;
     @Mock
     private SubscriptionStateMachine stateMachine;
+    @Mock
+    private RestconfStream.Registry streamRegistry;
     @Captor
     private ArgumentCaptor<ServerException> responseCaptor;
 
@@ -71,7 +74,7 @@ class ModifySubscriptionRpcTest {
     @BeforeEach
     void before() {
         mdsalService = new MdsalNotificationService(dataBroker);
-        rpc = new ModifySubscriptionRpc(mdsalService, subscriptionStateService, stateMachine);
+        rpc = new ModifySubscriptionRpc(mdsalService, subscriptionStateService, stateMachine, streamRegistry);
     }
 
     @Test