fb925ca94e7c1284c664dc42b5da808cb50a0abc
[netconf.git] /
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.subscription;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.net.URI;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import javax.inject.Inject;
18 import javax.inject.Singleton;
19 import org.opendaylight.mdsal.common.api.CommitInfo;
20 import org.opendaylight.restconf.notifications.mdsal.MdsalNotificationService;
21 import org.opendaylight.restconf.notifications.mdsal.SubscriptionStateService;
22 import org.opendaylight.restconf.server.api.ServerException;
23 import org.opendaylight.restconf.server.api.ServerRequest;
24 import org.opendaylight.restconf.server.spi.OperationInput;
25 import org.opendaylight.restconf.server.spi.RpcImplementation;
26 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.Encoding;
27 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscription;
28 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionInput;
29 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionOutput;
30 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.SubscriptionId;
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.filters.StreamFilter;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.streams.Stream;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.dynamic.Stream1Builder;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.modifiable.target.StreamBuilder;
35 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.modifiable.target.stream.stream.filter.ByReferenceBuilder;
36 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.Subscription;
37 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.SubscriptionBuilder;
38 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.subscription.Receivers;
39 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.subscription.receivers.Receiver;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
41 import org.opendaylight.yangtools.yang.common.ErrorTag;
42 import org.opendaylight.yangtools.yang.common.ErrorType;
43 import org.opendaylight.yangtools.yang.common.QName;
44 import org.opendaylight.yangtools.yang.common.Uint32;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
47 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
48 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
49 import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
50 import org.osgi.service.component.annotations.Activate;
51 import org.osgi.service.component.annotations.Component;
52 import org.osgi.service.component.annotations.Reference;
53
54 /**
55  * RESTCONF implementation of {@link EstablishSubscription}.
56  */
57 @Singleton
58 @Component(service = RpcImplementation.class)
59 public final class EstablishSubscriptionRpc extends RpcImplementation {
60     private static final NodeIdentifier SUBSCRIPTION_STREAM_FILTER_NAME =
61         NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stream-filter-name").intern());
62     private static final NodeIdentifier SUBSCRIPTION_STREAM =
63         NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stream").intern());
64     private static final NodeIdentifier SUBSCRIPTION_TARGET =
65             NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "target").intern());
66     private static final NodeIdentifier SUBSCRIPTION_STREAM_FILTER =
67         NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stream-filter").intern());
68     private static final NodeIdentifier SUBSCRIPTION_STOP_TIME =
69         NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "stop-time").intern());
70     private static final NodeIdentifier SUBSCRIPTION_ENCODING =
71         NodeIdentifier.create(QName.create(EstablishSubscriptionInput.QNAME, "encoding").intern());
72     private static final NodeIdentifier ESTABLISH_SUBSCRIPTION_OUTPUT =
73         NodeIdentifier.create(EstablishSubscriptionOutput.QNAME);
74     private static final NodeIdentifier OUTPUT_ID =
75         NodeIdentifier.create(QName.create(EstablishSubscriptionOutput.QNAME, "id").intern());
76
77     // As per https://www.rfc-editor.org/rfc/rfc8639.html#section-6
78     //
79     //    A best practice is to use the lower half of the "id"
80     //    object's integer space when that "id" is assigned by an external
81     //    entity (such as with a configured subscription).  This leaves the
82     //    upper half of the subscription integer space available to be
83     //    dynamically assigned by the publisher.
84     // FIXME: NETCONF-714: this should live in an actual service, not here, so that we can safely wrap without trampling
85     //                     on an existing subscription.
86     private final AtomicInteger subscriptionIdCounter = new AtomicInteger(Integer.MAX_VALUE);
87     private final MdsalNotificationService mdsalService;
88     private final SubscriptionStateService subscriptionStateService;
89     private final SubscriptionStateMachine stateMachine;
90
91     @Inject
92     @Activate
93     public EstablishSubscriptionRpc(@Reference final MdsalNotificationService mdsalService,
94             @Reference final SubscriptionStateService subscriptionStateService,
95             @Reference final SubscriptionStateMachine stateMachine) {
96         super(EstablishSubscription.QNAME);
97         this.mdsalService = requireNonNull(mdsalService);
98         this.subscriptionStateService = requireNonNull(subscriptionStateService);
99         this.stateMachine = requireNonNull(stateMachine);
100     }
101
102     @Override
103     public void invoke(final ServerRequest<ContainerNode> request, final URI restconfURI, final OperationInput input) {
104         final var session = request.session();
105         if (session == null) {
106             request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.OPERATION_NOT_SUPPORTED,
107                 "This end point does not support dynamic subscriptions."));
108             return;
109         }
110
111         final var body = input.input();
112         final var target = (DataContainerNode) body.childByArg(SUBSCRIPTION_TARGET);
113         if (target == null) {
114             // means there is no stream information present
115             request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
116                 "No stream specified"));
117             return;
118         }
119
120         final var id = Uint32.fromIntBits(subscriptionIdCounter.incrementAndGet());
121
122         final var subscriptionBuilder = new SubscriptionBuilder();
123         subscriptionBuilder.setId(new SubscriptionId(id));
124         final var streamBuilder = new StreamBuilder();
125
126         final var nodeBuilder = ImmutableNodes.newMapEntryBuilder()
127             .withNodeIdentifier(NodeIdentifierWithPredicates.of(Subscription.QNAME, SubscriptionUtil.QNAME_ID, id))
128             .withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_ID, id));
129         final var nodeTargetBuilder = ImmutableNodes.newChoiceBuilder().withNodeIdentifier(NodeIdentifier
130             .create(SubscriptionUtil.QNAME_TARGET));
131
132         final var principal = request.principal();
133         nodeBuilder.withChild(generateReceivers(principal == null ? "unknown" : principal.getName()));
134
135         // check stream name
136         final var streamName = leaf(target, SUBSCRIPTION_STREAM, String.class);
137         if (streamName == null) {
138             request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT,
139                 "No stream specified"));
140             return;
141         }
142         try {
143             if (!mdsalService.exist(SubscriptionUtil.STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME,
144                     SubscriptionUtil.QNAME_STREAM_NAME, streamName))).get()) {
145                 request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
146                     "%s refers to an unknown stream", streamName));
147                 return;
148             }
149         } catch (InterruptedException | ExecutionException e) {
150             request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT, e));
151             return;
152         }
153         final var stream1Builder = new Stream1Builder();
154         stream1Builder.setStream(streamName);
155         streamBuilder.addAugmentation(stream1Builder.build());
156         nodeTargetBuilder.withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_STREAM, streamName));
157
158         // check stream filter
159         final var streamFilter = (DataContainerNode) target.childByArg(SUBSCRIPTION_STREAM_FILTER);
160         if (streamFilter != null) {
161             final var streamFilterName = leaf(streamFilter, SUBSCRIPTION_STREAM_FILTER_NAME, String.class);
162             final var nodeFilterBuilder = ImmutableNodes.newChoiceBuilder().withNodeIdentifier(NodeIdentifier
163                 .create(StreamFilter.QNAME));
164             if (streamFilterName != null) {
165                 try {
166                     if (!mdsalService.exist(SubscriptionUtil.FILTERS.node(NodeIdentifierWithPredicates.of(
167                             StreamFilter.QNAME, SubscriptionUtil.QNAME_STREAM_FILTER_NAME, streamFilterName))).get()) {
168                         request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
169                             "%s refers to an unknown stream filter", streamFilterName));
170                         return;
171                     }
172                 } catch (InterruptedException | ExecutionException e) {
173                     request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT, e));
174                     return;
175                 }
176                 final var byReferenceBuilder = new ByReferenceBuilder();
177                 byReferenceBuilder.setStreamFilterName(streamFilterName);
178                 streamBuilder.setStreamFilter(byReferenceBuilder.build());
179                 nodeFilterBuilder.withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_STREAM_FILTER,
180                     streamFilterName));
181                 nodeTargetBuilder.withChild(nodeFilterBuilder.build());
182             }
183             //  TODO: parse anydata filter, rfc6241? https://www.rfc-editor.org/rfc/rfc8650#name-filter-example
184             //    {@link StreamSubtreeFilter}.
185         }
186         nodeBuilder.withChild(nodeTargetBuilder.build());
187
188         final DateAndTime stopTime;
189         try {
190             stopTime = leaf(body, SUBSCRIPTION_STOP_TIME, DateAndTime.class);
191         } catch (IllegalArgumentException e) {
192             request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT, e));
193             return;
194         }
195         if (stopTime != null) {
196             subscriptionBuilder.setStopTime(stopTime);
197             nodeBuilder.withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_STOP_TIME, stopTime));
198         }
199
200         final var encoding = leaf(body, SUBSCRIPTION_ENCODING, Encoding.class);
201         if (encoding != null) {
202             subscriptionBuilder.setEncoding(encoding);
203             nodeBuilder.withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_ENCODING, encoding));
204         }
205
206         final var subscription = new SubscriptionHolder(subscriptionBuilder.build(), mdsalService,
207             subscriptionStateService, stateMachine);
208         session.registerResource(subscription);
209         final var node = nodeBuilder.build();
210         stateMachine.registerSubscription(session, id);
211
212         mdsalService.writeSubscription(SubscriptionUtil.SUBSCRIPTIONS.node(node.name()), node)
213             .addCallback(new FutureCallback<CommitInfo>() {
214                 @Override
215                 public void onSuccess(final CommitInfo result) {
216                     stateMachine.moveTo(id, SubscriptionState.ACTIVE);
217                     request.completeWith(ImmutableNodes.newContainerBuilder()
218                         .withNodeIdentifier(ESTABLISH_SUBSCRIPTION_OUTPUT)
219                         .withChild(ImmutableNodes.leafNode(OUTPUT_ID, id))
220                         .build());
221                 }
222
223                 @Override
224                 public void onFailure(final Throwable throwable) {
225                     request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED,
226                         throwable.getCause().getMessage()));
227                 }
228             }, MoreExecutors.directExecutor());
229     }
230
231     private static ContainerNode generateReceivers(final String receiver) {
232         return ImmutableNodes.newContainerBuilder().withNodeIdentifier(NodeIdentifier
233             .create(Receivers.QNAME))
234                 .withChild(ImmutableNodes.newSystemMapBuilder()
235                         .withNodeIdentifier(NodeIdentifier.create(Receiver.QNAME))
236                     .withChild(ImmutableNodes.newMapEntryBuilder()
237                     .withNodeIdentifier(NodeIdentifierWithPredicates.of(Subscription.QNAME,
238                         SubscriptionUtil.QNAME_RECEIVER_NAME, receiver))
239                     .withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_RECEIVER_NAME, receiver))
240                     .withChild(ImmutableNodes.leafNode(SubscriptionUtil.QNAME_RECEIVER_STATE,
241                         Receiver.State.Active.getName()))
242                     .build())
243                 .build())
244             .build();
245     }
246 }