Adapt ModifySubscriptionRpc 61/115761/27
authorSamuel Schneider <samuel.schneider@pantheon.tech>
Thu, 6 Mar 2025 16:19:58 +0000 (17:19 +0100)
committerIvan Hrasko <ivan.hrasko@pantheon.tech>
Tue, 1 Apr 2025 07:54:00 +0000 (07:54 +0000)
Adapt ModifySubscriptionRpc to changes in RestconfStream.
Add possibility to modify filter in RestconfStream.Subscription
and propagate this change in to operational datastore.

Now we provide correct response to invoke of this rpc,
in case of error correct error message, after filtering is finished
correctly update datastore, instead of "Not implemented yet".

JIRA: NETCONF-714
Change-Id: If6487621d5a2ff378be7f9f9089351df75a50ff4
Signed-off-by: Samuel Schneider <samuel.schneider@pantheon.tech>
apps/restconf-subscription/src/main/java/org/opendaylight/restconf/subscription/EstablishSubscriptionRpc.java
apps/restconf-subscription/src/main/java/org/opendaylight/restconf/subscription/ModifySubscriptionRpc.java
apps/restconf-subscription/src/main/java/org/opendaylight/restconf/subscription/SubscriptionUtil.java
plugins/restconf-server-mdsal/src/main/java/org/opendaylight/restconf/server/mdsal/MdsalRestconfStreamRegistry.java
plugins/restconf-server-mdsal/src/main/java/org/opendaylight/restconf/server/mdsal/MdsalRestconfStreamSubscription.java
protocol/restconf-server-spi/src/main/java/org/opendaylight/restconf/server/spi/AbstractRestconfStreamRegistry.java
protocol/restconf-server-spi/src/main/java/org/opendaylight/restconf/server/spi/RestconfStream.java

index 79cdc3527ca731a711101384832c56e8dcbe8743..13899f1b1f2eba9f81ab012715b93731eb8893ea 100644 (file)
@@ -14,25 +14,20 @@ import java.util.Set;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.netconf.databind.RequestException;
 import org.opendaylight.restconf.notifications.mdsal.SubscriptionStateService;
 import org.opendaylight.restconf.server.api.ServerRequest;
 import org.opendaylight.restconf.server.spi.OperationInput;
 import org.opendaylight.restconf.server.spi.RestconfStream;
-import org.opendaylight.restconf.server.spi.RestconfStream.SubscriptionFilter;
 import org.opendaylight.restconf.server.spi.RpcImplementation;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EncodingUnsupported;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscription;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionOutput;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.stream.filter.elements.FilterSpec;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.stream.filter.elements.filter.spec.stream.subtree.filter.StreamSubtreeFilter;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.AnydataNode;
 import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
@@ -47,8 +42,6 @@ import org.osgi.service.component.annotations.Reference;
 @Component(service = RpcImplementation.class)
 @NonNullByDefault
 public final class EstablishSubscriptionRpc extends RpcImplementation {
-    private static final NodeIdentifier SUBSCRIPTION_STREAM_FILTER_NAME =
-        NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stream-filter-name").intern());
     private static final NodeIdentifier SUBSCRIPTION_STREAM =
         NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stream").intern());
     private static final NodeIdentifier SUBSCRIPTION_TARGET =
@@ -121,7 +114,7 @@ public final class EstablishSubscriptionRpc extends RpcImplementation {
 
         // check stream filter
         final var streamFilter = (ChoiceNode) target.childByArg(SUBSCRIPTION_STREAM_FILTER);
-        final var filter = streamFilter == null ? null : extractFilter(streamFilter);
+        final var filter = streamFilter == null ? null : SubscriptionUtil.extractFilter(streamFilter);
 
         streamRegistry.establishSubscription(request.transform(subscription -> {
             final var id = subscription.id();
@@ -136,22 +129,4 @@ public final class EstablishSubscriptionRpc extends RpcImplementation {
                 .build();
         }), streamName, encoding, filter);
     }
-
-    private static @Nullable SubscriptionFilter extractFilter(final ChoiceNode streamFilter) {
-        final var filterName = leaf(streamFilter, SUBSCRIPTION_STREAM_FILTER_NAME, String.class);
-        if (filterName != null) {
-            return new SubscriptionFilter.Reference(filterName);
-        }
-        final var filterSpec = (ChoiceNode) streamFilter.childByArg(new NodeIdentifier(FilterSpec.QNAME));
-        if (filterSpec == null) {
-            return null;
-        }
-        final var subtree = (AnydataNode<?>) filterSpec.childByArg(new NodeIdentifier(StreamSubtreeFilter.QNAME));
-        if (subtree != null) {
-            return new SubscriptionFilter.SubtreeDefinition(subtree);
-        }
-        final var xpath = leaf(filterSpec, new NodeIdentifier(QName.create(FilterSpec.QNAME, "stream-xpath-filter")),
-            String.class);
-        return xpath != null ? new SubscriptionFilter.XPathDefinition(xpath) : null;
-    }
 }
index 167b86fc64504f8fa0ddf27db1c332900a7f387d..1b4b99329ee0c87d84494949e709a9bfdbe7904a 100644 (file)
@@ -10,8 +10,10 @@ package org.opendaylight.restconf.subscription;
 import static java.util.Objects.requireNonNull;
 
 import java.net.URI;
+import java.time.Instant;
 import javax.inject.Inject;
 import javax.inject.Singleton;
+import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.opendaylight.netconf.databind.RequestException;
 import org.opendaylight.restconf.notifications.mdsal.SubscriptionStateService;
 import org.opendaylight.restconf.server.api.ServerRequest;
@@ -20,15 +22,14 @@ import org.opendaylight.restconf.server.spi.RestconfStream;
 import org.opendaylight.restconf.server.spi.RpcImplementation;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.ModifySubscription;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.ModifySubscriptionInput;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.Subscription;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.ModifySubscriptionOutput;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.Uint32;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
 import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -41,11 +42,10 @@ import org.slf4j.LoggerFactory;
  */
 @Singleton
 @Component(service = RpcImplementation.class)
+@NonNullByDefault
 public final class ModifySubscriptionRpc extends RpcImplementation {
     private static final NodeIdentifier SUBSCRIPTION_ID =
         NodeIdentifier.create(QName.create(ModifySubscriptionInput.QNAME, "id").intern());
-    private static final NodeIdentifier SUBSCRIPTION_STREAM_FILTER_NAME =
-        NodeIdentifier.create(QName.create(ModifySubscriptionInput.QNAME, "stream-filter-name").intern());
     private static final NodeIdentifier SUBSCRIPTION_TARGET =
         NodeIdentifier.create(QName.create(ModifySubscriptionInput.QNAME, "target").intern());
     private static final NodeIdentifier SUBSCRIPTION_STREAM_FILTER =
@@ -72,7 +72,6 @@ public final class ModifySubscriptionRpc extends RpcImplementation {
     public void invoke(final ServerRequest<ContainerNode> request, final URI restconfURI, final OperationInput input) {
         final var body = input.input();
         final Uint32 id;
-        final String streamFilterName;
 
         try {
             id = leaf(body, SUBSCRIPTION_ID, Uint32.class);
@@ -104,79 +103,34 @@ public final class ModifySubscriptionRpc extends RpcImplementation {
             return;
         }
 
-        final var target = (DataContainerNode) body.childByArg(SUBSCRIPTION_TARGET);
-        final var nodeBuilder = ImmutableNodes.newMapEntryBuilder();
-        final var nodeTargetBuilder = ImmutableNodes.newChoiceBuilder().withNodeIdentifier(NodeIdentifier
-            .create(SubscriptionUtil.QNAME_TARGET));
-        final var nodeFilterBuilder = ImmutableNodes.newChoiceBuilder().withNodeIdentifier(NodeIdentifier
-            .create(QName.create(Subscription.QNAME, "stream-filter")));
+        final var target = (ChoiceNode) body.childByArg(SUBSCRIPTION_TARGET);
 
-        nodeBuilder.withNodeIdentifier(NodeIdentifierWithPredicates.of(Subscription.QNAME,
-            SubscriptionUtil.QNAME_ID, id));
-        nodeBuilder.withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_ID, id));
-        if (target != null) {
-            final var streamFilter = (DataContainerNode) target.childByArg(SUBSCRIPTION_STREAM_FILTER);
-            streamFilterName = leaf(streamFilter, SUBSCRIPTION_STREAM_FILTER_NAME, String.class);
-            //  TODO: parse anydata filter, rfc6241? https://www.rfc-editor.org/rfc/rfc8650#name-filter-example
-            //    {@link StreamSubtreeFilter}.
-            if (streamFilterName != null) {
-//                try {
-//                    if (!mdsalService.exist(SubscriptionUtil.FILTERS.node(NodeIdentifierWithPredicates.of(
-//                        StreamFilter.QNAME, SubscriptionUtil.QNAME_STREAM_FILTER_NAME, streamFilterName))).get()) {
-//                        request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
-//                            "%s refers to an unknown stream filter", streamFilterName));
-//                        return;
-//                    }
-//                } catch (InterruptedException | ExecutionException e) {
-//                    request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT, e));
-//                    return;
-//                }
-                nodeFilterBuilder.withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_STREAM_FILTER,
-                    streamFilterName));
-                nodeTargetBuilder.withChild(nodeFilterBuilder.build());
-                nodeBuilder.withChild(nodeTargetBuilder.build());
-            }
+        if (target == null) {
+            request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
+                "No filter specified"));
+            return;
+        }
+        final var streamFilter = (ChoiceNode) target.childByArg(SUBSCRIPTION_STREAM_FILTER);
+        final var filter = streamFilter == null ? null : SubscriptionUtil.extractFilter(streamFilter);
+        if (filter == null) {
+            request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
+                "No filter specified"));
+            return;
         }
-//        final var node = nodeBuilder.build();
 
-        request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.OPERATION_NOT_SUPPORTED,
-            "Not implemented yet"));
+        streamRegistry.modifySubscription(request.transform(subscription -> {
+            try {
+                // FIXME: pass correct filter once we extract if from input
+                subscriptionStateService.subscriptionModified(Instant.now(), id, subscription.streamName(),
+                    subscription.encoding(), null, null, null);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IllegalStateException("Could not send subscription modify notification", e);
+            }
 
-// FIXME: reconcile
-//        mdsalService.mergeSubscription(SubscriptionUtil.SUBSCRIPTIONS.node(node.name()), node)
-//            .addCallback(new FutureCallback<CommitInfo>() {
-//                @Override
-//                public void onSuccess(final CommitInfo result) {
-//                    request.completeWith(ImmutableNodes.newContainerBuilder()
-//                        .withNodeIdentifier(NodeIdentifier.create(ModifySubscriptionOutput.QNAME))
-//                        .build());
-//                    try {
-//                        final var subscription = mdsalService.read(SubscriptionUtil.SUBSCRIPTIONS.node(node.name()))
-//                            .get();
-//                        if (subscription.isEmpty()) {
-//                            LOG.warn("Could not send subscription modify notification: could not read stream name");
-//                            return;
-//                        }
-//                        final var target = (DataContainerNode) ((DataContainerNode) subscription.orElseThrow())
-//                            .childByArg(NodeIdentifier.create(SubscriptionUtil.QNAME_TARGET));
-//                        final var streamName = leaf(target, NodeIdentifier.create(SubscriptionUtil.QNAME_STREAM),
-//                            String.class);
-//                        final var encoding = leaf((DataContainerNode) subscription.orElseThrow(),
-//                            NodeIdentifier.create(SubscriptionUtil.QNAME_ENCODING), QName.class);
-//                        // TODO: pass correct filter once we extract if from input
-//                        subscriptionStateService.subscriptionModified(Instant.now(), id, streamName, encoding, null,
-//                            stopTime, null);
-//                    } catch (InterruptedException | ExecutionException e) {
-//                        LOG.warn("Could not send subscription modify notification", e);
-//                    }
-//                }
-//
-//                @Override
-//                public void onFailure(final Throwable throwable) {
-//                    request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED,
-//                        // FIXME: why getCause()?
-//                        throwable.getCause()));
-//                }
-//            }, MoreExecutors.directExecutor());
+            return ImmutableNodes.newContainerBuilder()
+                .withNodeIdentifier(NodeIdentifier.create(ModifySubscriptionOutput.QNAME))
+                .build();
+        }), id, filter);
     }
 }
index f7739947e718db59d46f9584f24c8bc0af7e1546..7dbd828b988aa44eb80593a176120fef3c0fbcdb 100644 (file)
@@ -7,26 +7,35 @@
  */
 package org.opendaylight.restconf.subscription;
 
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.restconf.server.spi.RestconfStream.SubscriptionFilter;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.Filters;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.Streams;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.Subscriptions;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.filters.StreamFilter;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.stream.filter.elements.FilterSpec;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.stream.filter.elements.filter.spec.stream.subtree.filter.StreamSubtreeFilter;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.streams.Stream;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.Subscription;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.subscription.receivers.Receiver;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.AnydataNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 
 public final class SubscriptionUtil {
     public static final YangInstanceIdentifier SUBSCRIPTIONS = YangInstanceIdentifier.of(
-        YangInstanceIdentifier.NodeIdentifier.create(Subscriptions.QNAME),
-        YangInstanceIdentifier.NodeIdentifier.create(Subscription.QNAME));
+        NodeIdentifier.create(Subscriptions.QNAME),
+        NodeIdentifier.create(Subscription.QNAME));
     public static final YangInstanceIdentifier STREAMS = YangInstanceIdentifier.of(
-        YangInstanceIdentifier.NodeIdentifier.create(Streams.QNAME),
-        YangInstanceIdentifier.NodeIdentifier.create(Stream.QNAME));
+        NodeIdentifier.create(Streams.QNAME),
+        NodeIdentifier.create(Stream.QNAME));
     public static final YangInstanceIdentifier FILTERS = YangInstanceIdentifier.of(
-        YangInstanceIdentifier.NodeIdentifier.create(Filters.QNAME),
-        YangInstanceIdentifier.NodeIdentifier.create(StreamFilter.QNAME));
+        NodeIdentifier.create(Filters.QNAME),
+        NodeIdentifier.create(StreamFilter.QNAME));
     public static final QName QNAME_ID = QName.create(Subscription.QNAME, "id");
     public static final QName QNAME_STREAM = QName.create(Subscription.QNAME, "stream");
     public static final QName QNAME_STREAM_FILTER = QName.create(Subscription.QNAME, "stream-filter-name");
@@ -39,8 +48,35 @@ public final class SubscriptionUtil {
     public static final QName QNAME_SENT_EVENT_RECORDS = QName.create(Receiver.QNAME, "sent-event-records");
     public static final QName QNAME_EXCLUDED_EVENT_RECORDS = QName
         .create(Receiver.QNAME, "excluded-event-records");
+    private static final NodeIdentifier SUBSCRIPTION_STREAM_FILTER_NAME =
+        NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stream-filter-name").intern());
 
     private SubscriptionUtil() {
         // hidden on purpose
     }
+
+    static @Nullable SubscriptionFilter extractFilter(final ChoiceNode streamFilter) {
+        if (streamFilter.childByArg(SUBSCRIPTION_STREAM_FILTER_NAME) instanceof LeafNode<?> leafNode) {
+            if (leafNode.body() instanceof String filterName) {
+                return new SubscriptionFilter.Reference(filterName);
+            }
+            throw new IllegalArgumentException("Bad child " + leafNode.prettyTree());
+        }
+        final var filterSpec = (ChoiceNode) streamFilter.childByArg(new NodeIdentifier(FilterSpec.QNAME));
+        if (filterSpec == null) {
+            return null;
+        }
+        final var subtree = (AnydataNode<?>) filterSpec.childByArg(new NodeIdentifier(StreamSubtreeFilter.QNAME));
+        if (subtree != null) {
+            return new SubscriptionFilter.SubtreeDefinition(subtree);
+        }
+        if (filterSpec.childByArg(new NodeIdentifier(QName.create(FilterSpec.QNAME, "stream-xpath-filter")))
+            instanceof LeafNode<?> leafNode) {
+            if (leafNode.body() instanceof String xpath) {
+                return new SubscriptionFilter.XPathDefinition(xpath);
+            }
+            throw new IllegalArgumentException("Bad child " + leafNode.prettyTree());
+        }
+        return null;
+    }
 }
index 75850eede0d5d8260f4602b5b05a9b083c4e7884..7c35aeaf5986810ad03af42c7a6f6b20c1fb5aad 100644 (file)
@@ -26,14 +26,19 @@ import org.opendaylight.restconf.server.spi.ReceiverHolder;
 import org.opendaylight.restconf.server.spi.RestconfStream;
 import org.opendaylight.restconf.subscription.SubscriptionUtil;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.Subscriptions;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.stream.filter.elements.FilterSpec;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.stream.filter.elements.filter.spec.stream.subtree.filter.StreamSubtreeFilter;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.modifiable.target.stream.StreamFilter;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.Subscription;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.subscription.Receivers;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.subscription.receivers.Receiver;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.Uint32;
 import org.opendaylight.yangtools.yang.common.Uint64;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
 import org.osgi.service.component.annotations.Activate;
@@ -159,4 +164,44 @@ public final class MdsalRestconfStreamRegistry extends AbstractRestconfStreamReg
             return new MdsalRestconfStreamSubscription<>(subscription, dataBroker);
         }, MoreExecutors.directExecutor());
     }
+
+    @Override
+    protected ListenableFuture<RestconfStream.Subscription> modifySubscriptionFilter(
+            final RestconfStream.Subscription subscription, final RestconfStream.SubscriptionFilter filter) {
+        final var id = subscription.id();
+
+        final DataContainerChild filterNode = switch (filter) {
+            case RestconfStream.SubscriptionFilter.Reference(var filterName) ->
+                ImmutableNodes.leafNode(SubscriptionUtil.QNAME_STREAM_FILTER, filterName);
+            case RestconfStream.SubscriptionFilter.SubtreeDefinition(var anydata) ->
+                ImmutableNodes.newChoiceBuilder()
+                    .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(FilterSpec.QNAME))
+                    .withChild(ImmutableNodes.leafNode(StreamSubtreeFilter.QNAME, anydata))
+                    .build();
+            case RestconfStream.SubscriptionFilter.XPathDefinition(final var xpath) ->
+                ImmutableNodes.newChoiceBuilder()
+                    .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(FilterSpec.QNAME))
+                    .withChild(ImmutableNodes.leafNode(QName.create(FilterSpec.QNAME, "stream-xpath-filter"), xpath))
+                    .build();
+        };
+
+        final var tx = dataBroker.newWriteOnlyTransaction();
+        final var nodeId = NodeIdentifierWithPredicates.of(Subscription.QNAME, SubscriptionUtil.QNAME_ID, id);
+        tx.merge(LogicalDatastoreType.OPERATIONAL, SubscriptionUtil.SUBSCRIPTIONS.node(nodeId),
+            ImmutableNodes.newMapEntryBuilder()
+                .withNodeIdentifier(nodeId)
+                .withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_ID, id))
+                .withChild(ImmutableNodes.newChoiceBuilder()
+                    .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SubscriptionUtil.QNAME_TARGET))
+                    .withChild(ImmutableNodes.newChoiceBuilder()
+                        .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(StreamFilter.QNAME))
+                        .withChild(filterNode)
+                        .build())
+                    .build())
+                .build());
+        return tx.commit().transform(info -> {
+            LOG.debug("Modified subscription {} to operational datastore as of {}", id, info);
+            return new MdsalRestconfStreamSubscription<>(subscription, dataBroker);
+        }, MoreExecutors.directExecutor());
+    }
 }
index 8e012df6343988f918d639a8f386c2674b4c268b..1d13405d725ceb042b955b8138f976135dbcfea0 100644 (file)
@@ -39,7 +39,7 @@ final class MdsalRestconfStreamSubscription<T extends RestconfStream.Subscriptio
     }
 
     @Override
-    protected void terminateImpl(final ServerRequest<Empty> request,final QName terminationReason) {
+    protected void terminateImpl(final ServerRequest<Empty> request, final QName terminationReason) {
         final var id = id();
         LOG.debug("{} terminated with reason {}", id, terminationReason);
 
index 8c9c3276d8a6ea753f3e40e0875fda2fc037f9d5..6f0eea1b11bb4376e1dd6834bd5916dc8d511344 100644 (file)
@@ -238,9 +238,47 @@ public abstract class AbstractRestconfStreamRegistry implements RestconfStream.R
         }, MoreExecutors.directExecutor());
     }
 
+    @Override
+    public void modifySubscription(final ServerRequest<Subscription> request, final Uint32 id,
+            final SubscriptionFilter filter) {
+        final var oldSubscription = lookupSubscription(id);
+        if (oldSubscription == null) {
+            request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
+                "There is no subscription with given ID."));
+            return;
+        }
+
+        final EventStreamFilter filterImpl;
+        try {
+            filterImpl = resolveFilter(filter);
+        } catch (RequestException e) {
+            request.completeWith(e);
+            return;
+        }
+        final var newSubscription = new SubscriptionImpl(id, oldSubscription.encoding(), oldSubscription.streamName(),
+            oldSubscription.receiverName(), filterImpl);
+
+        Futures.addCallback(modifySubscriptionFilter(newSubscription, filter), new FutureCallback<>() {
+            @Override
+            public void onSuccess(final Subscription result) {
+                subscriptions.put(id, result);
+                request.completeWith(result);
+            }
+
+            @Override
+            public void onFailure(final Throwable cause) {
+                request.completeWith(new RequestException(cause));
+            }
+        }, MoreExecutors.directExecutor());
+    }
+
     @NonNullByDefault
     protected abstract ListenableFuture<Subscription> createSubscription(Subscription subscription);
 
+    @NonNullByDefault
+    protected abstract ListenableFuture<Subscription> modifySubscriptionFilter(Subscription subscription,
+        SubscriptionFilter filter);
+
     private @Nullable EventStreamFilter resolveFilter(final @Nullable SubscriptionFilter filter)
             throws RequestException {
         return switch (filter) {
index 10ce4c4358f95a15b3200b6c1bb7dcf385a620d5..83af1f70480a4b2a50f741abceeb3e329130edfa 100644 (file)
@@ -218,6 +218,17 @@ public final class RestconfStream<T> {
         void establishSubscription(ServerRequest<Subscription> request, String streamName, QName encoding,
             @Nullable SubscriptionFilter filter);
 
+        /**
+         * Modify existing RFC8639 subscription to a stream.
+         *
+         * @param request {@link ServerRequest} for this invocation
+         * @param id of subscription
+         * @param filter new filter
+         * @throws NullPointerException if {@code request}, {@code id} or {@code filter} is {@code null}
+         */
+        @NonNullByDefault
+        void modifySubscription(ServerRequest<Subscription> request, Uint32 id, SubscriptionFilter filter);
+
         /**
          * Lookup an existing subscription.
          *