"No subscription with given ID."));
return;
}
+
+ // FIXME: DynamicSubscription.delete()
final var state = subscription.state();
if (state != SubscriptionState.ACTIVE && state != SubscriptionState.SUSPENDED) {
request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
"There is no active or suspended subscription with given ID."));
return;
}
-
if (subscription.session() != request.session()) {
request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
"Subscription with given id does not exist on this session"));
return;
}
-
- streamRegistry.updateSubscriptionState(subscription, SubscriptionState.END);
+ subscription.setState(SubscriptionState.END);
subscription.terminate(request.transform(unused -> {
try {
subscriptionStateService.subscriptionTerminated(Instant.now(), id, NoSuchSubscription.QNAME);
"No subscription with given ID."));
return;
}
+
+ // FIXME: DynamicSubscription.kill()
final var state = subscription.state();
if (state != SubscriptionState.ACTIVE && state != SubscriptionState.SUSPENDED) {
request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
"There is no active or suspended subscription with given ID."));
return;
}
-
- streamRegistry.updateSubscriptionState(subscription, SubscriptionState.END);
+ subscription.setState(SubscriptionState.END);
subscription.terminate(request.transform(unused -> {
try {
subscriptionStateService.subscriptionTerminated(Instant.now(), id, NoSuchSubscription.QNAME);
import org.opendaylight.restconf.server.spi.ReceiverHolder;
import org.opendaylight.restconf.server.spi.ReceiverHolder.RecordType;
import org.opendaylight.restconf.server.spi.RestconfStream;
-import org.opendaylight.restconf.server.spi.RestconfStream.Subscription;
import org.opendaylight.restconf.server.spi.RestconfStream.SubscriptionFilter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.Uint32;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.AnydataNode;
}
@Override
- protected ListenableFuture<Subscription> createSubscription(final Subscription subscription) {
+ protected ListenableFuture<SubscriptionControl> createSubscription(final Uint32 subscriptionId,
+ final String streamName, final QName encoding, final String receiverName) {
throw new UnsupportedOperationException();
}
@Override
- protected ListenableFuture<Subscription> modifySubscriptionFilter(final Subscription subscription,
+ protected ListenableFuture<Void> modifySubscriptionFilter(final Uint32 subscriptionId,
final SubscriptionFilter filter) {
throw new UnsupportedOperationException();
}
}
@Override
- protected ListenableFuture<RestconfStream.Subscription> createSubscription(
- final RestconfStream.Subscription subscription) {
- final var id = subscription.id();
- final var receiverName = subscription.receiverName();
+ protected ListenableFuture<SubscriptionControl> createSubscription(final Uint32 id, final String streamName,
+ final QName encoding, final String receiverName) {
final var pathArg = subscriptionArg(id);
final var tx = dataBroker.newWriteOnlyTransaction();
ImmutableNodes.newMapEntryBuilder()
.withNodeIdentifier(pathArg)
.withChild(ImmutableNodes.leafNode(ID_NODEID, id))
- .withChild(ImmutableNodes.leafNode(ENCODING_NODEID, subscription.encoding()))
+ .withChild(ImmutableNodes.leafNode(ENCODING_NODEID, encoding))
.withChild(ImmutableNodes.newChoiceBuilder()
.withNodeIdentifier(TARGET_NODEID)
- .withChild(ImmutableNodes.leafNode(STREAM_NODEID, subscription.streamName()))
+ .withChild(ImmutableNodes.leafNode(STREAM_NODEID, streamName))
// .withChild(ImmutableNodes.newChoiceBuilder()
// .withNodeIdentifier(STREAM_FILTER_NODEID)
// .withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_STREAM_FILTER,
.build());
return tx.commit().transform(info -> {
LOG.debug("Added subscription {} to operational datastore as of {}", id, info);
- return new MdsalRestconfStreamSubscription<>(subscription, dataBroker);
+ return new MdsalSubscriptionControl(dataBroker, id);
}, MoreExecutors.directExecutor());
}
@Override
- protected ListenableFuture<RestconfStream.Subscription> modifySubscriptionFilter(
- final RestconfStream.Subscription subscription, final RestconfStream.SubscriptionFilter filter) {
- final var id = subscription.id();
+ protected ListenableFuture<Void> modifySubscriptionFilter(final Uint32 subscriptionId,
+ final RestconfStream.SubscriptionFilter filter) {
final var filterNode = switch (filter) {
case RestconfStream.SubscriptionFilter.Reference(var filterName) ->
.build();
};
- final var pathArg = subscriptionArg(id);
+ final var pathArg = subscriptionArg(subscriptionId);
final var tx = dataBroker.newWriteOnlyTransaction();
tx.merge(LogicalDatastoreType.OPERATIONAL, subscriptionPath(pathArg),
ImmutableNodes.newMapEntryBuilder()
.withNodeIdentifier(pathArg)
- .withChild(ImmutableNodes.leafNode(ID_NODEID, id))
+ .withChild(ImmutableNodes.leafNode(ID_NODEID, subscriptionId))
.withChild(ImmutableNodes.newChoiceBuilder()
.withNodeIdentifier(TARGET_NODEID)
.withChild(ImmutableNodes.newChoiceBuilder()
.build())
.build());
return tx.commit().transform(info -> {
- LOG.debug("Modified subscription {} to operational datastore as of {}", id, info);
- return new MdsalRestconfStreamSubscription<>(subscription, dataBroker);
+ LOG.debug("Modified subscription {} to operational datastore as of {}", subscriptionId, info);
+ return null;
}, MoreExecutors.directExecutor());
}
return NodeIdentifierWithPredicates.of(Receiver.QNAME, NAME_QNAME, receiverName);
}
+ @NonNullByDefault
+ static YangInstanceIdentifier streamFilterPath(final Uint32 subscriptionId) {
+ return YangInstanceIdentifier.of(SUBSCRIPTIONS_NODEID, SUBSCRIPTION_NODEID, subscriptionArg(subscriptionId),
+ TARGET_NODEID, STREAM_FILTER_NODEID);
+ }
+
@NonNullByDefault
static YangInstanceIdentifier subscriptionPath(final Uint32 subscriptionId) {
return subscriptionPath(subscriptionArg(subscriptionId));
+++ /dev/null
-/*
- * Copyright (c) 2025 PANTHEON.tech, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.server.mdsal;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.util.concurrent.MoreExecutors;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.common.api.OnCommitFutureCallback;
-import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
-import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-import org.opendaylight.netconf.databind.RequestException;
-import org.opendaylight.restconf.server.api.ServerRequest;
-import org.opendaylight.restconf.server.spi.ForwardingRestconfStreamSubscription;
-import org.opendaylight.restconf.server.spi.RestconfStream;
-import org.opendaylight.yangtools.yang.common.Empty;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.Uint32;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-final class MdsalRestconfStreamSubscription<T extends RestconfStream.Subscription>
- extends ForwardingRestconfStreamSubscription<T> {
- private static final Logger LOG = LoggerFactory.getLogger(MdsalRestconfStreamSubscription.class);
-
- private final DOMDataBroker dataBroker;
-
- MdsalRestconfStreamSubscription(final T delegate, final DOMDataBroker dataBroker) {
- super(delegate);
- this.dataBroker = requireNonNull(dataBroker);
- }
-
- @Override
- public void channelClosed() {
- final var id = id();
- LOG.debug("{} terminated after channel was closed", id);
- removeSubscription(id, null, null);
- }
-
- @Override
- protected void terminateImpl(final ServerRequest<Empty> request, final QName terminationReason) {
- final var id = id();
- LOG.debug("{} terminated with reason {}", id, terminationReason);
- removeSubscription(id, request, terminationReason);
- }
-
- private void removeSubscription(final Uint32 id, final ServerRequest<Empty> request,
- final QName terminationReason) {
- final var tx = dataBroker.newWriteOnlyTransaction();
- tx.delete(LogicalDatastoreType.OPERATIONAL, MdsalRestconfStreamRegistry.subscriptionPath(id));
- tx.commit().addCallback(new OnCommitFutureCallback() {
- @Override
- public void onSuccess(final CommitInfo result) {
- LOG.debug("Removed subscription {} from operational datastore as of {}", id, result);
- if (request != null) {
- delegate.terminate(request.transform(ignored -> Empty.value()), terminationReason);
- } else {
- delegate.channelClosed();
- }
- }
-
- @Override
- public void onFailure(final TransactionCommitFailedException cause) {
- LOG.warn("Failed to remove subscription {} from operational datastore", id, cause);
- if (request != null) {
- request.completeWith(new RequestException(cause));
- }
- }
- }, MoreExecutors.directExecutor());
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2025 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.server.mdsal;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.restconf.server.spi.AbstractRestconfStreamRegistry.SubscriptionControl;
+import org.opendaylight.yangtools.yang.common.Uint32;
+
+/**
+ * A {@link SubscriptionControl} updating the MD-SAL datastore.
+ */
+@NonNullByDefault
+final class MdsalSubscriptionControl implements SubscriptionControl {
+ private final DOMDataBroker dataBroker;
+ private final Uint32 subscriptionId;
+
+ MdsalSubscriptionControl(final DOMDataBroker dataBroker, final Uint32 subscriptionId) {
+ this.dataBroker = requireNonNull(dataBroker);
+ this.subscriptionId = requireNonNull(subscriptionId);
+ }
+
+ @Override
+ public FluentFuture<@Nullable Void> terminate() {
+ final var tx = dataBroker.newWriteOnlyTransaction();
+ tx.delete(LogicalDatastoreType.OPERATIONAL, MdsalRestconfStreamRegistry.subscriptionPath(subscriptionId));
+ return mapFuture(tx.commit());
+ }
+
+ private static FluentFuture<@Nullable Void> mapFuture(final FluentFuture<? extends CommitInfo> future) {
+ return future.transform(unused -> null, MoreExecutors.directExecutor());
+ }
+}
boolean test(YangInstanceIdentifier path, ContainerNode body);
}
+ /**
+ * Control interface for the backend of a subscription.
+ */
+ @NonNullByDefault
+ public interface SubscriptionControl {
+ /**
+ * Terminate the subscription.
+ *
+ * @return A {@link ListenableFuture} signalling the result of termination process
+ */
+ ListenableFuture<Void> terminate();
+ }
+
/**
* Internal implementation
* of a <a href="https://www.rfc-editor.org/rfc/rfc8639#section-2.4">dynamic subscription</a>.
*/
private final class DynSubscription extends AbstractRestconfStreamSubscription {
- DynSubscription(final Uint32 id, final QName encoding, final String streamName,
- final TransportSession session, final @Nullable Principal principal,
- final @Nullable EventStreamFilter filter) {
- super(id, encoding, streamName, initialReceiverName(session.description(), principal),
- SubscriptionState.ACTIVE, session, filter);
- }
+ private final SubscriptionControl control;
- // FIXME: Only used by modifySubscription(). We should not be instantiating a new subscription, but rather have
- // a Subscription.modify() method which does the trick. That method should be hidden by default, and
- // exposed in a separate interface:
- // - we can invoke modify() from AbstractRestconfStreamRegistry for configured subscriptions
- // - modify-subscription RPC can gain access via a check for 'DynamicSubscription', properly rejecting
- // attempts to modify non-dynamic subscriptions
- @NonNullByDefault
- DynSubscription(final Subscription prev, final @Nullable EventStreamFilter filter) {
- super(prev.id(), prev.encoding(), prev.streamName(), prev.receiverName(),
- // Note: RFC8639 mandates that an updated subscription is automatically active, but perhaps we want to
- // do that via explicit transition as we are free to immediately suspend, for example -- in which
- // case we do not want to, for example, go SUSPENDED -> ACTIVE -> SUSPENDED.
- SubscriptionState.ACTIVE, prev.session(), filter);
+ private @Nullable EventStreamFilter filter;
+
+ DynSubscription(final Uint32 id, final QName encoding, final String streamName, final String receiverName,
+ final TransportSession session, final SubscriptionControl control,
+ final @Nullable EventStreamFilter filter) {
+ super(id, encoding, streamName, receiverName, SubscriptionState.ACTIVE, session);
+ this.control = requireNonNull(control);
+ this.filter = filter;
}
@Override
protected void terminateImpl(final ServerRequest<Empty> request, final QName reason) {
- subscriptions.remove(id(), this);
- request.completeWith(Empty.value());
+ final var id = id();
+ LOG.debug("Terminating subscription {} reason {}", id, reason);
+
+ Futures.addCallback(control.terminate(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.debug("Subscription {} terminated", id);
+ subscriptions.remove(id, DynSubscription.this);
+ request.completeWith(Empty.value());
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ LOG.warn("Cannot terminate subscription {}", id, cause);
+ request.completeWith(new RequestException(cause));
+ }
+ }, MoreExecutors.directExecutor());
}
- @Override
- public void channelClosed() {
- subscriptions.remove(id());
+ private void channelClosed() {
+ final var id = id();
+ LOG.debug("Subscription {} terminated due to transport session going down", id);
+
+ Futures.addCallback(control.terminate(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.debug("Subscription {} cleaned up", id);
+ subscriptions.remove(id);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ LOG.warn("Subscription {} failed to clean up", id, cause);
+ subscriptions.remove(id);
+ }
+ }, MoreExecutors.directExecutor());
}
- @NonNullByDefault
- private static String initialReceiverName(final TransportSession.Description description,
- final @Nullable Principal principal) {
- final var receiverName = description.toFriendlyString();
- return principal == null ? receiverName : principal.getName() + " via " + receiverName;
+ void controlSessionClosed() {
+ switch (state()) {
+ case END -> {
+ LOG.debug("Subscription id:{} already in END state during attempt to end it", id());
+ terminate(null, null);
+ }
+ default -> {
+ setState(RestconfStream.SubscriptionState.END);
+ channelClosed();
+ }
+ }
+ }
+
+ void setFilter(final EventStreamFilter newFilter) {
+ filter = newFilter;
+ }
+
+ @Override
+ protected @Nullable EventStreamFilter filter() {
+ return filter;
}
}
- // FIXME: merge into DynSubscription
- @NonNullByDefault
- private final class DynSubscriptionResource extends AbstractRegistration {
- private final Uint32 subscriptionId;
+ private static final class DynSubscriptionResource extends AbstractRegistration {
+ private final DynSubscription subscription;
- DynSubscriptionResource(final Uint32 subscriptionId) {
- this.subscriptionId = requireNonNull(subscriptionId);
+ DynSubscriptionResource(final DynSubscription subscription) {
+ this.subscription = requireNonNull(subscription);
}
@Override
protected void removeRegistration() {
- // check if the subscription is still registered or was terminated from elsewhere
- final var subscription = lookupSubscription(subscriptionId);
- if (subscription != null) {
- switch (subscription.state()) {
- case END -> {
- LOG.debug("Subscription id:{} already in END state during attempt to end it", subscriptionId);
- subscription.terminate(null, null);
- }
- default -> {
- subscription.setState(RestconfStream.SubscriptionState.END);
- subscription.channelClosed();
- }
- }
- }
+ subscription.controlSessionClosed();
}
@Override
protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
- return super.addToStringAttributes(toStringHelper.add("subscription", subscriptionId));
+ return super.addToStringAttributes(toStringHelper.add("subscription", subscription.id()));
}
}
*/
private final AtomicInteger prevDynamicId = new AtomicInteger(Integer.MAX_VALUE);
private final ConcurrentMap<String, RestconfStream<?>> streams = new ConcurrentHashMap<>();
- private final ConcurrentMap<Uint32, Subscription> subscriptions = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Uint32, DynSubscription> subscriptions = new ConcurrentHashMap<>();
// FIXME: This is not quite sufficient and should be split into two maps:
// 1. filterSpecs, which is a HashMap<String, ChoiceNode> recording known filter-spec definitions
// access should be guarded by a lock
}
final var id = Uint32.fromIntBits(prevDynamicId.incrementAndGet());
- final var subscription = new DynSubscription(id, encoding, streamName, request.session(), request.principal(),
- filterImpl);
+ final var receiverName = newReceiverName(session.description(), request.principal());
- Futures.addCallback(createSubscription(subscription), new FutureCallback<Subscription>() {
+ Futures.addCallback(createSubscription(id, streamName, encoding, receiverName), new FutureCallback<>() {
@Override
- public void onSuccess(final Subscription result) {
- subscriptions.put(id, result);
- session.registerResource(new DynSubscriptionResource(id));
+ public void onSuccess(final SubscriptionControl result) {
+ final var subscription = new DynSubscription(id, encoding, streamName, receiverName, session, result,
+ filterImpl);
+ subscriptions.put(id, subscription);
+ session.registerResource(new DynSubscriptionResource(subscription));
request.completeWith(id);
}
}, MoreExecutors.directExecutor());
}
+ @NonNullByDefault
+ private static String newReceiverName(final TransportSession.Description description,
+ final @Nullable Principal principal) {
+ final var receiverName = description.toFriendlyString();
+ return principal == null ? receiverName : principal.getName() + " via " + receiverName;
+ }
+
@Override
public void modifySubscription(final ServerRequest<Subscription> request, final Uint32 id,
final SubscriptionFilter filter) {
- final var oldSubscription = lookupSubscription(id);
- if (oldSubscription == null) {
+ final var subscription = subscriptions.get(id);
+ if (subscription == null) {
request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
"There is no subscription with given ID."));
return;
request.completeWith(e);
return;
}
- final var newSubscription = new DynSubscription(oldSubscription, filterImpl);
- Futures.addCallback(modifySubscriptionFilter(newSubscription, filter), new FutureCallback<>() {
+ Futures.addCallback(modifySubscriptionFilter(id, filter), new FutureCallback<>() {
@Override
- public void onSuccess(final Subscription result) {
- subscriptions.put(id, result);
- request.completeWith(result);
+ public void onSuccess(final Void result) {
+ subscription.setFilter(filterImpl);
+ request.completeWith(subscription);
}
@Override
}, MoreExecutors.directExecutor());
}
- @Override
- public void updateSubscriptionState(final Subscription subscription, final SubscriptionState newState) {
- requireNonNull(subscription);
- subscription.setState(newState);
- subscriptions.replace(subscription.id(), subscription);
- }
-
@NonNullByDefault
- protected abstract ListenableFuture<Subscription> createSubscription(Subscription subscription);
+ protected abstract ListenableFuture<SubscriptionControl> createSubscription(Uint32 subscriptionId,
+ String streamName, QName encoding, String receiverName);
@NonNullByDefault
- protected abstract ListenableFuture<Subscription> modifySubscriptionFilter(Subscription subscription,
+ protected abstract ListenableFuture<Void> modifySubscriptionFilter(Uint32 subscriptionId,
SubscriptionFilter filter);
/**
/**
* Abstract base class for {@link RestconfStream.Subscription}s.
*/
-public abstract non-sealed class AbstractRestconfStreamSubscription extends RestconfStream.Subscription {
+public abstract class AbstractRestconfStreamSubscription extends RestconfStream.Subscription {
private final @NonNull Uint32 id;
private final @NonNull QName encoding;
private final @NonNull String streamName;
private final @NonNull String receiverName;
private final @NonNull TransportSession session;
- private final @Nullable EventStreamFilter filter;
private @NonNull SubscriptionState state;
protected AbstractRestconfStreamSubscription(final Uint32 id, final QName encoding, final String streamName,
- final String receiverName, final SubscriptionState state, final TransportSession session,
- final @Nullable EventStreamFilter filter) {
+ final String receiverName, final SubscriptionState state, final TransportSession session) {
this.id = requireNonNull(id);
this.encoding = requireNonNull(encoding);
this.state = requireNonNull(state);
this.session = requireNonNull(session);
this.streamName = requireNonNull(streamName);
this.receiverName = requireNonNull(receiverName);
- this.filter = filter;
}
@Override
return session;
}
- final @Nullable EventStreamFilter filter() {
- return filter;
- }
+ protected abstract @Nullable EventStreamFilter filter();
@Override
protected ToStringHelper addToStringAttributes(final ToStringHelper helper) {
.add("encoding", encoding)
.add("stream", streamName)
.add("receiver", receiverName)
- .add("filter", filter));
+ .add("filter", filter()));
}
}
+++ /dev/null
-/*
- * Copyright (c) 2025 PANTHEON.tech, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.server.spi;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.base.MoreObjects.ToStringHelper;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.restconf.server.api.TransportSession;
-import org.opendaylight.restconf.server.spi.RestconfStream.SubscriptionState;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.Uint32;
-
-/**
- * Abstract base class for {@link RestconfStream.Subscription}s which delegate to another subscription.
- */
-public abstract non-sealed class ForwardingRestconfStreamSubscription<T extends RestconfStream.Subscription>
- extends RestconfStream.Subscription {
- protected final @NonNull T delegate;
-
- protected ForwardingRestconfStreamSubscription(final T delegate) {
- this.delegate = requireNonNull(delegate);
- }
-
- @Override
- public final Uint32 id() {
- return delegate.id();
- }
-
- @Override
- public final String receiverName() {
- return delegate.receiverName();
- }
-
- @Override
- public final QName encoding() {
- return delegate.encoding();
- }
-
- @Override
- public final String streamName() {
- return delegate.streamName();
- }
-
- @Override
- public final SubscriptionState state() {
- return delegate.state();
- }
-
- @Override
- public final void setState(final SubscriptionState nextState) {
- delegate.setState(nextState);
- }
-
- @Override
- public final TransportSession session() {
- return delegate.session();
- }
-
- protected final @NonNull T delegate() {
- return delegate;
- }
-
- @Override
- protected ToStringHelper addToStringAttributes(final ToStringHelper helper) {
- return super.addToStringAttributes(helper.add("delegate", delegate));
- }
-}
@NonNullByDefault
void modifySubscription(ServerRequest<Subscription> request, Uint32 id, SubscriptionFilter filter);
- /**
- * Modify state of RFC8639 subscription.
- *
- * @param subscription subscription
- * @param newState new state
- * @throws NullPointerException if {@code subscription} is {@code null}
- */
- @NonNullByDefault
- void updateSubscriptionState(Subscription subscription, SubscriptionState newState);
-
/**
* Lookup an existing subscription.
*
*/
// TODO: a .toOperational() should result in the equivalent MapEntryNode equivalent of a Binding Subscription
@Beta
- public abstract static sealed class Subscription
- permits AbstractRestconfStreamSubscription, ForwardingRestconfStreamSubscription {
+ public abstract static class Subscription {
@SuppressFBWarnings(value = "UWF_UNWRITTEN_FIELD",
justification = "https://github.com/spotbugs/spotbugs/issues/2749")
private volatile QName terminated;
@NonNullByDefault
protected abstract void terminateImpl(ServerRequest<Empty> request, QName reason);
- public abstract void channelClosed();
-
@Override
public final String toString() {
return addToStringAttributes(MoreObjects.toStringHelper(this).omitNullValues()).toString();