import org.opendaylight.restconf.server.api.ServerException;
import org.opendaylight.restconf.server.api.ServerRequest;
import org.opendaylight.restconf.server.spi.OperationInput;
+import org.opendaylight.restconf.server.spi.RestconfStream;
import org.opendaylight.restconf.server.spi.RpcImplementation;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.Encoding;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscription;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionOutput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.SubscriptionId;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.filters.StreamFilter;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.streams.Stream;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.dynamic.Stream1Builder;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.modifiable.target.StreamBuilder;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscription.policy.modifiable.target.stream.stream.filter.ByReferenceBuilder;
private final MdsalNotificationService mdsalService;
private final SubscriptionStateService subscriptionStateService;
private final SubscriptionStateMachine stateMachine;
+ private final RestconfStream.Registry streamRegistry;
@Inject
@Activate
public EstablishSubscriptionRpc(@Reference final MdsalNotificationService mdsalService,
@Reference final SubscriptionStateService subscriptionStateService,
- @Reference final SubscriptionStateMachine stateMachine) {
+ @Reference final SubscriptionStateMachine stateMachine,
+ @Reference final RestconfStream.Registry streamRegistry) {
super(EstablishSubscription.QNAME);
this.mdsalService = requireNonNull(mdsalService);
this.subscriptionStateService = requireNonNull(subscriptionStateService);
this.stateMachine = requireNonNull(stateMachine);
+ this.streamRegistry = requireNonNull(streamRegistry);
}
@Override
"No stream specified"));
return;
}
- try {
- if (!mdsalService.exist(SubscriptionUtil.STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME,
- SubscriptionUtil.QNAME_STREAM_NAME, streamName))).get()) {
- request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
- "%s refers to an unknown stream", streamName));
- return;
- }
- } catch (InterruptedException | ExecutionException e) {
- request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT, e));
+ if (streamRegistry.lookupStream(streamName) == null) {
+ request.completeWith(new ServerException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
+ "%s refers to an unknown stream", streamName));
return;
}
+
final var stream1Builder = new Stream1Builder();
stream1Builder.setStream(streamName);
streamBuilder.addAugmentation(stream1Builder.build());
import org.opendaylight.restconf.server.api.ServerException;
import org.opendaylight.restconf.server.api.ServerRequest;
import org.opendaylight.restconf.server.spi.OperationInput;
+import org.opendaylight.restconf.server.spi.RestconfStream;
import org.opendaylight.restconf.server.spi.RpcImplementation;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.ModifySubscription;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.ModifySubscriptionInput;
private final MdsalNotificationService mdsalService;
private final SubscriptionStateService subscriptionStateService;
private final SubscriptionStateMachine stateMachine;
+ private final RestconfStream.Registry streamRegistry;
@Inject
@Activate
public ModifySubscriptionRpc(@Reference final MdsalNotificationService mdsalService,
@Reference final SubscriptionStateService subscriptionStateService,
- @Reference final SubscriptionStateMachine stateMachine) {
+ @Reference final SubscriptionStateMachine stateMachine,
+ @Reference RestconfStream.Registry streamRegistry) {
super(ModifySubscription.QNAME);
this.mdsalService = requireNonNull(mdsalService);
this.subscriptionStateService = requireNonNull(subscriptionStateService);
this.stateMachine = requireNonNull(stateMachine);
+ this.streamRegistry = requireNonNull(streamRegistry);
}
@Override
.withNodeIdentifier(NodeIdentifier.create(ModifySubscriptionOutput.QNAME))
.build());
try {
- final var subscription = mdsalService.read(SubscriptionUtil.SUBSCRIPTIONS.node(node.name()))
- .get();
- if (subscription.isEmpty()) {
+ final var streamName = leaf(target, NodeIdentifier.create(SubscriptionUtil.QNAME_STREAM),
+ String.class);
+ if (streamRegistry.lookupStream(streamName) == null) {
LOG.warn("Could not send subscription modify notification: could not read stream name");
return;
}
- final var target = (DataContainerNode) ((DataContainerNode) subscription.orElseThrow())
- .childByArg(NodeIdentifier.create(SubscriptionUtil.QNAME_TARGET));
- final var streamName = leaf(target, NodeIdentifier.create(SubscriptionUtil.QNAME_STREAM),
- String.class);
+
subscriptionStateService.subscriptionModified(Instant.now(), id, streamName, "uri", null);
- } catch (InterruptedException | ExecutionException e) {
+ } catch (InterruptedException e) {
LOG.warn("Could not send subscription modify notification", e);
}
}
import org.opendaylight.restconf.server.api.TransportSession;
import org.opendaylight.restconf.server.api.testlib.CompletingServerRequest;
import org.opendaylight.restconf.server.spi.OperationInput;
+import org.opendaylight.restconf.server.spi.RestconfStream;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionInput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionOutput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.filters.StreamFilter;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.streams.Stream;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.Subscription;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.subscription.receivers.Receiver;
import org.opendaylight.yangtools.util.concurrent.FluentFutures;
private TransportSession session;
@Mock
private SubscriptionStateMachine stateMachine;
+ @Mock
+ private RestconfStream.Registry streamRegistry;
+ @Mock
+ private RestconfStream restconfStream;
@Captor
private ArgumentCaptor<ServerException> response;
@BeforeEach
void before() {
final var mdsalService = new MdsalNotificationService(dataBroker);
- rpc = new EstablishSubscriptionRpc(mdsalService, subscriptionStateService, stateMachine);
+ rpc = new EstablishSubscriptionRpc(mdsalService, subscriptionStateService, stateMachine, streamRegistry);
}
@Test
.build();
doReturn(writeTx).when(dataBroker).newWriteOnlyTransaction();
- doReturn(readTx).when(dataBroker).newReadOnlyTransaction();
- doReturn(FluentFutures.immediateTrueFluentFuture()).when(readTx).exists(LogicalDatastoreType.OPERATIONAL,
- SubscriptionUtil.STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME,
- SubscriptionUtil.QNAME_STREAM_NAME, "NETCONF")));
+ doReturn(restconfStream).when(streamRegistry).lookupStream("NETCONF");
doReturn(CommitInfo.emptyFluentFuture()).when(writeTx).commit();
doReturn(session).when(request).session();
@Test
void establishSubscriptionWrongStreamTest() {
- doReturn(readTx).when(dataBroker).newReadOnlyTransaction();
- doReturn(FluentFutures.immediateFalseFluentFuture()).when(readTx).exists(LogicalDatastoreType.OPERATIONAL,
- SubscriptionUtil.STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME,
- SubscriptionUtil.QNAME_STREAM_NAME, "NETCONF")));
+ doReturn(null).when(streamRegistry).lookupStream("NETCONF");
doReturn(session).when(request).session();
rpc.invoke(request, RESTCONF_URI, new OperationInput(operationPath, getInput()));
.build())
.build();
doReturn(readTx).when(dataBroker).newReadOnlyTransaction();
- doReturn(FluentFutures.immediateTrueFluentFuture()).when(readTx).exists(LogicalDatastoreType.OPERATIONAL,
- SubscriptionUtil.STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME,
- SubscriptionUtil.QNAME_STREAM_NAME, "NETCONF")));
+ doReturn(restconfStream).when(streamRegistry).lookupStream("NETCONF");
doReturn(FluentFutures.immediateFalseFluentFuture()).when(readTx).exists(LogicalDatastoreType.OPERATIONAL,
SubscriptionUtil.FILTERS.node(NodeIdentifierWithPredicates.of(StreamFilter.QNAME,
SubscriptionUtil.QNAME_STREAM_FILTER_NAME, "filter")));
import org.opendaylight.restconf.server.api.TransportSession;
import org.opendaylight.restconf.server.api.testlib.CompletingServerRequest;
import org.opendaylight.restconf.server.spi.OperationInput;
+import org.opendaylight.restconf.server.spi.RestconfStream;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.EstablishSubscriptionInput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.ModifySubscriptionOutput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.subscribed.notifications.rev190909.subscriptions.Subscription;
private TransportSession session;
@Mock
private SubscriptionStateMachine stateMachine;
+ @Mock
+ private RestconfStream.Registry streamRegistry;
@Captor
private ArgumentCaptor<ServerException> responseCaptor;
@BeforeEach
void before() {
mdsalService = new MdsalNotificationService(dataBroker);
- rpc = new ModifySubscriptionRpc(mdsalService, subscriptionStateService, stateMachine);
+ rpc = new ModifySubscriptionRpc(mdsalService, subscriptionStateService, stateMachine, streamRegistry);
}
@Test