import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
import org.eclipse.jdt.annotation.NonNull;
/**
return false;
}
- public final void addCallback(final RestconfCallback<? super V> callback) {
+ /**
+ * Add a callback to invoke when this future completes, or immediately if it is already complete.
+ *
+ * @param callback Callback to invoke
+ * @return This future
+ * @throws NullPointerException if {@code callback} is {@code null}
+ */
+ public final @NonNull RestconfFuture<V> addCallback(final RestconfCallback<? super V> callback) {
Futures.addCallback(this, callback, MoreExecutors.directExecutor());
+ return this;
+ }
+
+ /**
+ * Transform the result of this future using the specified function.
+ *
+ * @param <T> Resulting type
+ * @param function Function to apply
+ * @return Transformed future
+ * @throws NullPointerException if {@code function} is {@code null}
+ */
+ public final <T> @NonNull RestconfFuture<T> transform(final Function<@NonNull V, @NonNull T> function) {
+ final var fun = requireNonNull(function);
+ final var ret = new RestconfFuture<T>();
+ addCallback(new RestconfCallback<>() {
+ @Override
+ public void onSuccess(final V result) {
+ ret.set(requireNonNull(fun.apply(result)));
+ }
+
+ @Override
+ protected void onFailure(final RestconfDocumentedException failure) {
+ ret.setException(failure);
+ }
+ });
+ return ret;
}
/**
if (mountPoint == null) {
// Hacked-up integration of streams
if (CreateDataChangeEventSubscription.QNAME.equals(type)) {
- return listenersBroker.createDataChangeNotifiStream(databindProvider, input,
+ return listenersBroker.createDataChangeNotifiStream(databindProvider, uriInfo, input,
localDatabind.modelContext());
} else if (CreateNotificationStream.QNAME.equals(type)) {
- return listenersBroker.createNotificationStream(databindProvider, input, localDatabind.modelContext());
+ return listenersBroker.createNotificationStream(databindProvider, uriInfo, input,
+ localDatabind.modelContext());
} else if (SubscribeDeviceNotification.QNAME.equals(type)) {
- return listenersBroker.createDeviceNotificationStream(input,
- listenersBroker.prepareUriByStreamName(uriInfo, "").toString(), mountPointService);
+ return listenersBroker.createDeviceNotificationStream(uriInfo, input, localDatabind.modelContext(),
+ mountPointService);
}
}
import static java.util.Objects.requireNonNull;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
-import java.net.URI;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.opendaylight.mdsal.dom.api.DOMSchemaService;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.common.errors.RestconfFuture;
+import org.opendaylight.restconf.common.errors.SettableRestconfFuture;
import org.opendaylight.restconf.nb.rfc8040.URLConstants;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
+import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.RestconfState;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.Streams;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
}
@Override
- public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+ public String baseStreamLocation(final UriInfo uriInfo) {
return uriInfo.getBaseUriBuilder()
- .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
- .build();
+ .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH)
+ .build()
+ .toString();
}
}
}
@Override
- public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+ public String baseStreamLocation(final UriInfo uriInfo) {
final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
// Secured HTTP goes to Secured WebSockets
case "https" -> "wss";
return uriInfo.getBaseUriBuilder()
.scheme(scheme)
- .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
- .build();
+ .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH)
+ .build()
+ .toString();
}
}
@NonNull T createStream(@NonNull String name);
}
- /**
- * Holder of all handlers for notifications.
- */
- // FIXME: why do we even need this class?!
- private record HandlersHolder(
- @NonNull DOMDataBroker dataBroker,
- @NonNull DOMNotificationService notificationService,
- @NonNull DatabindProvider databindProvider) {
-
- HandlersHolder {
- requireNonNull(dataBroker);
- requireNonNull(notificationService);
- requireNonNull(databindProvider);
- }
- }
-
-// private static final QName LOCATION_QNAME = QName.create(Notifi.QNAME, "location").intern();
-// private static final NodeIdentifier LOCATION_NODEID = NodeIdentifier.create(LOCATION_QNAME);
-//
-// private final ListenersBroker listenersBroker;
-// private final HandlersHolder handlersHolder;
-//
-// // FIXME: NETCONF:1102: do not instantiate this service
-// new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
-// listenersBroker),
-//
-// /**
-// * Initialize holder of handlers with holders as parameters.
-// *
-// * @param dataBroker {@link DOMDataBroker}
-// * @param notificationService {@link DOMNotificationService}
-// * @param databindProvider a {@link DatabindProvider}
-// * @param listenersBroker a {@link ListenersBroker}
-// */
-// public RestconfStreamsSubscriptionServiceImpl(final DOMDataBroker dataBroker,
-// final DOMNotificationService notificationService, final DatabindProvider databindProvider,
-// final ListenersBroker listenersBroker) {
-// handlersHolder = new HandlersHolder(dataBroker, notificationService, databindProvider);
-// this.listenersBroker = requireNonNull(listenersBroker);
-// }
-//
-// @Override
-// public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
-// final var params = QueryParams.newReceiveEventsParams(uriInfo);
-//
-// final URI location;
-// if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
-// location = listenersBroker.subscribeToDataStream(identifier, uriInfo, params, handlersHolder);
-// } else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
-// location = listenersBroker.subscribeToYangStream(identifier, uriInfo, params, handlersHolder);
-// } else {
-// final String msg = "Bad type of notification of sal-remote";
-// LOG.warn(msg);
-// throw new RestconfDocumentedException(msg);
-// }
-//
-// return Response.ok()
-// .location(location)
-// .entity(new NormalizedNodePayload(
-// Inference.ofDataTreePath(handlersHolder.databindProvider().currentContext().modelContext(),
-// Notifi.QNAME, LOCATION_QNAME),
-// ImmutableNodes.leafNode(LOCATION_NODEID, location.toString())))
-// .build();
-// }
-
private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
private static final YangInstanceIdentifier RESTCONF_STATE_STREAMS = YangInstanceIdentifier.of(
NodeIdentifier.create(RestconfState.QNAME),
NodeIdentifier.create(Streams.QNAME),
NodeIdentifier.create(Stream.QNAME));
- private static final QName DATASTORE_QNAME =
- QName.create(CreateDataChangeEventSubscriptionInput1.QNAME, "datastore").intern();
- private static final QName OUTPUT_TYPE_QNAME =
- QName.create(NotificationOutputTypeGrouping.QNAME, "notification-output-type").intern();
- private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
- QName.create(SubscribeDeviceNotificationInput.QNAME, "path").intern();
- private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
- QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
- private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
- private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
+ @VisibleForTesting
+ static final QName NAME_QNAME = QName.create(Stream.QNAME, "name").intern();
+ @VisibleForTesting
+ static final QName DESCRIPTION_QNAME = QName.create(Stream.QNAME, "description").intern();
+ @VisibleForTesting
+ static final QName ENCODING_QNAME = QName.create(Stream.QNAME, "encoding").intern();
+ @VisibleForTesting
+ static final QName LOCATION_QNAME = QName.create(Stream.QNAME, "location").intern();
+
+ private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(
+ QName.create(CreateDataChangeEventSubscriptionInput1.QNAME, "datastore").intern());
+ @Deprecated(forRemoval = true)
+ private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(
+ QName.create(NotificationOutputTypeGrouping.QNAME, "notification-output-type").intern());
private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
- NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
+ NodeIdentifier.create(QName.create(SubscribeDeviceNotificationInput.QNAME, "path").intern());
+ private static final NodeIdentifier DEVICE_NOTIFICATION_STREAM_PATH_NODEID =
+ NodeIdentifier.create(QName.create(SubscribeDeviceNotificationInput.QNAME, "stream-path").intern());
+
private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
private static final NodeIdentifier NOTIFICATIONS =
/**
* Create a {@link RestconfStream} with a unique name. This method will atomically generate a stream name, create
- * the corresponding instance and register it
+ * the corresponding instance and register it.
*
* @param <T> Stream type
+ * @param baseStreamLocation base streams location
* @param factory Factory for creating the actual stream instance
* @return A {@link RestconfStream} instance
* @throws NullPointerException if {@code factory} is {@code null}
*/
- public final <T extends RestconfStream<?>> @NonNull T createStream(final StreamFactory<T> factory) {
+ final <T extends RestconfStream<?>> @NonNull RestconfFuture<T> createStream(final String description,
+ final String baseStreamLocation, final StreamFactory<T> factory) {
String name;
T stream;
do {
stream = factory.createStream(name);
} while (streams.putIfAbsent(name, stream) != null);
- return stream;
+ // final captures for use with FutureCallback
+ final var streamName = name;
+ final var finalStream = stream;
+
+ // Now issue a put operation
+ final var ret = new SettableRestconfFuture<T>();
+ final var tx = dataBroker.newWriteOnlyTransaction();
+
+ tx.put(LogicalDatastoreType.OPERATIONAL, restconfStateStreamPath(streamName),
+ streamEntry(streamName, description, baseStreamLocation + '/' + streamName, ""));
+ tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+ @Override
+ public void onSuccess(final CommitInfo result) {
+ LOG.debug("Stream {} added", streamName);
+ ret.set(finalStream);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ LOG.debug("Failed to add stream {}", streamName, cause);
+ streams.remove(streamName, finalStream);
+ ret.setFailure(new RestconfDocumentedException("Failed to allocate stream " + streamName, cause));
+ }
+ }, MoreExecutors.directExecutor());
+ return ret;
}
/**
}
private static @NonNull YangInstanceIdentifier restconfStateStreamPath(final String streamName) {
- return RESTCONF_STATE_STREAMS
- .node(NodeIdentifierWithPredicates.of(Stream.QNAME, RestconfStateStreams.NAME_QNAME, streamName));
+ return RESTCONF_STATE_STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, streamName));
}
/**
- * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
- * and optionally {@link URLConstants#BASE_PATH} prefix.
- *
- * @param uri URI for creation of stream name.
- * @return String representation of stream name.
- */
-// private static String createStreamNameFromUri(final String uri) {
-// String result = requireNonNull(uri);
-// while (true) {
-// if (result.startsWith(URLConstants.BASE_PATH)) {
-// result = result.substring(URLConstants.BASE_PATH.length());
-// } else if (result.startsWith("/")) {
-// result = result.substring(1);
-// } else {
-// break;
-// }
-// }
-// if (result.endsWith("/")) {
-// result = result.substring(0, result.length() - 1);
-// }
-// return result;
-// }
-
- /**
- * Prepare URL from base name and stream name.
+ * Return the base location URL of the streams service based on request URI.
*
- * @param uriInfo base URL information
- * @param streamName name of stream for create
- * @return final URL
+ * @param uriInfo request URL information
+ * @return location URL
*/
- public abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
-
- // FIXME: callers are utter duplicates, refactor them
-// private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
-// // FIXME: use put() here
-// tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
-// mapToStreams);
-// }
-//
-// private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
-// try {
-// readWriteTransaction.commit().get();
-// } catch (final InterruptedException | ExecutionException e) {
-// throw new RestconfDocumentedException("Problem while putting data to DS.", e);
-// }
-// }
+ public abstract @NonNull String baseStreamLocation(UriInfo uriInfo);
/**
* Create data-change-event stream with POST operation via RPC.
* "input": {
* "path": "/toaster:toaster/toaster:toasterStatus",
* "sal-remote-augment:datastore": "OPERATIONAL",
- * "sal-remote-augment:scope": "ONE"
* }
* }
* }
*/
// FIXME: this really should be a normal RPC implementation
public final RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(
- final DatabindProvider databindProvider, final ContainerNode input,
+ final DatabindProvider databindProvider, final UriInfo uriInfo, final ContainerNode input,
final EffectiveModelContext modelContext) {
final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
final var datastore = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
: LogicalDatastoreType.CONFIGURATION;
final var path = preparePath(input);
+
final var outputType = prepareOutputType(input);
- final var adapter = createStream(name -> new DataTreeChangeStream(this, name, outputType, databindProvider,
- datastore, path));
-
- // building of output
- return RestconfFuture.of(Optional.of(Builders.containerBuilder()
- .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
- .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.name()))
- .build()));
+ return createStream(
+ "Events occuring in " + datastore + " datastore under /" + IdentifierCodec.serialize(path, modelContext),
+ baseStreamLocation(uriInfo),
+ name -> new DataTreeChangeStream(this, name, outputType, databindProvider, datastore, path))
+ .transform(stream -> Optional.of(Builders.containerBuilder()
+ .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
+ .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name()))
+ .build()));
}
+// FIXME: NETCONF-1102: this part needs to be invoked from subscriber
// /**
// * Register listener by streamName in identifier to listen to data change notifications, and put or delete
// * information about listener to DS according to ietf-restconf-monitoring.
// }
//
// listener.setQueryParams(notificationQueryParams);
-//
-// final var dataBroker = handlersHolder.dataBroker();
-// final var schemaHandler = handlersHolder.databindProvider();
-// listener.setCloseVars(schemaHandler);
// listener.listen(dataBroker);
-//
-// final var uri = prepareUriByStreamName(uriInfo, streamName);
-// final var schemaContext = schemaHandler.currentContext().modelContext();
-// final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
-//
-// final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
-// listener.getOutputType(), uri, schemaContext, serializedPath);
-// final var writeTransaction = dataBroker.newWriteOnlyTransaction();
-// writeDataToDS(writeTransaction, mapToStreams);
-// submitData(writeTransaction);
-// return uri;
// }
// FIXME: this really should be a normal RPC implementation
public final RestconfFuture<Optional<ContainerNode>> createNotificationStream(
- final DatabindProvider databindProvider, final ContainerNode input,
+ final DatabindProvider databindProvider, final UriInfo uriInfo, final ContainerNode input,
final EffectiveModelContext modelContext) {
final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
.map(LeafSetEntryNode::body)
.sorted()
.collect(ImmutableSet.toImmutableSet());
+ final var description = new StringBuilder("YANG notifications matching any of {");
+ var haveFirst = false;
for (var qname : qnames) {
- if (modelContext.findNotification(qname).isEmpty()) {
- throw new RestconfDocumentedException(qname + " refers to an unknown notification",
+ final var module = modelContext.findModuleStatement(qname.getModule())
+ .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
+ ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
+ final var stmt = module.findSchemaTreeNode(qname)
+ .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown notification",
+ ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
+ if (!(stmt instanceof NotificationEffectiveStatement)) {
+ throw new RestconfDocumentedException(qname + " refers to a non-notification",
ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
}
- }
-// FIXME: use this block to create a stream description
-// final var module = refSchemaCtx.findModuleStatement(qname.getModule())
-// .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
-// ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
-// final var stmt = module.findSchemaTreeNode(qname)
-// .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
-// ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
-// if (!(stmt instanceof NotificationEffectiveStatement)) {
-// throw new RestconfDocumentedException(qname + " refers to a non-notification",
-// ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
-// }
-//
-// if (haveFirst) {
-// sb.append(',');
-// } else {
-// haveFirst = true;
-// }
-// sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
+ if (haveFirst) {
+ description.append(",\n");
+ } else {
+ haveFirst = true;
+ }
+ description.append("\n ")
+ .append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
+ }
+ description.append("\n}");
// registration of the listener
final var outputType = prepareOutputType(input);
- final var adapter = createStream(name -> new NotificationStream(this, name, outputType,
- databindProvider, qnames));
-
- return RestconfFuture.of(Optional.of(Builders.containerBuilder()
- .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
- .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.name()))
- .build()));
+ return createStream(description.toString(), baseStreamLocation(uriInfo),
+ name -> new NotificationStream(this, name, outputType, databindProvider, qnames))
+ .transform(stream -> Optional.of(Builders.containerBuilder()
+ .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
+ .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name()))
+ .build()));
}
- /**
- * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
- * about listener to DS according to ietf-restconf-monitoring.
- *
- * @param identifier Name of the stream.
- * @param uriInfo URI information.
- * @param notificationQueryParams Query parameters of notification.
- * @param handlersHolder Holder of handlers for notifications.
- * @return Stream location for listening.
- */
-// public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
-// final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
-// final String streamName = createStreamNameFromUri(identifier);
-// if (isNullOrEmpty(streamName)) {
-// throw new RestconfDocumentedException("Stream name is empty", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
-// }
-//
-// final var notificationListenerAdapter = notificationListenerFor(streamName);
-// if (notificationListenerAdapter == null) {
-// throw new RestconfDocumentedException("Stream with name %s was not found".formatted(streamName),
-// ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
-// }
-//
-// final URI uri = prepareUriByStreamName(uriInfo, streamName);
-// notificationListenerAdapter.setQueryParams(notificationQueryParams);
-// notificationListenerAdapter.listen(handlersHolder.notificationService());
-// final DOMDataBroker dataBroker = handlersHolder.dataBroker();
-// notificationListenerAdapter.setCloseVars(handlersHolder.databindProvider());
-// final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
-// notificationListenerAdapter.qnames(), notificationListenerAdapter.getOutputType(), uri);
-//
-// // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
-// final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
-// writeDataToDS(writeTransaction, mapToStreams);
-// submitData(writeTransaction);
-// return uri;
-// }
-
/**
* Create device notification stream.
*
- * @param baseUrl base Url
* @param input RPC input
* @param mountPointService dom mount point service
* @return {@link DOMRpcResult} - Output of RPC - example in JSON
*/
// FIXME: this should be an RPC invocation
- public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationStream(final ContainerNode input,
- final String baseUrl, final DOMMountPointService mountPointService) {
+ public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationStream(final UriInfo uriInfo,
+ final ContainerNode input, final EffectiveModelContext modelContext,
+ final DOMMountPointService mountPointService) {
// parsing out of container with settings and path
// FIXME: ugly cast
final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
ErrorTag.OPERATION_FAILED);
}
-// FIXME: use this for description?
-// final String deviceName = listId.values().iterator().next().toString();
-
+ final var baseStreamsUri = baseStreamLocation(uriInfo);
final var outputType = prepareOutputType(input);
- final var notificationListenerAdapter = createStream(
+ return createStream(
+ "All YANG notifications occuring on mount point /" + IdentifierCodec.serialize(path, modelContext),
+ baseStreamsUri,
streamName -> new DeviceNotificationStream(this, streamName, outputType, mountModelContext,
- mountPointService, mountPoint.getIdentifier()));
- notificationListenerAdapter.listen(mountNotifService, notificationPaths);
-
- return RestconfFuture.of(Optional.of(Builders.containerBuilder()
- .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
- .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
- baseUrl + notificationListenerAdapter.name()))
- .build()));
+ mountPointService, mountPoint.getIdentifier()))
+ .transform(stream -> {
+ stream.listen(mountNotifService, notificationPaths);
+ return Optional.of(Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
+ .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH_NODEID,
+ baseStreamsUri + '/' + stream.name()))
+ .build());
+ });
}
/**
* @param data Container with stream settings (RPC create-stream).
* @return Parsed {@link NotificationOutputType}.
*/
+ @Deprecated(forRemoval = true)
private static NotificationOutputType prepareOutputType(final ContainerNode data) {
final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str
? str : null;
}
+
+ @VisibleForTesting
+ static @NonNull MapEntryNode streamEntry(final String name, final String description, final String location,
+ final String outputType) {
+ return Builders.mapEntryBuilder()
+ .withNodeIdentifier(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, name))
+ .withChild(ImmutableNodes.leafNode(NAME_QNAME, name))
+ .withChild(ImmutableNodes.leafNode(DESCRIPTION_QNAME, description))
+ .withChild(createAccessList(outputType, location))
+ .build();
+ }
+
+ private static MapNode createAccessList(final String outputType, final String location) {
+ return Builders.mapBuilder()
+ .withNodeIdentifier(new NodeIdentifier(Access.QNAME))
+ .withChild(Builders.mapEntryBuilder()
+ .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, ENCODING_QNAME, outputType))
+ .withChild(ImmutableNodes.leafNode(ENCODING_QNAME, outputType))
+ .withChild(ImmutableNodes.leafNode(LOCATION_QNAME, location))
+ .build())
+ .build();
+ }
}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.streams;
-
-import static org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.$YangModuleInfoImpl.qnameOf;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.net.URI;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.Stream;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.util.DataSchemaContext;
-import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
-import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-
-/**
- * Utilities for creating the content of {@code /ietf-restconf-monitoring:restconf-state/streams}.
- */
-public final class RestconfStateStreams {
- @VisibleForTesting
- static final QName DESCRIPTION_QNAME = qnameOf("description");
- @VisibleForTesting
- static final QName ENCODING_QNAME = qnameOf("encoding");
- @VisibleForTesting
- static final QName LOCATION_QNAME = qnameOf("location");
- static final QName NAME_QNAME = qnameOf("name");
-
- private RestconfStateStreams() {
- // Hidden on purpose
- }
-
- /**
- * Map data of YANG notification stream to a {@link MapEntryNode} according to {@code ietf-restconf-monitoring}.
- *
- * @param streamName stream name
- * @param qnames Notification QNames to listen on
- * @param outputType output type of notification
- * @param uri location of registered listener for sending data of notification
- * @return mapped data of notification - map entry node if parent exists,
- * container streams with list and map entry node if not
- */
- public static MapEntryNode notificationStreamEntry(final String streamName, final Set<QName> qnames,
- final String outputType, final URI uri) {
- return Builders.mapEntryBuilder()
- .withNodeIdentifier(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, streamName))
- .withChild(ImmutableNodes.leafNode(NAME_QNAME, streamName))
- .withChild(ImmutableNodes.leafNode(DESCRIPTION_QNAME, qnames.stream()
- .map(QName::toString)
- .collect(Collectors.joining(","))))
- .withChild(createAccessList(outputType, uri))
- .build();
- }
-
- /**
- * Map data of data change notification to normalized node according to ietf-restconf-monitoring.
- *
- * @param path path of data to listen on
- * @param outputType output type of notification
- * @param uri location of registered listener for sending data of notification
- * @param schemaContext schemaContext for parsing instance identifier to get schema node of data
- * @return mapped data of notification - map entry node if parent exists,
- * container streams with list and map entry node if not
- */
- public static MapEntryNode dataChangeStreamEntry(final YangInstanceIdentifier path,
- final String outputType, final URI uri, final EffectiveModelContext schemaContext,
- final String streamName) {
- final var streamEntry = Builders.mapEntryBuilder()
- .withNodeIdentifier(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, streamName))
- .withChild(ImmutableNodes.leafNode(NAME_QNAME, streamName));
-
- DataSchemaContextTree.from(schemaContext).findChild(path)
- .map(DataSchemaContext::dataSchemaNode)
- .flatMap(DataSchemaNode::getDescription)
- .ifPresent(desc -> streamEntry.withChild(ImmutableNodes.leafNode(DESCRIPTION_QNAME, desc)));
-
- return streamEntry
- .withChild(createAccessList(outputType, uri))
- .build();
- }
-
- private static MapNode createAccessList(final String outputType, final URI uriToWebsocketServer) {
- return Builders.mapBuilder()
- .withNodeIdentifier(new NodeIdentifier(Access.QNAME))
- .withChild(Builders.mapEntryBuilder()
- .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, ENCODING_QNAME, outputType))
- .withChild(ImmutableNodes.leafNode(ENCODING_QNAME, outputType))
- .withChild(ImmutableNodes.leafNode(LOCATION_QNAME, uriToWebsocketServer.toString()))
- .build())
- .build();
- }
-}
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import java.net.URI;
import java.util.UUID;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.ContainerLike;
@Mock
private DOMDataBroker dataBroker;
+ @Mock
+ private DOMDataTreeWriteTransaction tx;
+ @Mock
+ private UriInfo uriInfo;
+ @Mock
+ private UriBuilder uriBuilder;
+ @Captor
+ private ArgumentCaptor<String> uriCaptor;
+ @Captor
+ private ArgumentCaptor<YangInstanceIdentifier> pathCaptor;
+ @Captor
+ private ArgumentCaptor<NormalizedNode> dataCaptor;
private ListenersBroker listenersBroker;
private DatabindProvider databindProvider;
@Test
void createStreamTest() {
+ doReturn(tx).when(dataBroker).newWriteOnlyTransaction();
+ doNothing().when(tx).put(eq(LogicalDatastoreType.OPERATIONAL), pathCaptor.capture(), dataCaptor.capture());
+ doReturn(CommitInfo.emptyFluentFuture()).when(tx).commit();
+
+ doReturn(uriBuilder).when(uriInfo).getBaseUriBuilder();
+ doReturn(uriBuilder).when(uriBuilder).replacePath(uriCaptor.capture());
+ doAnswer(inv -> new URI(uriCaptor.getValue())).when(uriBuilder).build();
+
final var output = assertInstanceOf(ContainerNode.class, listenersBroker.createDataChangeNotifiStream(
- databindProvider, prepareDomPayload("create-data-change-event-subscription", "toaster", "path"),
+ databindProvider, uriInfo, prepareDomPayload("create-data-change-event-subscription", "toaster", "path"),
SCHEMA_CTX).getOrThrow().orElse(null));
assertEquals(new NodeIdentifier(CreateDataChangeEventSubscriptionOutput.QNAME), output.name());
assertEquals(45, name.length());
assertThat(name, startsWith("urn:uuid:"));
assertNotNull(UUID.fromString(name.substring(9)));
+
+ final var rcStream = QName.create("urn:ietf:params:xml:ns:yang:ietf-restconf-monitoring", "2017-01-26",
+ "stream");
+ final var rcName = QName.create(rcStream, "name");
+ final var streamId = NodeIdentifierWithPredicates.of(rcStream, rcName, name);
+ final var rcEncoding = QName.create(rcStream, "encoding");
+
+ assertEquals(YangInstanceIdentifier.of(
+ new NodeIdentifier(QName.create(rcStream, "restconf-state")),
+ new NodeIdentifier(QName.create(rcStream, "streams")),
+ new NodeIdentifier(rcStream),
+ streamId), pathCaptor.getValue());
+ assertEquals(Builders.mapEntryBuilder()
+ .withNodeIdentifier(streamId)
+ .withChild(ImmutableNodes.leafNode(rcName, name))
+ .withChild(ImmutableNodes.leafNode(QName.create(rcStream, "description"),
+ "Events occuring in CONFIGURATION datastore under /toaster:toaster"))
+ .withChild(Builders.mapBuilder()
+ .withNodeIdentifier(new NodeIdentifier(Access.QNAME))
+ .withChild(Builders.mapEntryBuilder()
+ .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, rcEncoding, ""))
+ .withChild(ImmutableNodes.leafNode(rcEncoding, ""))
+ .withChild(ImmutableNodes.leafNode(QName.create(rcStream, "location"),
+ "rests/streams/" + name))
+ .build())
+ .build())
+ .build(), dataCaptor.getValue());
}
@Test
void createStreamWrongValueTest() {
final var payload = prepareDomPayload("create-data-change-event-subscription", "String value", "path");
final var errors = assertThrows(RestconfDocumentedException.class,
- () -> listenersBroker.createDataChangeNotifiStream(databindProvider, payload, SCHEMA_CTX)).getErrors();
+ () -> listenersBroker.createDataChangeNotifiStream(databindProvider, uriInfo, payload, SCHEMA_CTX))
+ .getErrors();
assertEquals(1, errors.size());
final var error = errors.get(0);
assertEquals(ErrorType.APPLICATION, error.getErrorType());
void createStreamWrongInputRpcTest() {
final var payload = prepareDomPayload("create-data-change-event-subscription2", "toaster", "path2");
final var errors = assertThrows(RestconfDocumentedException.class,
- () -> listenersBroker.createDataChangeNotifiStream(databindProvider, payload, SCHEMA_CTX)).getErrors();
+ () -> listenersBroker.createDataChangeNotifiStream(databindProvider, uriInfo, payload, SCHEMA_CTX))
+ .getErrors();
assertEquals(1, errors.size());
final var error = errors.get(0);
assertEquals(ErrorType.APPLICATION, error.getErrorType());
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.net.URI;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.module.list.Module;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.module.list.module.Deviation;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.Revision;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
/**
* Unit tests for {@link RestconfStateStreams}.
*/
-public class RestconfStateStreamsTest {
+class RestconfStateStreamsTest {
private static final Logger LOG = LoggerFactory.getLogger(RestconfStateStreamsTest.class);
- private static EffectiveModelContext schemaContext;
- private static EffectiveModelContext schemaContextMonitoring;
-
- @BeforeClass
- public static void loadTestSchemaContextAndModules() throws Exception {
- // FIXME: assemble these from dependencies
- schemaContext = YangParserTestUtils.parseYangResourceDirectory("/modules/restconf-module-testing");
- schemaContextMonitoring = YangParserTestUtils.parseYangResourceDirectory("/modules");
- }
+ // FIXME: assemble these from dependencies
+ private static EffectiveModelContext schemaContext =
+ YangParserTestUtils.parseYangResourceDirectory("/modules/restconf-module-testing");
+ private static EffectiveModelContext schemaContextMonitoring =
+ YangParserTestUtils.parseYangResourceDirectory("/modules");
@Test
- public void toStreamEntryNodeTest() throws Exception {
- final YangInstanceIdentifier path = ParserIdentifier.toInstanceIdentifier(
+ void toStreamEntryNodeTest() throws Exception {
+ final var path = ParserIdentifier.toInstanceIdentifier(
"nested-module:depth1-cont/depth2-leaf1", schemaContextMonitoring, null).getInstanceIdentifier();
- final String outputType = "XML";
- final URI uri = new URI("uri");
- final String streamName = "/nested-module:depth1-cont/depth2-leaf1";
+ final var outputType = "XML";
+ final var uri = "uri";
+ final var streamName = "/nested-module:depth1-cont/depth2-leaf1";
assertMappedData(prepareMap(streamName, uri, outputType),
- RestconfStateStreams.dataChangeStreamEntry(path, outputType, uri, schemaContextMonitoring, streamName));
+ ListenersBroker.streamEntry(streamName, "description", "location", outputType));
}
@Test
- public void toStreamEntryNodeNotifiTest() throws Exception {
- final String outputType = "JSON";
- final URI uri = new URI("uri");
-
- final var map = prepareMap("notifi", uri, outputType);
- map.put(RestconfStateStreams.DESCRIPTION_QNAME, "(urn:nested:module?revision=2014-06-03)notifi");
+ void toStreamEntryNodeNotifiTest() throws Exception {
+ final var outputType = "JSON";
+ final var uri = "uri";
- assertMappedData(map, RestconfStateStreams.notificationStreamEntry("notifi",
- Set.of(QName.create("urn:nested:module", "2014-06-03", "notifi")), outputType, uri));
+ assertMappedData(prepareMap("notifi", uri, outputType),
+ ListenersBroker.streamEntry("notifi", "description", "location", outputType));
}
- private static Map<QName, Object> prepareMap(final String name, final URI uri, final String outputType) {
- final var map = new HashMap<QName, Object>();
- map.put(RestconfStateStreams.NAME_QNAME, name);
- map.put(RestconfStateStreams.LOCATION_QNAME, uri.toString());
- map.put(RestconfStateStreams.ENCODING_QNAME, outputType);
- return map;
+ private static Map<QName, Object> prepareMap(final String name, final String uri, final String outputType) {
+ return Map.of(
+ ListenersBroker.NAME_QNAME, name,
+ ListenersBroker.LOCATION_QNAME, uri,
+ ListenersBroker.ENCODING_QNAME, outputType,
+ ListenersBroker.DESCRIPTION_QNAME, "description");
}
private static void assertMappedData(final Map<QName, Object> map, final MapEntryNode mappedData) {
* @param containerNode
* modules
*/
+ // FIXME: what is this supposed to verify?
private static void verifyDeviations(final ContainerNode containerNode) {
int deviationsFound = 0;
for (var child : containerNode.body()) {
* @param containerNode
* modules
*/
+ // FIXME: what is this supposed to verify?
private static void verifyLoadedModules(final ContainerNode containerNode) {
- final Map<String, String> loadedModules = new HashMap<>();
+ final var loadedModules = new HashMap<String, String>();
for (var child : containerNode.body()) {
if (child instanceof LeafNode) {
}
final var expectedModules = schemaContext.getModules();
- assertEquals("Number of loaded modules is not as expected", expectedModules.size(), loadedModules.size());
+ assertEquals(expectedModules.size(), loadedModules.size());
for (var m : expectedModules) {
final String name = m.getName();
final String revision = loadedModules.get(name);
assertNotNull("Expected module not found", revision);
- assertEquals("Incorrect revision of loaded module", Revision.ofNullable(revision), m.getRevision());
+ assertEquals(Revision.ofNullable(revision), m.getRevision());
loadedModules.remove(name);
}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.streams;
-
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertThrows;
-//import static org.mockito.ArgumentMatchers.any;
-//import static org.mockito.Mockito.doReturn;
-//import static org.mockito.Mockito.mock;
-//
-//import com.google.common.collect.ImmutableClassToInstanceMap;
-//import java.net.URI;
-//import javax.ws.rs.core.MultivaluedHashMap;
-//import javax.ws.rs.core.UriBuilder;
-//import javax.ws.rs.core.UriInfo;
-//import org.junit.Before;
-//import org.junit.Test;
-//import org.junit.runner.RunWith;
-//import org.mockito.Mock;
-//import org.mockito.junit.MockitoJUnitRunner;
-//import org.opendaylight.mdsal.common.api.CommitInfo;
-//import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-//import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-//import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
-//import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
-//import org.opendaylight.mdsal.dom.api.DOMNotificationService;
-//import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-//import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
-//import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
-//import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker;
-//import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
-//import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708
-// .CreateDataChangeEventSubscriptionInput1.Scope;
-//import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping
-// .NotificationOutputType;
-//import org.opendaylight.yangtools.concepts.ListenerRegistration;
-//import org.opendaylight.yangtools.yang.common.ErrorTag;
-//import org.opendaylight.yangtools.yang.common.ErrorType;
-
-//@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class RestconfStreamsSubscriptionServiceImplTest extends AbstractNotificationListenerTest {
-// private static final String URI = "/rests/data/ietf-restconf-monitoring:restconf-state/streams/stream/"
-// + "toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
-//
-// @Mock
-// private DOMDataBroker dataBroker;
-// @Mock
-// private UriInfo uriInfo;
-// @Mock
-// private DOMNotificationService notificationService;
-//
-// private final DatabindProvider databindProvider = () -> DatabindContext.ofModel(MODEL_CONTEXT);
-//
-// @Before
-// public void setUp() throws Exception {
-// final var wTx = mock(DOMDataTreeWriteTransaction.class);
-// doReturn(wTx).when(dataBroker).newWriteOnlyTransaction();
-// doReturn(CommitInfo.emptyFluentFuture()).when(wTx).commit();
-//
-// final var dataTreeChangeService = mock(DOMDataTreeChangeService.class);
-// doReturn(mock(ListenerRegistration.class)).when(dataTreeChangeService)
-// .registerDataTreeChangeListener(any(), any());
-//
-// doReturn(ImmutableClassToInstanceMap.of(DOMDataTreeChangeService.class, dataTreeChangeService))
-// .when(dataBroker).getExtensions();
-//
-// doReturn(new MultivaluedHashMap<>()).when(uriInfo).getQueryParameters();
-// doReturn(UriBuilder.fromUri("http://localhost:8181")).when(uriInfo).getBaseUriBuilder();
-// doReturn(new URI("http://127.0.0.1/" + URI)).when(uriInfo).getAbsolutePath();
-// }
-//
-// @Test
-// public void testSubscribeToStreamSSE() {
-// final var listenersBroker = new ListenersBroker.ServerSentEvents();
-// listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
-// IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
-// NotificationOutputType.XML);
-// final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-// notificationService, databindProvider,listenersBroker);
-// final var response = streamsSubscriptionService.subscribeToStream(
-// "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
-// assertEquals("http://localhost:8181/rests/streams"
-// + "/data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
-// response.getLocation().toString());
-// }
-//
-// @Test
-// public void testSubscribeToStreamWS() {
-// final var listenersBroker = new ListenersBroker.WebSockets();
-// listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
-// IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
-// NotificationOutputType.XML);
-// final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-// notificationService, databindProvider, listenersBroker);
-// final var response = streamsSubscriptionService.subscribeToStream(
-// "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
-// assertEquals("ws://localhost:8181/rests/streams"
-// + "/data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
-// response.getLocation().toString());
-// }
-//
-// @Test
-// public void testSubscribeToStreamMissingDatastoreInPath() {
-// final var listenersBroker = new ListenersBroker.WebSockets();
-// final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-// notificationService, databindProvider, listenersBroker);
-// final var errors = assertThrows(RestconfDocumentedException.class,
-// () -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/scope=ONE", uriInfo))
-// .getErrors();
-// assertEquals(1, errors.size());
-// final var error = errors.get(0);
-// assertEquals(ErrorType.APPLICATION, error.getErrorType());
-// assertEquals(ErrorTag.OPERATION_FAILED, error.getErrorTag());
-// assertEquals("Bad type of notification of sal-remote", error.getErrorMessage());
-// }
-//
-// @Test
-// public void testSubscribeToStreamMissingScopeInPath() {
-// final var listenersBroker = new ListenersBroker.WebSockets();
-// final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-// notificationService, databindProvider, listenersBroker);
-// final var errors = assertThrows(RestconfDocumentedException.class,
-// () -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/datastore=OPERATIONAL",
-// uriInfo)).getErrors();
-// assertEquals(1, errors.size());
-// final var error = errors.get(0);
-// assertEquals(ErrorType.APPLICATION, error.getErrorType());
-// assertEquals(ErrorTag.OPERATION_FAILED, error.getErrorTag());
-// assertEquals("Bad type of notification of sal-remote", error.getErrorMessage());
-// }
-}
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.common.QName;
@Mock
private DOMDataBroker dataBroker;
@Mock
+ private DOMDataTreeWriteTransaction tx;
+ @Mock
private DatabindProvider databindProvider;
private ListenersBroker listenersBroker;
@BeforeEach
void prepareListenersBroker() {
+ doReturn(tx).when(dataBroker).newWriteOnlyTransaction();
+ doReturn(CommitInfo.emptyFluentFuture()).when(tx).commit();
+
listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker);
webSocketFactory = new WebSocketFactory(execService, listenersBroker, 5000, 2000);
- streamName = listenersBroker.createStream(name -> new DataTreeChangeStream(listenersBroker, name,
- NotificationOutputType.JSON, databindProvider, LogicalDatastoreType.CONFIGURATION,
- YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))))
+ streamName = listenersBroker.createStream("description", "streams",
+ name -> new DataTreeChangeStream(listenersBroker, name, NotificationOutputType.JSON, databindProvider,
+ LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.of(
+ QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))))
+ .getOrThrow()
.name();
}