X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=restconf%2Frestconf-nb-rfc8040%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Frestconf%2Fnb%2Frfc8040%2Frests%2Fservices%2Fimpl%2FSubscribeToStreamUtil.java;h=2019aff332720709ecd41264b09e136b4e768b74;hb=9173c204d5f54e5758df0ee346fb7e3937e37825;hp=b71de0454b009fb6abdb3f84b178c92ae306cdbb;hpb=407283022a89dfb7661164b6d872fa653fd7179c;p=netconf.git diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java index b71de0454b..2019aff332 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java @@ -7,33 +7,24 @@ */ package org.opendaylight.restconf.nb.rfc8040.rests.services.impl; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; +import static com.google.common.base.Strings.isNullOrEmpty; + import java.net.URI; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ExecutionException; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataBroker; -import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeReadOperations; -import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction; -import org.opendaylight.mdsal.dom.api.DOMNotificationListener; -import org.opendaylight.mdsal.dom.api.DOMTransactionChain; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; -import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag; -import org.opendaylight.restconf.common.errors.RestconfError.ErrorType; -import org.opendaylight.restconf.common.util.DataChangeScope; -import org.opendaylight.restconf.nb.rfc8040.Rfc8040.MonitoringModule; -import org.opendaylight.restconf.nb.rfc8040.handlers.NotificationServiceHandler; +import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams; +import org.opendaylight.restconf.nb.rfc8040.Rfc8040; +import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler; import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder; -import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams; -import org.opendaylight.restconf.nb.rfc8040.rests.utils.ResolveEnumUtil; import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker; @@ -41,12 +32,10 @@ import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListen import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants; import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil; import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.common.ErrorTag; +import org.opendaylight.yangtools.yang.common.ErrorType; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; -import org.opendaylight.yangtools.yang.model.api.Module; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +44,7 @@ import org.slf4j.LoggerFactory; */ abstract class SubscribeToStreamUtil { /** - * Implementation of {@link UrlResolver} for Server-sent events. + * Implementation of SubscribeToStreamUtil for Server-sent events. */ private static final class ServerSentEvents extends SubscribeToStreamUtil { static final ServerSentEvents INSTANCE = new ServerSentEvents(); @@ -69,7 +58,7 @@ abstract class SubscribeToStreamUtil { } /** - * Implementation of {@link UrlResolver} for Web sockets. + * Implementation of SubscribeToStreamUtil for Web sockets. */ private static final class WebSockets extends SubscribeToStreamUtil { static final WebSockets INSTANCE = new WebSockets(); @@ -124,50 +113,36 @@ abstract class SubscribeToStreamUtil { * @param uriInfo URI information. * @param notificationQueryParams Query parameters of notification. * @param handlersHolder Holder of handlers for notifications. - * @param urlResolver Resolver for proper implementation. Possibilities is WS or SSE. * @return Stream location for listening. */ final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo, final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) { final String streamName = ListenersBroker.createStreamNameFromUri(identifier); - if (Strings.isNullOrEmpty(streamName)) { + if (isNullOrEmpty(streamName)) { throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); } - final Optional notificationListenerAdapter = - ListenersBroker.getInstance().getNotificationListenerFor(streamName); - - if (!notificationListenerAdapter.isPresent()) { - throw new RestconfDocumentedException(String.format( - "Stream with name %s was not found.", streamName), - ErrorType.PROTOCOL, - ErrorTag.UNKNOWN_ELEMENT); - } - final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get(); - final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction(); - final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get(); - final boolean exist = checkExist(schemaContext, writeTransaction); + final NotificationListenerAdapter notificationListenerAdapter = ListenersBroker.getInstance() + .getNotificationListenerFor(streamName) + .orElseThrow(() -> new RestconfDocumentedException( + String.format("Stream with name %s was not found.", streamName), + ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT)); + final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get(); final URI uri = prepareUriByStreamName(uriInfo, streamName); - registerToListenNotification( - notificationListenerAdapter.get(), handlersHolder.getNotificationServiceHandler()); - notificationListenerAdapter.get().setQueryParams( - notificationQueryParams.getStart(), - notificationQueryParams.getStop().orElse(null), - notificationQueryParams.getFilter().orElse(null), - false); - notificationListenerAdapter.get().setCloseVars( - handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); - final NormalizedNode mapToStreams = - RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring( - notificationListenerAdapter.get().getSchemaPath().getLastComponent(), - schemaContext.getNotifications(), notificationQueryParams.getStart(), - notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext), exist); - writeDataToDS(schemaContext, - notificationListenerAdapter.get().getSchemaPath().getLastComponent().getLocalName(), writeTransaction, - exist, mapToStreams); + notificationListenerAdapter.setQueryParams(notificationQueryParams); + notificationListenerAdapter.listen(handlersHolder.getNotificationServiceHandler()); + final DOMDataBroker dataBroker = handlersHolder.getDataBroker(); + notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.getSchemaHandler()); + final MapEntryNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring( + notificationListenerAdapter.getSchemaPath().lastNodeIdentifier(), + schemaContext.getNotifications(), notificationListenerAdapter.getStart(), + notificationListenerAdapter.getOutputType(), uri); + + // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do? + final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); + writeDataToDS(writeTransaction, mapToStreams); submitData(writeTransaction); - transactionChain.close(); return uri; } @@ -184,71 +159,53 @@ abstract class SubscribeToStreamUtil { final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo, final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) { final Map mapOfValues = mapValuesFromUri(identifier); - final LogicalDatastoreType datastoreType = parseURIEnum( - LogicalDatastoreType.class, - mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME)); - if (datastoreType == null) { - final String message = "Stream name doesn't contain datastore value (pattern /datastore=)"; + + final String datastoreParam = mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME); + if (isNullOrEmpty(datastoreParam)) { + final String message = "Stream name does not contain datastore value (pattern /datastore=)"; LOG.debug(message); throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE); } - final DataChangeScope scope = parseURIEnum( - DataChangeScope.class, - mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME)); - if (scope == null) { - final String message = "Stream name doesn't contains datastore value (pattern /scope=)"; + // FIXME: this is kept only for compatibility, we are not using this parameter + if (isNullOrEmpty(mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME))) { + final String message = "Stream name does not contain scope value (pattern /scope=)"; LOG.warn(message); throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE); } final String streamName = ListenersBroker.createStreamNameFromUri(identifier); - final Optional listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName); - Preconditions.checkArgument(listener.isPresent(), "Listener doesn't exist : " + streamName); + final ListenerAdapter listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName) + .orElseThrow(() -> new RestconfDocumentedException("No listener found for stream " + streamName, + ErrorType.APPLICATION, ErrorTag.DATA_MISSING)); + listener.setQueryParams(notificationQueryParams); - listener.get().setQueryParams( - notificationQueryParams.getStart(), - notificationQueryParams.getStop().orElse(null), - notificationQueryParams.getFilter().orElse(null), - false); - listener.get().setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); - registration(datastoreType, listener.get(), handlersHolder.getDomDataBrokerHandler().get()); + final DOMDataBroker dataBroker = handlersHolder.getDataBroker(); + final SchemaContextHandler schemaHandler = handlersHolder.getSchemaHandler(); + listener.setCloseVars(dataBroker, schemaHandler); + listener.listen(dataBroker, LogicalDatastoreType.valueOf(datastoreParam)); final URI uri = prepareUriByStreamName(uriInfo, streamName); - final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get(); - final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction(); - final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get(); - final boolean exist = checkExist(schemaContext, writeTransaction); - - final NormalizedNode mapToStreams = RestconfMappingNodeUtil - .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(), - notificationQueryParams.getStart(), listener.get().getOutputType(), uri, - getMonitoringModule(schemaContext), exist, schemaContext); - writeDataToDS(schemaContext, listener.get().getPath().getLastPathArgument().getNodeType().getLocalName(), - writeTransaction, exist, mapToStreams); + final EffectiveModelContext schemaContext = schemaHandler.get(); + final String serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext); + + final MapEntryNode mapToStreams = + RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(), + listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath); + final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); + writeDataToDS(writeTransaction, mapToStreams); submitData(writeTransaction); - transactionChain.close(); return uri; } - static Module getMonitoringModule(final SchemaContext schemaContext) { - return schemaContext.findModule(MonitoringModule.MODULE_QNAME).orElse(null); + // FIXME: callers are utter duplicates, refactor them + private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) { + // FIXME: use put() here + tx.merge(LogicalDatastoreType.OPERATIONAL, Rfc8040.restconfStateStreamPath(mapToStreams.getIdentifier()), + mapToStreams); } - private static void writeDataToDS(final SchemaContext schemaContext, final String name, - final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist, - final NormalizedNode mapToStreams) { - String pathId; - if (exist) { - pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name; - } else { - pathId = MonitoringModule.PATH_TO_STREAMS; - } - readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, - IdentifierCodec.deserialize(pathId, schemaContext), mapToStreams); - } - - private static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) { + private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) { try { readWriteTransaction.commit().get(); } catch (final InterruptedException | ExecutionException e) { @@ -272,65 +229,4 @@ abstract class SubscribeToStreamUtil { } return result; } - - /** - * Register data change listener in DOM data broker and set it to listener on stream. - * - * @param datastore {@link LogicalDatastoreType} - * @param listener listener on specific stream - * @param domDataBroker data broker for register data change listener - */ - private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener, - final DOMDataBroker domDataBroker) { - if (listener.isListening()) { - return; - } - - final DOMDataTreeChangeService changeService = domDataBroker.getExtensions() - .getInstance(DOMDataTreeChangeService.class); - if (changeService == null) { - throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService"); - } - - final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath()); - final ListenerRegistration registration = - changeService.registerDataTreeChangeListener(root, listener); - listener.setRegistration(registration); - } - - private static boolean checkExist(final SchemaContext schemaContext, - final DOMDataTreeReadOperations readWriteTransaction) { - try { - return readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL, - IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).get(); - } catch (final InterruptedException | ExecutionException exception) { - throw new RestconfDocumentedException("Problem while checking data if exists", exception); - } - } - - private static void registerToListenNotification(final NotificationListenerAdapter listener, - final NotificationServiceHandler notificationServiceHandler) { - if (listener.isListening()) { - return; - } - - final SchemaPath path = listener.getSchemaPath(); - final ListenerRegistration registration = - notificationServiceHandler.get().registerNotificationListener(listener, path); - listener.setRegistration(registration); - } - - /** - * Parse out enumeration from URI. - * - * @param clazz Target enumeration type. - * @param value String representation of enumeration value. - * @return Parsed enumeration type. - */ - private static T parseURIEnum(final Class clazz, final String value) { - if (value == null || value.equals("")) { - return null; - } - return ResolveEnumUtil.resolveEnum(clazz, value); - } }