X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=restconf%2Fsal-rest-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Frestconf%2Frestful%2Futils%2FSubscribeToStreamUtil.java;h=5b8a5255df03dc5ea78b4554486fffeb3f45ea54;hb=6c7dc98ecbf088276bdff9b9fadb0e1236711f8a;hp=0904b02e461d4ba98b8a609859525f454716b03e;hpb=49a2bd9c0c10ea3356aba72284a5f2ab4661966e;p=netconf.git diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/SubscribeToStreamUtil.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/SubscribeToStreamUtil.java index 0904b02e46..5b8a5255df 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/SubscribeToStreamUtil.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/SubscribeToStreamUtil.java @@ -10,11 +10,13 @@ package org.opendaylight.restconf.restful.utils; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import java.net.URI; -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.format.DateTimeParseException; +import java.time.temporal.ChronoField; +import java.time.temporal.TemporalAccessor; import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -23,8 +25,11 @@ import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext; import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException; @@ -34,18 +39,26 @@ import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter; import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter; import org.opendaylight.netconf.sal.streams.listeners.Notificator; import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer; -import org.opendaylight.restconf.handlers.DOMDataBrokerHandler; +import org.opendaylight.restconf.Rfc8040.MonitoringModule; import org.opendaylight.restconf.handlers.NotificationServiceHandler; import org.opendaylight.restconf.handlers.SchemaContextHandler; +import org.opendaylight.restconf.parser.IdentifierCodec; +import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder; +import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams; import org.opendaylight.restconf.utils.RestconfConstants; +import org.opendaylight.restconf.utils.mapping.RestconfMappingNodeUtil; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; +import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode; import org.opendaylight.yangtools.yang.model.api.DataSchemaNode; +import org.opendaylight.yangtools.yang.model.api.Module; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaNode; import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; @@ -58,145 +71,86 @@ import org.slf4j.LoggerFactory; public final class SubscribeToStreamUtil { private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class); + private static final DateTimeFormatter FORMATTER = new DateTimeFormatterBuilder() + .appendValue(ChronoField.YEAR, 4).appendLiteral('-') + .appendValue(ChronoField.MONTH_OF_YEAR, 2).appendLiteral('-') + .appendValue(ChronoField.DAY_OF_MONTH, 2).appendLiteral('T') + .appendValue(ChronoField.HOUR_OF_DAY, 2).appendLiteral(':') + .appendValue(ChronoField.MINUTE_OF_HOUR, 2).appendLiteral(':') + .appendValue(ChronoField.SECOND_OF_MINUTE, 2) + .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true) + .appendOffset("+HH:MM", "Z").toFormatter(); private SubscribeToStreamUtil() { throw new UnsupportedOperationException("Util class"); } /** - * Parse enum from URI - * - * @param clazz - * - enum type - * @param value - * - string of enum value - * @return enum - */ - public static T parseURIEnum(final Class clazz, final String value) { - if ((value == null) || value.equals("")) { - return null; - } - return StreamUtil.resolveEnum(clazz, value); - } - - /** - * Prepare map of values from URI - * - * @param identifier - * - URI - * @return {@link Map} - */ - public static Map mapValuesFromUri(final String identifier) { - final HashMap result = new HashMap<>(); - final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH)); - for (final String token : tokens) { - final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL)); - if (paramToken.length == 2) { - result.put(paramToken[0], paramToken[1]); - } - } - return result; - } - - /** - * Register data change listener in dom data broker and set it to listener - * on stream - * - * @param ds - * - {@link LogicalDatastoreType} - * @param scope - * - {@link DataChangeScope} - * @param listener - * - listener on specific stream - * @param domDataBroker - * - data broker for register data change listener - */ - private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope, - final ListenerAdapter listener, final DOMDataBroker domDataBroker) { - if (listener.isListening()) { - return; - } - - final YangInstanceIdentifier path = listener.getPath(); - final ListenerRegistration registration = domDataBroker.registerDataChangeListener(ds, - path, listener, scope); - - listener.setRegistration(registration); - } - - /** - * Get port from web socket server. If doesn't exit, create it. - * - * @return port - */ - private static int prepareNotificationPort() { - int port = RestconfStreamsConstants.NOTIFICATION_PORT; - try { - final WebSocketServer webSocketServer = WebSocketServer.getInstance(); - port = webSocketServer.getPort(); - } catch (final NullPointerException e) { - WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT); - } - return port; - } - - /** - * Register listeners by streamName in identifier to listen to yang notifications + * Register listeners by streamName in identifier to listen to yang + * notifications, put or delete info about listener to DS according to + * ietf-restconf-monitoring * * @param identifier * - identifier as stream name * @param uriInfo * - for getting base URI information - * @param start - * - start-time query parameter - * @param stop - * - stop-time query parameter - * @param notifiServiceHandler - * - DOMNotificationService handler for register listeners + * @param notificationQueryParams + * - query parameters of notification + * @param handlersHolder + * - holder of handlers for notifications * @return location for listening */ - public static URI notifStream(final String identifier, final UriInfo uriInfo, final Date start, final Date stop, - final NotificationServiceHandler notifiServiceHandler) { + @SuppressWarnings("rawtypes") + public static URI notifYangStream(final String identifier, final UriInfo uriInfo, + final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) { final String streamName = Notificator.createStreamNameFromUri(identifier); if (Strings.isNullOrEmpty(streamName)) { throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); } - final List listeners = Notificator.getNotificationListenerFor(streamName); + List listeners = Notificator.getNotificationListenerFor(streamName); + if (identifier.contains(RestconfConstants.SLASH + NotificationOutputType.JSON.getName())) { + listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.JSON.getName()); + } else { + listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.XML.getName()); + } if ((listeners == null) || listeners.isEmpty()) { throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT); } - for (final NotificationListenerAdapter listener : listeners) { - registerToListenNotification(listener, notifiServiceHandler); - listener.setTime(start, stop); - } + final DOMDataReadWriteTransaction wTx = + handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction(); + final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get(); + final boolean exist = checkExist(schemaContext, wTx); - final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder(); - int notificationPort = RestconfStreamsConstants.NOTIFICATION_PORT; - try { - final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance(); - notificationPort = webSocketServerInstance.getPort(); - } catch (final NullPointerException e) { - WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT); + final URI uri = prepareUriByStreamName(uriInfo, streamName); + for (final NotificationListenerAdapter listener : listeners) { + registerToListenNotification(listener, handlersHolder.getNotificationServiceHandler()); + listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(), + notificationQueryParams.getFilter(), false); + listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); + final NormalizedNode mapToStreams = + RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(listener.getSchemaPath().getLastComponent(), + schemaContext.getNotifications(), notificationQueryParams.getStart(), + listener.getOutputType(), uri, getMonitoringModule(schemaContext), exist); + writeDataToDS(schemaContext, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist, + mapToStreams); } - final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws"); - final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build(); + submitData(wTx); - return uriToWebsocketServer; + return uri; } - private static void registerToListenNotification(final NotificationListenerAdapter listener, - final NotificationServiceHandler notificationServiceHandler) { - if (listener.isListening()) { - return; + static List + pickSpecificListenerByOutput(final List listeners, final String outputType) { + for (final NotificationListenerAdapter notificationListenerAdapter : listeners) { + if (notificationListenerAdapter.getOutputType().equals(outputType)) { + final List list = new ArrayList<>(); + list.add(notificationListenerAdapter); + return list; + } } - - final SchemaPath path = listener.getSchemaPath(); - final ListenerRegistration registration = - notificationServiceHandler.get().registerNotificationListener(listener, path); - - listener.setRegistration(registration); + return listeners; } /** @@ -220,22 +174,23 @@ public final class SubscribeToStreamUtil { } /** - * Register listener by streamName in identifier to listen to yang notifications + * Register listener by streamName in identifier to listen to data change + * notifications, put or delete info about listener to DS according to + * ietf-restconf-monitoring * * @param identifier * - identifier as stream name * @param uriInfo * - for getting base URI information - * @param start - * - start-time query parameter - * @param stop - * - stop-time query parameter - * @param domDataBrokerHandler - * - DOMDataBroker handler for register listener + * @param notificationQueryParams + * - query parameters of notification + * @param handlersHolder + * - holder of handlers for notifications * @return location for listening */ - public static URI dataSubs(final String identifier, final UriInfo uriInfo, final Date start, final Date stop, - final DOMDataBrokerHandler domDataBrokerHandler) { + @SuppressWarnings("rawtypes") + public static URI notifiDataStream(final String identifier, final UriInfo uriInfo, + final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) { final Map mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier); final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class, @@ -259,41 +214,187 @@ public final class SubscribeToStreamUtil { final ListenerAdapter listener = Notificator.getListenerFor(streamName); Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName); - listener.setTimer(start, stop); + listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(), + notificationQueryParams.getFilter(), false); + listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); + + registration(ds, scope, listener, handlersHolder.getDomDataBrokerHandler().get()); + + final URI uri = prepareUriByStreamName(uriInfo, streamName); + + final DOMDataReadWriteTransaction wTx = + handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction(); + final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get(); + final boolean exist = checkExist(schemaContext, wTx); + + final NormalizedNode mapToStreams = RestconfMappingNodeUtil + .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(), + notificationQueryParams.getStart(), listener.getOutputType(), uri, + getMonitoringModule(schemaContext), exist, schemaContext); + writeDataToDS(schemaContext, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist, + mapToStreams); + submitData(wTx); + return uri; + } + + public static Module getMonitoringModule(final SchemaContext schemaContext) { + final Module monitoringModule = + schemaContext.findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE); + return monitoringModule; + } + + /** + * Parse input of query parameters - start-time or stop-time - from + * {@link DateAndTime} format to {@link Instant} format + * + * @param entry + * - start-time or stop-time as string in {@link DateAndTime} + * format + * @return parsed {@link Instant} by entry + */ + public static Instant parseDateFromQueryParam(final Entry> entry) { + final DateAndTime event = new DateAndTime(entry.getValue().iterator().next()); + final String value = event.getValue(); + final TemporalAccessor p; + try { + p = FORMATTER.parse(value); + } catch (DateTimeParseException e) { + throw new RestconfDocumentedException("Cannot parse of value in date: " + value, e); + } + return Instant.from(p); + + } + + @SuppressWarnings("rawtypes") + static void writeDataToDS(final SchemaContext schemaContext, final String name, + final DOMDataReadWriteTransaction wTx, final boolean exist, final NormalizedNode mapToStreams) { + String pathId = ""; + if (exist) { + pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name; + } else { + pathId = MonitoringModule.PATH_TO_STREAMS; + } + wTx.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaContext), + mapToStreams); + } - SubscribeToStreamUtil.registration(ds, scope, listener, domDataBrokerHandler.get()); + static void submitData(final DOMDataReadWriteTransaction wTx) { + try { + wTx.submit().checkedGet(); + } catch (final TransactionCommitFailedException e) { + throw new RestconfDocumentedException("Problem while putting data to DS.", e); + } + } + + /** + * Prepare map of values from URI + * + * @param identifier + * - URI + * @return {@link Map} + */ + public static Map mapValuesFromUri(final String identifier) { + final HashMap result = new HashMap<>(); + for (final String token : RestconfConstants.SLASH_SPLITTER.split(identifier)) { + final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL)); + if (paramToken.length == 2) { + result.put(paramToken[0], paramToken[1]); + } + } + return result; + } + static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) { final int port = SubscribeToStreamUtil.prepareNotificationPort(); final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder(); final UriBuilder uriToWebSocketServer = uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI); - return uriToWebSocketServer.replacePath(streamName).build(); + final URI uri = uriToWebSocketServer.replacePath(streamName).build(); + return uri; } - public static Date parseDateFromQueryParam(final Entry> entry) { - final DateAndTime event = new DateAndTime(entry.getValue().iterator().next()); - String numOf_ms = ""; - final String value = event.getValue(); - if (value.contains(".")) { - numOf_ms = numOf_ms + "."; - final int lastChar = value.contains("Z") ? value.indexOf("Z") : (value.contains("+") ? value.indexOf("+") - : (value.contains("-") ? value.indexOf("-") : value.length())); - for (int i = 0; i < (lastChar - value.indexOf(".") - 1); i++) { - numOf_ms = numOf_ms + "S"; - } + /** + * Register data change listener in dom data broker and set it to listener + * on stream + * + * @param ds + * - {@link LogicalDatastoreType} + * @param scope + * - {@link DataChangeScope} + * @param listener + * - listener on specific stream + * @param domDataBroker + * - data broker for register data change listener + */ + @SuppressWarnings("deprecation") + private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope, + final ListenerAdapter listener, final DOMDataBroker domDataBroker) { + if (listener.isListening()) { + return; } - String zone = ""; - if (!value.contains("Z")) { - zone = zone + "XXX"; + + final YangInstanceIdentifier path = listener.getPath(); + final ListenerRegistration registration = + domDataBroker.registerDataChangeListener(ds, path, listener, scope); + + listener.setRegistration(registration); + } + + /** + * Get port from web socket server. If doesn't exit, create it. + * + * @return port + */ + private static int prepareNotificationPort() { + int port = RestconfStreamsConstants.NOTIFICATION_PORT; + try { + final WebSocketServer webSocketServer = WebSocketServer.getInstance(); + port = webSocketServer.getPort(); + } catch (final NullPointerException e) { + WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT); } - final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" + numOf_ms + zone); + return port; + } + static boolean checkExist(final SchemaContext schemaContext, final DOMDataReadWriteTransaction wTx) { + boolean exist; try { - return dateFormatter.parse(value.contains("Z") ? value.replace('T', ' ').substring(0, value.indexOf("Z")) - : value.replace('T', ' ')); - } catch (final ParseException e) { - throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e); + exist = wTx.exists(LogicalDatastoreType.OPERATIONAL, + IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).checkedGet(); + } catch (final ReadFailedException e1) { + throw new RestconfDocumentedException("Problem while checking data if exists", e1); } + return exist; } + + private static void registerToListenNotification(final NotificationListenerAdapter listener, + final NotificationServiceHandler notificationServiceHandler) { + if (listener.isListening()) { + return; + } + + final SchemaPath path = listener.getSchemaPath(); + final ListenerRegistration registration = + notificationServiceHandler.get().registerNotificationListener(listener, path); + + listener.setRegistration(registration); + } + + /** + * Parse enum from URI + * + * @param clazz + * - enum type + * @param value + * - string of enum value + * @return enum + */ + private static T parseURIEnum(final Class clazz, final String value) { + if ((value == null) || value.equals("")) { + return null; + } + return ResolveEnumUtil.resolveEnum(clazz, value); + } + }