Merge "Option to receive only leaf nodes in websocket notifs"
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / restconf / restful / utils / SubscribeToStreamUtil.java
index 0904b02e461d4ba98b8a609859525f454716b03e..5b8a5255df03dc5ea78b4554486fffeb3f45ea54 100644 (file)
@@ -10,11 +10,13 @@ package org.opendaylight.restconf.restful.utils;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import java.net.URI;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.ChronoField;
+import java.time.temporal.TemporalAccessor;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -23,8 +25,11 @@ import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
 import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
@@ -34,18 +39,26 @@ import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
 import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
-import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
+import org.opendaylight.restconf.Rfc8040.MonitoringModule;
 import org.opendaylight.restconf.handlers.NotificationServiceHandler;
 import org.opendaylight.restconf.handlers.SchemaContextHandler;
+import org.opendaylight.restconf.parser.IdentifierCodec;
+import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
+import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
 import org.opendaylight.restconf.utils.RestconfConstants;
+import org.opendaylight.restconf.utils.mapping.RestconfMappingNodeUtil;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
@@ -58,145 +71,86 @@ import org.slf4j.LoggerFactory;
 public final class SubscribeToStreamUtil {
 
     private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
+    private static final DateTimeFormatter FORMATTER = new DateTimeFormatterBuilder()
+            .appendValue(ChronoField.YEAR, 4).appendLiteral('-')
+            .appendValue(ChronoField.MONTH_OF_YEAR, 2).appendLiteral('-')
+            .appendValue(ChronoField.DAY_OF_MONTH, 2).appendLiteral('T')
+            .appendValue(ChronoField.HOUR_OF_DAY, 2).appendLiteral(':')
+            .appendValue(ChronoField.MINUTE_OF_HOUR, 2).appendLiteral(':')
+            .appendValue(ChronoField.SECOND_OF_MINUTE, 2)
+            .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
+            .appendOffset("+HH:MM", "Z").toFormatter();
 
     private SubscribeToStreamUtil() {
         throw new UnsupportedOperationException("Util class");
     }
 
     /**
-     * Parse enum from URI
-     *
-     * @param clazz
-     *            - enum type
-     * @param value
-     *            - string of enum value
-     * @return enum
-     */
-    public static <T> T parseURIEnum(final Class<T> clazz, final String value) {
-        if ((value == null) || value.equals("")) {
-            return null;
-        }
-        return StreamUtil.resolveEnum(clazz, value);
-    }
-
-    /**
-     * Prepare map of values from URI
-     *
-     * @param identifier
-     *            - URI
-     * @return {@link Map}
-     */
-    public static Map<String, String> mapValuesFromUri(final String identifier) {
-        final HashMap<String, String> result = new HashMap<>();
-        final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH));
-        for (final String token : tokens) {
-            final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
-            if (paramToken.length == 2) {
-                result.put(paramToken[0], paramToken[1]);
-            }
-        }
-        return result;
-    }
-
-    /**
-     * Register data change listener in dom data broker and set it to listener
-     * on stream
-     *
-     * @param ds
-     *            - {@link LogicalDatastoreType}
-     * @param scope
-     *            - {@link DataChangeScope}
-     * @param listener
-     *            - listener on specific stream
-     * @param domDataBroker
-     *            - data broker for register data change listener
-     */
-    private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
-            final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
-        if (listener.isListening()) {
-            return;
-        }
-
-        final YangInstanceIdentifier path = listener.getPath();
-        final ListenerRegistration<DOMDataChangeListener> registration = domDataBroker.registerDataChangeListener(ds,
-                path, listener, scope);
-
-        listener.setRegistration(registration);
-    }
-
-    /**
-     * Get port from web socket server. If doesn't exit, create it.
-     *
-     * @return port
-     */
-    private static int prepareNotificationPort() {
-        int port = RestconfStreamsConstants.NOTIFICATION_PORT;
-        try {
-            final WebSocketServer webSocketServer = WebSocketServer.getInstance();
-            port = webSocketServer.getPort();
-        } catch (final NullPointerException e) {
-            WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
-        }
-        return port;
-    }
-
-    /**
-     * Register listeners by streamName in identifier to listen to yang notifications
+     * Register listeners by streamName in identifier to listen to yang
+     * notifications, put or delete info about listener to DS according to
+     * ietf-restconf-monitoring
      *
      * @param identifier
      *            - identifier as stream name
      * @param uriInfo
      *            - for getting base URI information
-     * @param start
-     *            - start-time query parameter
-     * @param stop
-     *            - stop-time query parameter
-     * @param notifiServiceHandler
-     *            - DOMNotificationService handler for register listeners
+     * @param notificationQueryParams
+     *            - query parameters of notification
+     * @param handlersHolder
+     *            - holder of handlers for notifications
      * @return location for listening
      */
-    public static URI notifStream(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
-            final NotificationServiceHandler notifiServiceHandler) {
+    @SuppressWarnings("rawtypes")
+    public static URI notifYangStream(final String identifier, final UriInfo uriInfo,
+            final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
         final String streamName = Notificator.createStreamNameFromUri(identifier);
         if (Strings.isNullOrEmpty(streamName)) {
             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
         }
-        final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+        List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+        if (identifier.contains(RestconfConstants.SLASH + NotificationOutputType.JSON.getName())) {
+            listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.JSON.getName());
+        } else {
+            listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.XML.getName());
+        }
         if ((listeners == null) || listeners.isEmpty()) {
             throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
                     ErrorTag.UNKNOWN_ELEMENT);
         }
 
-        for (final NotificationListenerAdapter listener : listeners) {
-            registerToListenNotification(listener, notifiServiceHandler);
-            listener.setTime(start, stop);
-        }
+        final DOMDataReadWriteTransaction wTx =
+                handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+        final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
+        final boolean exist = checkExist(schemaContext, wTx);
 
-        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
-        int notificationPort = RestconfStreamsConstants.NOTIFICATION_PORT;
-        try {
-            final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
-            notificationPort = webSocketServerInstance.getPort();
-        } catch (final NullPointerException e) {
-            WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
+        final URI uri = prepareUriByStreamName(uriInfo, streamName);
+        for (final NotificationListenerAdapter listener : listeners) {
+            registerToListenNotification(listener, handlersHolder.getNotificationServiceHandler());
+            listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
+                    notificationQueryParams.getFilter(), false);
+            listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+            final NormalizedNode mapToStreams =
+                    RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(listener.getSchemaPath().getLastComponent(),
+                            schemaContext.getNotifications(), notificationQueryParams.getStart(),
+                            listener.getOutputType(), uri, getMonitoringModule(schemaContext), exist);
+            writeDataToDS(schemaContext, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist,
+                    mapToStreams);
         }
-        final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
-        final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
+        submitData(wTx);
 
-        return uriToWebsocketServer;
+        return uri;
     }
 
-    private static void registerToListenNotification(final NotificationListenerAdapter listener,
-            final NotificationServiceHandler notificationServiceHandler) {
-        if (listener.isListening()) {
-            return;
+    static List<NotificationListenerAdapter>
+            pickSpecificListenerByOutput(final List<NotificationListenerAdapter> listeners, final String outputType) {
+        for (final NotificationListenerAdapter notificationListenerAdapter : listeners) {
+            if (notificationListenerAdapter.getOutputType().equals(outputType)) {
+                final List<NotificationListenerAdapter> list = new ArrayList<>();
+                list.add(notificationListenerAdapter);
+                return list;
+            }
         }
-
-        final SchemaPath path = listener.getSchemaPath();
-        final ListenerRegistration<DOMNotificationListener> registration =
-                notificationServiceHandler.get().registerNotificationListener(listener, path);
-
-        listener.setRegistration(registration);
+        return listeners;
     }
 
     /**
@@ -220,22 +174,23 @@ public final class SubscribeToStreamUtil {
     }
 
     /**
-     * Register listener by streamName in identifier to listen to yang notifications
+     * Register listener by streamName in identifier to listen to data change
+     * notifications, put or delete info about listener to DS according to
+     * ietf-restconf-monitoring
      *
      * @param identifier
      *            - identifier as stream name
      * @param uriInfo
      *            - for getting base URI information
-     * @param start
-     *            - start-time query parameter
-     * @param stop
-     *            - stop-time query parameter
-     * @param domDataBrokerHandler
-     *            - DOMDataBroker handler for register listener
+     * @param notificationQueryParams
+     *            - query parameters of notification
+     * @param handlersHolder
+     *            - holder of handlers for notifications
      * @return location for listening
      */
-    public static URI dataSubs(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
-            final DOMDataBrokerHandler domDataBrokerHandler) {
+    @SuppressWarnings("rawtypes")
+    public static URI notifiDataStream(final String identifier, final UriInfo uriInfo,
+            final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
         final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
 
         final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
@@ -259,41 +214,187 @@ public final class SubscribeToStreamUtil {
         final ListenerAdapter listener = Notificator.getListenerFor(streamName);
         Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
 
-        listener.setTimer(start, stop);
+        listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
+                notificationQueryParams.getFilter(), false);
+        listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+
+        registration(ds, scope, listener, handlersHolder.getDomDataBrokerHandler().get());
+
+        final URI uri = prepareUriByStreamName(uriInfo, streamName);
+
+        final DOMDataReadWriteTransaction wTx =
+                handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+        final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
+        final boolean exist = checkExist(schemaContext, wTx);
+
+        final NormalizedNode mapToStreams = RestconfMappingNodeUtil
+                .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
+                        notificationQueryParams.getStart(), listener.getOutputType(), uri,
+                        getMonitoringModule(schemaContext), exist, schemaContext);
+        writeDataToDS(schemaContext, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist,
+                mapToStreams);
+        submitData(wTx);
+        return uri;
+    }
+
+    public static Module getMonitoringModule(final SchemaContext schemaContext) {
+        final Module monitoringModule =
+                schemaContext.findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE);
+        return monitoringModule;
+    }
+
+    /**
+     * Parse input of query parameters - start-time or stop-time - from
+     * {@link DateAndTime} format to {@link Instant} format
+     *
+     * @param entry
+     *            - start-time or stop-time as string in {@link DateAndTime}
+     *            format
+     * @return parsed {@link Instant} by entry
+     */
+    public static Instant parseDateFromQueryParam(final Entry<String, List<String>> entry) {
+        final DateAndTime event = new DateAndTime(entry.getValue().iterator().next());
+        final String value = event.getValue();
+        final TemporalAccessor p;
+        try {
+            p = FORMATTER.parse(value);
+        } catch (DateTimeParseException e) {
+            throw new RestconfDocumentedException("Cannot parse of value in date: " + value, e);
+        }
+        return Instant.from(p);
+
+    }
+
+    @SuppressWarnings("rawtypes")
+    static void writeDataToDS(final SchemaContext schemaContext, final String name,
+            final DOMDataReadWriteTransaction wTx, final boolean exist, final NormalizedNode mapToStreams) {
+        String pathId = "";
+        if (exist) {
+            pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
+        } else {
+            pathId = MonitoringModule.PATH_TO_STREAMS;
+        }
+        wTx.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaContext),
+                mapToStreams);
+    }
 
-        SubscribeToStreamUtil.registration(ds, scope, listener, domDataBrokerHandler.get());
+    static void submitData(final DOMDataReadWriteTransaction wTx) {
+        try {
+            wTx.submit().checkedGet();
+        } catch (final TransactionCommitFailedException e) {
+            throw new RestconfDocumentedException("Problem while putting data to DS.", e);
+        }
+    }
+
+    /**
+     * Prepare map of values from URI
+     *
+     * @param identifier
+     *            - URI
+     * @return {@link Map}
+     */
+    public static Map<String, String> mapValuesFromUri(final String identifier) {
+        final HashMap<String, String> result = new HashMap<>();
+        for (final String token : RestconfConstants.SLASH_SPLITTER.split(identifier)) {
+            final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
+            if (paramToken.length == 2) {
+                result.put(paramToken[0], paramToken[1]);
+            }
+        }
+        return result;
+    }
 
+    static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
         final int port = SubscribeToStreamUtil.prepareNotificationPort();
 
         final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
         final UriBuilder uriToWebSocketServer =
                 uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
-        return uriToWebSocketServer.replacePath(streamName).build();
+        final URI uri = uriToWebSocketServer.replacePath(streamName).build();
+        return uri;
     }
 
-    public static Date parseDateFromQueryParam(final Entry<String, List<String>> entry) {
-        final DateAndTime event = new DateAndTime(entry.getValue().iterator().next());
-        String numOf_ms = "";
-        final String value = event.getValue();
-        if (value.contains(".")) {
-            numOf_ms = numOf_ms + ".";
-            final int lastChar = value.contains("Z") ? value.indexOf("Z") : (value.contains("+") ? value.indexOf("+")
-                    : (value.contains("-") ? value.indexOf("-") : value.length()));
-            for (int i = 0; i < (lastChar - value.indexOf(".") - 1); i++) {
-                numOf_ms = numOf_ms + "S";
-            }
+    /**
+     * Register data change listener in dom data broker and set it to listener
+     * on stream
+     *
+     * @param ds
+     *            - {@link LogicalDatastoreType}
+     * @param scope
+     *            - {@link DataChangeScope}
+     * @param listener
+     *            - listener on specific stream
+     * @param domDataBroker
+     *            - data broker for register data change listener
+     */
+    @SuppressWarnings("deprecation")
+    private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
+            final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
+        if (listener.isListening()) {
+            return;
         }
-        String zone = "";
-        if (!value.contains("Z")) {
-            zone = zone + "XXX";
+
+        final YangInstanceIdentifier path = listener.getPath();
+        final ListenerRegistration<DOMDataChangeListener> registration =
+                domDataBroker.registerDataChangeListener(ds, path, listener, scope);
+
+        listener.setRegistration(registration);
+    }
+
+    /**
+     * Get port from web socket server. If doesn't exit, create it.
+     *
+     * @return port
+     */
+    private static int prepareNotificationPort() {
+        int port = RestconfStreamsConstants.NOTIFICATION_PORT;
+        try {
+            final WebSocketServer webSocketServer = WebSocketServer.getInstance();
+            port = webSocketServer.getPort();
+        } catch (final NullPointerException e) {
+            WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
         }
-        final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" + numOf_ms + zone);
+        return port;
+    }
 
+    static boolean checkExist(final SchemaContext schemaContext, final DOMDataReadWriteTransaction wTx) {
+        boolean exist;
         try {
-            return dateFormatter.parse(value.contains("Z") ? value.replace('T', ' ').substring(0, value.indexOf("Z"))
-                    : value.replace('T', ' '));
-        } catch (final ParseException e) {
-            throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e);
+            exist = wTx.exists(LogicalDatastoreType.OPERATIONAL,
+                    IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).checkedGet();
+        } catch (final ReadFailedException e1) {
+            throw new RestconfDocumentedException("Problem while checking data if exists", e1);
         }
+        return exist;
     }
+
+    private static void registerToListenNotification(final NotificationListenerAdapter listener,
+            final NotificationServiceHandler notificationServiceHandler) {
+        if (listener.isListening()) {
+            return;
+        }
+
+        final SchemaPath path = listener.getSchemaPath();
+        final ListenerRegistration<DOMNotificationListener> registration =
+                notificationServiceHandler.get().registerNotificationListener(listener, path);
+
+        listener.setRegistration(registration);
+    }
+
+    /**
+     * Parse enum from URI
+     *
+     * @param clazz
+     *            - enum type
+     * @param value
+     *            - string of enum value
+     * @return enum
+     */
+    private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
+        if ((value == null) || value.equals("")) {
+            return null;
+        }
+        return ResolveEnumUtil.resolveEnum(clazz, value);
+    }
+
 }