import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
+import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
import org.opendaylight.restconf.nb.rfc8040.URLConstants;
-import org.opendaylight.restconf.nb.rfc8040.monitoring.RestconfStateStreams;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
-import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
@Override
public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
return uriInfo.getBaseUriBuilder()
- .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.SSE_SUBPATH + '/' + streamName)
+ .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
.build();
}
}
return uriInfo.getBaseUriBuilder()
.scheme(scheme)
- .replacePath(URLConstants.BASE_PATH + '/' + streamName)
+ .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
.build();
}
}
* @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
* or {@link Optional#empty()} if listener with specified stream name doesn't exist.
*/
- public final @Nullable BaseListenerInterface listenerFor(final String streamName) {
+ public final @Nullable AbstractStream<?> listenerFor(final String streamName) {
if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
return notificationListenerFor(streamName);
} else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
final long stamp = dataChangeListenersLock.writeLock();
try {
return dataChangeListeners.computeIfAbsent(sb.toString(),
- streamName -> new ListenerAdapter(datastore, path, streamName, outputType, this));
+ streamName -> new ListenerAdapter(streamName, outputType, this, datastore, path));
} finally {
dataChangeListenersLock.unlockWrite(stamp);
}
final long stamp = notificationListenersLock.writeLock();
try {
return notificationListeners.computeIfAbsent(sb.toString(),
- streamName -> new NotificationListenerAdapter(notifications, streamName, outputType, this));
+ streamName -> new NotificationListenerAdapter(streamName, outputType, this, notifications));
} finally {
notificationListenersLock.unlockWrite(stamp);
}
final long stamp = deviceNotificationListenersLock.writeLock();
try {
return deviceNotificationListeners.computeIfAbsent(sb.toString(),
- streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
- mountPointService, path, this));
+ streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, this, refSchemaCtx,
+ mountPointService, path));
} finally {
deviceNotificationListenersLock.unlockWrite(stamp);
}
/**
* Removal and closing of general listener (data-change or notification listener).
*
- * @param listener Listener to be closed and removed from cache.
+ * @param stream Stream to be closed and removed from cache.
*/
- final void removeAndCloseListener(final BaseListenerInterface listener) {
- requireNonNull(listener);
- if (listener instanceof ListenerAdapter) {
- removeAndCloseDataChangeListener((ListenerAdapter) listener);
- } else if (listener instanceof NotificationListenerAdapter) {
- removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
+ final void removeAndCloseListener(final AbstractStream<?> stream) {
+ requireNonNull(stream);
+ if (stream instanceof ListenerAdapter dataChange) {
+ removeAndCloseDataChangeListener(dataChange);
+ } else if (stream instanceof NotificationListenerAdapter notification) {
+ removeAndCloseNotificationListener(notification);
}
}
* @param uri URI for creation of stream name.
* @return String representation of stream name.
*/
- public static String createStreamNameFromUri(final String uri) {
+ private static String createStreamNameFromUri(final String uri) {
String result = requireNonNull(uri);
while (true) {
if (result.startsWith(URLConstants.BASE_PATH)) {
* @return Stream location for listening.
*/
public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
- final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
+ final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
final String streamName = createStreamNameFromUri(identifier);
if (isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
final DOMDataBroker dataBroker = handlersHolder.dataBroker();
notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
- notificationListenerAdapter.qnames(), notificationListenerAdapter.getStart(),
- notificationListenerAdapter.getOutputType(), uri);
+ notificationListenerAdapter.qnames(), notificationListenerAdapter.getOutputType(), uri);
// FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
* @return Location for listening.
*/
public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
- final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
+ final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
final var streamName = createStreamNameFromUri(identifier);
final var listener = dataChangeListenerFor(streamName);
if (listener == null) {
final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
- listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
+ listener.getOutputType(), uri, schemaContext, serializedPath);
final var writeTransaction = dataBroker.newWriteOnlyTransaction();
writeDataToDS(writeTransaction, mapToStreams);
submitData(writeTransaction);