X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=restconf%2Frestconf-nb-rfc8040%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Frestconf%2Fnb%2Frfc8040%2Frests%2Futils%2FSubscribeToStreamUtil.java;h=eee8e432accb1d6757102e54cb00ce3926831f99;hb=6ae0d858188cd66f00133d2ad6eb13e15824a81e;hp=527e27deefc4643ae0037e0153da38968e56cfdc;hpb=b7537d2482ffb05582749ce80a7ca44c64a5ad6c;p=netconf.git diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/SubscribeToStreamUtil.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/SubscribeToStreamUtil.java index 527e27deef..eee8e432ac 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/SubscribeToStreamUtil.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/SubscribeToStreamUtil.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.concurrent.ExecutionException; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; @@ -28,7 +29,7 @@ import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataBroker; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction; +import org.opendaylight.mdsal.dom.api.DOMDataTreeReadOperations; import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMNotificationListener; import org.opendaylight.restconf.common.context.InstanceIdentifierContext; @@ -42,16 +43,13 @@ import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler; import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder; import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter; +import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter; -import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator; -import org.opendaylight.restconf.nb.rfc8040.streams.websockets.WebSocketServer; import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants; import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil; import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; -import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; @@ -82,199 +80,184 @@ public final class SubscribeToStreamUtil { .appendOffset("+HH:MM", "Z").toFormatter(); private SubscribeToStreamUtil() { - throw new UnsupportedOperationException("Util class"); + throw new UnsupportedOperationException("Utility class"); } /** - * Register listeners by streamName in identifier to listen to yang - * notifications, put or delete info about listener to DS according to - * ietf-restconf-monitoring. + * Register listener by streamName in identifier to listen to yang notifications, and put or delete information + * about listener to DS according to ietf-restconf-monitoring. * - * @param identifier - * identifier as stream name - * @param uriInfo - * for getting base URI information - * @param notificationQueryParams - * query parameters of notification - * @param handlersHolder - * holder of handlers for notifications - * @return location for listening + * @param identifier Name of the stream. + * @param uriInfo URI information. + * @param notificationQueryParams Query parameters of notification. + * @param handlersHolder Holder of handlers for notifications. + * @return Stream location for listening. */ @SuppressWarnings("rawtypes") - public static URI notifYangStream(final String identifier, final UriInfo uriInfo, + public static URI subscribeToYangStream(final String identifier, final UriInfo uriInfo, final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) { - final String streamName = Notificator.createStreamNameFromUri(identifier); + final String streamName = ListenersBroker.createStreamNameFromUri(identifier); if (Strings.isNullOrEmpty(streamName)) { throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); } - List listeners = Notificator.getNotificationListenerFor(streamName); - if (identifier.contains(RestconfConstants.SLASH + NotificationOutputType.JSON.getName())) { - listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.JSON.getName()); - } else { - listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.XML.getName()); - } - if (listeners.isEmpty()) { - throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL, + final Optional notificationListenerAdapter = + ListenersBroker.getInstance().getNotificationListenerFor(streamName); + + if (!notificationListenerAdapter.isPresent()) { + throw new RestconfDocumentedException(String.format( + "Stream with name %s was not found.", streamName), + ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT); } - final DOMDataTreeReadWriteTransaction wTx = - handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction(); + final DOMDataTreeReadWriteTransaction writeTransaction = handlersHolder + .getTransactionChainHandler() + .get() + .newReadWriteTransaction(); final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get(); - final boolean exist = checkExist(schemaContext, wTx); - + final boolean exist = checkExist(schemaContext, writeTransaction); final URI uri = prepareUriByStreamName(uriInfo, streamName); - for (final NotificationListenerAdapter listener : listeners) { - registerToListenNotification(listener, handlersHolder.getNotificationServiceHandler()); - listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(), - notificationQueryParams.getFilter(), false); - listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); - final NormalizedNode mapToStreams = RestconfMappingNodeUtil - .mapYangNotificationStreamByIetfRestconfMonitoring(listener.getSchemaPath().getLastComponent(), - schemaContext.getNotifications(), notificationQueryParams.getStart(), - listener.getOutputType(), uri, getMonitoringModule(schemaContext), exist); - writeDataToDS(schemaContext, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist, - mapToStreams); - } - submitData(wTx); + registerToListenNotification( + notificationListenerAdapter.get(), handlersHolder.getNotificationServiceHandler()); + notificationListenerAdapter.get().setQueryParams( + notificationQueryParams.getStart(), + notificationQueryParams.getStop().orElse(null), + notificationQueryParams.getFilter().orElse(null), + false); + notificationListenerAdapter.get().setCloseVars( + handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); + final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring( + notificationListenerAdapter.get().getSchemaPath().getLastComponent(), + schemaContext.getNotifications(), notificationQueryParams.getStart(), + notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext), + exist); + writeDataToDS(schemaContext, + notificationListenerAdapter.get().getSchemaPath().getLastComponent().getLocalName(), writeTransaction, + exist, mapToStreams); + submitData(writeTransaction); return uri; } - static List - pickSpecificListenerByOutput(final List listeners, final String outputType) { - for (final NotificationListenerAdapter notificationListenerAdapter : listeners) { - if (notificationListenerAdapter.getOutputType().equals(outputType)) { - final List list = new ArrayList<>(); - list.add(notificationListenerAdapter); - return list; - } - } - return listeners; - } - /** * Prepare InstanceIdentifierContext for Location leaf. * - * @param schemaHandler - * schemaContext handler - * @return InstanceIdentifier of Location leaf + * @param schemaHandler Schema context handler. + * @return InstanceIdentifier of Location leaf. */ public static InstanceIdentifierContext prepareIIDSubsStreamOutput(final SchemaContextHandler schemaHandler) { - final QName qnameBase = QName.create("subscribe:to:notification", "2016-10-28", "notifi"); - final DataSchemaNode location = ((ContainerSchemaNode) schemaHandler.get() - .findModule(qnameBase.getModule()).get() - .getDataChildByName(qnameBase)).getDataChildByName(QName.create(qnameBase, "location")); - final List path = new ArrayList<>(); - path.add(NodeIdentifier.create(qnameBase)); - path.add(NodeIdentifier.create(QName.create(qnameBase, "location"))); + final Optional module = schemaHandler.get() + .findModule(RestconfStreamsConstants.NOTIFI_QNAME.getModule()); + Preconditions.checkState(module.isPresent()); + final Optional notify = module.get() + .findDataChildByName(RestconfStreamsConstants.NOTIFI_QNAME); + Preconditions.checkState(notify.isPresent()); + final Optional location = ((ContainerSchemaNode) notify.get()) + .findDataChildByName(RestconfStreamsConstants.LOCATION_QNAME); + Preconditions.checkState(location.isPresent()); - return new InstanceIdentifierContext(YangInstanceIdentifier.create(path), location, null, - schemaHandler.get()); + final List path = new ArrayList<>(); + path.add(NodeIdentifier.create(RestconfStreamsConstants.NOTIFI_QNAME)); + path.add(NodeIdentifier.create(RestconfStreamsConstants.LOCATION_QNAME)); + return new InstanceIdentifierContext(YangInstanceIdentifier.create(path), location.get(), + null, schemaHandler.get()); } /** - * Register listener by streamName in identifier to listen to data change - * notifications, put or delete info about listener to DS according to - * ietf-restconf-monitoring. + * Register listener by streamName in identifier to listen to data change notifications, and put or delete + * information about listener to DS according to ietf-restconf-monitoring. * - * @param identifier - * identifier as stream name - * @param uriInfo - * for getting base URI information - * @param notificationQueryParams - * query parameters of notification - * @param handlersHolder - * holder of handlers for notifications - * @return location for listening + * @param identifier Identifier as stream name. + * @param uriInfo Base URI information. + * @param notificationQueryParams Query parameters of notification. + * @param handlersHolder Holder of handlers for notifications. + * @return Location for listening. */ @SuppressWarnings("rawtypes") - public static URI notifiDataStream(final String identifier, final UriInfo uriInfo, + public static URI subscribeToDataStream(final String identifier, final UriInfo uriInfo, final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) { - final Map mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier); - - final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class, + final Map mapOfValues = mapValuesFromUri(identifier); + final LogicalDatastoreType datastoreType = parseURIEnum( + LogicalDatastoreType.class, mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME)); - if (ds == null) { - final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)"; - LOG.debug(msg); - throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE); + if (datastoreType == null) { + final String message = "Stream name doesn't contain datastore value (pattern /datastore=)"; + LOG.debug(message); + throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE); } - final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class, + final DataChangeScope scope = parseURIEnum( + DataChangeScope.class, mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME)); if (scope == null) { - final String msg = "Stream name doesn't contains datastore value (pattern /scope=)"; - LOG.warn(msg); - throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE); + final String message = "Stream name doesn't contains datastore value (pattern /scope=)"; + LOG.warn(message); + throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE); } - final String streamName = Notificator.createStreamNameFromUri(identifier); - - final ListenerAdapter listener = Notificator.getListenerFor(streamName); - Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName); - - listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(), - notificationQueryParams.getFilter(), false); - listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); + final String streamName = ListenersBroker.createStreamNameFromUri(identifier); + final Optional listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName); + Preconditions.checkArgument(listener.isPresent(), "Listener doesn't exist : " + streamName); - registration(ds, scope, listener, handlersHolder.getDomDataBrokerHandler().get()); + listener.get().setQueryParams( + notificationQueryParams.getStart(), + notificationQueryParams.getStop().orElse(null), + notificationQueryParams.getFilter().orElse(null), + false); + listener.get().setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); + registration(datastoreType, listener.get(), handlersHolder.getDomDataBrokerHandler().get()); final URI uri = prepareUriByStreamName(uriInfo, streamName); - - final DOMDataTreeReadWriteTransaction wTx = - handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction(); + final DOMDataTreeReadWriteTransaction writeTransaction + = handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction(); final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get(); - final boolean exist = checkExist(schemaContext, wTx); + final boolean exist = checkExist(schemaContext, writeTransaction); final NormalizedNode mapToStreams = RestconfMappingNodeUtil - .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(), - notificationQueryParams.getStart(), listener.getOutputType(), uri, + .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(), + notificationQueryParams.getStart(), listener.get().getOutputType(), uri, getMonitoringModule(schemaContext), exist, schemaContext); - writeDataToDS(schemaContext, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist, - mapToStreams); - submitData(wTx); + writeDataToDS(schemaContext, listener.get().getPath().getLastPathArgument().getNodeType().getLocalName(), + writeTransaction, exist, mapToStreams); + submitData(writeTransaction); return uri; } - public static Module getMonitoringModule(final SchemaContext schemaContext) { + static Module getMonitoringModule(final SchemaContext schemaContext) { return schemaContext.findModule(MonitoringModule.MODULE_QNAME).orElse(null); } /** - * Parse input of query parameters - start-time or stop-time - from - * {@link DateAndTime} format to {@link Instant} format. + * Parse input of query parameters - start-time or stop-time - from {@link DateAndTime} format + * to {@link Instant} format. * - * @param entry - * start-time or stop-time as string in {@link DateAndTime} - * format - * @return parsed {@link Instant} by entry + * @param entry Start-time or stop-time as string in {@link DateAndTime} format. + * @return Parsed {@link Instant} by entry. */ public static Instant parseDateFromQueryParam(final Entry> entry) { final DateAndTime event = new DateAndTime(entry.getValue().iterator().next()); final String value = event.getValue(); - final TemporalAccessor p; + final TemporalAccessor accessor; try { - p = FORMATTER.parse(value); + accessor = FORMATTER.parse(value); } catch (final DateTimeParseException e) { throw new RestconfDocumentedException("Cannot parse of value in date: " + value, e); } - return Instant.from(p); - + return Instant.from(accessor); } @SuppressWarnings("rawtypes") - static void writeDataToDS(final SchemaContext schemaContext, - final String name, final DOMDataTreeReadWriteTransaction readWriteTransaction, - final boolean exist, final NormalizedNode mapToStreams) { - String pathId = ""; + static void writeDataToDS(final SchemaContext schemaContext, final String name, + final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist, + final NormalizedNode mapToStreams) { + String pathId; if (exist) { pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name; } else { pathId = MonitoringModule.PATH_TO_STREAMS; } - readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaContext), - mapToStreams); + readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, + IdentifierCodec.deserialize(pathId, schemaContext), mapToStreams); } static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) { @@ -286,13 +269,12 @@ public final class SubscribeToStreamUtil { } /** - * Prepare map of values from URI. + * Prepare map of URI parameter-values. * - * @param identifier - * URI - * @return {@link Map} + * @param identifier String identification of URI. + * @return Map od URI parameters and values. */ - public static Map mapValuesFromUri(final String identifier) { + private static Map mapValuesFromUri(final String identifier) { final HashMap result = new HashMap<>(); for (final String token : RestconfConstants.SLASH_SPLITTER.split(identifier)) { final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL)); @@ -304,28 +286,28 @@ public final class SubscribeToStreamUtil { } static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) { + final String scheme = uriInfo.getAbsolutePath().getScheme(); final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder(); - - prepareNotificationPort(uriInfo.getBaseUri().getPort()); - uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI); + switch (scheme) { + case RestconfStreamsConstants.SCHEMA_UPGRADE_SECURED_URI: + uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCRIBE_SECURED_URI); + break; + case RestconfStreamsConstants.SCHEMA_UPGRADE_URI: + default: + uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCRIBE_URI); + } return uriBuilder.replacePath(streamName).build(); } /** - * Register data change listener in dom data broker and set it to listener - * on stream. + * Register data change listener in DOM data broker and set it to listener on stream. * - * @param ds - * {@link LogicalDatastoreType} - * @param scope - * {@link DataChangeScope} - * @param listener - * listener on specific stream - * @param domDataBroker - * data broker for register data change listener + * @param datastore {@link LogicalDatastoreType} + * @param listener listener on specific stream + * @param domDataBroker data broker for register data change listener */ - private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope, - final ListenerAdapter listener, final DOMDataBroker domDataBroker) { + private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener, + final DOMDataBroker domDataBroker) { if (listener.isListening()) { return; } @@ -336,37 +318,21 @@ public final class SubscribeToStreamUtil { throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService"); } - final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(ds, listener.getPath()); + final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath()); final ListenerRegistration registration = - changeService.registerDataTreeChangeListener(root, listener); - + changeService.registerDataTreeChangeListener(root, listener); listener.setRegistration(registration); } - /** - * Get port from web socket server. If doesn't exit, create it. - * - * @param port - * - port - */ - private static void prepareNotificationPort(final int port) { - try { - WebSocketServer.getInstance(); - } catch (final NullPointerException e) { - WebSocketServer.createInstance(port); - } - } - static boolean checkExist(final SchemaContext schemaContext, - final DOMDataTreeReadTransaction readWriteTransaction) { + final DOMDataTreeReadOperations readWriteTransaction) { boolean exist; try { - exist = readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL, + return readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).get(); - } catch (final InterruptedException | ExecutionException e) { - throw new RestconfDocumentedException("Problem while checking data if exists", e); + } catch (final InterruptedException | ExecutionException exception) { + throw new RestconfDocumentedException("Problem while checking data if exists", exception); } - return exist; } private static void registerToListenNotification(final NotificationListenerAdapter listener, @@ -378,18 +344,15 @@ public final class SubscribeToStreamUtil { final SchemaPath path = listener.getSchemaPath(); final ListenerRegistration registration = notificationServiceHandler.get().registerNotificationListener(listener, path); - listener.setRegistration(registration); } /** - * Parse enum from URI. + * Parse out enumeration from URI. * - * @param clazz - * enum type - * @param value - * string of enum value - * @return enum + * @param clazz Target enumeration type. + * @param value String representation of enumeration value. + * @return Parsed enumeration type. */ private static T parseURIEnum(final Class clazz, final String value) { if (value == null || value.equals("")) {