Merge changes from topic 'web-sockets'
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / utils / SubscribeToStreamUtil.java
index b90dd4b145b207a430c47efe1365b50762412b60..eee8e432accb1d6757102e54cb00ce3926831f99 100644 (file)
@@ -21,36 +21,35 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadOperations;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
 import org.opendaylight.restconf.common.context.InstanceIdentifierContext;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
 import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
+import org.opendaylight.restconf.common.util.DataChangeScope;
 import org.opendaylight.restconf.nb.rfc8040.Rfc8040.MonitoringModule;
 import org.opendaylight.restconf.nb.rfc8040.handlers.NotificationServiceHandler;
 import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
-import org.opendaylight.restconf.nb.rfc8040.streams.websockets.WebSocketServer;
 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
 import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
@@ -66,7 +65,6 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Subscribe to stream util class.
- *
  */
 public final class SubscribeToStreamUtil {
 
@@ -82,219 +80,201 @@ public final class SubscribeToStreamUtil {
             .appendOffset("+HH:MM", "Z").toFormatter();
 
     private SubscribeToStreamUtil() {
-        throw new UnsupportedOperationException("Util class");
+        throw new UnsupportedOperationException("Utility class");
     }
 
     /**
-     * Register listeners by streamName in identifier to listen to yang
-     * notifications, put or delete info about listener to DS according to
-     * ietf-restconf-monitoring.
+     * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
+     * about listener to DS according to ietf-restconf-monitoring.
      *
-     * @param identifier
-     *             identifier as stream name
-     * @param uriInfo
-     *             for getting base URI information
-     * @param notificationQueryParams
-     *             query parameters of notification
-     * @param handlersHolder
-     *             holder of handlers for notifications
-     * @return location for listening
+     * @param identifier              Name of the stream.
+     * @param uriInfo                 URI information.
+     * @param notificationQueryParams Query parameters of notification.
+     * @param handlersHolder          Holder of handlers for notifications.
+     * @return Stream location for listening.
      */
     @SuppressWarnings("rawtypes")
-    public static URI notifYangStream(final String identifier, final UriInfo uriInfo,
+    public static URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
-        final String streamName = Notificator.createStreamNameFromUri(identifier);
+        final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
         if (Strings.isNullOrEmpty(streamName)) {
             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
         }
-        List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
-        if (identifier.contains(RestconfConstants.SLASH + NotificationOutputType.JSON.getName())) {
-            listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.JSON.getName());
-        } else {
-            listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.XML.getName());
-        }
-        if ((listeners == null) || listeners.isEmpty()) {
-            throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
+        final Optional<NotificationListenerAdapter> notificationListenerAdapter =
+                ListenersBroker.getInstance().getNotificationListenerFor(streamName);
+
+        if (!notificationListenerAdapter.isPresent()) {
+            throw new RestconfDocumentedException(String.format(
+                    "Stream with name %s was not found.", streamName),
+                    ErrorType.PROTOCOL,
                     ErrorTag.UNKNOWN_ELEMENT);
         }
 
-        final DOMDataReadWriteTransaction wTx =
-                handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+        final DOMDataTreeReadWriteTransaction writeTransaction = handlersHolder
+                .getTransactionChainHandler()
+                .get()
+                .newReadWriteTransaction();
         final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
-        final boolean exist = checkExist(schemaContext, wTx);
-
+        final boolean exist = checkExist(schemaContext, writeTransaction);
         final URI uri = prepareUriByStreamName(uriInfo, streamName);
-        for (final NotificationListenerAdapter listener : listeners) {
-            registerToListenNotification(listener, handlersHolder.getNotificationServiceHandler());
-            listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
-                    notificationQueryParams.getFilter(), false);
-            listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
-            final NormalizedNode mapToStreams = RestconfMappingNodeUtil
-                    .mapYangNotificationStreamByIetfRestconfMonitoring(listener.getSchemaPath().getLastComponent(),
-                            schemaContext.getNotifications(), notificationQueryParams.getStart(),
-                            listener.getOutputType(), uri, getMonitoringModule(schemaContext), exist);
-            writeDataToDS(schemaContext, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist,
-                    mapToStreams);
-        }
-        submitData(wTx);
 
+        registerToListenNotification(
+                notificationListenerAdapter.get(), handlersHolder.getNotificationServiceHandler());
+        notificationListenerAdapter.get().setQueryParams(
+                notificationQueryParams.getStart(),
+                notificationQueryParams.getStop().orElse(null),
+                notificationQueryParams.getFilter().orElse(null),
+                false);
+        notificationListenerAdapter.get().setCloseVars(
+                handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+        final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
+                notificationListenerAdapter.get().getSchemaPath().getLastComponent(),
+                schemaContext.getNotifications(), notificationQueryParams.getStart(),
+                notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext),
+                exist);
+        writeDataToDS(schemaContext,
+                notificationListenerAdapter.get().getSchemaPath().getLastComponent().getLocalName(), writeTransaction,
+                exist, mapToStreams);
+        submitData(writeTransaction);
         return uri;
     }
 
-    static List<NotificationListenerAdapter>
-            pickSpecificListenerByOutput(final List<NotificationListenerAdapter> listeners, final String outputType) {
-        for (final NotificationListenerAdapter notificationListenerAdapter : listeners) {
-            if (notificationListenerAdapter.getOutputType().equals(outputType)) {
-                final List<NotificationListenerAdapter> list = new ArrayList<>();
-                list.add(notificationListenerAdapter);
-                return list;
-            }
-        }
-        return listeners;
-    }
-
     /**
      * Prepare InstanceIdentifierContext for Location leaf.
      *
-     * @param schemaHandler
-     *             schemaContext handler
-     * @return InstanceIdentifier of Location leaf
+     * @param schemaHandler Schema context handler.
+     * @return InstanceIdentifier of Location leaf.
      */
     public static InstanceIdentifierContext<?> prepareIIDSubsStreamOutput(final SchemaContextHandler schemaHandler) {
-        final QName qnameBase = QName.create("subscribe:to:notification", "2016-10-28", "notifi");
-        final DataSchemaNode location = ((ContainerSchemaNode) schemaHandler.get()
-                .findModuleByNamespaceAndRevision(qnameBase.getNamespace(), qnameBase.getRevision())
-                .getDataChildByName(qnameBase)).getDataChildByName(QName.create(qnameBase, "location"));
-        final List<PathArgument> path = new ArrayList<>();
-        path.add(NodeIdentifier.create(qnameBase));
-        path.add(NodeIdentifier.create(QName.create(qnameBase, "location")));
+        final Optional<Module> module = schemaHandler.get()
+                .findModule(RestconfStreamsConstants.NOTIFI_QNAME.getModule());
+        Preconditions.checkState(module.isPresent());
+        final Optional<DataSchemaNode> notify = module.get()
+                .findDataChildByName(RestconfStreamsConstants.NOTIFI_QNAME);
+        Preconditions.checkState(notify.isPresent());
+        final Optional<DataSchemaNode> location = ((ContainerSchemaNode) notify.get())
+                .findDataChildByName(RestconfStreamsConstants.LOCATION_QNAME);
+        Preconditions.checkState(location.isPresent());
 
-        return new InstanceIdentifierContext<SchemaNode>(YangInstanceIdentifier.create(path), location, null,
-                schemaHandler.get());
+        final List<PathArgument> path = new ArrayList<>();
+        path.add(NodeIdentifier.create(RestconfStreamsConstants.NOTIFI_QNAME));
+        path.add(NodeIdentifier.create(RestconfStreamsConstants.LOCATION_QNAME));
+        return new InstanceIdentifierContext<SchemaNode>(YangInstanceIdentifier.create(path), location.get(),
+                null, schemaHandler.get());
     }
 
     /**
-     * Register listener by streamName in identifier to listen to data change
-     * notifications, put or delete info about listener to DS according to
-     * ietf-restconf-monitoring.
+     * Register listener by streamName in identifier to listen to data change notifications, and put or delete
+     * information about listener to DS according to ietf-restconf-monitoring.
      *
-     * @param identifier
-     *             identifier as stream name
-     * @param uriInfo
-     *             for getting base URI information
-     * @param notificationQueryParams
-     *             query parameters of notification
-     * @param handlersHolder
-     *             holder of handlers for notifications
-     * @return location for listening
+     * @param identifier              Identifier as stream name.
+     * @param uriInfo                 Base URI information.
+     * @param notificationQueryParams Query parameters of notification.
+     * @param handlersHolder          Holder of handlers for notifications.
+     * @return Location for listening.
      */
     @SuppressWarnings("rawtypes")
-    public static URI notifiDataStream(final String identifier, final UriInfo uriInfo,
+    public static URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
-        final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
-
-        final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
+        final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
+        final LogicalDatastoreType datastoreType = parseURIEnum(
+                LogicalDatastoreType.class,
                 mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
-        if (ds == null) {
-            final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)";
-            LOG.debug(msg);
-            throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+        if (datastoreType == null) {
+            final String message = "Stream name doesn't contain datastore value (pattern /datastore=)";
+            LOG.debug(message);
+            throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
         }
 
-        final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class,
+        final DataChangeScope scope = parseURIEnum(
+                DataChangeScope.class,
                 mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
         if (scope == null) {
-            final String msg = "Stream name doesn't contains datastore value (pattern /scope=)";
-            LOG.warn(msg);
-            throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+            final String message = "Stream name doesn't contains datastore value (pattern /scope=)";
+            LOG.warn(message);
+            throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
         }
 
-        final String streamName = Notificator.createStreamNameFromUri(identifier);
-
-        final ListenerAdapter listener = Notificator.getListenerFor(streamName);
-        Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
-
-        listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
-                notificationQueryParams.getFilter(), false);
-        listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+        final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
+        final Optional<ListenerAdapter> listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
+        Preconditions.checkArgument(listener.isPresent(), "Listener doesn't exist : " + streamName);
 
-        registration(ds, scope, listener, handlersHolder.getDomDataBrokerHandler().get());
+        listener.get().setQueryParams(
+                notificationQueryParams.getStart(),
+                notificationQueryParams.getStop().orElse(null),
+                notificationQueryParams.getFilter().orElse(null),
+                false);
+        listener.get().setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+        registration(datastoreType, listener.get(), handlersHolder.getDomDataBrokerHandler().get());
 
         final URI uri = prepareUriByStreamName(uriInfo, streamName);
-
-        final DOMDataReadWriteTransaction wTx =
-                handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+        final DOMDataTreeReadWriteTransaction writeTransaction
+                = handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
         final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
-        final boolean exist = checkExist(schemaContext, wTx);
+        final boolean exist = checkExist(schemaContext, writeTransaction);
 
         final NormalizedNode mapToStreams = RestconfMappingNodeUtil
-                .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
-                        notificationQueryParams.getStart(), listener.getOutputType(), uri,
+                .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(),
+                        notificationQueryParams.getStart(), listener.get().getOutputType(), uri,
                         getMonitoringModule(schemaContext), exist, schemaContext);
-        writeDataToDS(schemaContext, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist,
-                mapToStreams);
-        submitData(wTx);
+        writeDataToDS(schemaContext, listener.get().getPath().getLastPathArgument().getNodeType().getLocalName(),
+                writeTransaction, exist, mapToStreams);
+        submitData(writeTransaction);
         return uri;
     }
 
-    public static Module getMonitoringModule(final SchemaContext schemaContext) {
-        final Module monitoringModule =
-                schemaContext.findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE);
-        return monitoringModule;
+    static Module getMonitoringModule(final SchemaContext schemaContext) {
+        return schemaContext.findModule(MonitoringModule.MODULE_QNAME).orElse(null);
     }
 
     /**
-     * Parse input of query parameters - start-time or stop-time - from
-     * {@link DateAndTime} format to {@link Instant} format.
+     * Parse input of query parameters - start-time or stop-time - from {@link DateAndTime} format
+     * to {@link Instant} format.
      *
-     * @param entry
-     *             start-time or stop-time as string in {@link DateAndTime}
-     *            format
-     * @return parsed {@link Instant} by entry
+     * @param entry Start-time or stop-time as string in {@link DateAndTime} format.
+     * @return Parsed {@link Instant} by entry.
      */
     public static Instant parseDateFromQueryParam(final Entry<String, List<String>> entry) {
         final DateAndTime event = new DateAndTime(entry.getValue().iterator().next());
         final String value = event.getValue();
-        final TemporalAccessor p;
+        final TemporalAccessor accessor;
         try {
-            p = FORMATTER.parse(value);
+            accessor = FORMATTER.parse(value);
         } catch (final DateTimeParseException e) {
             throw new RestconfDocumentedException("Cannot parse of value in date: " + value, e);
         }
-        return Instant.from(p);
-
+        return Instant.from(accessor);
     }
 
     @SuppressWarnings("rawtypes")
-    static void writeDataToDS(final SchemaContext schemaContext,
-                              final String name, final DOMDataReadWriteTransaction readWriteTransaction,
-                              final boolean exist, final NormalizedNode mapToStreams) {
-        String pathId = "";
+    static void writeDataToDS(final SchemaContext schemaContext, final String name,
+            final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist,
+            final NormalizedNode mapToStreams) {
+        String pathId;
         if (exist) {
             pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
         } else {
             pathId = MonitoringModule.PATH_TO_STREAMS;
         }
-        readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaContext),
-                mapToStreams);
+        readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL,
+                IdentifierCodec.deserialize(pathId, schemaContext), mapToStreams);
     }
 
-    static void submitData(final DOMDataReadWriteTransaction readWriteTransaction) {
+    static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) {
         try {
-            readWriteTransaction.submit().checkedGet();
-        } catch (final TransactionCommitFailedException e) {
+            readWriteTransaction.commit().get();
+        } catch (final InterruptedException | ExecutionException e) {
             throw new RestconfDocumentedException("Problem while putting data to DS.", e);
         }
     }
 
     /**
-     * Prepare map of values from URI.
+     * Prepare map of URI parameter-values.
      *
-     * @param identifier
-     *             URI
-     * @return {@link Map}
+     * @param identifier String identification of URI.
+     * @return Map od URI parameters and values.
      */
-    public static Map<String, String> mapValuesFromUri(final String identifier) {
+    private static Map<String, String> mapValuesFromUri(final String identifier) {
         final HashMap<String, String> result = new HashMap<>();
         for (final String token : RestconfConstants.SLASH_SPLITTER.split(identifier)) {
             final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
@@ -306,68 +286,53 @@ public final class SubscribeToStreamUtil {
     }
 
     static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
-        final int port = SubscribeToStreamUtil.prepareNotificationPort();
-
-        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
-        final UriBuilder uriToWebSocketServer =
-                uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
-        final URI uri = uriToWebSocketServer.replacePath(streamName).build();
-        return uri;
+        final String scheme = uriInfo.getAbsolutePath().getScheme();
+        final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
+        switch (scheme) {
+            case RestconfStreamsConstants.SCHEMA_UPGRADE_SECURED_URI:
+                uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCRIBE_SECURED_URI);
+                break;
+            case RestconfStreamsConstants.SCHEMA_UPGRADE_URI:
+            default:
+                uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCRIBE_URI);
+        }
+        return uriBuilder.replacePath(streamName).build();
     }
 
     /**
-     * Register data change listener in dom data broker and set it to listener
-     * on stream.
+     * Register data change listener in DOM data broker and set it to listener on stream.
      *
-     * @param ds
-     *             {@link LogicalDatastoreType}
-     * @param scope
-     *             {@link DataChangeScope}
-     * @param listener
-     *             listener on specific stream
-     * @param domDataBroker
-     *             data broker for register data change listener
+     * @param datastore     {@link LogicalDatastoreType}
+     * @param listener      listener on specific stream
+     * @param domDataBroker data broker for register data change listener
      */
-    @SuppressWarnings("deprecation")
-    private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
-            final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
+    private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener,
+            final DOMDataBroker domDataBroker) {
         if (listener.isListening()) {
             return;
         }
 
-        final YangInstanceIdentifier path = listener.getPath();
-        final ListenerRegistration<DOMDataChangeListener> registration =
-                domDataBroker.registerDataChangeListener(ds, path, listener, scope);
+        final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
+                .getInstance(DOMDataTreeChangeService.class);
+        if (changeService == null) {
+            throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
+        }
 
+        final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath());
+        final ListenerRegistration<ListenerAdapter> registration =
+                changeService.registerDataTreeChangeListener(root, listener);
         listener.setRegistration(registration);
     }
 
-    /**
-     * Get port from web socket server. If doesn't exit, create it.
-     *
-     * @return port
-     */
-    private static int prepareNotificationPort() {
-        int port = RestconfStreamsConstants.NOTIFICATION_PORT;
-        try {
-            final WebSocketServer webSocketServer = WebSocketServer.getInstance();
-            port = webSocketServer.getPort();
-        } catch (final NullPointerException e) {
-            WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
-        }
-        return port;
-    }
-
     static boolean checkExist(final SchemaContext schemaContext,
-                              final DOMDataReadWriteTransaction readWriteTransaction) {
+                              final DOMDataTreeReadOperations readWriteTransaction) {
         boolean exist;
         try {
-            exist = readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL,
-                    IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).checkedGet();
-        } catch (final ReadFailedException e1) {
-            throw new RestconfDocumentedException("Problem while checking data if exists", e1);
+            return readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL,
+                    IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).get();
+        } catch (final InterruptedException | ExecutionException exception) {
+            throw new RestconfDocumentedException("Problem while checking data if exists", exception);
         }
-        return exist;
     }
 
     private static void registerToListenNotification(final NotificationListenerAdapter listener,
@@ -379,21 +344,18 @@ public final class SubscribeToStreamUtil {
         final SchemaPath path = listener.getSchemaPath();
         final ListenerRegistration<DOMNotificationListener> registration =
                 notificationServiceHandler.get().registerNotificationListener(listener, path);
-
         listener.setRegistration(registration);
     }
 
     /**
-     * Parse enum from URI.
+     * Parse out enumeration from URI.
      *
-     * @param clazz
-     *             enum type
-     * @param value
-     *             string of enum value
-     * @return enum
+     * @param clazz Target enumeration type.
+     * @param value String representation of enumeration value.
+     * @return Parsed enumeration type.
      */
     private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
-        if ((value == null) || value.equals("")) {
+        if (value == null || value.equals("")) {
             return null;
         }
         return ResolveEnumUtil.resolveEnum(clazz, value);