2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.subscription;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import javax.inject.Inject;
18 import javax.inject.Singleton;
19 import org.opendaylight.mdsal.common.api.CommitInfo;
20 import org.opendaylight.restconf.notifications.mdsal.MdsalNotificationService;
21 import org.opendaylight.restconf.notifications.mdsal.SubscriptionStateService;
22 import org.opendaylight.restconf.server.api.ServerException;
23 import org.opendaylight.restconf.server.api.ServerRequest;
24 import org.opendaylight.restconf.server.spi.OperationInput;
25 import org.opendaylight.restconf.server.spi.RpcImplementation;
26 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.Encoding;
27 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscription;
28 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionInput;
29 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionOutput;
30 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.SubscriptionId;
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.filters.StreamFilter;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.streams.Stream;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.dynamic.Stream1Builder;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.modifiable.target.StreamBuilder;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.modifiable.target.stream.stream.filter.ByReferenceBuilder;
36 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.Subscription;
37 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.SubscriptionBuilder;
38 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.subscription.Receivers;
39 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.subscription.receivers.Receiver;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
41 import org.opendaylight.yangtools.yang.common.ErrorTag;
42 import org.opendaylight.yangtools.yang.common.ErrorType;
43 import org.opendaylight.yangtools.yang.common.QName;
44 import org.opendaylight.yangtools.yang.common.Uint32;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
47 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
48 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
49 import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
50 import org.osgi.service.component.annotations.Activate;
51 import org.osgi.service.component.annotations.Component;
52 import org.osgi.service.component.annotations.Reference;
55 * RESTCONF implementation of {@link EstablishSubscription}.
58 @Component(service = RpcImplementation.class)
59 public final class EstablishSubscriptionRpc extends RpcImplementation {
60 private static final NodeIdentifier SUBSCRIPTION_STREAM_FILTER_NAME =
61 NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stream-filter-name").intern());
62 private static final NodeIdentifier SUBSCRIPTION_STREAM =
63 NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stream").intern());
64 private static final NodeIdentifier SUBSCRIPTION_TARGET =
65 NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "target").intern());
66 private static final NodeIdentifier SUBSCRIPTION_STREAM_FILTER =
67 NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stream-filter").intern());
68 private static final NodeIdentifier SUBSCRIPTION_STOP_TIME =
69 NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stop-time").intern());
70 private static final NodeIdentifier SUBSCRIPTION_ENCODING =
71 NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "encoding").intern());
72 private static final NodeIdentifier ESTABLISH_SUBSCRIPTION_OUTPUT =
73 NodeIdentifier.create(EstablishSubscriptionOutput.QNAME);
74 private static final NodeIdentifier OUTPUT_ID =
75 NodeIdentifier.create(QName.create(EstablishSubscriptionOutput.QNAME, "id").intern());
77 // As per https://www.rfc-editor.org/rfc/rfc8639.html#section-6
79 // A best practice is to use the lower half of the "id"
80 // object's integer space when that "id" is assigned by an external
81 // entity (such as with a configured subscription). This leaves the
82 // upper half of the subscription integer space available to be
83 // dynamically assigned by the publisher.
84 // FIXME: NETCONF-714: this should live in an actual service, not here, so that we can safely wrap without trampling
85 // on an existing subscription.
86 private final AtomicInteger subscriptionIdCounter = new AtomicInteger(Integer.MAX_VALUE);
87 private final MdsalNotificationService mdsalService;
88 private final SubscriptionStateService subscriptionStateService;
89 private final SubscriptionStateMachine stateMachine;
93 public EstablishSubscriptionRpc(@Reference final MdsalNotificationService mdsalService,
94 @Reference final SubscriptionStateService subscriptionStateService,
95 @Reference final SubscriptionStateMachine stateMachine) {
96 super(EstablishSubscription.QNAME);
97 this.mdsalService = requireNonNull(mdsalService);
98 this.subscriptionStateService = requireNonNull(subscriptionStateService);
99 this.stateMachine = requireNonNull(stateMachine);
103 public void invoke(final ServerRequest<ContainerNode> request, final URI restconfURI, final OperationInput input) {
104 final var session = request.session();
105 if (session == null) {
106 request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.OPERATION_NOT_SUPPORTED,
107 "This end point does not support dynamic subscriptions."));
111 final var body = input.input();
112 final var target = (DataContainerNode) body.childByArg(SUBSCRIPTION_TARGET);
113 if (target == null) {
114 // means there is no stream information present
115 request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
116 "No stream specified"));
120 final var id = Uint32.fromIntBits(subscriptionIdCounter.incrementAndGet());
122 final var subscriptionBuilder = new SubscriptionBuilder();
123 subscriptionBuilder.setId(new SubscriptionId(id));
124 final var streamBuilder = new StreamBuilder();
126 final var nodeBuilder = ImmutableNodes.newMapEntryBuilder()
127 .withNodeIdentifier(NodeIdentifierWithPredicates.of(Subscription.QNAME, SubscriptionUtil.QNAME_ID, id))
128 .withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_ID, id));
129 final var nodeTargetBuilder = ImmutableNodes.newChoiceBuilder().withNodeIdentifier(NodeIdentifier
130 .create(SubscriptionUtil.QNAME_TARGET));
132 final var principal = request.principal();
133 nodeBuilder.withChild(generateReceivers(principal == null ? "unknown" : principal.getName()));
136 final var streamName = leaf(target, SUBSCRIPTION_STREAM, String.class);
137 if (streamName == null) {
138 request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
139 "No stream specified"));
143 if (!mdsalService.exist(SubscriptionUtil.STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME,
144 SubscriptionUtil.QNAME_STREAM_NAME, streamName))).get()) {
145 request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
146 "%s refers to an unknown stream", streamName));
149 } catch (InterruptedException | ExecutionException e) {
150 request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT, e));
153 final var stream1Builder = new Stream1Builder();
154 stream1Builder.setStream(streamName);
155 streamBuilder.addAugmentation(stream1Builder.build());
156 nodeTargetBuilder.withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_STREAM, streamName));
158 // check stream filter
159 final var streamFilter = (DataContainerNode) target.childByArg(SUBSCRIPTION_STREAM_FILTER);
160 if (streamFilter != null) {
161 final var streamFilterName = leaf(streamFilter, SUBSCRIPTION_STREAM_FILTER_NAME, String.class);
162 final var nodeFilterBuilder = ImmutableNodes.newChoiceBuilder().withNodeIdentifier(NodeIdentifier
163 .create(StreamFilter.QNAME));
164 if (streamFilterName != null) {
166 if (!mdsalService.exist(SubscriptionUtil.FILTERS.node(NodeIdentifierWithPredicates.of(
167 StreamFilter.QNAME, SubscriptionUtil.QNAME_STREAM_FILTER_NAME, streamFilterName))).get()) {
168 request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
169 "%s refers to an unknown stream filter", streamFilterName));
172 } catch (InterruptedException | ExecutionException e) {
173 request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT, e));
176 final var byReferenceBuilder = new ByReferenceBuilder();
177 byReferenceBuilder.setStreamFilterName(streamFilterName);
178 streamBuilder.setStreamFilter(byReferenceBuilder.build());
179 nodeFilterBuilder.withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_STREAM_FILTER,
181 nodeTargetBuilder.withChild(nodeFilterBuilder.build());
183 // TODO: parse anydata filter, rfc6241? https://www.rfc-editor.org/rfc/rfc8650#name-filter-example
184 // {@link StreamSubtreeFilter}.
186 nodeBuilder.withChild(nodeTargetBuilder.build());
188 final DateAndTime stopTime;
190 stopTime = leaf(body, SUBSCRIPTION_STOP_TIME, DateAndTime.class);
191 } catch (IllegalArgumentException e) {
192 request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT, e));
195 if (stopTime != null) {
196 subscriptionBuilder.setStopTime(stopTime);
197 nodeBuilder.withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_STOP_TIME, stopTime));
200 final var encoding = leaf(body, SUBSCRIPTION_ENCODING, Encoding.class);
201 if (encoding != null) {
202 subscriptionBuilder.setEncoding(encoding);
203 nodeBuilder.withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_ENCODING, encoding));
206 final var subscription = new SubscriptionHolder(subscriptionBuilder.build(), mdsalService,
207 subscriptionStateService, stateMachine);
208 session.registerResource(subscription);
209 final var node = nodeBuilder.build();
210 stateMachine.registerSubscription(session, id);
212 mdsalService.writeSubscription(SubscriptionUtil.SUBSCRIPTIONS.node(node.name()), node)
213 .addCallback(new FutureCallback<CommitInfo>() {
215 public void onSuccess(final CommitInfo result) {
216 stateMachine.moveTo(id, SubscriptionState.ACTIVE);
217 request.completeWith(ImmutableNodes.newContainerBuilder()
218 .withNodeIdentifier(ESTABLISH_SUBSCRIPTION_OUTPUT)
219 .withChild(ImmutableNodes.leafNode(OUTPUT_ID, id))
224 public void onFailure(final Throwable throwable) {
225 request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED,
226 throwable.getCause().getMessage()));
228 }, MoreExecutors.directExecutor());
231 private static ContainerNode generateReceivers(final String receiver) {
232 return ImmutableNodes.newContainerBuilder().withNodeIdentifier(NodeIdentifier
233 .create(Receivers.QNAME))
234 .withChild(ImmutableNodes.newSystemMapBuilder()
235 .withNodeIdentifier(NodeIdentifier.create(Receiver.QNAME))
236 .withChild(ImmutableNodes.newMapEntryBuilder()
237 .withNodeIdentifier(NodeIdentifierWithPredicates.of(Subscription.QNAME,
238 SubscriptionUtil.QNAME_RECEIVER_NAME, receiver))
239 .withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_RECEIVER_NAME, receiver))
240 .withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_RECEIVER_STATE,
241 Receiver.State.Active.getName()))