Split out SubscriptionControl 92/116492/8
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 7 May 2025 23:25:55 +0000 (01:25 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 9 May 2025 08:37:56 +0000 (10:37 +0200)
Wrapping and replacing subscriptions during modification incurs churn
which makes the contents of subscription unstable. It also makes for a
less-than-pleasant reasoning experience.

Split out SubscriptionControl, which handles the MD-SAL-specific part of
a subscription, making things much more obvious.

JIRA: NETCONF-1449
Change-Id: Ib95ee26f367051dd355b79a67d9662a8fe9c2a18
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
apps/restconf-subscription/src/main/java/org/opendaylight/restconf/subscription/DeleteSubscriptionRpc.java
apps/restconf-subscription/src/main/java/org/opendaylight/restconf/subscription/KillSubscriptionRpc.java
apps/sal-remote-impl/src/test/java/org/opendaylight/netconf/sal/remote/impl/CreateDataChangeEventSubscriptionRpcTest.java
plugins/restconf-server-mdsal/src/main/java/org/opendaylight/restconf/server/mdsal/MdsalRestconfStreamRegistry.java
plugins/restconf-server-mdsal/src/main/java/org/opendaylight/restconf/server/mdsal/MdsalRestconfStreamSubscription.java [deleted file]
plugins/restconf-server-mdsal/src/main/java/org/opendaylight/restconf/server/mdsal/MdsalSubscriptionControl.java [new file with mode: 0644]
protocol/restconf-server-spi/src/main/java/org/opendaylight/restconf/server/spi/AbstractRestconfStreamRegistry.java
protocol/restconf-server-spi/src/main/java/org/opendaylight/restconf/server/spi/AbstractRestconfStreamSubscription.java
protocol/restconf-server-spi/src/main/java/org/opendaylight/restconf/server/spi/ForwardingRestconfStreamSubscription.java [deleted file]
protocol/restconf-server-spi/src/main/java/org/opendaylight/restconf/server/spi/RestconfStream.java

index ce087d709d59700d56b5eb526fd31f9a309721a8..e23712811734885cca80e2246117816e5d5c4cea 100644 (file)
@@ -83,20 +83,20 @@ public final class DeleteSubscriptionRpc extends RpcImplementation {
                 "No subscription with given ID."));
             return;
         }
+
+        // FIXME: DynamicSubscription.delete()
         final var state = subscription.state();
         if (state != SubscriptionState.ACTIVE && state != SubscriptionState.SUSPENDED) {
             request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
                 "There is no active or suspended subscription with given ID."));
             return;
         }
-
         if (subscription.session() != request.session()) {
             request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
                 "Subscription with given id does not exist on this session"));
             return;
         }
-
-        streamRegistry.updateSubscriptionState(subscription, SubscriptionState.END);
+        subscription.setState(SubscriptionState.END);
         subscription.terminate(request.transform(unused -> {
             try {
                 subscriptionStateService.subscriptionTerminated(Instant.now(), id, NoSuchSubscription.QNAME);
index af748b59181e34da666c6daaad1fb930f5d2155c..8314a755c86dde0beac6e5b981cc617cfaafbbe7 100644 (file)
@@ -85,14 +85,15 @@ public final class KillSubscriptionRpc extends RpcImplementation {
                 "No subscription with given ID."));
             return;
         }
+
+        // FIXME: DynamicSubscription.kill()
         final var state = subscription.state();
         if (state != SubscriptionState.ACTIVE && state != SubscriptionState.SUSPENDED) {
             request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
                 "There is no active or suspended subscription with given ID."));
             return;
         }
-
-        streamRegistry.updateSubscriptionState(subscription, SubscriptionState.END);
+        subscription.setState(SubscriptionState.END);
         subscription.terminate(request.transform(unused -> {
             try {
                 subscriptionStateService.subscriptionTerminated(Instant.now(), id, NoSuchSubscription.QNAME);
index 1a641ca41b8a4121976d40542195c8d44babecab..6b3e8059a27bd3493965cc2658724c180c902ea1 100644 (file)
@@ -43,13 +43,13 @@ import org.opendaylight.restconf.server.spi.OperationInput;
 import org.opendaylight.restconf.server.spi.ReceiverHolder;
 import org.opendaylight.restconf.server.spi.ReceiverHolder.RecordType;
 import org.opendaylight.restconf.server.spi.RestconfStream;
-import org.opendaylight.restconf.server.spi.RestconfStream.Subscription;
 import org.opendaylight.restconf.server.spi.RestconfStream.SubscriptionFilter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.Uint32;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.AnydataNode;
@@ -82,12 +82,13 @@ class CreateDataChangeEventSubscriptionRpcTest {
         }
 
         @Override
-        protected ListenableFuture<Subscription> createSubscription(final Subscription subscription) {
+        protected ListenableFuture<SubscriptionControl> createSubscription(final Uint32 subscriptionId,
+                final String streamName, final QName encoding, final String receiverName) {
             throw new UnsupportedOperationException();
         }
 
         @Override
-        protected ListenableFuture<Subscription> modifySubscriptionFilter(final Subscription subscription,
+        protected ListenableFuture<Void> modifySubscriptionFilter(final Uint32 subscriptionId,
                 final SubscriptionFilter filter) {
             throw new UnsupportedOperationException();
         }
index b44215a8b9c69dba01000d7412cfd9299b313e64..8bc93eeeb2dd3aa023d86f4c15961da224e9a753 100644 (file)
@@ -250,10 +250,8 @@ public final class MdsalRestconfStreamRegistry extends AbstractRestconfStreamReg
     }
 
     @Override
-    protected ListenableFuture<RestconfStream.Subscription> createSubscription(
-            final RestconfStream.Subscription subscription) {
-        final var id = subscription.id();
-        final var receiverName = subscription.receiverName();
+    protected ListenableFuture<SubscriptionControl> createSubscription(final Uint32 id, final String streamName,
+            final QName encoding, final String receiverName) {
         final var pathArg = subscriptionArg(id);
 
         final var tx = dataBroker.newWriteOnlyTransaction();
@@ -261,10 +259,10 @@ public final class MdsalRestconfStreamRegistry extends AbstractRestconfStreamReg
             ImmutableNodes.newMapEntryBuilder()
                 .withNodeIdentifier(pathArg)
                 .withChild(ImmutableNodes.leafNode(ID_NODEID, id))
-                .withChild(ImmutableNodes.leafNode(ENCODING_NODEID, subscription.encoding()))
+                .withChild(ImmutableNodes.leafNode(ENCODING_NODEID, encoding))
                 .withChild(ImmutableNodes.newChoiceBuilder()
                     .withNodeIdentifier(TARGET_NODEID)
-                    .withChild(ImmutableNodes.leafNode(STREAM_NODEID, subscription.streamName()))
+                    .withChild(ImmutableNodes.leafNode(STREAM_NODEID, streamName))
 //                    .withChild(ImmutableNodes.newChoiceBuilder()
 //                        .withNodeIdentifier(STREAM_FILTER_NODEID)
 //                        .withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_STREAM_FILTER,
@@ -287,14 +285,13 @@ public final class MdsalRestconfStreamRegistry extends AbstractRestconfStreamReg
                 .build());
         return tx.commit().transform(info -> {
             LOG.debug("Added subscription {} to operational datastore as of {}", id, info);
-            return new MdsalRestconfStreamSubscription<>(subscription, dataBroker);
+            return new MdsalSubscriptionControl(dataBroker, id);
         }, MoreExecutors.directExecutor());
     }
 
     @Override
-    protected ListenableFuture<RestconfStream.Subscription> modifySubscriptionFilter(
-            final RestconfStream.Subscription subscription, final RestconfStream.SubscriptionFilter filter) {
-        final var id = subscription.id();
+    protected ListenableFuture<Void> modifySubscriptionFilter(final Uint32 subscriptionId,
+            final RestconfStream.SubscriptionFilter filter) {
 
         final var filterNode = switch (filter) {
             case RestconfStream.SubscriptionFilter.Reference(var filterName) ->
@@ -311,12 +308,12 @@ public final class MdsalRestconfStreamRegistry extends AbstractRestconfStreamReg
                     .build();
         };
 
-        final var pathArg = subscriptionArg(id);
+        final var pathArg = subscriptionArg(subscriptionId);
         final var tx = dataBroker.newWriteOnlyTransaction();
         tx.merge(LogicalDatastoreType.OPERATIONAL, subscriptionPath(pathArg),
             ImmutableNodes.newMapEntryBuilder()
                 .withNodeIdentifier(pathArg)
-                .withChild(ImmutableNodes.leafNode(ID_NODEID, id))
+                .withChild(ImmutableNodes.leafNode(ID_NODEID, subscriptionId))
                 .withChild(ImmutableNodes.newChoiceBuilder()
                     .withNodeIdentifier(TARGET_NODEID)
                     .withChild(ImmutableNodes.newChoiceBuilder()
@@ -326,8 +323,8 @@ public final class MdsalRestconfStreamRegistry extends AbstractRestconfStreamReg
                     .build())
                 .build());
         return tx.commit().transform(info -> {
-            LOG.debug("Modified subscription {} to operational datastore as of {}", id, info);
-            return new MdsalRestconfStreamSubscription<>(subscription, dataBroker);
+            LOG.debug("Modified subscription {} to operational datastore as of {}", subscriptionId, info);
+            return null;
         }, MoreExecutors.directExecutor());
     }
 
@@ -359,6 +356,12 @@ public final class MdsalRestconfStreamRegistry extends AbstractRestconfStreamReg
         return NodeIdentifierWithPredicates.of(Receiver.QNAME, NAME_QNAME, receiverName);
     }
 
+    @NonNullByDefault
+    static YangInstanceIdentifier streamFilterPath(final Uint32 subscriptionId) {
+        return YangInstanceIdentifier.of(SUBSCRIPTIONS_NODEID, SUBSCRIPTION_NODEID, subscriptionArg(subscriptionId),
+            TARGET_NODEID, STREAM_FILTER_NODEID);
+    }
+
     @NonNullByDefault
     static YangInstanceIdentifier subscriptionPath(final Uint32 subscriptionId) {
         return subscriptionPath(subscriptionArg(subscriptionId));
diff --git a/plugins/restconf-server-mdsal/src/main/java/org/opendaylight/restconf/server/mdsal/MdsalRestconfStreamSubscription.java b/plugins/restconf-server-mdsal/src/main/java/org/opendaylight/restconf/server/mdsal/MdsalRestconfStreamSubscription.java
deleted file mode 100644 (file)
index 103b759..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (c) 2025 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.server.mdsal;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.util.concurrent.MoreExecutors;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.common.api.OnCommitFutureCallback;
-import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
-import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-import org.opendaylight.netconf.databind.RequestException;
-import org.opendaylight.restconf.server.api.ServerRequest;
-import org.opendaylight.restconf.server.spi.ForwardingRestconfStreamSubscription;
-import org.opendaylight.restconf.server.spi.RestconfStream;
-import org.opendaylight.yangtools.yang.common.Empty;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.Uint32;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-final class MdsalRestconfStreamSubscription<T extends RestconfStream.Subscription>
-        extends ForwardingRestconfStreamSubscription<T> {
-    private static final Logger LOG = LoggerFactory.getLogger(MdsalRestconfStreamSubscription.class);
-
-    private final DOMDataBroker dataBroker;
-
-    MdsalRestconfStreamSubscription(final T delegate, final DOMDataBroker dataBroker) {
-        super(delegate);
-        this.dataBroker = requireNonNull(dataBroker);
-    }
-
-    @Override
-    public void channelClosed() {
-        final var id = id();
-        LOG.debug("{} terminated after channel was closed", id);
-        removeSubscription(id, null, null);
-    }
-
-    @Override
-    protected void terminateImpl(final ServerRequest<Empty> request, final QName terminationReason) {
-        final var id = id();
-        LOG.debug("{} terminated with reason {}", id, terminationReason);
-        removeSubscription(id, request, terminationReason);
-    }
-
-    private void removeSubscription(final Uint32 id, final ServerRequest<Empty> request,
-            final QName terminationReason) {
-        final var tx = dataBroker.newWriteOnlyTransaction();
-        tx.delete(LogicalDatastoreType.OPERATIONAL, MdsalRestconfStreamRegistry.subscriptionPath(id));
-        tx.commit().addCallback(new OnCommitFutureCallback() {
-            @Override
-            public void onSuccess(final CommitInfo result) {
-                LOG.debug("Removed subscription {} from operational datastore as of {}", id, result);
-                if (request != null) {
-                    delegate.terminate(request.transform(ignored -> Empty.value()), terminationReason);
-                } else {
-                    delegate.channelClosed();
-                }
-            }
-
-            @Override
-            public void onFailure(final TransactionCommitFailedException cause) {
-                LOG.warn("Failed to remove subscription {} from operational datastore", id, cause);
-                if (request != null) {
-                    request.completeWith(new RequestException(cause));
-                }
-            }
-        }, MoreExecutors.directExecutor());
-    }
-}
diff --git a/plugins/restconf-server-mdsal/src/main/java/org/opendaylight/restconf/server/mdsal/MdsalSubscriptionControl.java b/plugins/restconf-server-mdsal/src/main/java/org/opendaylight/restconf/server/mdsal/MdsalSubscriptionControl.java
new file mode 100644 (file)
index 0000000..a3c3a63
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2025 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.server.mdsal;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.restconf.server.spi.AbstractRestconfStreamRegistry.SubscriptionControl;
+import org.opendaylight.yangtools.yang.common.Uint32;
+
+/**
+ * A {@link SubscriptionControl} updating the MD-SAL datastore.
+ */
+@NonNullByDefault
+final class MdsalSubscriptionControl implements SubscriptionControl {
+    private final DOMDataBroker dataBroker;
+    private final Uint32 subscriptionId;
+
+    MdsalSubscriptionControl(final DOMDataBroker dataBroker, final Uint32 subscriptionId) {
+        this.dataBroker = requireNonNull(dataBroker);
+        this.subscriptionId = requireNonNull(subscriptionId);
+    }
+
+    @Override
+    public FluentFuture<@Nullable Void> terminate() {
+        final var tx = dataBroker.newWriteOnlyTransaction();
+        tx.delete(LogicalDatastoreType.OPERATIONAL, MdsalRestconfStreamRegistry.subscriptionPath(subscriptionId));
+        return mapFuture(tx.commit());
+    }
+
+    private static FluentFuture<@Nullable Void> mapFuture(final FluentFuture<? extends CommitInfo> future) {
+        return future.transform(unused -> null, MoreExecutors.directExecutor());
+    }
+}
index 87c8d1eff438a38cca129328854db15aa6db5db9..607ea90c5714fa5a3470fe14a0c2578acbd15d71 100644 (file)
@@ -63,82 +63,114 @@ public abstract class AbstractRestconfStreamRegistry implements RestconfStream.R
         boolean test(YangInstanceIdentifier path, ContainerNode body);
     }
 
+    /**
+     * Control interface for the backend of a subscription.
+     */
+    @NonNullByDefault
+    public interface SubscriptionControl {
+        /**
+         * Terminate the subscription.
+         *
+         * @return A {@link ListenableFuture} signalling the result of termination process
+         */
+        ListenableFuture<Void> terminate();
+    }
+
     /**
      * Internal implementation
      * of a <a href="https://www.rfc-editor.org/rfc/rfc8639#section-2.4">dynamic subscription</a>.
      */
     private final class DynSubscription extends AbstractRestconfStreamSubscription {
-        DynSubscription(final Uint32 id, final QName encoding, final String streamName,
-                final TransportSession session, final @Nullable Principal principal,
-                final @Nullable EventStreamFilter filter) {
-            super(id, encoding, streamName, initialReceiverName(session.description(), principal),
-                SubscriptionState.ACTIVE, session, filter);
-        }
+        private final SubscriptionControl control;
 
-        // FIXME: Only used by modifySubscription(). We should not be instantiating a new subscription, but rather have
-        //        a Subscription.modify() method which does the trick. That method should be hidden by default, and
-        //        exposed in a separate interface:
-        //        - we can invoke modify() from AbstractRestconfStreamRegistry for configured subscriptions
-        //        - modify-subscription RPC can gain access via a check for 'DynamicSubscription', properly rejecting
-        //          attempts to modify non-dynamic subscriptions
-        @NonNullByDefault
-        DynSubscription(final Subscription prev, final @Nullable EventStreamFilter filter) {
-            super(prev.id(), prev.encoding(), prev.streamName(), prev.receiverName(),
-                // Note: RFC8639 mandates that an updated subscription is automatically active, but perhaps we want to
-                //       do that via explicit transition as we are free to immediately suspend, for example -- in which
-                //       case we do not want to, for example, go SUSPENDED -> ACTIVE -> SUSPENDED.
-                SubscriptionState.ACTIVE, prev.session(), filter);
+        private @Nullable EventStreamFilter filter;
+
+        DynSubscription(final Uint32 id, final QName encoding, final String streamName, final String receiverName,
+                final TransportSession session, final SubscriptionControl control,
+                final @Nullable EventStreamFilter filter) {
+            super(id, encoding, streamName, receiverName, SubscriptionState.ACTIVE, session);
+            this.control = requireNonNull(control);
+            this.filter = filter;
         }
 
         @Override
         protected void terminateImpl(final ServerRequest<Empty> request, final QName reason) {
-            subscriptions.remove(id(), this);
-            request.completeWith(Empty.value());
+            final var id = id();
+            LOG.debug("Terminating subscription {} reason {}", id, reason);
+
+            Futures.addCallback(control.terminate(), new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(final Void result) {
+                    LOG.debug("Subscription {} terminated", id);
+                    subscriptions.remove(id, DynSubscription.this);
+                    request.completeWith(Empty.value());
+                }
+
+                @Override
+                public void onFailure(final Throwable cause) {
+                    LOG.warn("Cannot terminate subscription {}", id, cause);
+                    request.completeWith(new RequestException(cause));
+                }
+            }, MoreExecutors.directExecutor());
         }
 
-        @Override
-        public void channelClosed() {
-            subscriptions.remove(id());
+        private void channelClosed() {
+            final var id = id();
+            LOG.debug("Subscription {} terminated due to transport session going down", id);
+
+            Futures.addCallback(control.terminate(), new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(final Void result) {
+                    LOG.debug("Subscription {} cleaned up", id);
+                    subscriptions.remove(id);
+                }
+
+                @Override
+                public void onFailure(final Throwable cause) {
+                    LOG.warn("Subscription {} failed to clean up", id, cause);
+                    subscriptions.remove(id);
+                }
+            }, MoreExecutors.directExecutor());
         }
 
-        @NonNullByDefault
-        private static String initialReceiverName(final TransportSession.Description description,
-                final @Nullable Principal principal) {
-            final var receiverName = description.toFriendlyString();
-            return principal == null ? receiverName : principal.getName() + " via " + receiverName;
+        void controlSessionClosed() {
+            switch (state()) {
+                case END -> {
+                    LOG.debug("Subscription id:{} already in END state during attempt to end it", id());
+                    terminate(null, null);
+                }
+                default -> {
+                    setState(RestconfStream.SubscriptionState.END);
+                    channelClosed();
+                }
+            }
+        }
+
+        void setFilter(final EventStreamFilter newFilter) {
+            filter = newFilter;
+        }
+
+        @Override
+        protected @Nullable EventStreamFilter filter() {
+            return filter;
         }
     }
 
-    // FIXME: merge into DynSubscription
-    @NonNullByDefault
-    private final class DynSubscriptionResource extends AbstractRegistration {
-        private final Uint32 subscriptionId;
+    private static final class DynSubscriptionResource extends AbstractRegistration {
+        private final DynSubscription subscription;
 
-        DynSubscriptionResource(final Uint32 subscriptionId) {
-            this.subscriptionId = requireNonNull(subscriptionId);
+        DynSubscriptionResource(final DynSubscription subscription) {
+            this.subscription = requireNonNull(subscription);
         }
 
         @Override
         protected void removeRegistration() {
-            // check if the subscription is still registered or was terminated from elsewhere
-            final var subscription = lookupSubscription(subscriptionId);
-            if (subscription != null) {
-                switch (subscription.state()) {
-                    case END -> {
-                        LOG.debug("Subscription id:{} already in END state during attempt to end it", subscriptionId);
-                        subscription.terminate(null, null);
-                    }
-                    default -> {
-                        subscription.setState(RestconfStream.SubscriptionState.END);
-                        subscription.channelClosed();
-                    }
-                }
-            }
+            subscription.controlSessionClosed();
         }
 
         @Override
         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
-            return super.addToStringAttributes(toStringHelper.add("subscription", subscriptionId));
+            return super.addToStringAttributes(toStringHelper.add("subscription", subscription.id()));
         }
     }
 
@@ -164,7 +196,7 @@ public abstract class AbstractRestconfStreamRegistry implements RestconfStream.R
      */
     private final AtomicInteger prevDynamicId = new AtomicInteger(Integer.MAX_VALUE);
     private final ConcurrentMap<String, RestconfStream<?>> streams = new ConcurrentHashMap<>();
-    private final ConcurrentMap<Uint32, Subscription> subscriptions = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Uint32, DynSubscription> subscriptions = new ConcurrentHashMap<>();
     // FIXME: This is not quite sufficient and should be split into two maps:
     //          1. filterSpecs, which is a HashMap<String, ChoiceNode> recording known filter-spec definitions
     //             access should be guarded by a lock
@@ -323,14 +355,15 @@ public abstract class AbstractRestconfStreamRegistry implements RestconfStream.R
         }
 
         final var id = Uint32.fromIntBits(prevDynamicId.incrementAndGet());
-        final var subscription = new DynSubscription(id, encoding, streamName, request.session(), request.principal(),
-            filterImpl);
+        final var receiverName = newReceiverName(session.description(), request.principal());
 
-        Futures.addCallback(createSubscription(subscription), new FutureCallback<Subscription>() {
+        Futures.addCallback(createSubscription(id, streamName, encoding, receiverName), new FutureCallback<>() {
             @Override
-            public void onSuccess(final Subscription result) {
-                subscriptions.put(id, result);
-                session.registerResource(new DynSubscriptionResource(id));
+            public void onSuccess(final SubscriptionControl result) {
+                final var subscription = new DynSubscription(id, encoding, streamName, receiverName, session, result,
+                    filterImpl);
+                subscriptions.put(id, subscription);
+                session.registerResource(new DynSubscriptionResource(subscription));
                 request.completeWith(id);
             }
 
@@ -341,11 +374,18 @@ public abstract class AbstractRestconfStreamRegistry implements RestconfStream.R
         }, MoreExecutors.directExecutor());
     }
 
+    @NonNullByDefault
+    private static String newReceiverName(final TransportSession.Description description,
+            final @Nullable Principal principal) {
+        final var receiverName = description.toFriendlyString();
+        return principal == null ? receiverName : principal.getName() + " via " + receiverName;
+    }
+
     @Override
     public void modifySubscription(final ServerRequest<Subscription> request, final Uint32 id,
             final SubscriptionFilter filter) {
-        final var oldSubscription = lookupSubscription(id);
-        if (oldSubscription == null) {
+        final var subscription = subscriptions.get(id);
+        if (subscription == null) {
             request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
                 "There is no subscription with given ID."));
             return;
@@ -358,13 +398,12 @@ public abstract class AbstractRestconfStreamRegistry implements RestconfStream.R
             request.completeWith(e);
             return;
         }
-        final var newSubscription = new DynSubscription(oldSubscription, filterImpl);
 
-        Futures.addCallback(modifySubscriptionFilter(newSubscription, filter), new FutureCallback<>() {
+        Futures.addCallback(modifySubscriptionFilter(id, filter), new FutureCallback<>() {
             @Override
-            public void onSuccess(final Subscription result) {
-                subscriptions.put(id, result);
-                request.completeWith(result);
+            public void onSuccess(final Void result) {
+                subscription.setFilter(filterImpl);
+                request.completeWith(subscription);
             }
 
             @Override
@@ -374,18 +413,12 @@ public abstract class AbstractRestconfStreamRegistry implements RestconfStream.R
         }, MoreExecutors.directExecutor());
     }
 
-    @Override
-    public void updateSubscriptionState(final Subscription subscription, final SubscriptionState newState) {
-        requireNonNull(subscription);
-        subscription.setState(newState);
-        subscriptions.replace(subscription.id(), subscription);
-    }
-
     @NonNullByDefault
-    protected abstract ListenableFuture<Subscription> createSubscription(Subscription subscription);
+    protected abstract ListenableFuture<SubscriptionControl> createSubscription(Uint32 subscriptionId,
+        String streamName, QName encoding, String receiverName);
 
     @NonNullByDefault
-    protected abstract ListenableFuture<Subscription> modifySubscriptionFilter(Subscription subscription,
+    protected abstract ListenableFuture<Void> modifySubscriptionFilter(Uint32 subscriptionId,
         SubscriptionFilter filter);
 
     /**
index 4bdf0bb63cc1fcbde58e21f2297eb5238f9b336e..68fa24de5694fb8a7e784f872c344448391c061d 100644 (file)
@@ -21,26 +21,23 @@ import org.opendaylight.yangtools.yang.common.Uint32;
 /**
  * Abstract base class for {@link RestconfStream.Subscription}s.
  */
-public abstract non-sealed class AbstractRestconfStreamSubscription extends RestconfStream.Subscription {
+public abstract class AbstractRestconfStreamSubscription extends RestconfStream.Subscription {
     private final @NonNull Uint32 id;
     private final @NonNull QName encoding;
     private final @NonNull String streamName;
     private final @NonNull String receiverName;
     private final @NonNull TransportSession session;
-    private final @Nullable EventStreamFilter filter;
 
     private @NonNull SubscriptionState state;
 
     protected AbstractRestconfStreamSubscription(final Uint32 id, final QName encoding, final String streamName,
-            final String receiverName, final SubscriptionState state, final TransportSession session,
-            final @Nullable EventStreamFilter filter) {
+            final String receiverName, final SubscriptionState state, final TransportSession session) {
         this.id = requireNonNull(id);
         this.encoding = requireNonNull(encoding);
         this.state = requireNonNull(state);
         this.session = requireNonNull(session);
         this.streamName = requireNonNull(streamName);
         this.receiverName = requireNonNull(receiverName);
-        this.filter = filter;
     }
 
     @Override
@@ -82,9 +79,7 @@ public abstract non-sealed class AbstractRestconfStreamSubscription extends Rest
         return session;
     }
 
-    final @Nullable EventStreamFilter filter() {
-        return filter;
-    }
+    protected abstract @Nullable EventStreamFilter filter();
 
     @Override
     protected ToStringHelper addToStringAttributes(final ToStringHelper helper) {
@@ -93,6 +88,6 @@ public abstract non-sealed class AbstractRestconfStreamSubscription extends Rest
             .add("encoding", encoding)
             .add("stream", streamName)
             .add("receiver", receiverName)
-            .add("filter", filter));
+            .add("filter", filter()));
     }
 }
diff --git a/protocol/restconf-server-spi/src/main/java/org/opendaylight/restconf/server/spi/ForwardingRestconfStreamSubscription.java b/protocol/restconf-server-spi/src/main/java/org/opendaylight/restconf/server/spi/ForwardingRestconfStreamSubscription.java
deleted file mode 100644 (file)
index fd14cdd..0000000
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright (c) 2025 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.server.spi;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.base.MoreObjects.ToStringHelper;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.restconf.server.api.TransportSession;
-import org.opendaylight.restconf.server.spi.RestconfStream.SubscriptionState;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.Uint32;
-
-/**
- * Abstract base class for {@link RestconfStream.Subscription}s which delegate to another subscription.
- */
-public abstract non-sealed class ForwardingRestconfStreamSubscription<T extends RestconfStream.Subscription>
-        extends RestconfStream.Subscription {
-    protected final @NonNull T delegate;
-
-    protected ForwardingRestconfStreamSubscription(final T delegate) {
-        this.delegate = requireNonNull(delegate);
-    }
-
-    @Override
-    public final Uint32 id() {
-        return delegate.id();
-    }
-
-    @Override
-    public final String receiverName() {
-        return delegate.receiverName();
-    }
-
-    @Override
-    public final QName encoding() {
-        return delegate.encoding();
-    }
-
-    @Override
-    public final String streamName() {
-        return delegate.streamName();
-    }
-
-    @Override
-    public final SubscriptionState state() {
-        return delegate.state();
-    }
-
-    @Override
-    public final void setState(final SubscriptionState nextState) {
-        delegate.setState(nextState);
-    }
-
-    @Override
-    public final TransportSession session() {
-        return delegate.session();
-    }
-
-    protected final @NonNull T delegate() {
-        return delegate;
-    }
-
-    @Override
-    protected ToStringHelper addToStringAttributes(final ToStringHelper helper) {
-        return super.addToStringAttributes(helper.add("delegate", delegate));
-    }
-}
index ef313384331b6d01d5731f109f92b1a33900179f..abed935163d0d41489e0ef2547c69073b57ba3a6 100644 (file)
@@ -220,16 +220,6 @@ public final class RestconfStream<T> {
         @NonNullByDefault
         void modifySubscription(ServerRequest<Subscription> request, Uint32 id, SubscriptionFilter filter);
 
-        /**
-         * Modify state of RFC8639 subscription.
-         *
-         * @param subscription subscription
-         * @param newState new state
-         * @throws NullPointerException if {@code subscription} is {@code null}
-         */
-        @NonNullByDefault
-        void updateSubscriptionState(Subscription subscription, SubscriptionState newState);
-
         /**
          * Lookup an existing subscription.
          *
@@ -258,8 +248,7 @@ public final class RestconfStream<T> {
      */
     // TODO: a .toOperational() should result in the equivalent MapEntryNode equivalent of a Binding Subscription
     @Beta
-    public abstract static sealed class Subscription
-            permits AbstractRestconfStreamSubscription, ForwardingRestconfStreamSubscription {
+    public abstract static class Subscription {
         @SuppressFBWarnings(value = "UWF_UNWRITTEN_FIELD",
             justification = "https://github.com/spotbugs/spotbugs/issues/2749")
         private volatile QName terminated;
@@ -336,8 +325,6 @@ public final class RestconfStream<T> {
         @NonNullByDefault
         protected abstract void terminateImpl(ServerRequest<Empty> request, QName reason);
 
-        public abstract void channelClosed();
-
         @Override
         public final String toString() {
             return addToStringAttributes(MoreObjects.toStringHelper(this).omitNullValues()).toString();