Bug 4883 - implement query parameter - filter
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / restconf / restful / utils / SubscribeToStreamUtil.java
index c07551c182ed62e323ff455ca94621ed913b60f8..f6d45e56258e22ae4ef6aa37b2cec16335a2d19c 100644 (file)
@@ -7,17 +7,49 @@
  */
 package org.opendaylight.restconf.restful.utils;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import java.net.URI;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
 import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
+import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
+import org.opendaylight.restconf.handlers.NotificationServiceHandler;
+import org.opendaylight.restconf.handlers.SchemaContextHandler;
 import org.opendaylight.restconf.utils.RestconfConstants;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Subscribe to stream util class
@@ -25,6 +57,8 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  */
 public final class SubscribeToStreamUtil {
 
+    private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
+
     private SubscribeToStreamUtil() {
         throw new UnsupportedOperationException("Util class");
     }
@@ -77,7 +111,7 @@ public final class SubscribeToStreamUtil {
      * @param domDataBroker
      *            - data broker for register data change listener
      */
-    public static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
+    private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
             final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
         if (listener.isListening()) {
             return;
@@ -95,7 +129,7 @@ public final class SubscribeToStreamUtil {
      *
      * @return port
      */
-    public static int prepareNotificationPort() {
+    private static int prepareNotificationPort() {
         int port = RestconfStreamsConstants.NOTIFICATION_PORT;
         try {
             final WebSocketServer webSocketServer = WebSocketServer.getInstance();
@@ -106,4 +140,164 @@ public final class SubscribeToStreamUtil {
         return port;
     }
 
+    /**
+     * Register listeners by streamName in identifier to listen to yang notifications
+     *
+     * @param identifier
+     *            - identifier as stream name
+     * @param uriInfo
+     *            - for getting base URI information
+     * @param start
+     *            - start-time query parameter
+     * @param stop
+     *            - stop-time query parameter
+     * @param notifiServiceHandler
+     *            - DOMNotificationService handler for register listeners
+     * @param filter
+     *            - indicate which subset of all possible events are of interest
+     * @return location for listening
+     */
+    public static URI notifStream(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
+            final NotificationServiceHandler notifiServiceHandler, final String filter) {
+        final String streamName = Notificator.createStreamNameFromUri(identifier);
+        if (Strings.isNullOrEmpty(streamName)) {
+            throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+        }
+        final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+        if ((listeners == null) || listeners.isEmpty()) {
+            throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
+                    ErrorTag.UNKNOWN_ELEMENT);
+        }
+
+        for (final NotificationListenerAdapter listener : listeners) {
+            registerToListenNotification(listener, notifiServiceHandler);
+            listener.setQueryParams(start, stop, filter);
+        }
+
+        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+        int notificationPort = RestconfStreamsConstants.NOTIFICATION_PORT;
+        try {
+            final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
+            notificationPort = webSocketServerInstance.getPort();
+        } catch (final NullPointerException e) {
+            WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
+        }
+        final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
+        final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
+
+        return uriToWebsocketServer;
+    }
+
+    private static void registerToListenNotification(final NotificationListenerAdapter listener,
+            final NotificationServiceHandler notificationServiceHandler) {
+        if (listener.isListening()) {
+            return;
+        }
+
+        final SchemaPath path = listener.getSchemaPath();
+        final ListenerRegistration<DOMNotificationListener> registration =
+                notificationServiceHandler.get().registerNotificationListener(listener, path);
+
+        listener.setRegistration(registration);
+    }
+
+    /**
+     * Prepare InstanceIdentifierContext for Location leaf
+     *
+     * @param schemaHandler
+     *            - schemaContext handler
+     * @return InstanceIdentifier of Location leaf
+     */
+    public static InstanceIdentifierContext<?> prepareIIDSubsStreamOutput(final SchemaContextHandler schemaHandler) {
+        final QName qnameBase = QName.create("subscribe:to:notification", "2016-10-28", "notifi");
+        final DataSchemaNode location = ((ContainerSchemaNode) schemaHandler.get()
+                .findModuleByNamespaceAndRevision(qnameBase.getNamespace(), qnameBase.getRevision())
+                .getDataChildByName(qnameBase)).getDataChildByName(QName.create(qnameBase, "location"));
+        final List<PathArgument> path = new ArrayList<>();
+        path.add(NodeIdentifier.create(qnameBase));
+        path.add(NodeIdentifier.create(QName.create(qnameBase, "location")));
+
+        return new InstanceIdentifierContext<SchemaNode>(YangInstanceIdentifier.create(path), location, null,
+                schemaHandler.get());
+    }
+
+    /**
+     * Register listener by streamName in identifier to listen to yang notifications
+     *
+     * @param identifier
+     *            - identifier as stream name
+     * @param uriInfo
+     *            - for getting base URI information
+     * @param start
+     *            - start-time query parameter
+     * @param stop
+     *            - stop-time query parameter
+     * @param domDataBrokerHandler
+     *            - DOMDataBroker handler for register listener
+     * @param filter
+     *            - indicate which subset of all possible events are of interest
+     * @return location for listening
+     */
+    public static URI dataSubs(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
+            final DOMDataBrokerHandler domDataBrokerHandler, final String filter) {
+        final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
+
+        final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
+                mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
+        if (ds == null) {
+            final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)";
+            LOG.debug(msg);
+            throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+        }
+
+        final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class,
+                mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
+        if (scope == null) {
+            final String msg = "Stream name doesn't contains datastore value (pattern /scope=)";
+            LOG.warn(msg);
+            throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+        }
+
+        final String streamName = Notificator.createStreamNameFromUri(identifier);
+
+        final ListenerAdapter listener = Notificator.getListenerFor(streamName);
+        Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
+
+        listener.setQueryParams(start, stop, filter);
+
+        SubscribeToStreamUtil.registration(ds, scope, listener, domDataBrokerHandler.get());
+
+        final int port = SubscribeToStreamUtil.prepareNotificationPort();
+
+        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+        final UriBuilder uriToWebSocketServer =
+                uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
+        return uriToWebSocketServer.replacePath(streamName).build();
+    }
+
+    public static Date parseDateFromQueryParam(final Entry<String, List<String>> entry) {
+        final DateAndTime event = new DateAndTime(entry.getValue().iterator().next());
+        String numOf_ms = "";
+        final String value = event.getValue();
+        if (value.contains(".")) {
+            numOf_ms = numOf_ms + ".";
+            final int lastChar = value.contains("Z") ? value.indexOf("Z") : (value.contains("+") ? value.indexOf("+")
+                    : (value.contains("-") ? value.indexOf("-") : value.length()));
+            for (int i = 0; i < (lastChar - value.indexOf(".") - 1); i++) {
+                numOf_ms = numOf_ms + "S";
+            }
+        }
+        String zone = "";
+        if (!value.contains("Z")) {
+            zone = zone + "XXX";
+        }
+        final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" + numOf_ms + zone);
+
+        try {
+            return dateFormatter.parse(value.contains("Z") ? value.replace('T', ' ').substring(0, value.indexOf("Z"))
+                    : value.replace('T', ' '));
+        } catch (final ParseException e) {
+            throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e);
+        }
+    }
 }