import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.MdsalRestconfServer;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfDataStreamServiceImpl;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.SubscribeToStreamUtil;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer;
@Reference final DOMMountPointService mountPointService,
@Reference final DOMNotificationService notificationService, @Reference final DOMRpcService rpcService,
@Reference final DOMSchemaService schemaService, @Reference final DatabindProvider databindProvider,
- @Reference final MdsalRestconfServer server, @Reference final ListenersBroker listenersBroker,
- final Configuration configuration) throws ServletException {
+ @Reference final MdsalRestconfServer server, final Configuration configuration) throws ServletException {
this(webServer, webContextSecurer, servletSupport, filterAdapterConfiguration, actionService, dataBroker,
mountPointService, notificationService, rpcService, schemaService, databindProvider, server,
- listenersBroker, configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(),
+ configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(),
new StreamsConfiguration(configuration.maximum$_$fragment$_$length(),
configuration.idle$_$timeout(), configuration.heartbeat$_$interval(), configuration.use$_$sse()));
}
final DOMActionService actionService, final DOMDataBroker dataBroker,
final DOMMountPointService mountPointService, final DOMNotificationService notificationService,
final DOMRpcService rpcService, final DOMSchemaService schemaService,
- final DatabindProvider databindProvider, final MdsalRestconfServer server,
- final ListenersBroker listenersBroker, final String pingNamePrefix, final int pingMaxThreadCount,
- final StreamsConfiguration streamsConfiguration) throws ServletException {
+ final DatabindProvider databindProvider, final MdsalRestconfServer server, final String pingNamePrefix,
+ final int pingMaxThreadCount, final StreamsConfiguration streamsConfiguration) throws ServletException {
final var scheduledThreadPool = new ScheduledThreadPoolWrapper(pingMaxThreadCount,
new NamingThreadPoolFactory(pingNamePrefix));
- final SubscribeToStreamUtil streamUtils;
+ final ListenersBroker listenersBroker;
final ServletDetails streamServlet;
if (streamsConfiguration.useSSE()) {
- streamUtils = SubscribeToStreamUtil.serverSentEvents(listenersBroker);
+ listenersBroker = new ListenersBroker.ServerSentEvents();
streamServlet = ServletDetails.builder()
.addUrlPattern("/" + URLConstants.SSE_SUBPATH + "/*")
.servlet(servletSupport.createHttpServletBuilder(
.asyncSupported(true)
.build();
} else {
- streamUtils = SubscribeToStreamUtil.webSockets(listenersBroker);
+ listenersBroker = new ListenersBroker.WebSockets();
streamServlet = ServletDetails.builder()
.addUrlPattern("/" + RestconfStreamsConstants.DATA_SUBSCRIPTION + "/*")
.addUrlPattern("/" + RestconfStreamsConstants.NOTIFICATION_STREAM + "/*")
.addUrlPattern("/*")
.servlet(servletSupport.createHttpServletBuilder(
new RestconfApplication(databindProvider, server, mountPointService, dataBroker, actionService,
- notificationService, schemaService, streamUtils))
+ notificationService, schemaService, listenersBroker))
.build())
.asyncSupported(true)
.build())
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfOperationsServiceImpl;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfSchemaServiceImpl;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.SubscribeToStreamUtil;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
@Singleton
public class RestconfApplication extends AbstractRestconfApplication {
public RestconfApplication(final DatabindProvider databindProvider, final MdsalRestconfServer server,
final DOMMountPointService mountPointService, final DOMDataBroker dataBroker,
final DOMActionService actionService, final DOMNotificationService notificationService,
- final DOMSchemaService domSchemaService, final SubscribeToStreamUtil streamUtils) {
+ final DOMSchemaService domSchemaService, final ListenersBroker listenersBroker) {
super(databindProvider, List.of(
// FIXME: NETCONF:1102: do not instantiate this service
- new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider, streamUtils),
+ new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
+ listenersBroker),
new RestconfDataServiceImpl(databindProvider, server, actionService),
- new RestconfInvokeOperationsServiceImpl(databindProvider, server, mountPointService, streamUtils),
+ new RestconfInvokeOperationsServiceImpl(databindProvider, server, mountPointService, listenersBroker),
new RestconfOperationsServiceImpl(databindProvider, server),
new RestconfSchemaServiceImpl(domSchemaService, mountPointService),
new RestconfImpl(databindProvider)));
import org.opendaylight.restconf.nb.rfc8040.databind.XmlOperationInputBody;
import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStream;
private final MdsalRestconfServer server;
@Deprecated(forRemoval = true)
private final DOMMountPointService mountPointService;
- private final SubscribeToStreamUtil streamUtils;
+ private final ListenersBroker listenersBroker;
public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider,
final MdsalRestconfServer server, final DOMMountPointService mountPointService,
- final SubscribeToStreamUtil streamUtils) {
+ final ListenersBroker listenersBroker) {
this.databindProvider = requireNonNull(databindProvider);
this.server = requireNonNull(server);
this.mountPointService = requireNonNull(mountPointService);
- this.streamUtils = requireNonNull(streamUtils);
+ this.listenersBroker = requireNonNull(listenersBroker);
}
/**
if (mountPoint == null) {
// Hacked-up integration of streams
if (CreateDataChangeEventSubscription.QNAME.equals(type)) {
- return CreateStreamUtil.createDataChangeNotifiStream(streamUtils.listenersBroker(), input,
+ return CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, input,
localDatabind.modelContext());
} else if (CreateNotificationStream.QNAME.equals(type)) {
- return CreateStreamUtil.createNotificationStream(streamUtils.listenersBroker(), input,
+ return CreateStreamUtil.createNotificationStream(listenersBroker, input,
localDatabind.modelContext());
} else if (SubscribeDeviceNotification.QNAME.equals(type)) {
- return CreateStreamUtil.createDeviceNotificationListener(streamUtils.listenersBroker(), input,
- streamUtils.prepareUriByStreamName(uriInfo, "").toString(), mountPointService);
+ return CreateStreamUtil.createDeviceNotificationListener(listenersBroker, input,
+ listenersBroker.prepareUriByStreamName(uriInfo, "").toString(), mountPointService);
}
}
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfStreamsSubscriptionService;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yang.gen.v1.subscribe.to.notification.rev161028.Notifi;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
private static final QName LOCATION_QNAME = QName.create(Notifi.QNAME, "location").intern();
private static final NodeIdentifier LOCATION_NODEID = NodeIdentifier.create(LOCATION_QNAME);
- private final SubscribeToStreamUtil streamUtils;
+ private final ListenersBroker listenersBroker;
private final HandlersHolder handlersHolder;
/**
* @param dataBroker {@link DOMDataBroker}
* @param notificationService {@link DOMNotificationService}
* @param databindProvider a {@link DatabindProvider}
- * @param streamUtils a {@link SubscribeToStreamUtil}
+ * @param listenersBroker a {@link ListenersBroker}
*/
public RestconfStreamsSubscriptionServiceImpl(final DOMDataBroker dataBroker,
final DOMNotificationService notificationService, final DatabindProvider databindProvider,
- final SubscribeToStreamUtil streamUtils) {
+ final ListenersBroker listenersBroker) {
handlersHolder = new HandlersHolder(dataBroker, notificationService, databindProvider);
- this.streamUtils = requireNonNull(streamUtils);
+ this.listenersBroker = requireNonNull(listenersBroker);
}
@Override
final URI location;
if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
- location = streamUtils.subscribeToDataStream(identifier, uriInfo, params, handlersHolder);
+ location = listenersBroker.subscribeToDataStream(identifier, uriInfo, params, handlersHolder);
} else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
- location = streamUtils.subscribeToYangStream(identifier, uriInfo, params, handlersHolder);
+ location = listenersBroker.subscribeToYangStream(identifier, uriInfo, params, handlersHolder);
} else {
final String msg = "Bad type of notification of sal-remote";
LOG.warn(msg);
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
-
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.base.Splitter;
-import java.net.URI;
-import java.util.concurrent.ExecutionException;
-import javax.ws.rs.core.UriInfo;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
-import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
-import org.opendaylight.restconf.nb.rfc8040.URLConstants;
-import org.opendaylight.restconf.nb.rfc8040.monitoring.RestconfStateStreams;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
-import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
-import org.opendaylight.yangtools.yang.common.ErrorTag;
-import org.opendaylight.yangtools.yang.common.ErrorType;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Subscribe to stream util class.
- */
-public abstract class SubscribeToStreamUtil {
- /**
- * Implementation of SubscribeToStreamUtil for Server-sent events.
- */
- private static final class ServerSentEvents extends SubscribeToStreamUtil {
- ServerSentEvents(final ListenersBroker listenersBroker) {
- super(listenersBroker);
- }
-
- @Override
- public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
- return uriInfo.getBaseUriBuilder()
- .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.SSE_SUBPATH + '/' + streamName)
- .build();
- }
- }
-
- /**
- * Implementation of SubscribeToStreamUtil for Web sockets.
- */
- private static final class WebSockets extends SubscribeToStreamUtil {
- WebSockets(final ListenersBroker listenersBroker) {
- super(listenersBroker);
- }
-
- @Override
- public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
- final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
- // Secured HTTP goes to Secured WebSockets
- case "https" -> "wss";
- // Unsecured HTTP and others go to unsecured WebSockets
- default -> "ws";
- };
-
- return uriInfo.getBaseUriBuilder()
- .scheme(scheme)
- .replacePath(URLConstants.BASE_PATH + '/' + streamName)
- .build();
- }
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
- private static final Splitter SLASH_SPLITTER = Splitter.on('/');
-
- private final @NonNull ListenersBroker listenersBroker;
-
- private SubscribeToStreamUtil(final ListenersBroker listenersBroker) {
- this.listenersBroker = requireNonNull(listenersBroker);
- }
-
- public static @NonNull SubscribeToStreamUtil serverSentEvents(final ListenersBroker listenersBroker) {
- return new ServerSentEvents(listenersBroker);
- }
-
- public static @NonNull SubscribeToStreamUtil webSockets(final ListenersBroker listenersBroker) {
- return new WebSockets(listenersBroker);
- }
-
- public final @NonNull ListenersBroker listenersBroker() {
- return listenersBroker;
- }
-
- /**
- * Prepare URL from base name and stream name.
- *
- * @param uriInfo base URL information
- * @param streamName name of stream for create
- * @return final URL
- */
- abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
-
- /**
- * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
- * about listener to DS according to ietf-restconf-monitoring.
- *
- * @param identifier Name of the stream.
- * @param uriInfo URI information.
- * @param notificationQueryParams Query parameters of notification.
- * @param handlersHolder Holder of handlers for notifications.
- * @return Stream location for listening.
- */
- final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
- final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
- final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
- if (isNullOrEmpty(streamName)) {
- throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
- }
-
- final var notificationListenerAdapter = listenersBroker.notificationListenerFor(streamName);
- if (notificationListenerAdapter == null) {
- throw new RestconfDocumentedException(String.format("Stream with name %s was not found.", streamName),
- ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
- }
-
- final URI uri = prepareUriByStreamName(uriInfo, streamName);
- notificationListenerAdapter.setQueryParams(notificationQueryParams);
- notificationListenerAdapter.listen(handlersHolder.notificationService());
- final DOMDataBroker dataBroker = handlersHolder.dataBroker();
- notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
- final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
- notificationListenerAdapter.qnames(), notificationListenerAdapter.getStart(),
- notificationListenerAdapter.getOutputType(), uri);
-
- // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
- final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
- writeDataToDS(writeTransaction, mapToStreams);
- submitData(writeTransaction);
- return uri;
- }
-
- /**
- * Register listener by streamName in identifier to listen to data change notifications, and put or delete
- * information about listener to DS according to ietf-restconf-monitoring.
- *
- * @param identifier Identifier as stream name.
- * @param uriInfo Base URI information.
- * @param notificationQueryParams Query parameters of notification.
- * @param handlersHolder Holder of handlers for notifications.
- * @return Location for listening.
- */
- final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
- final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
- final var streamName = ListenersBroker.createStreamNameFromUri(identifier);
- final var listener = listenersBroker.dataChangeListenerFor(streamName);
- if (listener == null) {
- throw new RestconfDocumentedException("No listener found for stream " + streamName,
- ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
- }
-
- listener.setQueryParams(notificationQueryParams);
-
- final var dataBroker = handlersHolder.dataBroker();
- final var schemaHandler = handlersHolder.databindProvider();
- listener.setCloseVars(dataBroker, schemaHandler);
- listener.listen(dataBroker);
-
- final var uri = prepareUriByStreamName(uriInfo, streamName);
- final var schemaContext = schemaHandler.currentContext().modelContext();
- final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
-
- final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
- listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
- final var writeTransaction = dataBroker.newWriteOnlyTransaction();
- writeDataToDS(writeTransaction, mapToStreams);
- submitData(writeTransaction);
- return uri;
- }
-
- // FIXME: callers are utter duplicates, refactor them
- private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
- // FIXME: use put() here
- tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
- mapToStreams);
- }
-
- private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
- try {
- readWriteTransaction.commit().get();
- } catch (final InterruptedException | ExecutionException e) {
- throw new RestconfDocumentedException("Problem while putting data to DS.", e);
- }
- }
-}
*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.requireNonNull;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableSet;
+import java.net.URI;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.StampedLock;
-import javax.inject.Inject;
-import javax.inject.Singleton;
+import javax.ws.rs.core.UriInfo;
+import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
import org.opendaylight.restconf.nb.rfc8040.URLConstants;
+import org.opendaylight.restconf.nb.rfc8040.monitoring.RestconfStateStreams;
+import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
// names. We essentially need a component which deals with allocation of stream names and their lifecycle and
// the contents of /restconf-state/streams.
-@Singleton
-@Component(service = ListenersBroker.class, immediate = true)
-public final class ListenersBroker {
+public abstract sealed class ListenersBroker {
+ /**
+ * A ListenersBroker working with Server-Sent Events.
+ */
+ public static final class ServerSentEvents extends ListenersBroker {
+ @Override
+ public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+ return uriInfo.getBaseUriBuilder()
+ .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.SSE_SUBPATH + '/' + streamName)
+ .build();
+ }
+ }
+
+ /**
+ * A ListenersBroker working with WebSockets.
+ */
+ public static final class WebSockets extends ListenersBroker {
+ @Override
+ public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+ final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
+ // Secured HTTP goes to Secured WebSockets
+ case "https" -> "wss";
+ // Unsecured HTTP and others go to unsecured WebSockets
+ default -> "ws";
+ };
+
+ return uriInfo.getBaseUriBuilder()
+ .scheme(scheme)
+ .replacePath(URLConstants.BASE_PATH + '/' + streamName)
+ .build();
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
private final StampedLock dataChangeListenersLock = new StampedLock();
private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
- @Inject
- @Activate
- public ListenersBroker() {
-
+ private ListenersBroker() {
+ // Hidden on purpose
}
/**
* does not exist.
* @throws NullPointerException in {@code streamName} is {@code null}
*/
- public @Nullable ListenerAdapter dataChangeListenerFor(final String streamName) {
+ public final @Nullable ListenerAdapter dataChangeListenerFor(final String streamName) {
requireNonNull(streamName);
final long stamp = dataChangeListenersLock.readLock();
* stream name does not exist.
* @throws NullPointerException in {@code streamName} is {@code null}
*/
- public @Nullable NotificationListenerAdapter notificationListenerFor(final String streamName) {
+ public final @Nullable NotificationListenerAdapter notificationListenerFor(final String streamName) {
requireNonNull(streamName);
final long stamp = notificationListenersLock.readLock();
* specified stream name does not exist.
* @throws NullPointerException in {@code path} is {@code null}
*/
- public @Nullable DeviceNotificationListenerAdaptor deviceNotificationListenerFor(final String streamName) {
+ public final @Nullable DeviceNotificationListenerAdaptor deviceNotificationListenerFor(final String streamName) {
requireNonNull(streamName);
final long stamp = deviceNotificationListenersLock.readLock();
* @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
* or {@link Optional#empty()} if listener with specified stream name doesn't exist.
*/
- public @Nullable BaseListenerInterface listenerFor(final String streamName) {
+ public final @Nullable BaseListenerInterface listenerFor(final String streamName) {
if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
return notificationListenerFor(streamName);
} else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
* @param outputType Specific type of output for notifications - XML or JSON.
* @return Created or existing data-change listener adapter.
*/
- public ListenerAdapter registerDataChangeListener(final EffectiveModelContext modelContext,
+ public final ListenerAdapter registerDataChangeListener(final EffectiveModelContext modelContext,
final LogicalDatastoreType datastore, final YangInstanceIdentifier path, final Scope scope,
final NotificationOutputType outputType) {
final var sb = new StringBuilder(RestconfStreamsConstants.DATA_SUBSCRIPTION)
* @param outputType Specific type of output for notifications - XML or JSON.
* @return Created or existing notification listener adapter.
*/
- public NotificationListenerAdapter registerNotificationListener(final EffectiveModelContext refSchemaCtx,
+ public final NotificationListenerAdapter registerNotificationListener(final EffectiveModelContext refSchemaCtx,
final ImmutableSet<QName> notifications, final NotificationOutputType outputType) {
final var sb = new StringBuilder(RestconfStreamsConstants.NOTIFICATION_STREAM).append('/');
var haveFirst = false;
* @param mountPointService Mount point service
* @return Created or existing device notification listener adapter.
*/
- public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
+ public final DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
final var sb = new StringBuilder(RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM).append('/')
/**
* Removal and closing of all data-change-event and notification listeners.
*/
- public synchronized void removeAndCloseAllListeners() {
+ public final synchronized void removeAndCloseAllListeners() {
final long stampNotifications = notificationListenersLock.writeLock();
final long stampDataChanges = dataChangeListenersLock.writeLock();
try {
/**
* Closes and removes all data-change listeners.
*/
- public void removeAndCloseAllDataChangeListeners() {
+ public final void removeAndCloseAllDataChangeListeners() {
final long stamp = dataChangeListenersLock.writeLock();
try {
removeAndCloseAllDataChangeListenersTemplate();
@SuppressWarnings("checkstyle:IllegalCatch")
private void removeAndCloseAllDataChangeListenersTemplate() {
- dataChangeListeners.values()
- .forEach(listenerAdapter -> {
- try {
- listenerAdapter.close();
- } catch (final Exception exception) {
- LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
- throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
- listenerAdapter), exception);
- }
- });
+ dataChangeListeners.values().forEach(listenerAdapter -> {
+ try {
+ listenerAdapter.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close data-change listener {}.", listenerAdapter, e);
+ throw new IllegalStateException("Failed to close data-change listener %s.".formatted(listenerAdapter),
+ e);
+ }
+ });
dataChangeListeners.clear();
}
/**
* Closes and removes all notification listeners.
*/
- public void removeAndCloseAllNotificationListeners() {
+ public final void removeAndCloseAllNotificationListeners() {
final long stamp = notificationListenersLock.writeLock();
try {
removeAndCloseAllNotificationListenersTemplate();
@SuppressWarnings("checkstyle:IllegalCatch")
private void removeAndCloseAllNotificationListenersTemplate() {
- notificationListeners.values()
- .forEach(listenerAdapter -> {
- try {
- listenerAdapter.close();
- } catch (final Exception exception) {
- LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
- throw new IllegalStateException(String.format("Failed to close notification listener %s.",
- listenerAdapter), exception);
- }
- });
+ notificationListeners.values().forEach(listenerAdapter -> {
+ try {
+ listenerAdapter.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close notification listener {}.", listenerAdapter, e);
+ throw new IllegalStateException("Failed to close notification listener %s.".formatted(listenerAdapter),
+ e);
+ }
+ });
notificationListeners.clear();
}
* @param listener Listener to be closed and removed.
*/
@SuppressWarnings("checkstyle:IllegalCatch")
- public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
+ public final void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
final long stamp = dataChangeListenersLock.writeLock();
try {
removeAndCloseDataChangeListenerTemplate(listener);
- } catch (final Exception exception) {
+ } catch (Exception exception) {
LOG.error("Data-change listener {} cannot be closed.", listener, exception);
} finally {
dataChangeListenersLock.unlockWrite(stamp);
if (dataChangeListeners.inverse().remove(listener) == null) {
LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
}
- } catch (final InterruptedException | ExecutionException exception) {
- LOG.error("Data-change listener {} cannot be closed.", listener, exception);
- throw new IllegalStateException(String.format(
- "Data-change listener %s cannot be closed.",
- listener), exception);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Data-change listener {} cannot be closed.", listener, e);
+ throw new IllegalStateException("Data-change listener %s cannot be closed.".formatted(listener), e);
}
}
* @param listener Listener to be closed and removed.
*/
@SuppressWarnings("checkstyle:IllegalCatch")
- public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
+ public final void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
final long stamp = notificationListenersLock.writeLock();
try {
removeAndCloseNotificationListenerTemplate(listener);
- } catch (final Exception exception) {
- LOG.error("Notification listener {} cannot be closed.", listener, exception);
+ } catch (Exception e) {
+ LOG.error("Notification listener {} cannot be closed.", listener, e);
} finally {
notificationListenersLock.unlockWrite(stamp);
}
* @param listener Listener to be closed and removed.
*/
@SuppressWarnings("checkstyle:IllegalCatch")
- public void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
+ public final void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
final long stamp = deviceNotificationListenersLock.writeLock();
try {
requireNonNull(listener);
if (notificationListeners.inverse().remove(listener) == null) {
LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
}
- } catch (final InterruptedException | ExecutionException exception) {
- LOG.error("Notification listener {} cannot be closed.", listener, exception);
- throw new IllegalStateException(String.format(
- "Notification listener %s cannot be closed.", listener),
- exception);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Notification listener {} cannot be closed.", listener, e);
+ throw new IllegalStateException("Notification listener %s cannot be closed.".formatted(listener), e);
}
}
*
* @param listener Listener to be closed and removed from cache.
*/
- void removeAndCloseListener(final BaseListenerInterface listener) {
+ final void removeAndCloseListener(final BaseListenerInterface listener) {
requireNonNull(listener);
if (listener instanceof ListenerAdapter) {
removeAndCloseDataChangeListener((ListenerAdapter) listener);
}
return result;
}
+
+ /**
+ * Prepare URL from base name and stream name.
+ *
+ * @param uriInfo base URL information
+ * @param streamName name of stream for create
+ * @return final URL
+ */
+ public abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
+
+ /**
+ * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
+ * about listener to DS according to ietf-restconf-monitoring.
+ *
+ * @param identifier Name of the stream.
+ * @param uriInfo URI information.
+ * @param notificationQueryParams Query parameters of notification.
+ * @param handlersHolder Holder of handlers for notifications.
+ * @return Stream location for listening.
+ */
+ public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
+ final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
+ final String streamName = createStreamNameFromUri(identifier);
+ if (isNullOrEmpty(streamName)) {
+ throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+ }
+
+ final var notificationListenerAdapter = notificationListenerFor(streamName);
+ if (notificationListenerAdapter == null) {
+ throw new RestconfDocumentedException("Stream with name %s was not found.".formatted(streamName),
+ ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
+ }
+
+ final URI uri = prepareUriByStreamName(uriInfo, streamName);
+ notificationListenerAdapter.setQueryParams(notificationQueryParams);
+ notificationListenerAdapter.listen(handlersHolder.notificationService());
+ final DOMDataBroker dataBroker = handlersHolder.dataBroker();
+ notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
+ final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
+ notificationListenerAdapter.qnames(), notificationListenerAdapter.getStart(),
+ notificationListenerAdapter.getOutputType(), uri);
+
+ // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
+ final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+ writeDataToDS(writeTransaction, mapToStreams);
+ submitData(writeTransaction);
+ return uri;
+ }
+
+ /**
+ * Register listener by streamName in identifier to listen to data change notifications, and put or delete
+ * information about listener to DS according to ietf-restconf-monitoring.
+ *
+ * @param identifier Identifier as stream name.
+ * @param uriInfo Base URI information.
+ * @param notificationQueryParams Query parameters of notification.
+ * @param handlersHolder Holder of handlers for notifications.
+ * @return Location for listening.
+ */
+ public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
+ final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
+ final var streamName = createStreamNameFromUri(identifier);
+ final var listener = dataChangeListenerFor(streamName);
+ if (listener == null) {
+ throw new RestconfDocumentedException("No listener found for stream " + streamName,
+ ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
+ }
+
+ listener.setQueryParams(notificationQueryParams);
+
+ final var dataBroker = handlersHolder.dataBroker();
+ final var schemaHandler = handlersHolder.databindProvider();
+ listener.setCloseVars(dataBroker, schemaHandler);
+ listener.listen(dataBroker);
+
+ final var uri = prepareUriByStreamName(uriInfo, streamName);
+ final var schemaContext = schemaHandler.currentContext().modelContext();
+ final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
+
+ final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
+ listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
+ final var writeTransaction = dataBroker.newWriteOnlyTransaction();
+ writeDataToDS(writeTransaction, mapToStreams);
+ submitData(writeTransaction);
+ return uri;
+ }
+
+ // FIXME: callers are utter duplicates, refactor them
+ private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
+ // FIXME: use put() here
+ tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
+ mapToStreams);
+ }
+
+ private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
+ try {
+ readWriteTransaction.commit().get();
+ } catch (final InterruptedException | ExecutionException e) {
+ throw new RestconfDocumentedException("Problem while putting data to DS.", e);
+ }
+ }
}
public class CreateStreamUtilTest {
private static EffectiveModelContext SCHEMA_CTX;
- private final ListenersBroker listenersBroker = new ListenersBroker();
+ private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
@BeforeClass
public static void setUp() {
public void setup() {
server = new MdsalRestconfServer(dataBroker, rpcService, mountPointService);
invokeOperationsService = new RestconfInvokeOperationsServiceImpl(() -> CONTEXT, server, mountPointService,
- SubscribeToStreamUtil.webSockets(new ListenersBroker()));
+ new ListenersBroker.WebSockets());
}
@Test
import com.google.common.collect.ImmutableClassToInstanceMap;
import java.net.URI;
-import java.net.URISyntaxException;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class RestconfStreamsSubscriptionServiceImplTest extends AbstractNotificationListenerTest {
@Mock
private DOMNotificationService notificationService;
- private final ListenersBroker listenersBroker = new ListenersBroker();
private final DatabindProvider databindProvider = () -> DatabindContext.ofModel(MODEL_CONTEXT);
@Before
- public void setUp() throws URISyntaxException {
- listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
- YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
- Scope.ONE, NotificationOutputType.JSON);
- final DOMDataTreeWriteTransaction wTx = mock(DOMDataTreeWriteTransaction.class);
+ public void setUp() throws Exception {
+ final var wTx = mock(DOMDataTreeWriteTransaction.class);
doReturn(wTx).when(dataBroker).newWriteOnlyTransaction();
doReturn(CommitInfo.emptyFluentFuture()).when(wTx).commit();
- DOMDataTreeChangeService dataTreeChangeService = mock(DOMDataTreeChangeService.class);
+ final var dataTreeChangeService = mock(DOMDataTreeChangeService.class);
doReturn(mock(ListenerRegistration.class)).when(dataTreeChangeService)
.registerDataTreeChangeListener(any(), any());
@Test
public void testSubscribeToStreamSSE() {
+ final var listenersBroker = new ListenersBroker.ServerSentEvents();
listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
NotificationOutputType.XML);
final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
- notificationService, databindProvider, SubscribeToStreamUtil.serverSentEvents(listenersBroker));
+ notificationService, databindProvider,listenersBroker);
final var response = streamsSubscriptionService.subscribeToStream(
"data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
assertEquals("http://localhost:8181/" + URLConstants.BASE_PATH + "/" + URLConstants.SSE_SUBPATH
@Test
public void testSubscribeToStreamWS() {
+ final var listenersBroker = new ListenersBroker.WebSockets();
listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
NotificationOutputType.XML);
final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
- notificationService, databindProvider, SubscribeToStreamUtil.webSockets(listenersBroker));
+ notificationService, databindProvider, listenersBroker);
final var response = streamsSubscriptionService.subscribeToStream(
"data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
assertEquals("ws://localhost:8181/" + URLConstants.BASE_PATH
@Test
public void testSubscribeToStreamMissingDatastoreInPath() {
+ final var listenersBroker = new ListenersBroker.WebSockets();
final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
- notificationService, databindProvider, SubscribeToStreamUtil.webSockets(listenersBroker));
+ notificationService, databindProvider, listenersBroker);
final var errors = assertThrows(RestconfDocumentedException.class,
() -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/scope=ONE", uriInfo))
.getErrors();
@Test
public void testSubscribeToStreamMissingScopeInPath() {
+ final var listenersBroker = new ListenersBroker.WebSockets();
final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
- notificationService, databindProvider, SubscribeToStreamUtil.webSockets(listenersBroker));
+ notificationService, databindProvider, listenersBroker);
final var errors = assertThrows(RestconfDocumentedException.class,
() -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/datastore=OPERATIONAL",
uriInfo)).getErrors();
private static final String REGISTERED_STREAM_NAME = "data-change-event-subscription/"
+ "toaster:toaster/datastore=CONFIGURATION/scope=SUBTREE/JSON";
- private final ListenersBroker listenersBroker = new ListenersBroker();
+ private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
@Mock
private ScheduledExecutorService execService;
public class JsonNotificationListenerTest extends AbstractNotificationListenerTest {
private static final Logger LOG = LoggerFactory.getLogger(JsonNotificationListenerTest.class);
- private final ListenersBroker listenersBroker = new ListenersBroker();
+ private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
@Test
public void notifi_leafTest() throws Exception {
private static EffectiveModelContext SCHEMA_CONTEXT;
- private final ListenersBroker listenersBroker = new ListenersBroker();
+ private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
private DataBroker dataBroker;
private DOMDataBroker domDataBroker;
private DatabindProvider databindProvider;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class XmlNotificationListenerTest extends AbstractNotificationListenerTest {
- private final ListenersBroker listenersBroker = new ListenersBroker();
+ private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
@Test
public void notifi_leafTest() throws Exception {