description
"Revised version with the following changes:
- datastore parameter is documented to default to CONFIGURATION
- - scope parameter to create-data-change-event-subscription is obsolete";
+ - scope parameter to create-data-change-event-subscription is obsolete
+ - notification-output-type parameter is obsolete";
}
revision 2014-07-08 {
grouping notification-output-type-grouping {
leaf notification-output-type {
+ status obsolete;
description "Input parameter which type of output will be parsed on notification";
type enumeration {
enum JSON;
final ListenersBroker listenersBroker;
final HttpServlet streamServlet;
if (streamsConfiguration.useSSE()) {
- listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker);
+ listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker, notificationService, mountPointService);
streamServlet = servletSupport.createHttpServletBuilder(
new ServerSentEventsApplication(scheduledThreadPool, listenersBroker, streamsConfiguration))
.build();
} else {
- listenersBroker = new ListenersBroker.WebSockets(dataBroker);
+ listenersBroker = new ListenersBroker.WebSockets(dataBroker, notificationService, mountPointService);
streamServlet = new WebSocketInitializer(scheduledThreadPool, listenersBroker, streamsConfiguration);
}
*/
package org.opendaylight.restconf.nb.rfc8040;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.base.MoreObjects;
+import java.util.Map;
+import java.util.function.Function;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.restconf.api.query.ChangedLeafNodesOnlyParam;
import org.opendaylight.restconf.api.query.ChildNodesOnlyParam;
import org.opendaylight.restconf.api.query.FilterParam;
}
}
+ /**
+ * Return {@link ReceiveEventsParams} for specified query parameters.
+ * @param queryParameters Parameters and their values
+ * @return A {@link ReceiveEventsParams}
+ */
+ public static @NonNull ReceiveEventsParams ofQueryParameters(final Map<String, String> queryParameters) {
+ StartTimeParam startTime = null;
+ StopTimeParam stopTime = null;
+ FilterParam filter = null;
+ LeafNodesOnlyParam leafNodesOnly = null;
+ SkipNotificationDataParam skipNotificationData = null;
+ ChangedLeafNodesOnlyParam changedLeafNodesOnly = null;
+ ChildNodesOnlyParam childNodesOnly = null;
+
+ for (var entry : queryParameters.entrySet()) {
+ final var paramName = entry.getKey();
+ final var paramValue = entry.getValue();
+
+ switch (paramName) {
+ case FilterParam.uriName:
+ filter = optionalParam(FilterParam::forUriValue, paramName, paramValue);
+ break;
+ case StartTimeParam.uriName:
+ startTime = optionalParam(StartTimeParam::forUriValue, paramName, paramValue);
+ break;
+ case StopTimeParam.uriName:
+ stopTime = optionalParam(StopTimeParam::forUriValue, paramName, paramValue);
+ break;
+ case LeafNodesOnlyParam.uriName:
+ leafNodesOnly = optionalParam(LeafNodesOnlyParam::forUriValue, paramName, paramValue);
+ break;
+ case SkipNotificationDataParam.uriName:
+ skipNotificationData = optionalParam(SkipNotificationDataParam::forUriValue, paramName,
+ paramValue);
+ break;
+ case ChangedLeafNodesOnlyParam.uriName:
+ changedLeafNodesOnly = optionalParam(ChangedLeafNodesOnlyParam::forUriValue, paramName,
+ paramValue);
+ break;
+ case ChildNodesOnlyParam.uriName:
+ childNodesOnly = optionalParam(ChildNodesOnlyParam::forUriValue, paramName, paramValue);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid parameter: " + paramName);
+ }
+ }
+
+ return new ReceiveEventsParams(startTime, stopTime, filter, leafNodesOnly, skipNotificationData,
+ changedLeafNodesOnly, childNodesOnly);
+ }
+
@Override
public String toString() {
final var helper = MoreObjects.toStringHelper(this);
}
return helper.toString();
}
+
+ private static <T> @Nullable T optionalParam(final Function<String, @NonNull T> factory, final String name,
+ final String value) {
+ try {
+ return factory.apply(requireNonNull(value));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Invalid " + name + " value: " + value, e);
+ }
+ }
}
singletons = Set.of(
new RestconfDocumentedExceptionMapper(databindProvider),
new RestconfDataServiceImpl(databindProvider, server, actionService),
- new RestconfInvokeOperationsServiceImpl(databindProvider, server, mountPointService, listenersBroker),
+ new RestconfInvokeOperationsServiceImpl(databindProvider, server, listenersBroker),
new RestconfOperationsServiceImpl(databindProvider, server),
new RestconfSchemaServiceImpl(domSchemaService, mountPointService),
new RestconfImpl(databindProvider));
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
import javax.ws.rs.core.UriInfo;
import org.opendaylight.restconf.common.errors.RestconfError;
import org.opendaylight.restconf.nb.rfc8040.Insert;
import org.opendaylight.restconf.nb.rfc8040.ReadDataParams;
-import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext;
import org.opendaylight.restconf.nb.rfc8040.legacy.QueryParameters;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.NetconfFieldsTranslator;
// Utility class
}
- public static @NonNull ReceiveEventsParams newReceiveEventsParams(final UriInfo uriInfo) {
- StartTimeParam startTime = null;
- StopTimeParam stopTime = null;
- FilterParam filter = null;
- LeafNodesOnlyParam leafNodesOnly = null;
- SkipNotificationDataParam skipNotificationData = null;
- ChangedLeafNodesOnlyParam changedLeafNodesOnly = null;
- ChildNodesOnlyParam childNodesOnly = null;
-
- for (Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
- final String paramName = entry.getKey();
- final List<String> paramValues = entry.getValue();
-
- try {
- switch (paramName) {
- case FilterParam.uriName:
- filter = optionalParam(FilterParam::forUriValue, paramName, paramValues);
- break;
- case StartTimeParam.uriName:
- startTime = optionalParam(StartTimeParam::forUriValue, paramName, paramValues);
- break;
- case StopTimeParam.uriName:
- stopTime = optionalParam(StopTimeParam::forUriValue, paramName, paramValues);
- break;
- case LeafNodesOnlyParam.uriName:
- leafNodesOnly = optionalParam(LeafNodesOnlyParam::forUriValue, paramName, paramValues);
- break;
- case SkipNotificationDataParam.uriName:
- skipNotificationData = optionalParam(SkipNotificationDataParam::forUriValue, paramName,
- paramValues);
- break;
- case ChangedLeafNodesOnlyParam.uriName:
- changedLeafNodesOnly = optionalParam(ChangedLeafNodesOnlyParam::forUriValue, paramName,
- paramValues);
- break;
- case ChildNodesOnlyParam.uriName:
- childNodesOnly = optionalParam(ChildNodesOnlyParam::forUriValue, paramName, paramValues);
- break;
- default:
- throw unhandledParam("notification", paramName);
- }
- } catch (IllegalArgumentException e) {
- throw new RestconfDocumentedException("Invalid " + paramName + " value: " + e.getMessage(),
- ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE, e);
- }
- }
-
- try {
- return new ReceiveEventsParams(startTime, stopTime, filter, leafNodesOnly, skipNotificationData,
- changedLeafNodesOnly, childNodesOnly);
- } catch (IllegalArgumentException e) {
- throw new RestconfDocumentedException("Invalid query parameters: " + e.getMessage(), e);
- }
- }
-
public static QueryParameters newQueryParameters(final ReadDataParams params,
final InstanceIdentifierContext identifier) {
final var fields = params.fields();
WithDefaultsParam withDefaults = null;
PrettyPrintParam prettyPrint = null;
- for (Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
- final String paramName = entry.getKey();
- final List<String> paramValues = entry.getValue();
+ for (var entry : uriInfo.getQueryParameters().entrySet()) {
+ final var paramName = entry.getKey();
+ final var paramValues = entry.getValue();
try {
switch (paramName) {
import static java.util.Objects.requireNonNull;
+import com.google.common.collect.ImmutableMap;
+import java.io.UnsupportedEncodingException;
import java.util.concurrent.ScheduledExecutorService;
+import javax.ws.rs.BadRequestException;
import javax.ws.rs.GET;
+import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
-import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriInfo;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
+import javax.xml.xpath.XPathExpressionException;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
+import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
import org.slf4j.Logger;
* @param streamName path to target
*/
@GET
- @Path("/{streamName:.+}")
+ @Path("/{encodingName:[a-zA-Z]+}/{streamName:.+}")
@Produces(MediaType.SERVER_SENT_EVENTS)
- public void getSSE(@PathParam("streamName") final String streamName, @Context final SseEventSink sink,
- @Context final Sse sse) {
- final var listener = listenersBroker.getStream(streamName);
- if (listener == null) {
+ public void getSSE(@PathParam("encodingName") final EncodingName encodingName,
+ @PathParam("streamName") final String streamName, @Context final UriInfo uriInfo,
+ @Context final SseEventSink sink, @Context final Sse sse) {
+ final var stream = listenersBroker.getStream(streamName);
+ if (stream == null) {
LOG.debug("Listener for stream with name {} was not found.", streamName);
- throw new WebApplicationException("No such stream: " + streamName, Status.NOT_FOUND);
+ throw new NotFoundException("No such stream: " + streamName);
+ }
+
+ final var queryParameters = ImmutableMap.<String, String>builder();
+ for (var entry : uriInfo.getQueryParameters().entrySet()) {
+ final var values = entry.getValue();
+ switch (values.size()) {
+ case 0:
+ // No-op
+ break;
+ case 1:
+ queryParameters.put(entry.getKey(), values.get(0));
+ break;
+ default:
+ throw new BadRequestException(
+ "Parameter " + entry.getKey() + " can appear at most once in request URI");
+ }
+ }
+
+ final ReceiveEventsParams params;
+ try {
+ params = ReceiveEventsParams.ofQueryParameters(queryParameters.build());
+ } catch (IllegalArgumentException e) {
+ throw new BadRequestException(e.getMessage(), e);
}
LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName);
// FIXME: invert control here: we should call 'listener.addSession()', which in turn should call
// handler.init()/handler.close()
- final var handler = new SSESessionHandler(executorService, sink, sse, listener, maximumFragmentLength,
- heartbeatInterval);
- handler.init();
+ final var handler = new SSESessionHandler(executorService, sink, sse, stream, encodingName, params,
+ maximumFragmentLength, heartbeatInterval);
+
+ try {
+ handler.init();
+ } catch (UnsupportedEncodingException e) {
+ throw new NotFoundException("Unsupported encoding " + encodingName.name(), e);
+ } catch (IllegalArgumentException | XPathExpressionException e) {
+ throw new BadRequestException(e.getMessage(), e);
+ }
}
}
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
-import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.common.errors.RestconfFuture;
import org.opendaylight.restconf.nb.rfc8040.MediaTypes;
private final DatabindProvider databindProvider;
private final MdsalRestconfServer server;
- @Deprecated(forRemoval = true)
- private final DOMMountPointService mountPointService;
private final ListenersBroker listenersBroker;
public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider,
- final MdsalRestconfServer server, final DOMMountPointService mountPointService,
- final ListenersBroker listenersBroker) {
+ final MdsalRestconfServer server, final ListenersBroker listenersBroker) {
this.databindProvider = requireNonNull(databindProvider);
this.server = requireNonNull(server);
- this.mountPointService = requireNonNull(mountPointService);
this.listenersBroker = requireNonNull(listenersBroker);
}
final var type = reqPath.getSchemaNode().getQName();
final var mountPoint = reqPath.getMountPoint();
if (mountPoint == null) {
+ final var baseURI = uriInfo.getBaseUri();
// Hacked-up integration of streams
if (CreateDataChangeEventSubscription.QNAME.equals(type)) {
- return listenersBroker.createDataChangeNotifiStream(databindProvider, uriInfo, input,
+ return listenersBroker.createDataChangeNotifiStream(databindProvider, baseURI, input,
localDatabind.modelContext());
} else if (CreateNotificationStream.QNAME.equals(type)) {
- return listenersBroker.createNotificationStream(databindProvider, uriInfo, input,
+ return listenersBroker.createNotificationStream(databindProvider, baseURI, input,
localDatabind.modelContext());
} else if (SubscribeDeviceNotification.QNAME.equals(type)) {
- return listenersBroker.createDeviceNotificationStream(uriInfo, input, localDatabind.modelContext(),
- mountPointService);
+ return listenersBroker.createDeviceNotificationStream(baseURI, input, localDatabind.modelContext());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.nb.rfc8040.streams;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.ImmutableMap;
+import java.time.Instant;
+import org.opendaylight.mdsal.dom.api.DOMEvent;
+import org.opendaylight.mdsal.dom.api.DOMNotification;
+import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Sink;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Source;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContextProvider;
+
+/**
+ * Abstract base class for functionality shared between {@link NotificationSource} and
+ * {@link DeviceNotificationSource}.
+ */
+abstract class AbstractNotificationSource extends Source<DOMNotification> {
+ static final class Listener implements DOMNotificationListener {
+ private final Sink<DOMNotification> sink;
+ private final EffectiveModelContextProvider modelContext;
+
+ Listener(final Sink<DOMNotification> sink, final EffectiveModelContextProvider modelContext) {
+ this.sink = requireNonNull(sink);
+ this.modelContext = requireNonNull(modelContext);
+ }
+
+ @Override
+ public void onNotification(final DOMNotification notification) {
+ sink.publish(modelContext.getEffectiveModelContext(), notification,
+ notification instanceof DOMEvent domEvent ? domEvent.getEventInstant() : Instant.now());
+ }
+ }
+
+ private static final ImmutableMap<EncodingName, NotificationFormatterFactory> ENCODINGS = ImmutableMap.of(
+ EncodingName.RFC8040_JSON, JSONNotificationFormatter.FACTORY,
+ EncodingName.RFC8040_XML, XMLNotificationFormatter.FACTORY);
+
+ AbstractNotificationSource() {
+ super(ENCODINGS);
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.streams;
-
-import com.google.common.collect.ImmutableMap;
-import java.time.Instant;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.dom.api.DOMEvent;
-import org.opendaylight.mdsal.dom.api.DOMNotification;
-import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-
-/**
- * Abstract base class for functionality shared between {@link NotificationStream} and
- * {@link DeviceNotificationStream}.
- */
-abstract class AbstractNotificationStream extends RestconfStream<DOMNotification> implements DOMNotificationListener {
- private static final ImmutableMap<EncodingName, NotificationFormatterFactory> ENCODINGS = ImmutableMap.of(
- EncodingName.RFC8040_JSON, JSONNotificationFormatter.FACTORY,
- EncodingName.RFC8040_XML, XMLNotificationFormatter.FACTORY);
-
- AbstractNotificationStream(final ListenersBroker listenersBroker, final String name,
- final NotificationOutputType outputType) {
- super(listenersBroker, name, ENCODINGS, outputType);
- }
-
- @Override
- @SuppressWarnings("checkstyle:IllegalCatch")
- public final void onNotification(final DOMNotification notification) {
- sendDataMessage(effectiveModel(), notification,
- notification instanceof DOMEvent domEvent ? domEvent.getEventInstant() : Instant.now());
- }
-
- abstract @NonNull EffectiveModelContext effectiveModel();
-}
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Sink;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Source;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
/**
* A {@link RestconfStream} reporting changes on a particular data tree.
*/
-public final class DataTreeChangeStream extends RestconfStream<List<DataTreeCandidate>>
- implements ClusteredDOMDataTreeChangeListener {
+public final class DataTreeChangeSource extends Source<List<DataTreeCandidate>> {
private static final ImmutableMap<EncodingName, DataTreeCandidateFormatterFactory> ENCODINGS = ImmutableMap.of(
EncodingName.RFC8040_JSON, JSONDataTreeCandidateFormatter.FACTORY,
EncodingName.RFC8040_XML, XMLDataTreeCandidateFormatter.FACTORY);
- private final DatabindProvider databindProvider;
+ private final @NonNull DOMDataTreeChangeService changeService;
+ private final @NonNull DatabindProvider databindProvider;
private final @NonNull LogicalDatastoreType datastore;
private final @NonNull YangInstanceIdentifier path;
- DataTreeChangeStream(final ListenersBroker listenersBroker, final String name,
- final NotificationOutputType outputType, final DatabindProvider databindProvider,
+ DataTreeChangeSource(final DatabindProvider databindProvider, final DOMDataBroker dataBroker,
final LogicalDatastoreType datastore, final YangInstanceIdentifier path) {
- super(listenersBroker, name, ENCODINGS, outputType);
+ super(ENCODINGS);
this.databindProvider = requireNonNull(databindProvider);
this.datastore = requireNonNull(datastore);
this.path = requireNonNull(path);
- }
- @Override
- public void onInitialData() {
- // No-op
+ final var dtcs = dataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
+ if (dtcs == null) {
+ throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
+ }
+ changeService = dtcs;
}
@Override
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void onDataTreeChanged(final List<DataTreeCandidate> dataTreeCandidates) {
- sendDataMessage(databindProvider.currentContext().modelContext(), dataTreeCandidates, Instant.now());
- }
+ protected Registration start(final Sink<List<DataTreeCandidate>> sink) {
+ return changeService.registerDataTreeChangeListener(new DOMDataTreeIdentifier(datastore, path),
+ new ClusteredDOMDataTreeChangeListener() {
+ @Override
+ public void onDataTreeChanged(final List<DataTreeCandidate> changes) {
+ // FIXME: format one change at a time?
+ sink.publish(databindProvider.currentContext().modelContext(), changes, Instant.now());
+ }
- /**
- * Get path pointed to data in data store.
- *
- * @return Path pointed to data in data store.
- */
- public YangInstanceIdentifier getPath() {
- return path;
- }
-
- /**
- * Register data change listener in DOM data broker and set it to listener on stream.
- *
- * @param domDataBroker data broker for register data change listener
- */
- public synchronized void listen(final DOMDataBroker domDataBroker) {
- if (!isListening()) {
- final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
- if (changeService == null) {
- throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
- }
-
- setRegistration(changeService.registerDataTreeChangeListener(
- new DOMDataTreeIdentifier(datastore, path), this));
- }
+ @Override
+ public void onInitialData() {
+ // No-op
+ }
+ });
}
@Override
- ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+ protected ToStringHelper addToStringAttributes(final ToStringHelper helper) {
return super.addToStringAttributes(helper.add("path", path));
}
}
--- /dev/null
+/*
+ * Copyright (c) 2022 Opendaylight, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.nb.rfc8040.streams;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.concurrent.atomic.AtomicReference;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMNotification;
+import org.opendaylight.mdsal.dom.api.DOMNotificationService;
+import org.opendaylight.mdsal.dom.api.DOMSchemaService;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Sink;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
+import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link RestconfStream} reporting YANG notifications coming from a mounted device.
+ */
+public final class DeviceNotificationSource extends AbstractNotificationSource implements DOMMountPointListener {
+ private static final Logger LOG = LoggerFactory.getLogger(DeviceNotificationSource.class);
+
+ private final AtomicReference<Runnable> onRemoved = new AtomicReference<>();
+ private final DOMMountPointService mountPointService;
+ private final YangInstanceIdentifier devicePath;
+
+ DeviceNotificationSource(final DOMMountPointService mountPointService, final YangInstanceIdentifier devicePath) {
+ this.mountPointService = requireNonNull(mountPointService);
+ this.devicePath = requireNonNull(devicePath);
+ }
+
+ @Override
+ public void onMountPointCreated(final YangInstanceIdentifier path) {
+ // No-op
+ }
+
+ @Override
+ public void onMountPointRemoved(final YangInstanceIdentifier path) {
+ if (devicePath.equals(path)) {
+ // The mount point went away, run cleanup
+ cleanup();
+ }
+ }
+
+ @Override
+ protected Registration start(final Sink<DOMNotification> sink) {
+ final var optMount = mountPointService.getMountPoint(devicePath);
+ if (optMount.isEmpty()) {
+ LOG.info("Mount point {} not present, terminating", devicePath);
+ return endOfStream(sink);
+ }
+
+ final var mount = optMount.orElseThrow();
+ final var optSchema = mount.getService(DOMSchemaService.class);
+ if (optSchema.isEmpty()) {
+ LOG.info("Mount point {} does not have a DOMSchemaService, terminating", devicePath);
+ return endOfStream(sink);
+ }
+
+ final var optNotification = mount.getService(DOMNotificationService.class);
+ if (optNotification.isEmpty()) {
+ LOG.info("Mount point {} does not have a DOMNotificationService, terminating", devicePath);
+ return endOfStream(sink);
+ }
+
+ // Find all notifications
+ final var modelContext = optSchema.orElseThrow().getGlobalContext();
+ final var paths = modelContext.getModuleStatements().values().stream()
+ .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
+ .map(notification -> Absolute.of(notification.argument()))
+ .collect(ImmutableSet.toImmutableSet());
+ if (paths.isEmpty()) {
+ LOG.info("Mount point {} does not advertize any YANG notifications, terminating", devicePath);
+ return endOfStream(sink);
+ }
+
+ final var notifReg = optNotification.orElseThrow().registerNotificationListener(
+ new Listener(sink, () -> modelContext), paths);
+
+ // Notifications are running now.
+ // If we get removed we need to close those. But since we are running lockless and we need to set up
+ // the listener, which will own its cleanup.
+ final Runnable closeNotif = () -> {
+ notifReg.close();
+ sink.endOfStream();
+ };
+ onRemoved.set(closeNotif);
+
+ // onMountPointRemoved() may be invoked asynchronously before this method returns.
+ // Therefore we perform a CAS replacement routine of the close routine:
+ // - if it succeeds onRemoved's Runnable covers all required cleanup
+ // - if it does not, it means state has already been cleaned up by onMountPointRemoved()
+ final var mountReg = mountPointService.registerProvisionListener(this);
+ final Runnable closeMount = () -> {
+ notifReg.close();
+ sink.endOfStream();
+ mountReg.close();
+ };
+ if (onRemoved.compareAndSet(closeNotif, closeMount)) {
+ // All set, cleanup() will handle the rest
+ return this::cleanup;
+ }
+
+ // Already removed, bail out, but do not signal endOfStream()
+ mountReg.close();
+ return () -> {
+ // No-op
+ };
+ }
+
+ private static @NonNull Registration endOfStream(final Sink<DOMNotification> sink) {
+ // Something went wrong: signal end of stream and return a no-op registration
+ sink.endOfStream();
+ return () -> {
+ // No-op
+ };
+ }
+
+ private void cleanup() {
+ final var runnable = onRemoved.getAndSet(null);
+ if (runnable != null) {
+ runnable.run();
+ }
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2022 Opendaylight, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.streams;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.Set;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
-import org.opendaylight.mdsal.dom.api.DOMMountPointService;
-import org.opendaylight.mdsal.dom.api.DOMNotificationService;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
-
-/**
- * A {@link RestconfStream} reporting YANG notifications coming from a mounted device.
- */
-public final class DeviceNotificationStream extends AbstractNotificationStream implements DOMMountPointListener {
- private final @NonNull EffectiveModelContext effectiveModel;
- private final @NonNull DOMMountPointService mountPointService;
- private final @NonNull YangInstanceIdentifier instanceIdentifier;
-
- private Registration reg;
-
- DeviceNotificationStream(final ListenersBroker listenersBroker, final String name,
- final NotificationOutputType outputType, final EffectiveModelContext effectiveModel,
- final DOMMountPointService mountPointService, final YangInstanceIdentifier instanceIdentifier) {
- super(listenersBroker, name, outputType);
- this.effectiveModel = requireNonNull(effectiveModel);
- this.mountPointService = requireNonNull(mountPointService);
- this.instanceIdentifier = requireNonNull(instanceIdentifier);
- }
-
- public synchronized void listen(final DOMNotificationService notificationService, final Set<Absolute> paths) {
- if (!isListening()) {
- setRegistration(notificationService.registerNotificationListener(this, paths));
- reg = mountPointService.registerProvisionListener(this);
- }
- }
-
- private synchronized void resetListenerRegistration() {
- if (reg != null) {
- reg.close();
- reg = null;
- }
- }
-
- @Override
- EffectiveModelContext effectiveModel() {
- return effectiveModel;
- }
-
- @Override
- public void onMountPointCreated(final YangInstanceIdentifier path) {
- // No-op
- }
-
- @Override
- public void onMountPointRemoved(final YangInstanceIdentifier path) {
- if (instanceIdentifier.equals(path)) {
- resetListenerRegistration();
- endOfStream();
- }
- }
-}
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-abstract class EventFormatter<T> implements Immutable {
+public abstract class EventFormatter<T> implements Immutable {
private static final XPathFactory XPF = XPathFactory.newInstance();
// FIXME: NETCONF-369: XPath operates without namespace context, therefore we need an namespace-unaware builder.
import javax.xml.xpath.XPathExpressionException;
import org.eclipse.jdt.annotation.NonNull;
-abstract class EventFormatterFactory<T> {
+public abstract class EventFormatterFactory<T> {
private final @NonNull EventFormatter<T> emptyFormatter;
- EventFormatterFactory(final EventFormatter<T> emptyFormatter) {
+ protected EventFormatterFactory(final EventFormatter<T> emptyFormatter) {
this.emptyFormatter = requireNonNull(emptyFormatter);
}
- final @NonNull EventFormatter<T> emptyFormatter() {
- return emptyFormatter;
- }
-
- final @NonNull EventFormatter<T> getFormatter(final @NonNull TextParameters textParamaters) {
+ public final @NonNull EventFormatter<T> getFormatter(final @NonNull TextParameters textParamaters) {
return textParamaters.equals(TextParameters.EMPTY) ? emptyFormatter : newFormatter(textParamaters);
}
- abstract @NonNull EventFormatter<T> getFormatter(@NonNull TextParameters textParamaters, String xpathFilter)
+ public abstract @NonNull EventFormatter<T> getFormatter(@NonNull TextParameters textParamaters, String xpathFilter)
throws XPathExpressionException;
- abstract @NonNull EventFormatter<T> newFormatter(@NonNull TextParameters textParamaters);
+ public abstract @NonNull EventFormatter<T> newFormatter(@NonNull TextParameters textParamaters);
}
static final DataTreeCandidateFormatterFactory FACTORY = new DataTreeCandidateFormatterFactory(EMPTY) {
@Override
- DataTreeCandidateFormatter newFormatter(final TextParameters textParams) {
+ public DataTreeCandidateFormatter newFormatter(final TextParameters textParams) {
return new JSONDataTreeCandidateFormatter(textParams);
}
@Override
- DataTreeCandidateFormatter getFormatter(final TextParameters textParams, final String xpathFilter)
+ public DataTreeCandidateFormatter getFormatter(final TextParameters textParams, final String xpathFilter)
throws XPathExpressionException {
return new JSONDataTreeCandidateFormatter(textParams, xpathFilter);
}
static final NotificationFormatterFactory FACTORY = new NotificationFormatterFactory(EMPTY) {
@Override
- JSONNotificationFormatter getFormatter(final TextParameters textParams, final String xpathFilter)
+ public JSONNotificationFormatter getFormatter(final TextParameters textParams, final String xpathFilter)
throws XPathExpressionException {
return new JSONNotificationFormatter(textParams, xpathFilter);
}
@Override
- JSONNotificationFormatter newFormatter(final TextParameters textParams) {
+ public JSONNotificationFormatter newFormatter(final TextParameters textParams) {
return new JSONNotificationFormatter(textParams);
}
};
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import javax.ws.rs.core.UriInfo;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-import org.opendaylight.mdsal.dom.api.DOMMountPoint;
import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.mdsal.dom.api.DOMNotificationService;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
-import org.opendaylight.mdsal.dom.api.DOMSchemaService;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.common.errors.RestconfFuture;
import org.opendaylight.restconf.common.errors.SettableRestconfFuture;
import org.opendaylight.restconf.nb.rfc8040.URLConstants;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Source;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.RestconfState;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.Streams;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.CreateDataChangeEventSubscriptionInput1;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
-import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This singleton class is responsible for creation, removal and searching for {@link DataTreeChangeStream} or
- * {@link NotificationStream} listeners.
+ * This singleton class is responsible for creation, removal and searching for {@link DataTreeChangeSource} or
+ * {@link NotificationSource} listeners.
*/
// FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
// names. We essentially need a component which deals with allocation of stream names and their lifecycle and
* A ListenersBroker working with Server-Sent Events.
*/
public static final class ServerSentEvents extends ListenersBroker {
- public ServerSentEvents(final DOMDataBroker dataBroker) {
- super(dataBroker);
- }
-
- @Override
- public String baseStreamLocation(final UriInfo uriInfo) {
- return uriInfo.getBaseUriBuilder()
- .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH)
- .build()
- .toString();
+ public ServerSentEvents(final DOMDataBroker dataBroker, final DOMNotificationService notificationService,
+ final DOMMountPointService mountPointService) {
+ super(dataBroker, notificationService, mountPointService);
}
}
* A ListenersBroker working with WebSockets.
*/
public static final class WebSockets extends ListenersBroker {
- public WebSockets(final DOMDataBroker dataBroker) {
- super(dataBroker);
+ public WebSockets(final DOMDataBroker dataBroker, final DOMNotificationService notificationService,
+ final DOMMountPointService mountPointService) {
+ super(dataBroker, notificationService, mountPointService);
}
@Override
- public String baseStreamLocation(final UriInfo uriInfo) {
- final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
+ String streamsScheme(final URI baseURI) {
+ return switch (super.streamsScheme(baseURI)) {
// Secured HTTP goes to Secured WebSockets
case "https" -> "wss";
// Unsecured HTTP and others go to unsecured WebSockets
default -> "ws";
};
-
- return uriInfo.getBaseUriBuilder()
- .scheme(scheme)
- .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH)
- .build()
- .toString();
}
}
- /**
- * Factory interface for creating instances of {@link RestconfStream}.
- *
- * @param <T> {@link RestconfStream} type
- */
- @FunctionalInterface
- public interface StreamFactory<T extends RestconfStream<?>> {
- /**
- * Create a stream with the supplied name.
- *
- * @param name Stream name
- * @return An {@link RestconfStream}
- */
- @NonNull T createStream(@NonNull String name);
- }
-
private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
private static final YangInstanceIdentifier RESTCONF_STATE_STREAMS = YangInstanceIdentifier.of(
NodeIdentifier.create(RestconfState.QNAME),
private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(
QName.create(CreateDataChangeEventSubscriptionInput1.QNAME, "datastore").intern());
- @Deprecated(forRemoval = true)
- private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(
- QName.create(NotificationOutputTypeGrouping.QNAME, "notification-output-type").intern());
private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
NodeIdentifier.create(QName.create(SubscribeDeviceNotificationInput.QNAME, "path").intern());
private static final NodeIdentifier DEVICE_NOTIFICATION_STREAM_PATH_NODEID =
private final ConcurrentMap<String, RestconfStream<?>> streams = new ConcurrentHashMap<>();
private final DOMDataBroker dataBroker;
+ @Deprecated(forRemoval = true)
+ private final DOMMountPointService mountPointService;
+ @Deprecated(forRemoval = true)
+ private final DOMNotificationService notificationService;
- private ListenersBroker(final DOMDataBroker dataBroker) {
+ private ListenersBroker(final DOMDataBroker dataBroker, final DOMNotificationService notificationService,
+ final DOMMountPointService mountPointService) {
this.dataBroker = requireNonNull(dataBroker);
+ this.notificationService = requireNonNull(notificationService);
+ this.mountPointService = requireNonNull(mountPointService);
}
/**
* @return A {@link RestconfStream} instance
* @throws NullPointerException if {@code factory} is {@code null}
*/
- final <T extends RestconfStream<?>> @NonNull RestconfFuture<T> createStream(final String description,
- final String baseStreamLocation, final StreamFactory<T> factory) {
- String name;
- T stream;
- do {
- // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor
- // it into UUID URN namespace as defined by RFC4122
- name = "urn:uuid:" + UUID.randomUUID().toString();
- stream = factory.createStream(name);
- } while (streams.putIfAbsent(name, stream) != null);
-
- // final captures for use with FutureCallback
- final var streamName = name;
- final var finalStream = stream;
+ final <T> @NonNull RestconfFuture<RestconfStream<T>> createStream(final String description,
+ final String baseStreamLocation, final Source<T> source) {
+ final var stream = allocateStream(source);
+ final var name = stream.name();
// Now issue a put operation
- final var ret = new SettableRestconfFuture<T>();
+ final var ret = new SettableRestconfFuture<RestconfStream<T>>();
final var tx = dataBroker.newWriteOnlyTransaction();
-
- tx.put(LogicalDatastoreType.OPERATIONAL, restconfStateStreamPath(streamName),
- streamEntry(streamName, description, baseStreamLocation + '/' + streamName, ""));
+ tx.put(LogicalDatastoreType.OPERATIONAL, restconfStateStreamPath(name),
+ streamEntry(name, description, baseStreamLocation, stream.encodings()));
tx.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
- LOG.debug("Stream {} added", streamName);
- ret.set(finalStream);
+ LOG.debug("Stream {} added", name);
+ ret.set(stream);
}
@Override
public void onFailure(final Throwable cause) {
- LOG.debug("Failed to add stream {}", streamName, cause);
- streams.remove(streamName, finalStream);
- ret.setFailure(new RestconfDocumentedException("Failed to allocate stream " + streamName, cause));
+ LOG.debug("Failed to add stream {}", name, cause);
+ streams.remove(name, stream);
+ ret.setFailure(new RestconfDocumentedException("Failed to allocate stream " + name, cause));
}
}, MoreExecutors.directExecutor());
return ret;
}
+ private <T> @NonNull RestconfStream<T> allocateStream(final Source<T> source) {
+ String name;
+ RestconfStream<T> stream;
+ do {
+ // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor
+ // it into UUID URN namespace as defined by RFC4122
+ name = "urn:uuid:" + UUID.randomUUID().toString();
+ stream = new RestconfStream<>(this, source, name);
+ } while (streams.putIfAbsent(name, stream) != null);
+
+ return stream;
+ }
+
/**
* Remove a particular stream and remove its entry from operational datastore.
*
/**
* Return the base location URL of the streams service based on request URI.
*
- * @param uriInfo request URL information
- * @return location URL
+ * @param baseURI request base URI
+ * @throws IllegalArgumentException if the result would have been malformed
*/
- public abstract @NonNull String baseStreamLocation(UriInfo uriInfo);
+ public final @NonNull String baseStreamLocation(final URI baseURI) {
+ try {
+ return new URI(streamsScheme(baseURI), baseURI.getRawUserInfo(), baseURI.getHost(), baseURI.getPort(),
+ URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH, null, null)
+ .toString();
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Cannot derive streams location", e);
+ }
+ }
+
+ String streamsScheme(final URI baseURI) {
+ return baseURI.getScheme();
+ }
/**
* Create data-change-event stream with POST operation via RPC.
*/
// FIXME: this really should be a normal RPC implementation
public final RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(
- final DatabindProvider databindProvider, final UriInfo uriInfo, final ContainerNode input,
+ final DatabindProvider databindProvider, final URI baseURI, final ContainerNode input,
final EffectiveModelContext modelContext) {
final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
final var datastore = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
: LogicalDatastoreType.CONFIGURATION;
final var path = preparePath(input);
- final var outputType = prepareOutputType(input);
return createStream(
"Events occuring in " + datastore + " datastore under /" + IdentifierCodec.serialize(path, modelContext),
- baseStreamLocation(uriInfo),
- name -> new DataTreeChangeStream(this, name, outputType, databindProvider, datastore, path))
+ baseStreamLocation(baseURI), new DataTreeChangeSource(databindProvider, dataBroker, datastore, path))
.transform(stream -> Optional.of(Builders.containerBuilder()
.withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
.withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name()))
.build()));
}
-// FIXME: NETCONF-1102: this part needs to be invoked from subscriber
-// /**
-// * Register listener by streamName in identifier to listen to data change notifications, and put or delete
-// * information about listener to DS according to ietf-restconf-monitoring.
-// *
-// * @param identifier Identifier as stream name.
-// * @param uriInfo Base URI information.
-// * @param notificationQueryParams Query parameters of notification.
-// * @param handlersHolder Holder of handlers for notifications.
-// * @return Location for listening.
-// */
-// public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
-// final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
-// final var streamName = createStreamNameFromUri(identifier);
-// final var listener = dataChangeListenerFor(streamName);
-// if (listener == null) {
-// throw new RestconfDocumentedException("No listener found for stream " + streamName,
-// ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
-// }
-//
-// listener.setQueryParams(notificationQueryParams);
-// listener.listen(dataBroker);
-// }
-
// FIXME: this really should be a normal RPC implementation
public final RestconfFuture<Optional<ContainerNode>> createNotificationStream(
- final DatabindProvider databindProvider, final UriInfo uriInfo, final ContainerNode input,
+ final DatabindProvider databindProvider, final URI baseURI, final ContainerNode input,
final EffectiveModelContext modelContext) {
final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
.map(LeafSetEntryNode::body)
}
description.append("\n}");
- // registration of the listener
- final var outputType = prepareOutputType(input);
- return createStream(description.toString(), baseStreamLocation(uriInfo),
- name -> new NotificationStream(this, name, outputType, databindProvider, qnames))
+ return createStream(description.toString(), baseStreamLocation(baseURI),
+ new NotificationSource(databindProvider, notificationService, qnames))
.transform(stream -> Optional.of(Builders.containerBuilder()
.withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
.withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name()))
* Create device notification stream.
*
* @param input RPC input
- * @param mountPointService dom mount point service
* @return {@link DOMRpcResult} - Output of RPC - example in JSON
*/
// FIXME: this should be an RPC invocation
- public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationStream(final UriInfo uriInfo,
- final ContainerNode input, final EffectiveModelContext modelContext,
- final DOMMountPointService mountPointService) {
+ public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationStream(final URI baseURI,
+ final ContainerNode input, final EffectiveModelContext modelContext) {
// parsing out of container with settings and path
// FIXME: ugly cast
final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
ErrorTag.INVALID_VALUE);
}
- final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
- .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
- ErrorTag.OPERATION_FAILED));
-
- final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
- .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
- ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
-
- final var mountModelContext = mountPoint.getService(DOMSchemaService.class)
- .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
- ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
- .getGlobalContext();
- final var notificationPaths = mountModelContext.getModuleStatements().values().stream()
- .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
- .map(notification -> Absolute.of(notification.argument()))
- .collect(ImmutableSet.toImmutableSet());
- if (notificationPaths.isEmpty()) {
- throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
- ErrorTag.OPERATION_FAILED);
- }
-
- final var baseStreamsUri = baseStreamLocation(uriInfo);
- final var outputType = prepareOutputType(input);
+ final var baseStreamsUri = baseStreamLocation(baseURI);
return createStream(
"All YANG notifications occuring on mount point /" + IdentifierCodec.serialize(path, modelContext),
baseStreamsUri,
- streamName -> new DeviceNotificationStream(this, streamName, outputType, mountModelContext,
- mountPointService, mountPoint.getIdentifier()))
- .transform(stream -> {
- stream.listen(mountNotifService, notificationPaths);
- return Optional.of(Builders.containerBuilder()
- .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
- .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH_NODEID,
- baseStreamsUri + '/' + stream.name()))
- .build());
- });
- }
-
- /**
- * Prepare {@link NotificationOutputType}.
- *
- * @param data Container with stream settings (RPC create-stream).
- * @return Parsed {@link NotificationOutputType}.
- */
- @Deprecated(forRemoval = true)
- private static NotificationOutputType prepareOutputType(final ContainerNode data) {
- final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
- return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
+ new DeviceNotificationSource(mountPointService, path))
+ .transform(stream -> Optional.of(Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
+ .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH_NODEID,
+ baseStreamsUri + '/' + stream.name()))
+ .build()));
}
/**
}
@VisibleForTesting
- static @NonNull MapEntryNode streamEntry(final String name, final String description, final String location,
- final String outputType) {
+ static @NonNull MapEntryNode streamEntry(final String name, final String description,
+ final String baseStreamLocation, final Set<EncodingName> encodings) {
+ final var accessBuilder = Builders.mapBuilder().withNodeIdentifier(new NodeIdentifier(Access.QNAME));
+ for (var encoding : encodings) {
+ final var encodingName = encoding.name();
+ accessBuilder.withChild(Builders.mapEntryBuilder()
+ .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, ENCODING_QNAME, encodingName))
+ .withChild(ImmutableNodes.leafNode(ENCODING_QNAME, encodingName))
+ .withChild(ImmutableNodes.leafNode(LOCATION_QNAME,
+ baseStreamLocation + '/' + encodingName + '/' + name))
+ .build());
+ }
+
return Builders.mapEntryBuilder()
.withNodeIdentifier(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, name))
.withChild(ImmutableNodes.leafNode(NAME_QNAME, name))
.withChild(ImmutableNodes.leafNode(DESCRIPTION_QNAME, description))
- .withChild(createAccessList(outputType, location))
- .build();
- }
-
- private static MapNode createAccessList(final String outputType, final String location) {
- return Builders.mapBuilder()
- .withNodeIdentifier(new NodeIdentifier(Access.QNAME))
- .withChild(Builders.mapEntryBuilder()
- .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, ENCODING_QNAME, outputType))
- .withChild(ImmutableNodes.leafNode(ENCODING_QNAME, outputType))
- .withChild(ImmutableNodes.leafNode(LOCATION_QNAME, location))
- .build())
+ .withChild(accessBuilder.build())
.build();
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.nb.rfc8040.streams;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects.ToStringHelper;
+import com.google.common.collect.ImmutableSet;
+import org.opendaylight.mdsal.dom.api.DOMNotification;
+import org.opendaylight.mdsal.dom.api.DOMNotificationService;
+import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Sink;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Source;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
+
+/**
+ * A {@link Source} reporting YANG notifications.
+ */
+public final class NotificationSource extends AbstractNotificationSource {
+ private final DatabindProvider databindProvider;
+ private final DOMNotificationService notificationService;
+ private final ImmutableSet<QName> qnames;
+
+ NotificationSource(final DatabindProvider databindProvider, final DOMNotificationService notificationService,
+ final ImmutableSet<QName> qnames) {
+ this.databindProvider = requireNonNull(databindProvider);
+ this.notificationService = requireNonNull(notificationService);
+ this.qnames = requireNonNull(qnames);
+ }
+
+ /**
+ * Return notification QNames.
+ *
+ * @return The YANG notification {@link QName}s this listener is bound to
+ */
+ public ImmutableSet<QName> qnames() {
+ return qnames;
+ }
+
+ @Override
+ protected Registration start(final Sink<DOMNotification> sink) {
+ return notificationService.registerNotificationListener(
+ new Listener(sink, () -> databindProvider.currentContext().modelContext()),
+ qnames.stream().map(Absolute::of).toList());
+ }
+
+ @Override
+ protected ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+ return super.addToStringAttributes(helper.add("qnames", qnames));
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.streams;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.collect.ImmutableSet;
-import org.opendaylight.mdsal.dom.api.DOMNotificationService;
-import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
-
-/**
- * A {@link RestconfStream} reporting YANG notifications.
- */
-public final class NotificationStream extends AbstractNotificationStream {
- private final DatabindProvider databindProvider;
- private final ImmutableSet<QName> paths;
-
- NotificationStream(final ListenersBroker listenersBroker, final String name,
- final NotificationOutputType outputType, final DatabindProvider databindProvider,
- final ImmutableSet<QName> paths) {
- super(listenersBroker, name, outputType);
- this.databindProvider = requireNonNull(databindProvider);
- this.paths = requireNonNull(paths);
- }
-
- @Override
- EffectiveModelContext effectiveModel() {
- return databindProvider.currentContext().modelContext();
- }
-
- /**
- * Return notification QNames.
- *
- * @return The YANG notification {@link QName}s this listener is bound to
- */
- public ImmutableSet<QName> qnames() {
- return paths;
- }
-
- public synchronized void listen(final DOMNotificationService notificationService) {
- if (!isListening()) {
- setRegistration(notificationService.registerNotificationListener(this,
- paths.stream().map(Absolute::of).toList()));
- }
- }
-
- @Override
- ToStringHelper addToStringAttributes(final ToStringHelper helper) {
- return super.addToStringAttributes(helper.add("paths", paths));
- }
-}
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.collect.ImmutableMap;
+import java.io.UnsupportedEncodingException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.time.Instant;
import java.util.Set;
import java.util.regex.Pattern;
import javax.xml.xpath.XPathExpressionException;
-import org.checkerframework.checker.lock.qual.Holding;
+import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.yang.common.ErrorTag;
-import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*
* @param <T> Type of processed events
*/
-public abstract class RestconfStream<T> {
+public final class RestconfStream<T> {
/**
* An opinionated view on what values we can produce for {@link Access#getEncoding()}. The name can only be composed
* of one or more characters matching {@code [a-zA-Z]}.
}
}
+ /**
+ * A sink of events for a {@link RestconfStream}.
+ */
+ public interface Sink<T> {
+ /**
+ * Publish a set of events generated from input data.
+ *
+ * @param modelContext An {@link EffectiveModelContext} used to format the input
+ * @param input Input data
+ * @param now Current time
+ * @throws NullPointerException if any argument is {@code null}
+ */
+ void publish(EffectiveModelContext modelContext, T input, Instant now);
+
+ /**
+ * Called when the stream has reached its end.
+ */
+ void endOfStream();
+ }
+
+ /**
+ * A source of events for a {@link RestconfStream}.
+ */
+ public abstract static class Source<T> {
+ // ImmutableMap because it retains iteration order
+ final @NonNull ImmutableMap<EncodingName, ? extends EventFormatterFactory<T>> encodings;
+
+ protected Source(final ImmutableMap<EncodingName, ? extends EventFormatterFactory<T>> encodings) {
+ if (encodings.isEmpty()) {
+ throw new IllegalArgumentException("A source must support at least one encoding");
+ }
+ this.encodings = encodings;
+ }
+
+ protected abstract @NonNull Registration start(Sink<T> sink);
+
+ @Override
+ public final String toString() {
+ return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
+ }
+
+ protected ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+ return helper.add("encodings", encodings.keySet());
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(RestconfStream.class);
private static final VarHandle SUBSCRIBERS;
}
}
- // ImmutableMap because it retains iteration order
- private final @NonNull ImmutableMap<EncodingName, ? extends EventFormatterFactory<T>> encodings;
+ private final @NonNull Sink<T> sink = new Sink<>() {
+ @Override
+ public void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
+ final var local = acquireSubscribers();
+ if (local != null) {
+ local.publish(modelContext, input, now);
+ } else {
+ LOG.debug("Ignoring publish() on terminated stream {}", RestconfStream.this);
+ }
+ }
+
+ @Override
+ public void endOfStream() {
+ // Atomic assertion we are ending plus guarded cleanup
+ final var local = (Subscribers<T>) SUBSCRIBERS.getAndSetRelease(RestconfStream.this, null);
+ if (local != null) {
+ terminate();
+ local.endOfStream();
+ }
+ }
+ };
private final @NonNull ListenersBroker listenersBroker;
+ private final @NonNull Source<T> source;
private final @NonNull String name;
// Accessed via SUBSCRIBERS, 'null' indicates we have been shut down
@SuppressWarnings("unused")
private volatile Subscribers<T> subscribers = Subscribers.empty();
+ @GuardedBy("this")
private Registration registration;
- // FIXME: NETCONF-1102: this should be tied to a subscriber
- private final EventFormatterFactory<T> formatterFactory;
- private final NotificationOutputType outputType;
- private @NonNull EventFormatter<T> formatter;
-
- protected RestconfStream(final ListenersBroker listenersBroker, final String name,
- final ImmutableMap<EncodingName, ? extends EventFormatterFactory<T>> encodings,
- final NotificationOutputType outputType) {
+ RestconfStream(final ListenersBroker listenersBroker, final Source<T> source, final String name) {
this.listenersBroker = requireNonNull(listenersBroker);
+ this.source = requireNonNull(source);
this.name = requireNonNull(name);
- if (encodings.isEmpty()) {
- throw new IllegalArgumentException("Stream '" + name + "' must support at least one encoding");
- }
- this.encodings = encodings;
-
- final var encodingName = switch (outputType) {
- case JSON -> EncodingName.RFC8040_JSON;
- case XML -> EncodingName.RFC8040_XML;
- };
- this.outputType = outputType;
- formatterFactory = formatterFactory(encodingName);
- formatter = formatterFactory.emptyFormatter();
}
/**
*
* @return Stream name.
*/
- public final @NonNull String name() {
+ public @NonNull String name() {
return name;
}
* @return Supported encodings.
*/
@SuppressWarnings("null")
- final @NonNull Set<EncodingName> encodings() {
- return encodings.keySet();
- }
-
- /**
- * Return the {@link EventFormatterFactory} for an encoding.
- *
- * @param encoding An {@link EncodingName}
- * @return The {@link EventFormatterFactory} for the selected encoding
- * @throws NullPointerException if {@code encoding} is {@code null}
- * @throws IllegalAccessError if {@code encoding} is not supported
- */
- final @NonNull EventFormatterFactory<T> formatterFactory(final EncodingName encoding) {
- final var factory = encodings.get(requireNonNull(encoding));
- if (factory == null) {
- throw new IllegalArgumentException("Stream '" + name + "' does not support " + encoding);
- }
- return factory;
+ @NonNull Set<EncodingName> encodings() {
+ return source.encodings.keySet();
}
/**
* Registers {@link StreamSessionHandler} subscriber.
*
* @param handler SSE or WS session handler.
+ * @param encoding Requested event stream encoding
+ * @param params Reception parameters
* @return A new {@link Registration}, or {@code null} if the subscriber cannot be added
- * @throws NullPointerException if {@code handler} is {@code null}
+ * @throws NullPointerException if any argument is {@code null}
+ * @throws UnsupportedEncodingException if {@code encoding} is not supported
+ * @throws XPathExpressionException if requested filter is not valid
+ * @throws InvalidArgumentException if the parameters are not supported
*/
- @Nullable Registration addSubscriber(final StreamSessionHandler handler) {
+ @Nullable Registration addSubscriber(final StreamSessionHandler handler, final EncodingName encoding,
+ final ReceiveEventsParams params) throws UnsupportedEncodingException, XPathExpressionException {
+ final var factory = source.encodings.get(requireNonNull(encoding));
+ if (factory == null) {
+ throw new UnsupportedEncodingException("Stream '" + name + "' does not support " + encoding);
+ }
+
+ final var startTime = params.startTime();
+ if (startTime != null) {
+ throw new IllegalArgumentException("Stream " + name + " does not support replay");
+ }
+
+ final var leafNodes = params.leafNodesOnly();
+ final var skipData = params.skipNotificationData();
+ final var changedLeafNodes = params.changedLeafNodesOnly();
+ final var childNodes = params.childNodesOnly();
+
+ final var textParams = new TextParameters(
+ leafNodes != null && leafNodes.value(),
+ skipData != null && skipData.value(),
+ changedLeafNodes != null && changedLeafNodes.value(),
+ childNodes != null && childNodes.value());
+
+ final var filter = params.filter();
+ final var filterValue = filter == null ? null : filter.paramValue();
+ final var formatter = filterValue == null || filterValue.isEmpty() ? factory.getFormatter(textParams)
+ : factory.getFormatter(textParams, filterValue);
+
+
// Lockless add of a subscriber. If we observe a null this stream is dead before the new subscriber could be
// added.
final var toAdd = new Subscriber<>(this, handler, formatter);
final var witness = (Subscribers<T>) SUBSCRIBERS.compareAndExchangeRelease(this, observed, next);
if (witness == observed) {
LOG.debug("Subscriber {} is added.", handler);
+ if (observed instanceof Subscribers.Empty) {
+ // We have became non-empty, start the source
+ startSource();
+ }
return toAdd;
}
// We have raced: retry the operation
observed = witness;
}
+
return null;
}
return (Subscribers<T>) SUBSCRIBERS.getAcquire(this);
}
- /**
- * Signal the end-of-stream condition to subscribers, shut down this stream and initiate its removal from global
- * state.
- */
- final void endOfStream() {
- // Atomic assertion we are ending plus locked clean up
- final var local = (Subscribers<T>) SUBSCRIBERS.getAndSetRelease(this, null);
- if (local != null) {
- terminate();
- local.endOfStream();
- }
- }
-
- /**
- * Post data to subscribed SSE session handlers.
- *
- * @param modelContext An {@link EffectiveModelContext} used to format the input
- * @param input Input data
- * @param now Current time
- * @throws NullPointerException if any argument is {@code null}
- */
- void sendDataMessage(final EffectiveModelContext modelContext, final T input, final Instant now) {
- final var local = acquireSubscribers();
- if (local != null) {
- local.publish(modelContext, input, now);
- } else {
- LOG.debug("Ignoring sendDataMessage() on terminated stream {}", this);
+ private void startSource() {
+ // We have not started the stream yet, make sure that happens. This is a bit more involved, as the source may
+ // immediately issue endOfStream(), which in turn invokes terminate(). But at that point start() has not return
+ // and therefore registration is still null -- and thus we need to see if we are still on-line.
+ final var reg = source.start(sink);
+ synchronized (this) {
+ if (acquireSubscribers() == null) {
+ reg.close();
+ } else {
+ registration = reg;
+ }
}
}
private void terminate() {
- if (registration != null) {
- registration.close();
- registration = null;
- }
- listenersBroker.removeStream(this);
- }
-
- /**
- * Set query parameters for listener.
- *
- * @param params NotificationQueryParams to use.
- */
- public final void setQueryParams(final ReceiveEventsParams params) {
- final var startTime = params.startTime();
- if (startTime != null) {
- throw new RestconfDocumentedException("Stream " + name + " does not support replay",
- ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
- }
-
- final var leafNodes = params.leafNodesOnly();
- final var skipData = params.skipNotificationData();
- final var changedLeafNodes = params.changedLeafNodesOnly();
- final var childNodes = params.childNodesOnly();
-
- final var textParams = new TextParameters(
- leafNodes != null && leafNodes.value(),
- skipData != null && skipData.value(),
- changedLeafNodes != null && changedLeafNodes.value(),
- childNodes != null && childNodes.value());
-
- final var filter = params.filter();
- final var filterValue = filter == null ? null : filter.paramValue();
-
- final EventFormatter<T> newFormatter;
- if (filterValue != null && !filterValue.isEmpty()) {
- try {
- newFormatter = formatterFactory.getFormatter(textParams, filterValue);
- } catch (XPathExpressionException e) {
- throw new IllegalArgumentException("Failed to get filter", e);
+ synchronized (this) {
+ if (registration != null) {
+ registration.close();
+ registration = null;
}
- } else {
- newFormatter = formatterFactory.getFormatter(textParams);
}
-
- // Single assign
- formatter = newFormatter;
- }
-
- final @NonNull EventFormatter<T> formatter() {
- return formatter;
- }
-
- /**
- * Sets {@link Registration} registration.
- *
- * @param registration a listener registration registration.
- */
- @Holding("this")
- final void setRegistration(final Registration registration) {
- this.registration = requireNonNull(registration);
- }
-
- /**
- * Checks if {@link Registration} registration exists.
- *
- * @return {@code true} if exists, {@code false} otherwise.
- */
- @Holding("this")
- final boolean isListening() {
- return registration != null;
+ listenersBroker.removeStream(this);
}
@Override
- public final String toString() {
- return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
- }
-
- ToStringHelper addToStringAttributes(final ToStringHelper helper) {
- return helper.add("name", name).add("output-type", outputType.getName());
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("name", name).add("source", source).toString();
}
}
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CharMatcher;
import com.google.common.base.Strings;
+import java.io.UnsupportedEncodingException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
+import javax.xml.xpath.XPathExpressionException;
+import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
import org.opendaylight.yangtools.concepts.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final ScheduledExecutorService executorService;
private final RestconfStream<?> stream;
- private final int maximumFragmentLength;
- private final int heartbeatInterval;
+ private final EncodingName encoding;
+ private final ReceiveEventsParams params;
private final SseEventSink sink;
private final Sse sse;
+ private final int maximumFragmentLength;
+ private final int heartbeatInterval;
private ScheduledFuture<?> pingProcess;
private Registration subscriber;
* @param executorService Executor that is used for periodical sending of SSE ping messages to keep session up even
* if the notifications doesn't flow from server to clients or clients don't implement ping-pong
* service.
- * @param listener YANG notification or data-change event listener to which client on this SSE session subscribes
- * to.
+ * @param stream YANG notification or data-change event listener to which client on this SSE session subscribes to.
* @param maximumFragmentLength Maximum fragment length in number of Unicode code units (characters). If this
* parameter is set to 0, the maximum fragment length is disabled and messages up to 64 KB can be sent
* (exceeded notification length ends in error). If the parameter is set to non-zero positive value,
* session up. Ping control frames are disabled if this parameter is set to 0.
*/
public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse,
- final RestconfStream<?> listener, final int maximumFragmentLength, final int heartbeatInterval) {
- this.executorService = executorService;
- this.sse = sse;
- this.sink = sink;
- stream = listener;
+ final RestconfStream<?> stream, final EncodingName encoding, final ReceiveEventsParams params,
+ final int maximumFragmentLength, final int heartbeatInterval) {
+ this.executorService = requireNonNull(executorService);
+ this.sse = requireNonNull(sse);
+ this.sink = requireNonNull(sink);
+ this.stream = requireNonNull(stream);
+ this.encoding = requireNonNull(encoding);
+ this.params = requireNonNull(params);
this.maximumFragmentLength = maximumFragmentLength;
this.heartbeatInterval = heartbeatInterval;
}
/**
* Initialization of SSE connection. SSE session handler is registered at data-change-event / YANG notification
* listener and the heartbeat ping process is started if it is enabled.
+ *
+ * @throws UnsupportedEncodingException if the subscriber cannot be instantiated
+ * @throws XPathExpressionException if the subscriber cannot be instantiated
+ * @throws IllegalArgumentException if the subscriber cannot be instantiated
*/
- public synchronized boolean init() {
- final var local = stream.addSubscriber(this);
+ public synchronized boolean init() throws UnsupportedEncodingException, XPathExpressionException {
+ final var local = stream.addSubscriber(this, encoding, params);
if (local == null) {
return false;
}
}
}
- @Override
- public synchronized boolean isConnected() {
- return !sink.isClosed();
- }
-
/**
* Sending of string message to outbound Server-Sent Events channel {@link SseEventSink}. SSE is automatically split
* to fragments with new line character. If the maximum fragment length is set to non-zero positive value and input
* Interface for session handler that is responsible for sending of data over established session.
*/
public interface StreamSessionHandler {
- /**
- * Identification of created session.
- */
- boolean isConnected();
-
/**
* Interface for sending String message through one of implementation.
*
* @param <T> event type
*/
abstract sealed class Subscribers<T> {
- private static final class Empty<T> extends Subscribers<T> {
- static final @NonNull Empty<?> INSTANCE = new Empty<>();
+ static final class Empty<T> extends Subscribers<T> {
+ private static final @NonNull Empty<?> INSTANCE = new Empty<>();
+
+ private Empty() {
+ // Hidden on purpose
+ }
@Override
Subscribers<T> add(final Subscriber<T> toAdd) {
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.opendaylight.restconf.nb.rfc8040.URLConstants;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Override
public Object createWebSocket(final ServletUpgradeRequest req, final ServletUpgradeResponse resp) {
final var path = req.getRequestURI().getPath();
- if (path.startsWith(STREAMS_PREFIX)) {
- final var streamName = path.substring(STREAMS_PREFIX.length());
- final var listener = listenersBroker.getStream(streamName);
- if (listener != null) {
- LOG.debug("Listener for stream with name {} has been found, web-socket session handler will be created",
- streamName);
- resp.setSuccess(true);
- resp.setStatusCode(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
- // note: every web-socket manages PING process individually because this approach scales better than
- // sending PING frames at once over all web-socket sessions
- return new WebSocketSessionHandler(executorService, listener, maximumFragmentLength, heartbeatInterval);
- }
+ if (!path.startsWith(STREAMS_PREFIX)) {
+ LOG.debug("Request path '{}' does not start with '{}'", path, STREAMS_PREFIX);
+ return notFound(resp);
+ }
+ final var stripped = path.substring(STREAMS_PREFIX.length());
+ final int slash = stripped.indexOf('/');
+ if (slash < 0) {
+ LOG.debug("Request path '{}' does not contain encoding", path);
+ return notFound(resp);
+ }
+ if (slash == 0) {
+ LOG.debug("Request path '{}' contains empty encoding", path);
+ return notFound(resp);
+ }
+ final var streamName = stripped.substring(slash + 1);
+ final var stream = listenersBroker.getStream(streamName);
+ if (stream == null) {
LOG.debug("Listener for stream with name {} was not found.", streamName);
- } else {
- LOG.debug("Request path '{}' does not start with '{}'", path, STREAMS_PREFIX);
+ return notFound(resp);
}
+
+ LOG.debug("Listener for stream with name {} has been found, web-socket session handler will be created",
+ streamName);
+ resp.setSuccess(true);
+ resp.setStatusCode(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
+ // note: every web-socket manages PING process individually because this approach scales better than
+ // sending PING frames at once over all web-socket sessions
+ return new WebSocketSessionHandler(executorService, stream, new EncodingName(stripped.substring(0, slash)),
+ null, maximumFragmentLength, heartbeatInterval);
+ }
+
+ private static Object notFound(final ServletUpgradeResponse resp) {
resp.setSuccess(false);
resp.setStatusCode(HttpServletResponse.SC_NOT_FOUND);
return null;
import com.google.common.base.Strings;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.xml.xpath.XPathExpressionException;
+import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
import org.opendaylight.yangtools.concepts.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final ScheduledExecutorService executorService;
private final RestconfStream<?> stream;
+ private final EncodingName encodingName;
+ private final ReceiveEventsParams params;
private final int maximumFragmentLength;
private final int heartbeatInterval;
* to keep session up. Ping control frames are disabled if this parameter is set to 0.
*/
WebSocketSessionHandler(final ScheduledExecutorService executorService, final RestconfStream<?> stream,
+ final EncodingName encodingName, final @Nullable ReceiveEventsParams params,
final int maximumFragmentLength, final int heartbeatInterval) {
this.executorService = requireNonNull(executorService);
this.stream = requireNonNull(stream);
+ this.encodingName = requireNonNull(encodingName);
+ // FIXME: NETCONF-1102: require params
+ this.params = params;
this.maximumFragmentLength = maximumFragmentLength;
this.heartbeatInterval = heartbeatInterval;
}
public synchronized void onWebSocketConnected(final Session webSocketSession) {
if (session == null || !session.isOpen()) {
session = webSocketSession;
- subscriber = stream.addSubscriber(this);
+ try {
+ subscriber = stream.addSubscriber(this, encodingName, params);
+ } catch (IllegalArgumentException | XPathExpressionException | UnsupportedEncodingException e) {
+ LOG.info("Closing web-socket session {}", webSocketSession, e);
+ webSocketSession.close(404, "Unsupported encoding " + encodingName);
+ session = null;
+ return;
+ }
LOG.debug("A new web-socket session {} has been successfully registered.", webSocketSession);
if (heartbeatInterval != 0) {
}
return parts;
}
-
- @Override
- public synchronized boolean isConnected() {
- return session != null && session.isOpen();
- }
}
\ No newline at end of file
static final DataTreeCandidateFormatterFactory FACTORY = new DataTreeCandidateFormatterFactory(EMPTY) {
@Override
- XMLDataTreeCandidateFormatter getFormatter(final TextParameters textParams, final String xpathFilter)
+ public XMLDataTreeCandidateFormatter getFormatter(final TextParameters textParams, final String xpathFilter)
throws XPathExpressionException {
return new XMLDataTreeCandidateFormatter(textParams, xpathFilter);
}
@Override
- XMLDataTreeCandidateFormatter newFormatter(final TextParameters textParams) {
+ public XMLDataTreeCandidateFormatter newFormatter(final TextParameters textParams) {
return new XMLDataTreeCandidateFormatter(textParams);
}
};
static final XMLNotificationFormatter EMPTY = new XMLNotificationFormatter(TextParameters.EMPTY);
static final NotificationFormatterFactory FACTORY = new NotificationFormatterFactory(EMPTY) {
@Override
- XMLNotificationFormatter newFormatter(final TextParameters textParams) {
+ public XMLNotificationFormatter newFormatter(final TextParameters textParams) {
return new XMLNotificationFormatter(textParams);
}
@Override
- XMLNotificationFormatter getFormatter(final TextParameters textParams, final String xpathFilter)
+ public XMLNotificationFormatter getFormatter(final TextParameters textParams, final String xpathFilter)
throws XPathExpressionException {
return new XMLNotificationFormatter(textParams, xpathFilter);
}
import org.opendaylight.restconf.api.query.RestconfQueryParam;
import org.opendaylight.restconf.api.query.WithDefaultsParam;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext;
import org.opendaylight.restconf.nb.rfc8040.legacy.QueryParameters;
import org.opendaylight.yangtools.yang.common.ErrorTag;
*/
@Test
public void checkParametersTypesNegativeTest() {
- assertUnknownParam(QueryParams::newReceiveEventsParams);
+ assertInvalidIAE(ReceiveEventsParams::ofQueryParameters);
assertUnknownParam(QueryParams::newReadDataParams);
assertUnknownParam(uriInfo -> QueryParams.parseInsert(mock(EffectiveModelContext.class), uriInfo));
- assertInvalidParam(QueryParams::newReceiveEventsParams, ContentParam.ALL);
+ assertInvalidIAE(ReceiveEventsParams::ofQueryParameters, ContentParam.ALL);
assertInvalidParam(QueryParams::newReadDataParams, InsertParam.LAST);
assertInvalidParam(
uriInfo -> QueryParams.parseInsert(mock(EffectiveModelContext.class), uriInfo),
assertParamsThrows(ErrorTag.MALFORMED_MESSAGE, paramsMethod, params);
}
+ private static void assertInvalidIAE(final Function<Map<String, String>, ?> paramsMethod,
+ final RestconfQueryParam<?> param) {
+ final var ex = assertThrows(IllegalArgumentException.class,
+ () -> paramsMethod.apply(Map.of(param.paramName(), "odl-test-value")));
+ assertEquals("Invalid parameter: " + param.paramName(), ex.getMessage());
+ }
+
+ private static void assertInvalidIAE(final Function<Map<String, String>, ?> paramsMethod) {
+ final var ex = assertThrows(IllegalArgumentException.class,
+ () -> paramsMethod.apply(Map.of("odl-unknown-param", "odl-test-value")));
+ assertEquals("Invalid parameter: odl-unknown-param", ex.getMessage());
+ }
+
private static void assertUnknownParam(final Function<UriInfo, ?> paramsMethod) {
final var params = new MultivaluedHashMap<String, String>();
params.putSingle("odl-unknown-param", "odl-test-value");
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMMountPoint;
import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMNotificationService;
import org.opendaylight.mdsal.dom.api.DOMRpcImplementationNotAvailableException;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.mdsal.dom.api.DOMRpcService;
.withChild(ImmutableNodes.leafNode(QName.create(RPC, "content"), "operation result"))
.build();
- private static DatabindContext CONTEXT;
+ private static final DatabindContext CONTEXT =
+ DatabindContext.ofModel(YangParserTestUtils.parseYangResourceDirectory("/invoke-rpc"));
@Mock
private DOMDataBroker dataBroker;
private DOMMountPoint mountPoint;
@Mock
private DOMMountPointService mountPointService;
+ @Mock
+ private DOMNotificationService notificationService;
private RestconfInvokeOperationsServiceImpl invokeOperationsService;
private MdsalRestconfServer server;
- @BeforeClass
- public static void beforeClass() {
- CONTEXT = DatabindContext.ofModel(YangParserTestUtils.parseYangResourceDirectory("/invoke-rpc"));
- }
-
@Before
public void setup() {
server = new MdsalRestconfServer(dataBroker, rpcService, mountPointService);
- invokeOperationsService = new RestconfInvokeOperationsServiceImpl(() -> CONTEXT, server, mountPointService,
- new ListenersBroker.WebSockets(dataBroker));
+ invokeOperationsService = new RestconfInvokeOperationsServiceImpl(() -> CONTEXT, server,
+ new ListenersBroker.WebSockets(dataBroker, notificationService, mountPointService));
}
@Test
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractConcurrentDataBrokerTest;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMNotificationService;
import org.opendaylight.restconf.api.query.ChangedLeafNodesOnlyParam;
import org.opendaylight.restconf.api.query.ChildNodesOnlyParam;
import org.opendaylight.restconf.api.query.LeafNodesOnlyParam;
import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
import org.opendaylight.yang.gen.v1.augment.instance.identifier.patch.module.rev220218.PatchCont1Builder;
import org.opendaylight.yang.gen.v1.augment.instance.identifier.patch.module.rev220218.patch.cont.patch.choice1.PatchCase1Builder;
import org.opendaylight.yang.gen.v1.augment.instance.identifier.patch.module.rev220218.patch.cont.patch.choice2.PatchCase11Builder;
private CountDownLatch notificationLatch = new CountDownLatch(1);
private volatile String lastNotification;
- @Override
- public boolean isConnected() {
- return true;
- }
-
@Override
public void endOfStream() {
// No-op
dataBroker = getDataBroker();
domDataBroker = getDomBroker();
databindProvider = () -> DatabindContext.ofModel(SCHEMA_CONTEXT);
- listenersBroker = new ListenersBroker.ServerSentEvents(domDataBroker);
+ listenersBroker = new ListenersBroker.ServerSentEvents(domDataBroker, mock(DOMNotificationService.class),
+ mock(DOMMountPointService.class));
}
- DataTreeChangeStream createStream(final YangInstanceIdentifier path, final String streamName,
- final NotificationOutputType outputType, final boolean leafNodesOnly,
- final boolean skipNotificationData, final boolean changedLeafNodesOnly, final boolean childNodesOnly) {
- final var ret = new DataTreeChangeStream(listenersBroker, streamName, outputType, databindProvider,
- LogicalDatastoreType.CONFIGURATION, path);
- ret.setQueryParams(new ReceiveEventsParams(null, null, null,
- leafNodesOnly ? LeafNodesOnlyParam.of(true) : null,
- skipNotificationData ? SkipNotificationDataParam.of(true) : null,
- changedLeafNodesOnly ? ChangedLeafNodesOnlyParam.of(true) : null,
- childNodesOnly ? ChildNodesOnlyParam.of(true) : null));
- return ret;
+ TestHandler createHandler(final YangInstanceIdentifier path, final String streamName,
+ final NotificationOutputType outputType, final boolean leafNodesOnly, final boolean skipNotificationData,
+ final boolean changedLeafNodesOnly, final boolean childNodesOnly) throws Exception {
+ final var stream = listenersBroker.createStream("test", "baseURI",
+ new DataTreeChangeSource(databindProvider, domDataBroker, LogicalDatastoreType.CONFIGURATION, path))
+ .getOrThrow();
+ final var handler = new TestHandler();
+ stream.addSubscriber(handler,
+ switch (outputType) {
+ case JSON -> EncodingName.RFC8040_JSON;
+ case XML -> EncodingName.RFC8040_XML;
+ },
+ new ReceiveEventsParams(null, null, null,
+ leafNodesOnly ? LeafNodesOnlyParam.of(true) : null,
+ skipNotificationData ? SkipNotificationDataParam.of(true) : null,
+ changedLeafNodesOnly ? ChangedLeafNodesOnlyParam.of(true) : null,
+ childNodesOnly ? ChildNodesOnlyParam.of(true) : null));
+ return handler;
}
@Test
public void testJsonNotifsLeaves() throws Exception {
- final var adapter = createStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
+ final var handler = createHandler(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
true, false, false, false);
- final var handler = new TestHandler();
- adapter.addSubscriber(handler);
-
- final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
- final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
- changeService.registerDataTreeChangeListener(root, adapter);
var writeTransaction = dataBroker.newWriteOnlyTransaction();
final var iid = InstanceIdentifier.create(PatchCont.class);
@Test
public void testJsonNotifsChangedLeaves() throws Exception {
- final var adapter = createStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON, false, false, true,
+ final var handler = createHandler(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON, false, false, true,
false);
- final var handler = new TestHandler();
- adapter.addSubscriber(handler);
-
- final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
- final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
- changeService.registerDataTreeChangeListener(root, adapter);
var writeTransaction = dataBroker.newWriteOnlyTransaction();
final var iid = InstanceIdentifier.create(PatchCont.class);
@Test
public void testJsonChildNodesOnly() throws Exception {
- final var adapter = createStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON, false, false, false,
+ final var handler = createHandler(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON, false, false, false,
true);
- final var handler = new TestHandler();
- adapter.addSubscriber(handler);
-
- final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
- final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
- changeService.registerDataTreeChangeListener(root, adapter);
final var iid = InstanceIdentifier.create(PatchCont.class).child(MyList1.class, new MyList1Key("Althea"));
var writeTransaction = dataBroker.newWriteOnlyTransaction();
@Test
public void testXmlLeavesOnly() throws Exception {
- final var adapter = createStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML, true, false, false,
+ final var handler = createHandler(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML, true, false, false,
false);
- final var handler = new TestHandler();
- adapter.addSubscriber(handler);
- final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
- final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
- changeService.registerDataTreeChangeListener(root, adapter);
var writeTransaction = dataBroker.newWriteOnlyTransaction();
final var iid = InstanceIdentifier.create(PatchCont.class);
writeTransaction.put(LogicalDatastoreType.CONFIGURATION, iid, new PatchContBuilder()
@Test
public void testXmlChangedLeavesOnly() throws Exception {
- final var adapter = createStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML, false, false, true,
+ final var handler = createHandler(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML, false, false, true,
false);
- final var handler = new TestHandler();
- adapter.addSubscriber(handler);
- final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
- final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
- changeService.registerDataTreeChangeListener(root, adapter);
var writeTransaction = dataBroker.newWriteOnlyTransaction();
final var iid = InstanceIdentifier.create(PatchCont.class);
writeTransaction.put(LogicalDatastoreType.CONFIGURATION, iid, new PatchContBuilder()
@Test
public void testXmlChildNodesOnly() throws Exception {
- final var adapter = createStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML, false, false, false,
+ final var handler = createHandler(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML, false, false, false,
true);
- final var handler = new TestHandler();
- adapter.addSubscriber(handler);
-
- final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
- final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, PATCH_CONT_YIID);
- changeService.registerDataTreeChangeListener(root, adapter);
final var iid = InstanceIdentifier.create(PatchCont.class).child(MyList1.class, new MyList1Key("Althea"));
var writeTransaction = dataBroker.newWriteOnlyTransaction();
private void jsonNotifications(final YangInstanceIdentifier pathYiid, final boolean skipData,
final String jsonNotifCreate, final String jsonNotifUpdate, final String jsonNotifDelete) throws Exception {
- final var stream = createStream(pathYiid, "Casey", NotificationOutputType.JSON, false, skipData, false, false);
- final var handler = new TestHandler();
- stream.addSubscriber(handler);
-
- final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
- final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, pathYiid);
- changeService.registerDataTreeChangeListener(root, stream);
+ final var handler = createHandler(pathYiid, "Casey", NotificationOutputType.JSON, false, skipData, false,
+ false);
var writeTransaction = dataBroker.newWriteOnlyTransaction();
var builder = new MyList1Builder().setMyLeaf11("Jed").setName("Althea");
private void xmlNotifications(final YangInstanceIdentifier pathYiid, final boolean skipData,
final String xmlNotifCreate, final String xmlNotifUpdate, final String xmlNotifDelete) throws Exception {
- final var adapter = createStream(pathYiid, "Casey", NotificationOutputType.XML, false, skipData, false, false);
- final var handler = new TestHandler();
- adapter.addSubscriber(handler);
-
- final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
- final var root = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, pathYiid);
- changeService.registerDataTreeChangeListener(root, adapter);
+ final var handler = createHandler(pathYiid, "Casey", NotificationOutputType.XML, false, skipData, false, false);
var writeTransaction = dataBroker.newWriteOnlyTransaction();
var builder = new MyList1Builder().setMyLeaf11("Jed").setName("Althea");
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import com.google.common.collect.ImmutableClassToInstanceMap;
import java.net.URI;
import java.util.UUID;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.UriInfo;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMNotificationService;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
@ExtendWith(MockitoExtension.class)
class ListenersBrokerTest {
private static final EffectiveModelContext SCHEMA_CTX = YangParserTestUtils.parseYangResourceDirectory("/streams");
+ private static final URI BASE_URI = URI.create("baseURI");
@Mock
private DOMDataBroker dataBroker;
@Mock
- private DOMDataTreeWriteTransaction tx;
+ private DOMDataTreeChangeService treeChange;
@Mock
- private UriInfo uriInfo;
+ private DOMMountPointService mountPointService;
@Mock
- private UriBuilder uriBuilder;
- @Captor
- private ArgumentCaptor<String> uriCaptor;
+ private DOMNotificationService notificationService;
+ @Mock
+ private DOMDataTreeWriteTransaction tx;
@Captor
private ArgumentCaptor<YangInstanceIdentifier> pathCaptor;
@Captor
@BeforeEach
public void before() {
- listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker);
+ listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker, notificationService, mountPointService);
databindProvider = () -> DatabindContext.ofModel(SCHEMA_CTX);
}
@Test
void createStreamTest() {
+ doReturn(ImmutableClassToInstanceMap.of(DOMDataTreeChangeService.class, treeChange))
+ .when(dataBroker).getExtensions();
+
doReturn(tx).when(dataBroker).newWriteOnlyTransaction();
doNothing().when(tx).put(eq(LogicalDatastoreType.OPERATIONAL), pathCaptor.capture(), dataCaptor.capture());
doReturn(CommitInfo.emptyFluentFuture()).when(tx).commit();
- doReturn(uriBuilder).when(uriInfo).getBaseUriBuilder();
- doReturn(uriBuilder).when(uriBuilder).replacePath(uriCaptor.capture());
- doAnswer(inv -> new URI(uriCaptor.getValue())).when(uriBuilder).build();
-
- final var output = assertInstanceOf(ContainerNode.class, listenersBroker.createDataChangeNotifiStream(
- databindProvider, uriInfo, prepareDomPayload("create-data-change-event-subscription", "toaster", "path"),
- SCHEMA_CTX).getOrThrow().orElse(null));
+ final var output = assertInstanceOf(ContainerNode.class,
+ listenersBroker.createDataChangeNotifiStream(databindProvider, BASE_URI,
+ prepareDomPayload("create-data-change-event-subscription", "toaster", "path"), SCHEMA_CTX)
+ .getOrThrow()
+ .orElse(null));
assertEquals(new NodeIdentifier(CreateDataChangeEventSubscriptionOutput.QNAME), output.name());
assertEquals(1, output.size());
final var rcName = QName.create(rcStream, "name");
final var streamId = NodeIdentifierWithPredicates.of(rcStream, rcName, name);
final var rcEncoding = QName.create(rcStream, "encoding");
+ final var rcLocation = QName.create(rcStream, "location");
assertEquals(YangInstanceIdentifier.of(
new NodeIdentifier(QName.create(rcStream, "restconf-state")),
.withChild(Builders.mapBuilder()
.withNodeIdentifier(new NodeIdentifier(Access.QNAME))
.withChild(Builders.mapEntryBuilder()
- .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, rcEncoding, ""))
- .withChild(ImmutableNodes.leafNode(rcEncoding, ""))
- .withChild(ImmutableNodes.leafNode(QName.create(rcStream, "location"),
- "rests/streams/" + name))
+ .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, rcEncoding, "json"))
+ .withChild(ImmutableNodes.leafNode(rcEncoding, "json"))
+ .withChild(ImmutableNodes.leafNode(rcLocation, "rests/streams/json/" + name))
+ .build())
+ .withChild(Builders.mapEntryBuilder()
+ .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, rcEncoding, "xml"))
+ .withChild(ImmutableNodes.leafNode(rcEncoding, "xml"))
+ .withChild(ImmutableNodes.leafNode(rcLocation, "rests/streams/xml/" + name))
.build())
.build())
- .build(), dataCaptor.getValue());
+ .build().prettyTree().toString(), dataCaptor.getValue().prettyTree().toString());
}
@Test
void createStreamWrongValueTest() {
final var payload = prepareDomPayload("create-data-change-event-subscription", "String value", "path");
final var errors = assertThrows(RestconfDocumentedException.class,
- () -> listenersBroker.createDataChangeNotifiStream(databindProvider, uriInfo, payload, SCHEMA_CTX))
+ () -> listenersBroker.createDataChangeNotifiStream(databindProvider, BASE_URI, payload, SCHEMA_CTX))
.getErrors();
assertEquals(1, errors.size());
final var error = errors.get(0);
void createStreamWrongInputRpcTest() {
final var payload = prepareDomPayload("create-data-change-event-subscription2", "toaster", "path2");
final var errors = assertThrows(RestconfDocumentedException.class,
- () -> listenersBroker.createDataChangeNotifiStream(databindProvider, uriInfo, payload, SCHEMA_CTX))
+ () -> listenersBroker.createDataChangeNotifiStream(databindProvider, BASE_URI, payload, SCHEMA_CTX))
.getErrors();
assertEquals(1, errors.size());
final var error = errors.get(0);
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.junit.jupiter.api.Test;
-import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.module.list.Module;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.module.list.module.Deviation;
import org.opendaylight.yangtools.yang.common.QName;
*/
class RestconfStateStreamsTest {
private static final Logger LOG = LoggerFactory.getLogger(RestconfStateStreamsTest.class);
-
- // FIXME: assemble these from dependencies
- private static EffectiveModelContext schemaContext =
+ private static final EffectiveModelContext CONTEXT =
+ // TODO: assemble these from dependencies?
YangParserTestUtils.parseYangResourceDirectory("/modules/restconf-module-testing");
- private static EffectiveModelContext schemaContextMonitoring =
- YangParserTestUtils.parseYangResourceDirectory("/modules");
@Test
void toStreamEntryNodeTest() throws Exception {
- final var path = ParserIdentifier.toInstanceIdentifier(
- "nested-module:depth1-cont/depth2-leaf1", schemaContextMonitoring, null).getInstanceIdentifier();
final var outputType = "XML";
final var uri = "uri";
final var streamName = "/nested-module:depth1-cont/depth2-leaf1";
assertMappedData(prepareMap(streamName, uri, outputType),
- ListenersBroker.streamEntry(streamName, "description", "location", outputType));
+ ListenersBroker.streamEntry(streamName, "description", "location", Set.of(new EncodingName(outputType))));
}
@Test
final var uri = "uri";
assertMappedData(prepareMap("notifi", uri, outputType),
- ListenersBroker.streamEntry("notifi", "description", "location", outputType));
+ ListenersBroker.streamEntry("notifi", "description", "location", Set.of(new EncodingName(outputType))));
}
private static Map<QName, Object> prepareMap(final String name, final String uri, final String outputType) {
}
}
- final var expectedModules = schemaContext.getModules();
+ final var expectedModules = CONTEXT.getModules();
assertEquals(expectedModules.size(), loadedModules.size());
for (var m : expectedModules) {
final String name = m.getName();
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import org.glassfish.jersey.media.sse.OutboundEvent;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
import org.opendaylight.yangtools.concepts.Registration;
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class SSESessionHandlerTest {
+@ExtendWith(MockitoExtension.class)
+class SSESessionHandlerTest {
@Mock
private ScheduledExecutorService executorService;
@Mock
- private RestconfStream<?> listener;
+ private RestconfStream<?> stream;
@Mock
private ScheduledFuture<?> pingFuture;
@Mock
@Mock
private Registration reg;
- private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) {
+ private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) throws Exception {
+ final var sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, stream,
+ EncodingName.RFC8040_XML, new ReceiveEventsParams(null, null, null, null, null, null, null),
+ maxFragmentSize, heartbeatInterval);
+ doReturn(reg).when(stream).addSubscriber(eq(sseSessionHandler), any(), any());
+ return sseSessionHandler;
+ }
+
+ private void setupEvent() {
doAnswer(inv -> new OutboundEvent.Builder().data(String.class, inv.getArgument(0, String.class)).build())
- .when(sse).newEvent(any());
+ .when(sse).newEvent(any());
+ }
- final var sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, listener, maxFragmentSize,
- heartbeatInterval);
- doReturn(reg).when(listener).addSubscriber(sseSessionHandler);
+ private void setupPing(final long maxFragmentSize, final long heartbeatInterval) {
doReturn(pingFuture).when(executorService)
- .scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), eq((long) heartbeatInterval),
+ .scheduleWithFixedDelay(any(Runnable.class), eq(heartbeatInterval), eq(heartbeatInterval),
eq(TimeUnit.MILLISECONDS));
- return sseSessionHandler;
}
@Test
- public void onSSEConnectedWithEnabledPing() {
+ void onSSEConnectedWithEnabledPing() throws Exception {
final int heartbeatInterval = 1000;
final var sseSessionHandler = setup(1000, heartbeatInterval);
}
@Test
- public void onSSEConnectedWithDisabledPing() {
+ void onSSEConnectedWithDisabledPing() throws Exception {
final int heartbeatInterval = 0;
final var sseSessionHandler = setup(1000, heartbeatInterval);
sseSessionHandler.init();
- verify(listener).addSubscriber(sseSessionHandler);
verifyNoMoreInteractions(executorService);
}
@Test
- public void onSSEClosedWithOpenSession() {
+ void onSSEClosedWithOpenSession() throws Exception {
final var sseSessionHandler = setup(200, 10000);
sseSessionHandler.init();
- verify(listener).addSubscriber(sseSessionHandler);
sseSessionHandler.close();
verify(reg).close();
}
@Test
- public void onSSECloseWithEnabledPingAndLivingSession() throws IOException {
+ void onSSECloseWithEnabledPingAndLivingSession() throws Exception {
final var sseSessionHandler = setup(150, 8000);
+ setupPing(150, 8000);
sseSessionHandler.init();
doReturn(false).when(pingFuture).isCancelled();
doReturn(false).when(pingFuture).isDone();
}
@Test
- public void onSSECloseWithEnabledPingAndDeadSession() {
+ void onSSECloseWithEnabledPingAndDeadSession() throws Exception {
final var sseSessionHandler = setup(150, 8000);
+ setupPing(150, 8000);
sseSessionHandler.init();
sseSessionHandler.close();
}
@Test
- public void onSSECloseWithDisabledPingAndDeadSession() {
+ void onSSECloseWithDisabledPingAndDeadSession() throws Exception {
final var sseSessionHandler = setup(150, 8000);
sseSessionHandler.init();
- doReturn(true).when(pingFuture).isDone();
sseSessionHandler.close();
verify(reg).close();
}
@Test
- public void sendDataMessageWithDisabledFragmentation() throws IOException {
+ void sendDataMessageWithDisabledFragmentation() throws Exception {
final var sseSessionHandler = setup(0, 0);
doReturn(false).when(eventSink).isClosed();
+ setupEvent();
sseSessionHandler.init();
final String testMessage = generateRandomStringOfLength(100);
sseSessionHandler.sendDataMessage(testMessage);
}
@Test
- public void sendDataMessageWithDisabledFragAndDeadSession() throws IOException {
- final SSESessionHandler sseSessionHandler = setup(0, 0);
+ void sendDataMessageWithDisabledFragAndDeadSession() throws Exception {
+ final var sseSessionHandler = setup(0, 0);
doReturn(true).when(eventSink).isClosed();
sseSessionHandler.init();
}
@Test
- public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException {
- final SSESessionHandler sseSessionHandler = setup(100, 0);
+ void sendDataMessageWithEnabledFragAndSmallMessage() throws Exception {
+ final var sseSessionHandler = setup(100, 0);
doReturn(false).when(eventSink).isClosed();
+ setupEvent();
sseSessionHandler.init();
// in both cases, fragmentation should not be applied
sseSessionHandler.sendDataMessage(testMessage1);
sseSessionHandler.sendDataMessage(testMessage2);
- ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
+ final var cap = ArgumentCaptor.forClass(OutboundEvent.class);
// without fragmentation there will be 2 write calls
verify(eventSink, times(2)).send(cap.capture());
OutboundEvent event1 = cap.getAllValues().get(0);
}
@Test
- public void sendDataMessageWithZeroLength() {
- final SSESessionHandler sseSessionHandler = setup(100, 0);
+ void sendDataMessageWithZeroLength() throws Exception {
+ final var sseSessionHandler = setup(100, 0);
sseSessionHandler.init();
sseSessionHandler.sendDataMessage("");
}
@Test
- public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException {
- final SSESessionHandler sseSessionHandler = setup(100, 0);
+ void sendDataMessageWithEnabledFragAndLargeMessage1() throws Exception {
+ final var sseSessionHandler = setup(100, 0);
doReturn(false).when(eventSink).isClosed();
+ setupEvent();
sseSessionHandler.init();
// there should be 10 fragments of length 100 characters
final String testMessage = generateRandomStringOfLength(1000);
sseSessionHandler.sendDataMessage(testMessage);
- ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
+ final var cap = ArgumentCaptor.forClass(OutboundEvent.class);
// SSE automatically send fragmented packet ended with new line character due to eventOutput
// have only 1 write call
verify(eventSink, times(1)).send(cap.capture());
}
private static String generateRandomStringOfLength(final int length) {
- final String alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvxyz";
- final StringBuilder sb = new StringBuilder(length);
+ final var alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvxyz";
+ final var sb = new StringBuilder(length);
for (int i = 0; i < length; i++) {
int index = (int) (alphabet.length() * Math.random());
sb.append(alphabet.charAt(index));
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
+import com.google.common.collect.ImmutableClassToInstanceMap;
import java.net.URI;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMNotificationService;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@ExtendWith(MockitoExtension.class)
class WebSocketFactoryTest extends AbstractNotificationListenerTest {
+ private static final QName TOASTER = QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster");
+
@Mock
private ScheduledExecutorService execService;
@Mock
@Mock
private DOMDataBroker dataBroker;
@Mock
+ private DOMDataTreeChangeService changeService;
+ @Mock
private DOMDataTreeWriteTransaction tx;
@Mock
private DatabindProvider databindProvider;
+ @Mock
+ private DOMMountPointService mountPointService;
+ @Mock
+ private DOMNotificationService notificationService;
private ListenersBroker listenersBroker;
private WebSocketFactory webSocketFactory;
@BeforeEach
void prepareListenersBroker() {
+ doReturn(ImmutableClassToInstanceMap.of(DOMDataTreeChangeService.class, changeService)).when(dataBroker)
+ .getExtensions();
doReturn(tx).when(dataBroker).newWriteOnlyTransaction();
doReturn(CommitInfo.emptyFluentFuture()).when(tx).commit();
- listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker);
+ listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker, notificationService, mountPointService);
webSocketFactory = new WebSocketFactory(execService, listenersBroker, 5000, 2000);
streamName = listenersBroker.createStream("description", "streams",
- name -> new DataTreeChangeStream(listenersBroker, name, NotificationOutputType.JSON, databindProvider,
- LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.of(
- QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))))
+ new DataTreeChangeSource(databindProvider, dataBroker, LogicalDatastoreType.CONFIGURATION,
+ YangInstanceIdentifier.of(TOASTER)))
.getOrThrow()
.name();
}
@Test
void createWebSocketSuccessfully() {
- doReturn(URI.create("https://localhost:8181/rests/streams/" + streamName))
+ doReturn(URI.create("https://localhost:8181/rests/streams/xml/" + streamName))
.when(upgradeRequest).getRequestURI();
assertInstanceOf(WebSocketSessionHandler.class,
@Test
void createWebSocketUnsuccessfully() {
- doReturn(URI.create("https://localhost:8181/rests/streams/" + streamName + "/toasterStatus"))
+ doReturn(URI.create("https://localhost:8181/rests/streams/xml/" + streamName + "/toasterStatus"))
.when(upgradeRequest).getRequestURI();
assertNull(webSocketFactory.createWebSocket(upgradeRequest, upgradeResponse));
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import java.io.IOException;
-import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
import org.opendaylight.yangtools.concepts.Registration;
-public class WebSocketSessionHandlerTest {
- private static final class WebSocketTestSessionState {
- private final RestconfStream<?> listener;
- private final ScheduledExecutorService executorService;
+@ExtendWith(MockitoExtension.class)
+class WebSocketSessionHandlerTest {
+ private final class WebSocketTestSessionState {
private final WebSocketSessionHandler webSocketSessionHandler;
private final int heartbeatInterval;
private final int maxFragmentSize;
- private final ScheduledFuture pingFuture;
WebSocketTestSessionState(final int maxFragmentSize, final int heartbeatInterval) {
- listener = mock(RestconfStream.class);
- executorService = mock(ScheduledExecutorService.class);
this.heartbeatInterval = heartbeatInterval;
this.maxFragmentSize = maxFragmentSize;
- webSocketSessionHandler = new WebSocketSessionHandler(executorService, listener, maxFragmentSize,
- heartbeatInterval);
- pingFuture = mock(ScheduledFuture.class);
- when(executorService.scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval),
- eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS))).thenReturn(pingFuture);
+ webSocketSessionHandler = new WebSocketSessionHandler(executorService, stream,
+ ENCODING, null, maxFragmentSize, heartbeatInterval);
+
+ if (heartbeatInterval != 0) {
+ doReturn(pingFuture).when(executorService).scheduleWithFixedDelay(any(Runnable.class),
+ eq((long) heartbeatInterval), eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS));
+ }
}
}
+ static final EncodingName ENCODING = new EncodingName("encoding");
+
+ @Mock
+ private RestconfStream<?> stream;
+ @Mock
+ private ScheduledExecutorService executorService;
+ @Mock
+ private ScheduledFuture pingFuture;
+ @Mock
+ private Session session;
+
@Test
- public void onWebSocketConnectedWithEnabledPing() {
+ void onWebSocketConnectedWithEnabledPing() throws Exception {
final int heartbeatInterval = 1000;
- final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(
- 1000, heartbeatInterval);
- final Session session = mock(Session.class);
+ final var webSocketTestSessionState = new WebSocketTestSessionState(1000, heartbeatInterval);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
- verify(webSocketTestSessionState.listener).addSubscriber(
- webSocketTestSessionState.webSocketSessionHandler);
- verify(webSocketTestSessionState.executorService).scheduleWithFixedDelay(any(Runnable.class),
+ verify(stream).addSubscriber(webSocketTestSessionState.webSocketSessionHandler, ENCODING, null);
+ verify(executorService).scheduleWithFixedDelay(any(Runnable.class),
eq((long) webSocketTestSessionState.heartbeatInterval),
eq((long) webSocketTestSessionState.heartbeatInterval), eq(TimeUnit.MILLISECONDS));
}
@Test
- public void onWebSocketConnectedWithDisabledPing() {
+ void onWebSocketConnectedWithDisabledPing() throws Exception {
final int heartbeatInterval = 0;
- final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(
- 1000, heartbeatInterval);
- final Session session = mock(Session.class);
+ final var webSocketTestSessionState = new WebSocketTestSessionState(1000, heartbeatInterval);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
- verify(webSocketTestSessionState.listener).addSubscriber(
- webSocketTestSessionState.webSocketSessionHandler);
- verifyNoMoreInteractions(webSocketTestSessionState.executorService);
+ verify(stream).addSubscriber(webSocketTestSessionState.webSocketSessionHandler, ENCODING, null);
+ verifyNoMoreInteractions(executorService);
}
@Test
- public void onWebSocketConnectedWithAlreadyOpenSession() {
+ void onWebSocketConnectedWithAlreadyOpenSession() throws Exception {
final var webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
- final var session = mock(Session.class);
when(session.isOpen()).thenReturn(true);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
- verify(webSocketTestSessionState.listener).addSubscriber(any());
+ verify(stream).addSubscriber(any(), any(), any());
}
@Test
- public void onWebSocketClosedWithOpenSession() {
+ void onWebSocketClosedWithOpenSession() throws Exception {
final var webSocketTestSessionState = new WebSocketTestSessionState(200, 10000);
- final var session = mock(Session.class);
final var reg = mock(Registration.class);
- doReturn(reg).when(webSocketTestSessionState.listener)
- .addSubscriber(webSocketTestSessionState.webSocketSessionHandler);
+ doReturn(reg).when(stream).addSubscriber(webSocketTestSessionState.webSocketSessionHandler, ENCODING, null);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
- verify(webSocketTestSessionState.listener).addSubscriber(webSocketTestSessionState.webSocketSessionHandler);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketClosed(200, "Simulated close");
verify(reg).close();
}
@Test
- public void onWebSocketClosedWithNotInitialisedSession() {
- final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(300, 12000);
+ void onWebSocketClosedWithNotInitialisedSession() {
+ final var webSocketTestSessionState = new WebSocketTestSessionState(0, 0);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketClosed(500, "Simulated close");
- verifyNoMoreInteractions(webSocketTestSessionState.listener);
+ verifyNoMoreInteractions(stream);
}
@Test
- public void onWebSocketErrorWithEnabledPingAndLivingSession() {
+ void onWebSocketErrorWithEnabledPingAndLivingSession() throws Exception {
final var webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
- final var session = mock(Session.class);
final var reg = mock(Registration.class);
when(session.isOpen()).thenReturn(true);
- when(webSocketTestSessionState.listener.addSubscriber(webSocketTestSessionState.webSocketSessionHandler))
+ when(stream.addSubscriber(webSocketTestSessionState.webSocketSessionHandler, ENCODING, null))
.thenReturn(reg);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
- when(webSocketTestSessionState.pingFuture.isCancelled()).thenReturn(false);
- when(webSocketTestSessionState.pingFuture.isDone()).thenReturn(false);
+ when(pingFuture.isCancelled()).thenReturn(false);
+ when(pingFuture.isDone()).thenReturn(false);
final var sampleError = new IllegalStateException("Simulated error");
doNothing().when(reg).close();
webSocketTestSessionState.webSocketSessionHandler.onWebSocketError(sampleError);
verify(reg).close();
verify(session).close();
- verify(webSocketTestSessionState.pingFuture).cancel(anyBoolean());
+ verify(pingFuture).cancel(anyBoolean());
}
@Test
- public void onWebSocketErrorWithEnabledPingAndDeadSession() {
+ void onWebSocketErrorWithEnabledPingAndDeadSession() throws Exception {
final var webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
- final var session = mock(Session.class);
final var reg = mock(Registration.class);
when(session.isOpen()).thenReturn(false);
- when(webSocketTestSessionState.listener.addSubscriber(webSocketTestSessionState.webSocketSessionHandler))
+ when(stream.addSubscriber(webSocketTestSessionState.webSocketSessionHandler, ENCODING, null))
.thenReturn(reg);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketError(sampleError);
verify(reg).close();
verify(session, never()).close();
- verify(webSocketTestSessionState.pingFuture).cancel(anyBoolean());
+ verify(pingFuture).cancel(anyBoolean());
}
@Test
- public void onWebSocketErrorWithDisabledPingAndDeadSession() {
+ void onWebSocketErrorWithDisabledPingAndDeadSession() throws Exception {
final var webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
- final var session = mock(Session.class);
final var reg = mock(Registration.class);
when(session.isOpen()).thenReturn(false);
- when(webSocketTestSessionState.listener.addSubscriber(webSocketTestSessionState.webSocketSessionHandler))
+ when(stream.addSubscriber(webSocketTestSessionState.webSocketSessionHandler, ENCODING, null))
.thenReturn(reg);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
- when(webSocketTestSessionState.pingFuture.isCancelled()).thenReturn(false);
- when(webSocketTestSessionState.pingFuture.isDone()).thenReturn(true);
+ when(pingFuture.isDone()).thenReturn(true);
final var sampleError = new IllegalStateException("Simulated error");
webSocketTestSessionState.webSocketSessionHandler.onWebSocketError(sampleError);
verify(reg).close();
verify(session, never()).close();
- verify(webSocketTestSessionState.pingFuture, never()).cancel(anyBoolean());
+ verify(pingFuture, never()).cancel(anyBoolean());
}
@Test
- public void sendDataMessageWithDisabledFragmentation() throws IOException {
+ void sendDataMessageWithDisabledFragmentation() throws Exception {
final var webSocketTestSessionState = new WebSocketTestSessionState(0, 0);
- final var session = mock(Session.class);
final var remoteEndpoint = mock(RemoteEndpoint.class);
when(session.isOpen()).thenReturn(true);
when(session.getRemote()).thenReturn(remoteEndpoint);
}
@Test
- public void sendDataMessageWithDisabledFragAndDeadSession() {
- final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(0, 0);
- final Session session = mock(Session.class);
- final RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
+ void sendDataMessageWithDisabledFragAndDeadSession() {
+ final var webSocketTestSessionState = new WebSocketTestSessionState(0, 0);
+ final var remoteEndpoint = mock(RemoteEndpoint.class);
when(session.isOpen()).thenReturn(false);
- when(session.getRemote()).thenReturn(remoteEndpoint);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
final String testMessage = generateRandomStringOfLength(11);
}
@Test
- public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException {
- final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(100, 0);
- final Session session = mock(Session.class);
- final RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
+ void sendDataMessageWithEnabledFragAndSmallMessage() throws Exception {
+ final var webSocketTestSessionState = new WebSocketTestSessionState(100, 0);
+ final var remoteEndpoint = mock(RemoteEndpoint.class);
when(session.isOpen()).thenReturn(true);
when(session.getRemote()).thenReturn(remoteEndpoint);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
}
@Test
- public void sendDataMessageWithZeroLength() {
- final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(100, 0);
- final Session session = mock(Session.class);
- final RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
- when(session.isOpen()).thenReturn(true);
- when(session.getRemote()).thenReturn(remoteEndpoint);
+ void sendDataMessageWithZeroLength() {
+ final var webSocketTestSessionState = new WebSocketTestSessionState(100, 0);
+ final var remoteEndpoint = mock(RemoteEndpoint.class);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
webSocketTestSessionState.webSocketSessionHandler.sendDataMessage("");
}
@Test
- public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException {
- final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(100, 0);
- final Session session = mock(Session.class);
- final RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
+ void sendDataMessageWithEnabledFragAndLargeMessage1() throws Exception {
+ final var webSocketTestSessionState = new WebSocketTestSessionState(100, 0);
+ final var remoteEndpoint = mock(RemoteEndpoint.class);
when(session.isOpen()).thenReturn(true);
when(session.getRemote()).thenReturn(remoteEndpoint);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
// there should be 10 fragments of length 100 characters
final String testMessage = generateRandomStringOfLength(1000);
webSocketTestSessionState.webSocketSessionHandler.sendDataMessage(testMessage);
- final ArgumentCaptor<String> messageCaptor = ArgumentCaptor.forClass(String.class);
- final ArgumentCaptor<Boolean> isLastCaptor = ArgumentCaptor.forClass(Boolean.class);
+ final var messageCaptor = ArgumentCaptor.forClass(String.class);
+ final var isLastCaptor = ArgumentCaptor.forClass(Boolean.class);
verify(remoteEndpoint, times(10)).sendPartialString(
messageCaptor.capture(), isLastCaptor.capture());
- final List<String> allMessages = messageCaptor.getAllValues();
- final List<Boolean> isLastFlags = isLastCaptor.getAllValues();
+ final var allMessages = messageCaptor.getAllValues();
+ final var isLastFlags = isLastCaptor.getAllValues();
assertTrue(allMessages.stream().allMatch(s -> s.length() == webSocketTestSessionState.maxFragmentSize));
assertTrue(isLastFlags.subList(0, 9).stream().noneMatch(isLast -> isLast));
assertTrue(isLastFlags.get(9));
}
@Test
- public void sendDataMessageWithEnabledFragAndLargeMessage2() throws IOException {
- final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(100, 0);
- final Session session = mock(Session.class);
- final RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
+ void sendDataMessageWithEnabledFragAndLargeMessage2() throws Exception {
+ final var webSocketTestSessionState = new WebSocketTestSessionState(100, 0);
+ final var remoteEndpoint = mock(RemoteEndpoint.class);
when(session.isOpen()).thenReturn(true);
when(session.getRemote()).thenReturn(remoteEndpoint);
webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
// there should be 10 fragments, the last fragment should be the shortest one
final String testMessage = generateRandomStringOfLength(950);
webSocketTestSessionState.webSocketSessionHandler.sendDataMessage(testMessage);
- final ArgumentCaptor<String> messageCaptor = ArgumentCaptor.forClass(String.class);
- final ArgumentCaptor<Boolean> isLastCaptor = ArgumentCaptor.forClass(Boolean.class);
+ final var messageCaptor = ArgumentCaptor.forClass(String.class);
+ final var isLastCaptor = ArgumentCaptor.forClass(Boolean.class);
verify(remoteEndpoint, times(10)).sendPartialString(
messageCaptor.capture(), isLastCaptor.capture());
- final List<String> allMessages = messageCaptor.getAllValues();
- final List<Boolean> isLastFlags = isLastCaptor.getAllValues();
+ final var allMessages = messageCaptor.getAllValues();
+ final var isLastFlags = isLastCaptor.getAllValues();
assertTrue(allMessages.subList(0, 9).stream().allMatch(s ->
s.length() == webSocketTestSessionState.maxFragmentSize));
assertEquals(50, allMessages.get(9).length());
private static String generateRandomStringOfLength(final int length) {
final String alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvxyz";
- final StringBuilder sb = new StringBuilder(length);
+ final var sb = new StringBuilder(length);
for (int i = 0; i < length; i++) {
int index = (int) (alphabet.length() * Math.random());
sb.append(alphabet.charAt(index));