Adapt ModifySubscriptionRpc to changes in RestconfStream.
Add possibility to modify filter in RestconfStream.Subscription
and propagate this change in to operational datastore.
Now we provide correct response to invoke of this rpc,
in case of error correct error message, after filtering is finished
correctly update datastore, instead of "Not implemented yet".
JIRA: NETCONF-714
Change-Id: If6487621d5a2ff378be7f9f9089351df75a50ff4
Signed-off-by: Samuel Schneider <samuel.schneider@pantheon.tech>
import javax.inject.Inject;
import javax.inject.Singleton;
import org.eclipse.jdt.annotation.NonNullByDefault;
-import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.netconf.databind.RequestException;
import org.opendaylight.restconf.notifications.mdsal.SubscriptionStateService;
import org.opendaylight.restconf.server.api.ServerRequest;
import org.opendaylight.restconf.server.spi.OperationInput;
import org.opendaylight.restconf.server.spi.RestconfStream;
-import org.opendaylight.restconf.server.spi.RestconfStream.SubscriptionFilter;
import org.opendaylight.restconf.server.spi.RpcImplementation;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EncodingUnsupported;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscription;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionInput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionOutput;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.stream.filter.elements.FilterSpec;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.stream.filter.elements.filter.spec.stream.subtree.filter.StreamSubtreeFilter;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.AnydataNode;
import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
@Component(service = RpcImplementation.class)
@NonNullByDefault
public final class EstablishSubscriptionRpc extends RpcImplementation {
- private static final NodeIdentifier SUBSCRIPTION_STREAM_FILTER_NAME =
- NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stream-filter-name").intern());
private static final NodeIdentifier SUBSCRIPTION_STREAM =
NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stream").intern());
private static final NodeIdentifier SUBSCRIPTION_TARGET =
// check stream filter
final var streamFilter = (ChoiceNode) target.childByArg(SUBSCRIPTION_STREAM_FILTER);
- final var filter = streamFilter == null ? null : extractFilter(streamFilter);
+ final var filter = streamFilter == null ? null : SubscriptionUtil.extractFilter(streamFilter);
streamRegistry.establishSubscription(request.transform(subscription -> {
final var id = subscription.id();
.build();
}), streamName, encoding, filter);
}
-
- private static @Nullable SubscriptionFilter extractFilter(final ChoiceNode streamFilter) {
- final var filterName = leaf(streamFilter, SUBSCRIPTION_STREAM_FILTER_NAME, String.class);
- if (filterName != null) {
- return new SubscriptionFilter.Reference(filterName);
- }
- final var filterSpec = (ChoiceNode) streamFilter.childByArg(new NodeIdentifier(FilterSpec.QNAME));
- if (filterSpec == null) {
- return null;
- }
- final var subtree = (AnydataNode<?>) filterSpec.childByArg(new NodeIdentifier(StreamSubtreeFilter.QNAME));
- if (subtree != null) {
- return new SubscriptionFilter.SubtreeDefinition(subtree);
- }
- final var xpath = leaf(filterSpec, new NodeIdentifier(QName.create(FilterSpec.QNAME, "stream-xpath-filter")),
- String.class);
- return xpath != null ? new SubscriptionFilter.XPathDefinition(xpath) : null;
- }
}
import static java.util.Objects.requireNonNull;
import java.net.URI;
+import java.time.Instant;
import javax.inject.Inject;
import javax.inject.Singleton;
+import org.eclipse.jdt.annotation.NonNullByDefault;
import org.opendaylight.netconf.databind.RequestException;
import org.opendaylight.restconf.notifications.mdsal.SubscriptionStateService;
import org.opendaylight.restconf.server.api.ServerRequest;
import org.opendaylight.restconf.server.spi.RpcImplementation;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.ModifySubscription;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.ModifySubscriptionInput;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.Subscription;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.ModifySubscriptionOutput;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.Uint32;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
*/
@Singleton
@Component(service = RpcImplementation.class)
+@NonNullByDefault
public final class ModifySubscriptionRpc extends RpcImplementation {
private static final NodeIdentifier SUBSCRIPTION_ID =
NodeIdentifier.create(QName.create(ModifySubscriptionInput.QNAME, "id").intern());
- private static final NodeIdentifier SUBSCRIPTION_STREAM_FILTER_NAME =
- NodeIdentifier.create(QName.create(ModifySubscriptionInput.QNAME, "stream-filter-name").intern());
private static final NodeIdentifier SUBSCRIPTION_TARGET =
NodeIdentifier.create(QName.create(ModifySubscriptionInput.QNAME, "target").intern());
private static final NodeIdentifier SUBSCRIPTION_STREAM_FILTER =
public void invoke(final ServerRequest<ContainerNode> request, final URI restconfURI, final OperationInput input) {
final var body = input.input();
final Uint32 id;
- final String streamFilterName;
try {
id = leaf(body, SUBSCRIPTION_ID, Uint32.class);
return;
}
- final var target = (DataContainerNode) body.childByArg(SUBSCRIPTION_TARGET);
- final var nodeBuilder = ImmutableNodes.newMapEntryBuilder();
- final var nodeTargetBuilder = ImmutableNodes.newChoiceBuilder().withNodeIdentifier(NodeIdentifier
- .create(SubscriptionUtil.QNAME_TARGET));
- final var nodeFilterBuilder = ImmutableNodes.newChoiceBuilder().withNodeIdentifier(NodeIdentifier
- .create(QName.create(Subscription.QNAME, "stream-filter")));
+ final var target = (ChoiceNode) body.childByArg(SUBSCRIPTION_TARGET);
- nodeBuilder.withNodeIdentifier(NodeIdentifierWithPredicates.of(Subscription.QNAME,
- SubscriptionUtil.QNAME_ID, id));
- nodeBuilder.withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_ID, id));
- if (target != null) {
- final var streamFilter = (DataContainerNode) target.childByArg(SUBSCRIPTION_STREAM_FILTER);
- streamFilterName = leaf(streamFilter, SUBSCRIPTION_STREAM_FILTER_NAME, String.class);
- // TODO: parse anydata filter, rfc6241? https://www.rfc-editor.org/rfc/rfc8650#name-filter-example
- // {@link StreamSubtreeFilter}.
- if (streamFilterName != null) {
-// try {
-// if (!mdsalService.exist(SubscriptionUtil.FILTERS.node(NodeIdentifierWithPredicates.of(
-// StreamFilter.QNAME, SubscriptionUtil.QNAME_STREAM_FILTER_NAME, streamFilterName))).get()) {
-// request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
-// "%s refers to an unknown stream filter", streamFilterName));
-// return;
-// }
-// } catch (InterruptedException | ExecutionException e) {
-// request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT, e));
-// return;
-// }
- nodeFilterBuilder.withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_STREAM_FILTER,
- streamFilterName));
- nodeTargetBuilder.withChild(nodeFilterBuilder.build());
- nodeBuilder.withChild(nodeTargetBuilder.build());
- }
+ if (target == null) {
+ request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
+ "No filter specified"));
+ return;
+ }
+ final var streamFilter = (ChoiceNode) target.childByArg(SUBSCRIPTION_STREAM_FILTER);
+ final var filter = streamFilter == null ? null : SubscriptionUtil.extractFilter(streamFilter);
+ if (filter == null) {
+ request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
+ "No filter specified"));
+ return;
}
-// final var node = nodeBuilder.build();
- request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.OPERATION_NOT_SUPPORTED,
- "Not implemented yet"));
+ streamRegistry.modifySubscription(request.transform(subscription -> {
+ try {
+ // FIXME: pass correct filter once we extract if from input
+ subscriptionStateService.subscriptionModified(Instant.now(), id, subscription.streamName(),
+ subscription.encoding(), null, null, null);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Could not send subscription modify notification", e);
+ }
-// FIXME: reconcile
-// mdsalService.mergeSubscription(SubscriptionUtil.SUBSCRIPTIONS.node(node.name()), node)
-// .addCallback(new FutureCallback<CommitInfo>() {
-// @Override
-// public void onSuccess(final CommitInfo result) {
-// request.completeWith(ImmutableNodes.newContainerBuilder()
-// .withNodeIdentifier(NodeIdentifier.create(ModifySubscriptionOutput.QNAME))
-// .build());
-// try {
-// final var subscription = mdsalService.read(SubscriptionUtil.SUBSCRIPTIONS.node(node.name()))
-// .get();
-// if (subscription.isEmpty()) {
-// LOG.warn("Could not send subscription modify notification: could not read stream name");
-// return;
-// }
-// final var target = (DataContainerNode) ((DataContainerNode) subscription.orElseThrow())
-// .childByArg(NodeIdentifier.create(SubscriptionUtil.QNAME_TARGET));
-// final var streamName = leaf(target, NodeIdentifier.create(SubscriptionUtil.QNAME_STREAM),
-// String.class);
-// final var encoding = leaf((DataContainerNode) subscription.orElseThrow(),
-// NodeIdentifier.create(SubscriptionUtil.QNAME_ENCODING), QName.class);
-// // TODO: pass correct filter once we extract if from input
-// subscriptionStateService.subscriptionModified(Instant.now(), id, streamName, encoding, null,
-// stopTime, null);
-// } catch (InterruptedException | ExecutionException e) {
-// LOG.warn("Could not send subscription modify notification", e);
-// }
-// }
-//
-// @Override
-// public void onFailure(final Throwable throwable) {
-// request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED,
-// // FIXME: why getCause()?
-// throwable.getCause()));
-// }
-// }, MoreExecutors.directExecutor());
+ return ImmutableNodes.newContainerBuilder()
+ .withNodeIdentifier(NodeIdentifier.create(ModifySubscriptionOutput.QNAME))
+ .build();
+ }), id, filter);
}
}
*/
package org.opendaylight.restconf.subscription;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.restconf.server.spi.RestconfStream.SubscriptionFilter;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionInput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.Filters;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.Streams;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.Subscriptions;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.filters.StreamFilter;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.stream.filter.elements.FilterSpec;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.stream.filter.elements.filter.spec.stream.subtree.filter.StreamSubtreeFilter;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.streams.Stream;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.Subscription;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.subscription.receivers.Receiver;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.AnydataNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
public final class SubscriptionUtil {
public static final YangInstanceIdentifier SUBSCRIPTIONS = YangInstanceIdentifier.of(
- YangInstanceIdentifier.NodeIdentifier.create(Subscriptions.QNAME),
- YangInstanceIdentifier.NodeIdentifier.create(Subscription.QNAME));
+ NodeIdentifier.create(Subscriptions.QNAME),
+ NodeIdentifier.create(Subscription.QNAME));
public static final YangInstanceIdentifier STREAMS = YangInstanceIdentifier.of(
- YangInstanceIdentifier.NodeIdentifier.create(Streams.QNAME),
- YangInstanceIdentifier.NodeIdentifier.create(Stream.QNAME));
+ NodeIdentifier.create(Streams.QNAME),
+ NodeIdentifier.create(Stream.QNAME));
public static final YangInstanceIdentifier FILTERS = YangInstanceIdentifier.of(
- YangInstanceIdentifier.NodeIdentifier.create(Filters.QNAME),
- YangInstanceIdentifier.NodeIdentifier.create(StreamFilter.QNAME));
+ NodeIdentifier.create(Filters.QNAME),
+ NodeIdentifier.create(StreamFilter.QNAME));
public static final QName QNAME_ID = QName.create(Subscription.QNAME, "id");
public static final QName QNAME_STREAM = QName.create(Subscription.QNAME, "stream");
public static final QName QNAME_STREAM_FILTER = QName.create(Subscription.QNAME, "stream-filter-name");
public static final QName QNAME_SENT_EVENT_RECORDS = QName.create(Receiver.QNAME, "sent-event-records");
public static final QName QNAME_EXCLUDED_EVENT_RECORDS = QName
.create(Receiver.QNAME, "excluded-event-records");
+ private static final NodeIdentifier SUBSCRIPTION_STREAM_FILTER_NAME =
+ NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stream-filter-name").intern());
private SubscriptionUtil() {
// hidden on purpose
}
+
+ static @Nullable SubscriptionFilter extractFilter(final ChoiceNode streamFilter) {
+ if (streamFilter.childByArg(SUBSCRIPTION_STREAM_FILTER_NAME) instanceof LeafNode<?> leafNode) {
+ if (leafNode.body() instanceof String filterName) {
+ return new SubscriptionFilter.Reference(filterName);
+ }
+ throw new IllegalArgumentException("Bad child " + leafNode.prettyTree());
+ }
+ final var filterSpec = (ChoiceNode) streamFilter.childByArg(new NodeIdentifier(FilterSpec.QNAME));
+ if (filterSpec == null) {
+ return null;
+ }
+ final var subtree = (AnydataNode<?>) filterSpec.childByArg(new NodeIdentifier(StreamSubtreeFilter.QNAME));
+ if (subtree != null) {
+ return new SubscriptionFilter.SubtreeDefinition(subtree);
+ }
+ if (filterSpec.childByArg(new NodeIdentifier(QName.create(FilterSpec.QNAME, "stream-xpath-filter")))
+ instanceof LeafNode<?> leafNode) {
+ if (leafNode.body() instanceof String xpath) {
+ return new SubscriptionFilter.XPathDefinition(xpath);
+ }
+ throw new IllegalArgumentException("Bad child " + leafNode.prettyTree());
+ }
+ return null;
+ }
}
import org.opendaylight.restconf.server.spi.RestconfStream;
import org.opendaylight.restconf.subscription.SubscriptionUtil;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.Subscriptions;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.stream.filter.elements.FilterSpec;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.stream.filter.elements.filter.spec.stream.subtree.filter.StreamSubtreeFilter;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.modifiable.target.stream.StreamFilter;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.Subscription;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.subscription.Receivers;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.subscription.receivers.Receiver;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.Uint32;
import org.opendaylight.yangtools.yang.common.Uint64;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
import org.osgi.service.component.annotations.Activate;
return new MdsalRestconfStreamSubscription<>(subscription, dataBroker);
}, MoreExecutors.directExecutor());
}
+
+ @Override
+ protected ListenableFuture<RestconfStream.Subscription> modifySubscriptionFilter(
+ final RestconfStream.Subscription subscription, final RestconfStream.SubscriptionFilter filter) {
+ final var id = subscription.id();
+
+ final DataContainerChild filterNode = switch (filter) {
+ case RestconfStream.SubscriptionFilter.Reference(var filterName) ->
+ ImmutableNodes.leafNode(SubscriptionUtil.QNAME_STREAM_FILTER, filterName);
+ case RestconfStream.SubscriptionFilter.SubtreeDefinition(var anydata) ->
+ ImmutableNodes.newChoiceBuilder()
+ .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(FilterSpec.QNAME))
+ .withChild(ImmutableNodes.leafNode(StreamSubtreeFilter.QNAME, anydata))
+ .build();
+ case RestconfStream.SubscriptionFilter.XPathDefinition(final var xpath) ->
+ ImmutableNodes.newChoiceBuilder()
+ .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(FilterSpec.QNAME))
+ .withChild(ImmutableNodes.leafNode(QName.create(FilterSpec.QNAME, "stream-xpath-filter"), xpath))
+ .build();
+ };
+
+ final var tx = dataBroker.newWriteOnlyTransaction();
+ final var nodeId = NodeIdentifierWithPredicates.of(Subscription.QNAME, SubscriptionUtil.QNAME_ID, id);
+ tx.merge(LogicalDatastoreType.OPERATIONAL, SubscriptionUtil.SUBSCRIPTIONS.node(nodeId),
+ ImmutableNodes.newMapEntryBuilder()
+ .withNodeIdentifier(nodeId)
+ .withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_ID, id))
+ .withChild(ImmutableNodes.newChoiceBuilder()
+ .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SubscriptionUtil.QNAME_TARGET))
+ .withChild(ImmutableNodes.newChoiceBuilder()
+ .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(StreamFilter.QNAME))
+ .withChild(filterNode)
+ .build())
+ .build())
+ .build());
+ return tx.commit().transform(info -> {
+ LOG.debug("Modified subscription {} to operational datastore as of {}", id, info);
+ return new MdsalRestconfStreamSubscription<>(subscription, dataBroker);
+ }, MoreExecutors.directExecutor());
+ }
}
}
@Override
- protected void terminateImpl(final ServerRequest<Empty> request,final QName terminationReason) {
+ protected void terminateImpl(final ServerRequest<Empty> request, final QName terminationReason) {
final var id = id();
LOG.debug("{} terminated with reason {}", id, terminationReason);
}, MoreExecutors.directExecutor());
}
+ @Override
+ public void modifySubscription(final ServerRequest<Subscription> request, final Uint32 id,
+ final SubscriptionFilter filter) {
+ final var oldSubscription = lookupSubscription(id);
+ if (oldSubscription == null) {
+ request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
+ "There is no subscription with given ID."));
+ return;
+ }
+
+ final EventStreamFilter filterImpl;
+ try {
+ filterImpl = resolveFilter(filter);
+ } catch (RequestException e) {
+ request.completeWith(e);
+ return;
+ }
+ final var newSubscription = new SubscriptionImpl(id, oldSubscription.encoding(), oldSubscription.streamName(),
+ oldSubscription.receiverName(), filterImpl);
+
+ Futures.addCallback(modifySubscriptionFilter(newSubscription, filter), new FutureCallback<>() {
+ @Override
+ public void onSuccess(final Subscription result) {
+ subscriptions.put(id, result);
+ request.completeWith(result);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ request.completeWith(new RequestException(cause));
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
@NonNullByDefault
protected abstract ListenableFuture<Subscription> createSubscription(Subscription subscription);
+ @NonNullByDefault
+ protected abstract ListenableFuture<Subscription> modifySubscriptionFilter(Subscription subscription,
+ SubscriptionFilter filter);
+
private @Nullable EventStreamFilter resolveFilter(final @Nullable SubscriptionFilter filter)
throws RequestException {
return switch (filter) {
void establishSubscription(ServerRequest<Subscription> request, String streamName, QName encoding,
@Nullable SubscriptionFilter filter);
+ /**
+ * Modify existing RFC8639 subscription to a stream.
+ *
+ * @param request {@link ServerRequest} for this invocation
+ * @param id of subscription
+ * @param filter new filter
+ * @throws NullPointerException if {@code request}, {@code id} or {@code filter} is {@code null}
+ */
+ @NonNullByDefault
+ void modifySubscription(ServerRequest<Subscription> request, Uint32 id, SubscriptionFilter filter);
+
/**
* Lookup an existing subscription.
*