*/
package org.opendaylight.restconf.nb.rfc8040;
-import static org.opendaylight.restconf.nb.rfc8040.URLConstants.BASE_PATH;
-import static org.opendaylight.restconf.nb.rfc8040.URLConstants.SSE_SUBPATH;
-import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.DATA_SUBSCRIPTION;
-import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.NOTIFICATION_STREAM;
-
import com.google.common.annotations.Beta;
import javax.servlet.ServletException;
import org.opendaylight.aaa.filterchain.configuration.CustomFilterAdapterConfiguration;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.MdsalRestconfServer;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfDataStreamServiceImpl;
+import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
final var restconfBuilder = WebContext.builder()
.name("RFC8040 RESTCONF")
- .contextPath("/" + BASE_PATH)
+ .contextPath("/" + URLConstants.BASE_PATH)
.supportsSessions(false)
.addServlet(ServletDetails.builder()
.addUrlPattern("/*")
.asyncSupported(true)
.build())
.addServlet(ServletDetails.builder()
- .addUrlPattern("/" + SSE_SUBPATH + "/*")
+ .addUrlPattern("/" + URLConstants.SSE_SUBPATH + "/*")
.servlet(servletSupport.createHttpServletBuilder(
new DataStreamApplication(databindProvider,
new RestconfDataStreamServiceImpl(scheduledThreadPool, listenersBroker, streamsConfiguration)))
.asyncSupported(true)
.build())
.addServlet(ServletDetails.builder()
- .addUrlPattern("/" + DATA_SUBSCRIPTION + "/*")
- .addUrlPattern("/" + NOTIFICATION_STREAM + "/*")
+ .addUrlPattern("/" + RestconfStreamsConstants.DATA_SUBSCRIPTION + "/*")
+ .addUrlPattern("/" + RestconfStreamsConstants.NOTIFICATION_STREAM + "/*")
+ .addUrlPattern("/" + RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM + "/*")
.servlet(new WebSocketInitializer(scheduledThreadPool, listenersBroker, streamsConfiguration))
.build())
.supportsSessions(false)
.addServlet(ServletDetails.builder()
.addUrlPattern("/*")
- .servlet(servletSupport.createHttpServletBuilder(new RootFoundApplication(BASE_PATH)).build())
+ .servlet(servletSupport.createHttpServletBuilder(new RootFoundApplication(URLConstants.BASE_PATH))
+ .build())
.name("Rootfound")
.build());
return RestconfFuture.of(Optional.of(Builders.containerBuilder()
.withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
.withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
- baseUrl + notificationListenerAdapter.getStreamName() + "?"
- + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
+ baseUrl + notificationListenerAdapter.getStreamName()))
.build()));
}
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.UriInfo;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
* Get target data resource.
*
* @param identifier path to target
- * @param uriInfo URI info
*/
@GET
@Path("/{identifier:.+}")
@Produces(MediaType.SERVER_SENT_EVENTS)
- public void getSSE(@Encoded @PathParam("identifier") final String identifier, @Context final UriInfo uriInfo,
- @Context final SseEventSink sink, @Context final Sse sse) {
- final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
- final BaseListenerInterface listener;
- final String notificaionType =
- uriInfo.getQueryParameters().getFirst(RestconfStreamsConstants.NOTIFICATION_TYPE);
- if (notificaionType != null && notificaionType.equals(RestconfStreamsConstants.DEVICE)) {
- listener = listenersBroker.deviceNotificationListenerFor(streamName);
- if (listener == null) {
- LOG.debug("Listener for device path with name {} was not found.", streamName);
- throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
- }
- } else {
- listener = listenersBroker.listenerFor(streamName);
- if (listener == null) {
- LOG.debug("Listener for stream with name {} was not found.", streamName);
- throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
- }
+ public void getSSE(@Encoded @PathParam("identifier") final String identifier, @Context final SseEventSink sink,
+ @Context final Sse sse) {
+ final var streamName = ListenersBroker.createStreamNameFromUri(identifier);
+ final var listener = listenersBroker.listenerFor(streamName);
+ if (listener == null) {
+ LOG.debug("Listener for stream with name {} was not found.", streamName);
+ throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
}
LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName);
// FIXME: invert control here: we should call 'listener.addSession()', which in turn should call
// handler.init()/handler.close()
- final SSESessionHandler handler = new SSESessionHandler(executorService, sink, sse, listener,
- maximumFragmentLength, heartbeatInterval);
+ final var handler = new SSESessionHandler(executorService, sink, sse, listener, maximumFragmentLength,
+ heartbeatInterval);
handler.init();
}
}
public static final String DATASTORE_PARAM_NAME = "datastore";
public static final String SCOPE_PARAM_NAME = "scope";
+ // Prefixes for stream names
public static final String DATA_SUBSCRIPTION = "data-change-event-subscription";
public static final String NOTIFICATION_STREAM = "notification-stream";
+ public static final String DEVICE_NOTIFICATION_STREAM = "device-notification-stream";
public static final String STREAMS_PATH = "ietf-restconf-monitoring:restconf-state/streams";
public static final String STREAM_PATH_PART = "/stream=";
public static final String STREAM_PATH = STREAMS_PATH + STREAM_PATH_PART;
public static final String STREAM_ACCESS_PATH_PART = "/access=";
public static final String STREAM_LOCATION_PATH_PART = "/location";
- public static final String NOTIFICATION_TYPE = "notificationType";
- public static final String DEVICE = "device";
private RestconfStreamsConstants() {
// Hidden on purpose
/**
* Get listener for device path.
*
- * @param path name.
- * @return {@link BaseListenerInterface} specified by stream name or {@code null} if listener with specified
- * stream name does not exist.
+ * @param streamName name.
+ * @return {@link DeviceNotificationListenerAdaptor} specified by stream name or {@code null} if listener with
+ * specified stream name does not exist.
* @throws NullPointerException in {@code path} is {@code null}
*/
- public @Nullable BaseListenerInterface deviceNotificationListenerFor(final String path) {
- requireNonNull(path);
+ public @Nullable DeviceNotificationListenerAdaptor deviceNotificationListenerFor(final String streamName) {
+ requireNonNull(streamName);
final long stamp = deviceNotificationListenersLock.readLock();
try {
- return deviceNotificationListeners.get(path);
+ return deviceNotificationListeners.get(streamName);
} finally {
deviceNotificationListenersLock.unlockRead(stamp);
}
return notificationListenerFor(streamName);
} else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
return dataChangeListenerFor(streamName);
+ } else if (streamName.startsWith(RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM)) {
+ return deviceNotificationListenerFor(streamName);
} else {
return null;
}
public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
+ final var sb = new StringBuilder(RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM).append('/')
+ .append(deviceName);
+
final long stamp = deviceNotificationListenersLock.writeLock();
try {
- return deviceNotificationListeners.computeIfAbsent(deviceName,
- streamName -> new DeviceNotificationListenerAdaptor(deviceName, outputType, refSchemaCtx,
+ return deviceNotificationListeners.computeIfAbsent(sb.toString(),
+ streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
mountPointService, path, this));
} finally {
deviceNotificationListenersLock.unlockWrite(stamp);