Bug 5679 - implement ietf-restconf-monitoring - cleanup
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / restconf / restful / utils / SubscribeToStreamUtil.java
index f387941b67b2a15612c7c78ba5a2dd207a2db5bf..d8df4d20c8d5ec62f2e31db9a7f038caf63645b6 100644 (file)
@@ -38,11 +38,11 @@ import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapte
 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
 import org.opendaylight.restconf.Draft18.MonitoringModule;
-import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
 import org.opendaylight.restconf.handlers.NotificationServiceHandler;
 import org.opendaylight.restconf.handlers.SchemaContextHandler;
-import org.opendaylight.restconf.handlers.TransactionChainHandler;
 import org.opendaylight.restconf.parser.IdentifierCodec;
+import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
+import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
 import org.opendaylight.restconf.utils.RestconfConstants;
 import org.opendaylight.restconf.utils.mapping.RestconfMappingNodeUtil;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
@@ -55,6 +55,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
@@ -72,83 +73,6 @@ public final class SubscribeToStreamUtil {
         throw new UnsupportedOperationException("Util class");
     }
 
-    /**
-     * Parse enum from URI
-     *
-     * @param clazz
-     *            - enum type
-     * @param value
-     *            - string of enum value
-     * @return enum
-     */
-    private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
-        if ((value == null) || value.equals("")) {
-            return null;
-        }
-        return ResolveEnumUtil.resolveEnum(clazz, value);
-    }
-
-    /**
-     * Prepare map of values from URI
-     *
-     * @param identifier
-     *            - URI
-     * @return {@link Map}
-     */
-    public static Map<String, String> mapValuesFromUri(final String identifier) {
-        final HashMap<String, String> result = new HashMap<>();
-        final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH));
-        for (final String token : tokens) {
-            final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
-            if (paramToken.length == 2) {
-                result.put(paramToken[0], paramToken[1]);
-            }
-        }
-        return result;
-    }
-
-    /**
-     * Register data change listener in dom data broker and set it to listener
-     * on stream
-     *
-     * @param ds
-     *            - {@link LogicalDatastoreType}
-     * @param scope
-     *            - {@link DataChangeScope}
-     * @param listener
-     *            - listener on specific stream
-     * @param domDataBroker
-     *            - data broker for register data change listener
-     */
-    private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
-            final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
-        if (listener.isListening()) {
-            return;
-        }
-
-        final YangInstanceIdentifier path = listener.getPath();
-        final ListenerRegistration<DOMDataChangeListener> registration = domDataBroker.registerDataChangeListener(ds,
-                path, listener, scope);
-
-        listener.setRegistration(registration);
-    }
-
-    /**
-     * Get port from web socket server. If doesn't exit, create it.
-     *
-     * @return port
-     */
-    private static int prepareNotificationPort() {
-        int port = RestconfStreamsConstants.NOTIFICATION_PORT;
-        try {
-            final WebSocketServer webSocketServer = WebSocketServer.getInstance();
-            port = webSocketServer.getPort();
-        } catch (final NullPointerException e) {
-            WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
-        }
-        return port;
-    }
-
     /**
      * Register listeners by streamName in identifier to listen to yang
      * notifications, put or delete info about listener to DS according to
@@ -158,24 +82,15 @@ public final class SubscribeToStreamUtil {
      *            - identifier as stream name
      * @param uriInfo
      *            - for getting base URI information
-     * @param start
-     *            - start-time query parameter
-     * @param stop
-     *            - stop-time query parameter
-     * @param notifiServiceHandler
-     *            - DOMNotificationService handler for register listeners
-     * @param filter
-     *            - indicate which subset of all possible events are of interest
-     * @param transactionChainHandler
-     *            - to put new data about stream to DS and delete after close
-     *            listener
-     * @param schemaHandler
-     *            - for getting schema context
+     * @param notificationQueryParams
+     *            - query parameters of notification
+     * @param handlersHolder
+     *            - holder of handlers for notifications
      * @return location for listening
      */
-    public static URI notifYangStream(final String identifier, final UriInfo uriInfo, Date start, final Date stop,
-            final NotificationServiceHandler notifiServiceHandler, final String filter,
-            final TransactionChainHandler transactionChainHandler, final SchemaContextHandler schemaHandler) {
+    @SuppressWarnings("rawtypes")
+    public static URI notifYangStream(final String identifier, final UriInfo uriInfo,
+            final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
         final String streamName = Notificator.createStreamNameFromUri(identifier);
         if (Strings.isNullOrEmpty(streamName)) {
             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
@@ -186,63 +101,27 @@ public final class SubscribeToStreamUtil {
                     ErrorTag.UNKNOWN_ELEMENT);
         }
 
-        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
-        int notificationPort = RestconfStreamsConstants.NOTIFICATION_PORT;
-        try {
-            final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
-            notificationPort = webSocketServerInstance.getPort();
-        } catch (final NullPointerException e) {
-            WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
-        }
-        final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
-        final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
-
-        final DOMDataReadWriteTransaction wTx = transactionChainHandler.get().newReadWriteTransaction();
-        final boolean exist = checkExist(schemaHandler, wTx);
-        final Module monitoringModule = schemaHandler.get()
-                .findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE);
-        if (start == null) {
-            start = new Date();
-        }
+        final DOMDataReadWriteTransaction wTx =
+                handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+        final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
+        final boolean exist = checkExist(schemaContext, wTx);
+
+        final URI uri = prepareUriByStreamName(uriInfo, streamName);
         for (final NotificationListenerAdapter listener : listeners) {
-            registerToListenNotification(listener, notifiServiceHandler);
-            listener.setQueryParams(start, stop, filter);
-            listener.setCloseVars(transactionChainHandler, schemaHandler);
+            registerToListenNotification(listener, handlersHolder.getNotificationServiceHandler());
+            listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
+                    notificationQueryParams.getFilter());
+            listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
             final NormalizedNode mapToStreams =
                     RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(listener.getSchemaPath().getLastComponent(),
-                    schemaHandler.get().getNotifications(), start, listener.getOutputType(),
-                            uriToWebsocketServer, monitoringModule, exist);
-            writeDataToDS(schemaHandler, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist, mapToStreams);
+                            schemaContext.getNotifications(), notificationQueryParams.getStart(),
+                            listener.getOutputType(), uri, getMonitoringModule(schemaContext), exist);
+            writeDataToDS(schemaContext, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist,
+                    mapToStreams);
         }
         submitData(wTx);
 
-        return uriToWebsocketServer;
-    }
-
-    private static boolean checkExist(final SchemaContextHandler schemaHandler, final DOMDataReadWriteTransaction wTx) {
-        boolean exist;
-        try {
-            exist = wTx
-                    .exists(LogicalDatastoreType.OPERATIONAL, IdentifierCodec
-                            .deserialize(MonitoringModule.PATH_TO_STREAMS, schemaHandler.get()))
-                    .checkedGet();
-        } catch (final ReadFailedException e1) {
-            throw new RestconfDocumentedException("Problem while checking data if exists", e1);
-        }
-        return exist;
-    }
-
-    private static void registerToListenNotification(final NotificationListenerAdapter listener,
-            final NotificationServiceHandler notificationServiceHandler) {
-        if (listener.isListening()) {
-            return;
-        }
-
-        final SchemaPath path = listener.getSchemaPath();
-        final ListenerRegistration<DOMNotificationListener> registration =
-                notificationServiceHandler.get().registerNotificationListener(listener, path);
-
-        listener.setRegistration(registration);
+        return uri;
     }
 
     /**
@@ -274,24 +153,15 @@ public final class SubscribeToStreamUtil {
      *            - identifier as stream name
      * @param uriInfo
      *            - for getting base URI information
-     * @param start
-     *            - start-time query parameter
-     * @param stop
-     *            - stop-time query parameter
-     * @param domDataBrokerHandler
-     *            - DOMDataBroker handler for register listener
-     * @param filter
-     *            - indicate which subset of all possible events are of interest
-     * @param schemaHandler
-     *            - for getting schema context
-     * @param transactionChainHandler
-     *            - to put new data about stream to DS and delete after close
-     *            listener
+     * @param notificationQueryParams
+     *            - query parameters of notification
+     * @param handlersHolder
+     *            - holder of handlers for notifications
      * @return location for listening
      */
-    public static URI notifiDataStream(final String identifier, final UriInfo uriInfo, Date start, final Date stop,
-            final DOMDataBrokerHandler domDataBrokerHandler, final String filter,
-            final TransactionChainHandler transactionChainHandler, final SchemaContextHandler schemaHandler) {
+    @SuppressWarnings("rawtypes")
+    public static URI notifiDataStream(final String identifier, final UriInfo uriInfo,
+            final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
         final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
 
         final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
@@ -315,53 +185,33 @@ public final class SubscribeToStreamUtil {
         final ListenerAdapter listener = Notificator.getListenerFor(streamName);
         Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
 
-        if (start == null) {
-            start = new Date();
-        }
+        listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
+                notificationQueryParams.getFilter());
+        listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
 
-        listener.setQueryParams(start, stop, filter);
-        listener.setCloseVars(transactionChainHandler, schemaHandler);
+        registration(ds, scope, listener, handlersHolder.getDomDataBrokerHandler().get());
 
-        SubscribeToStreamUtil.registration(ds, scope, listener, domDataBrokerHandler.get());
+        final URI uri = prepareUriByStreamName(uriInfo, streamName);
 
-        final int port = SubscribeToStreamUtil.prepareNotificationPort();
+        final DOMDataReadWriteTransaction wTx =
+                handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+        final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
+        final boolean exist = checkExist(schemaContext, wTx);
 
-        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
-        final UriBuilder uriToWebSocketServer =
-                uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
-        final URI uri = uriToWebSocketServer.replacePath(streamName).build();
-
-        final Module monitoringModule = schemaHandler.get()
-                .findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE);
-        final DOMDataReadWriteTransaction wTx = transactionChainHandler.get().newReadWriteTransaction();
-        final boolean exist = checkExist(schemaHandler, wTx);
-
-        final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(), start,
-                listener.getOutputType(), uri, monitoringModule, exist, schemaHandler.get());
-        writeDataToDS(schemaHandler, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist,
+        final NormalizedNode mapToStreams = RestconfMappingNodeUtil
+                .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
+                        notificationQueryParams.getStart(), listener.getOutputType(), uri,
+                        getMonitoringModule(schemaContext), exist, schemaContext);
+        writeDataToDS(schemaContext, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist,
                 mapToStreams);
         submitData(wTx);
         return uri;
     }
 
-    private static void writeDataToDS(final SchemaContextHandler schemaHandler, final String name,
-            final DOMDataReadWriteTransaction wTx, final boolean exist, final NormalizedNode mapToStreams) {
-        String pathId = "";
-        if (exist) {
-            pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
-        } else {
-            pathId = MonitoringModule.PATH_TO_STREAMS;
-        }
-        wTx.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaHandler.get()),
-                mapToStreams);
-    }
-
-    private static void submitData(final DOMDataReadWriteTransaction wTx) {
-        try {
-            wTx.submit().checkedGet();
-        } catch (final TransactionCommitFailedException e) {
-            throw new RestconfDocumentedException("Problem while putting data to DS.", e);
-        }
+    public static Module getMonitoringModule(final SchemaContext schemaContext) {
+        final Module monitoringModule =
+                schemaContext.findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE);
+        return monitoringModule;
     }
 
     /**
@@ -398,4 +248,138 @@ public final class SubscribeToStreamUtil {
             throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e);
         }
     }
+
+    @SuppressWarnings("rawtypes")
+    private static void writeDataToDS(final SchemaContext schemaContext, final String name,
+            final DOMDataReadWriteTransaction wTx, final boolean exist, final NormalizedNode mapToStreams) {
+        String pathId = "";
+        if (exist) {
+            pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
+        } else {
+            pathId = MonitoringModule.PATH_TO_STREAMS;
+        }
+        wTx.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaContext),
+                mapToStreams);
+    }
+
+    private static void submitData(final DOMDataReadWriteTransaction wTx) {
+        try {
+            wTx.submit().checkedGet();
+        } catch (final TransactionCommitFailedException e) {
+            throw new RestconfDocumentedException("Problem while putting data to DS.", e);
+        }
+    }
+
+    /**
+     * Prepare map of values from URI
+     *
+     * @param identifier
+     *            - URI
+     * @return {@link Map}
+     */
+    public static Map<String, String> mapValuesFromUri(final String identifier) {
+        final HashMap<String, String> result = new HashMap<>();
+        final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH));
+        for (final String token : tokens) {
+            final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
+            if (paramToken.length == 2) {
+                result.put(paramToken[0], paramToken[1]);
+            }
+        }
+        return result;
+    }
+
+    private static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+        final int port = SubscribeToStreamUtil.prepareNotificationPort();
+
+        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+        final UriBuilder uriToWebSocketServer =
+                uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
+        final URI uri = uriToWebSocketServer.replacePath(streamName).build();
+        return uri;
+    }
+
+    /**
+     * Register data change listener in dom data broker and set it to listener
+     * on stream
+     *
+     * @param ds
+     *            - {@link LogicalDatastoreType}
+     * @param scope
+     *            - {@link DataChangeScope}
+     * @param listener
+     *            - listener on specific stream
+     * @param domDataBroker
+     *            - data broker for register data change listener
+     */
+    @SuppressWarnings("deprecation")
+    private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
+            final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
+        if (listener.isListening()) {
+            return;
+        }
+
+        final YangInstanceIdentifier path = listener.getPath();
+        final ListenerRegistration<DOMDataChangeListener> registration =
+                domDataBroker.registerDataChangeListener(ds, path, listener, scope);
+
+        listener.setRegistration(registration);
+    }
+
+    /**
+     * Get port from web socket server. If doesn't exit, create it.
+     *
+     * @return port
+     */
+    private static int prepareNotificationPort() {
+        int port = RestconfStreamsConstants.NOTIFICATION_PORT;
+        try {
+            final WebSocketServer webSocketServer = WebSocketServer.getInstance();
+            port = webSocketServer.getPort();
+        } catch (final NullPointerException e) {
+            WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
+        }
+        return port;
+    }
+
+    private static boolean checkExist(final SchemaContext schemaContext, final DOMDataReadWriteTransaction wTx) {
+        boolean exist;
+        try {
+            exist = wTx.exists(LogicalDatastoreType.OPERATIONAL,
+                    IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).checkedGet();
+        } catch (final ReadFailedException e1) {
+            throw new RestconfDocumentedException("Problem while checking data if exists", e1);
+        }
+        return exist;
+    }
+
+    private static void registerToListenNotification(final NotificationListenerAdapter listener,
+            final NotificationServiceHandler notificationServiceHandler) {
+        if (listener.isListening()) {
+            return;
+        }
+
+        final SchemaPath path = listener.getSchemaPath();
+        final ListenerRegistration<DOMNotificationListener> registration =
+                notificationServiceHandler.get().registerNotificationListener(listener, path);
+
+        listener.setRegistration(registration);
+    }
+
+    /**
+     * Parse enum from URI
+     *
+     * @param clazz
+     *            - enum type
+     * @param value
+     *            - string of enum value
+     * @return enum
+     */
+    private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
+        if ((value == null) || value.equals("")) {
+            return null;
+        }
+        return ResolveEnumUtil.resolveEnum(clazz, value);
+    }
+
 }