X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=restconf%2Frestconf-nb-rfc8040%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Frestconf%2Fnb%2Frfc8040%2Frests%2Fservices%2Fimpl%2FSubscribeToStreamUtil.java;h=ff02c54e42f6d9cf444715a64e70fc4c3f53b36a;hb=7d5c865cbf9d0f0b979937ca7f5cebfe19f4983e;hp=9f1207f454640ea8835ad81fc1b7464ef8c180e1;hpb=1982b130c5b7b4342c1b9bb107b717f44ce4b7af;p=netconf.git diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java index 9f1207f454..ff02c54e42 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/SubscribeToStreamUtil.java @@ -21,16 +21,15 @@ import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataBroker; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeReadOperations; import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMNotificationListener; +import org.opendaylight.mdsal.dom.api.DOMNotificationService; import org.opendaylight.mdsal.dom.api.DOMTransactionChain; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag; import org.opendaylight.restconf.common.errors.RestconfError.ErrorType; import org.opendaylight.restconf.common.util.DataChangeScope; import org.opendaylight.restconf.nb.rfc8040.Rfc8040.MonitoringModule; -import org.opendaylight.restconf.nb.rfc8040.handlers.NotificationServiceHandler; import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder; import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams; import org.opendaylight.restconf.nb.rfc8040.rests.utils.ResolveEnumUtil; @@ -42,11 +41,9 @@ import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants; import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil; import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; -import org.opendaylight.yangtools.yang.model.api.Module; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +52,7 @@ import org.slf4j.LoggerFactory; */ abstract class SubscribeToStreamUtil { /** - * Implementation of {@link UrlResolver} for Server-sent events. + * Implementation of SubscribeToStreamUtil for Server-sent events. */ private static final class ServerSentEvents extends SubscribeToStreamUtil { static final ServerSentEvents INSTANCE = new ServerSentEvents(); @@ -69,7 +66,7 @@ abstract class SubscribeToStreamUtil { } /** - * Implementation of {@link UrlResolver} for Web sockets. + * Implementation of SubscribeToStreamUtil for Web sockets. */ private static final class WebSockets extends SubscribeToStreamUtil { static final WebSockets INSTANCE = new WebSockets(); @@ -124,7 +121,6 @@ abstract class SubscribeToStreamUtil { * @param uriInfo URI information. * @param notificationQueryParams Query parameters of notification. * @param handlersHolder Holder of handlers for notifications. - * @param urlResolver Resolver for proper implementation. Possibilities is WS or SSE. * @return Stream location for listening. */ final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo, @@ -136,7 +132,7 @@ abstract class SubscribeToStreamUtil { final Optional notificationListenerAdapter = ListenersBroker.getInstance().getNotificationListenerFor(streamName); - if (!notificationListenerAdapter.isPresent()) { + if (notificationListenerAdapter.isEmpty()) { throw new RestconfDocumentedException(String.format( "Stream with name %s was not found.", streamName), ErrorType.PROTOCOL, @@ -145,8 +141,7 @@ abstract class SubscribeToStreamUtil { final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get(); final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction(); - final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get(); - final boolean exist = checkExist(schemaContext, writeTransaction); + final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get(); final URI uri = prepareUriByStreamName(uriInfo, streamName); registerToListenNotification( @@ -158,14 +153,13 @@ abstract class SubscribeToStreamUtil { false, notificationQueryParams.isSkipNotificationData()); notificationListenerAdapter.get().setCloseVars( handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); - final NormalizedNode mapToStreams = - RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring( - notificationListenerAdapter.get().getSchemaPath().getLastComponent(), + final MapEntryNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring( + notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier(), schemaContext.getNotifications(), notificationQueryParams.getStart(), - notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext), exist); + notificationListenerAdapter.get().getOutputType(), uri); writeDataToDS(schemaContext, - notificationListenerAdapter.get().getSchemaPath().getLastComponent().getLocalName(), writeTransaction, - exist, mapToStreams); + notificationListenerAdapter.get().getSchemaPath().lastNodeIdentifier().getLocalName(), writeTransaction, + mapToStreams); submitData(writeTransaction); transactionChain.close(); return uri; @@ -218,34 +212,23 @@ abstract class SubscribeToStreamUtil { final DOMTransactionChain transactionChain = handlersHolder.getTransactionChainHandler().get(); final DOMDataTreeReadWriteTransaction writeTransaction = transactionChain.newReadWriteTransaction(); final EffectiveModelContext schemaContext = handlersHolder.getSchemaHandler().get(); - final boolean exist = checkExist(schemaContext, writeTransaction); + final String serializedPath = IdentifierCodec.serialize(listener.get().getPath(), schemaContext); - final NormalizedNode mapToStreams = RestconfMappingNodeUtil - .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(), - notificationQueryParams.getStart(), listener.get().getOutputType(), uri, - getMonitoringModule(schemaContext), exist, schemaContext); - writeDataToDS(schemaContext, listener.get().getPath().getLastPathArgument().getNodeType().getLocalName(), - writeTransaction, exist, mapToStreams); + final MapEntryNode mapToStreams = + RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(), + notificationQueryParams.getStart(), listener.get().getOutputType(), uri, schemaContext, serializedPath); + writeDataToDS(schemaContext, serializedPath, writeTransaction, mapToStreams); submitData(writeTransaction); transactionChain.close(); return uri; } - static Module getMonitoringModule(final SchemaContext schemaContext) { - return schemaContext.findModule(MonitoringModule.MODULE_QNAME).orElse(null); - } - - private static void writeDataToDS(final SchemaContext schemaContext, final String name, - final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist, - final NormalizedNode mapToStreams) { - String pathId; - if (exist) { - pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name; - } else { - pathId = MonitoringModule.PATH_TO_STREAMS; - } + private static void writeDataToDS(final EffectiveModelContext schemaContext, final String name, + final DOMDataTreeReadWriteTransaction readWriteTransaction, final MapEntryNode mapToStreams) { readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, - IdentifierCodec.deserialize(pathId, schemaContext), mapToStreams); + // FIXME: do not use IdentifierCodec here + IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name, schemaContext), + mapToStreams); } private static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) { @@ -298,25 +281,16 @@ abstract class SubscribeToStreamUtil { listener.setRegistration(registration); } - private static boolean checkExist(final SchemaContext schemaContext, - final DOMDataTreeReadOperations readWriteTransaction) { - try { - return readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL, - IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).get(); - } catch (final InterruptedException | ExecutionException exception) { - throw new RestconfDocumentedException("Problem while checking data if exists", exception); - } - } - + // FIXME: this method should be in NotificationListenerAdapter private static void registerToListenNotification(final NotificationListenerAdapter listener, - final NotificationServiceHandler notificationServiceHandler) { + final DOMNotificationService notificationService) { if (listener.isListening()) { return; } - final SchemaPath path = listener.getSchemaPath(); + final Absolute path = listener.getSchemaPath(); final ListenerRegistration registration = - notificationServiceHandler.get().registerNotificationListener(listener, path); + notificationService.registerNotificationListener(listener, path); listener.setRegistration(registration); }