Bug 3959 - support netconf notification
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / restconf / impl / RestconfImpl.java
index 9a58a1c1626ce87a6a42c365bb6a4c6b4101af9d..7226a0abb22b357d4da42ab386cc66f46664296f 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.netconf.sal.restconf.impl;
 import com.google.common.base.CharMatcher;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
@@ -28,9 +27,11 @@ import java.net.URISyntaxException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -58,6 +59,7 @@ import org.opendaylight.netconf.sal.rest.api.RestconfService;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
 import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -69,8 +71,8 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgum
 import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -89,16 +91,11 @@ import org.opendaylight.yangtools.yang.model.api.LeafListSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-import org.opendaylight.yangtools.yang.model.util.EmptyType;
-import org.opendaylight.yangtools.yang.parser.builder.api.GroupingBuilder;
-import org.opendaylight.yangtools.yang.parser.builder.impl.ContainerSchemaNodeBuilder;
-import org.opendaylight.yangtools.yang.parser.builder.impl.LeafSchemaNodeBuilder;
-import org.opendaylight.yangtools.yang.parser.builder.impl.ModuleBuilder;
-import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -143,6 +140,12 @@ public class RestconfImpl implements RestconfService {
 
     private static final YangInstanceIdentifier.AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER;
 
+    public static final CharSequence DATA_SUBSCR = "data-change-event-subscription";
+    private static final CharSequence CREATE_DATA_SUBSCR = "create-" + DATA_SUBSCR;
+
+    public static final CharSequence NOTIFICATION_STREAM = "notification-stream";
+    private static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM;
+
     static {
         try {
             final Date eventSubscriptionAugRevision = new SimpleDateFormat("yyyy-MM-dd").parse("2014-07-08");
@@ -178,13 +181,13 @@ public class RestconfImpl implements RestconfService {
 
     @Override
     public NormalizedNodeContext getModules(final UriInfo uriInfo) {
-        final Set<Module> allModules = controllerContext.getAllModules();
+        final Set<Module> allModules = this.controllerContext.getAllModules();
         final MapNode allModuleMap = makeModuleMapNode(allModules);
 
-        final SchemaContext schemaContext = controllerContext.getGlobalSchema();
+        final SchemaContext schemaContext = this.controllerContext.getGlobalSchema();
 
         final Module restconfModule = getRestconfModule();
-        final DataSchemaNode modulesSchemaNode = controllerContext.getRestconfModuleRestConfSchemaNode(
+        final DataSchemaNode modulesSchemaNode = this.controllerContext.getRestconfModuleRestConfSchemaNode(
                 restconfModule, Draft02.RestConfModule.MODULES_CONTAINER_SCHEMA_NODE);
         Preconditions.checkState(modulesSchemaNode instanceof ContainerSchemaNode);
 
@@ -210,13 +213,13 @@ public class RestconfImpl implements RestconfService {
             throw new RestconfDocumentedException(errMsg, ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
         }
 
-        final InstanceIdentifierContext<?> mountPointIdentifier = controllerContext.toMountPointIdentifier(identifier);
+        final InstanceIdentifierContext<?> mountPointIdentifier = this.controllerContext.toMountPointIdentifier(identifier);
         final DOMMountPoint mountPoint = mountPointIdentifier.getMountPoint();
-        final Set<Module> modules = controllerContext.getAllModules(mountPoint);
+        final Set<Module> modules = this.controllerContext.getAllModules(mountPoint);
         final MapNode mountPointModulesMap = makeModuleMapNode(modules);
 
         final Module restconfModule = getRestconfModule();
-        final DataSchemaNode modulesSchemaNode = controllerContext.getRestconfModuleRestConfSchemaNode(
+        final DataSchemaNode modulesSchemaNode = this.controllerContext.getRestconfModuleRestConfSchemaNode(
                 restconfModule, Draft02.RestConfModule.MODULES_CONTAINER_SCHEMA_NODE);
         Preconditions.checkState(modulesSchemaNode instanceof ContainerSchemaNode);
 
@@ -225,7 +228,7 @@ public class RestconfImpl implements RestconfService {
         moduleContainerBuilder.withChild(mountPointModulesMap);
 
         return new NormalizedNodeContext(new InstanceIdentifierContext<>(null, modulesSchemaNode,
-                mountPoint, controllerContext.getGlobalSchema()), moduleContainerBuilder.build(),
+                mountPoint, this.controllerContext.getGlobalSchema()), moduleContainerBuilder.build(),
                 QueryParametersParser.parseWriterParameters(uriInfo));
     }
 
@@ -237,13 +240,13 @@ public class RestconfImpl implements RestconfService {
         DOMMountPoint mountPoint = null;
         final SchemaContext schemaContext;
         if (identifier.contains(ControllerContext.MOUNT)) {
-            final InstanceIdentifierContext<?> mountPointIdentifier = controllerContext.toMountPointIdentifier(identifier);
+            final InstanceIdentifierContext<?> mountPointIdentifier = this.controllerContext.toMountPointIdentifier(identifier);
             mountPoint = mountPointIdentifier.getMountPoint();
-            module = controllerContext.findModuleByNameAndRevision(mountPoint, moduleNameAndRevision);
+            module = this.controllerContext.findModuleByNameAndRevision(mountPoint, moduleNameAndRevision);
             schemaContext = mountPoint.getSchemaContext();
         } else {
-            module = controllerContext.findModuleByNameAndRevision(moduleNameAndRevision);
-            schemaContext = controllerContext.getGlobalSchema();
+            module = this.controllerContext.findModuleByNameAndRevision(moduleNameAndRevision);
+            schemaContext = this.controllerContext.getGlobalSchema();
         }
 
         if (module == null) {
@@ -257,7 +260,7 @@ public class RestconfImpl implements RestconfService {
         final Set<Module> modules = Collections.singleton(module);
         final MapNode moduleMap = makeModuleMapNode(modules);
 
-        final DataSchemaNode moduleSchemaNode = controllerContext.getRestconfModuleRestConfSchemaNode(
+        final DataSchemaNode moduleSchemaNode = this.controllerContext.getRestconfModuleRestConfSchemaNode(
                 restconfModule, Draft02.RestConfModule.MODULE_LIST_SCHEMA_NODE);
         Preconditions.checkState(moduleSchemaNode instanceof ListSchemaNode);
 
@@ -267,10 +270,10 @@ public class RestconfImpl implements RestconfService {
 
     @Override
     public NormalizedNodeContext getAvailableStreams(final UriInfo uriInfo) {
-        final SchemaContext schemaContext = controllerContext.getGlobalSchema();
+        final SchemaContext schemaContext = this.controllerContext.getGlobalSchema();
         final Set<String> availableStreams = Notificator.getStreamNames();
         final Module restconfModule = getRestconfModule();
-        final DataSchemaNode streamSchemaNode = controllerContext.getRestconfModuleRestConfSchemaNode(restconfModule,
+        final DataSchemaNode streamSchemaNode = this.controllerContext.getRestconfModuleRestConfSchemaNode(restconfModule,
                 Draft02.RestConfModule.STREAM_LIST_SCHEMA_NODE);
         Preconditions.checkState(streamSchemaNode instanceof ListSchemaNode);
 
@@ -281,7 +284,7 @@ public class RestconfImpl implements RestconfService {
             listStreamsBuilder.withChild(toStreamEntryNode(streamName, streamSchemaNode));
         }
 
-        final DataSchemaNode streamsContainerSchemaNode = controllerContext.getRestconfModuleRestConfSchemaNode(
+        final DataSchemaNode streamsContainerSchemaNode = this.controllerContext.getRestconfModuleRestConfSchemaNode(
                 restconfModule, Draft02.RestConfModule.STREAMS_CONTAINER_SCHEMA_NODE);
         Preconditions.checkState(streamsContainerSchemaNode instanceof ContainerSchemaNode);
 
@@ -296,7 +299,7 @@ public class RestconfImpl implements RestconfService {
 
     @Override
     public NormalizedNodeContext getOperations(final UriInfo uriInfo) {
-        final Set<Module> allModules = controllerContext.getAllModules();
+        final Set<Module> allModules = this.controllerContext.getAllModules();
         return operationsFromModulesToNormalizedContext(allModules, null);
     }
 
@@ -305,9 +308,9 @@ public class RestconfImpl implements RestconfService {
         Set<Module> modules = null;
         DOMMountPoint mountPoint = null;
         if (identifier.contains(ControllerContext.MOUNT)) {
-            final InstanceIdentifierContext<?> mountPointIdentifier = controllerContext.toMountPointIdentifier(identifier);
+            final InstanceIdentifierContext<?> mountPointIdentifier = this.controllerContext.toMountPointIdentifier(identifier);
             mountPoint = mountPointIdentifier.getMountPoint();
-            modules = controllerContext.getAllModules(mountPoint);
+            modules = this.controllerContext.getAllModules(mountPoint);
 
         } else {
             final String errMsg = "URI has bad format. If operations behind mount point should be showed, URI has to end with ";
@@ -318,71 +321,13 @@ public class RestconfImpl implements RestconfService {
         return operationsFromModulesToNormalizedContext(modules, mountPoint);
     }
 
-    private static final Predicate<GroupingBuilder> GROUPING_FILTER = new Predicate<GroupingBuilder>() {
-        @Override
-        public boolean apply(final GroupingBuilder g) {
-            return Draft02.RestConfModule.RESTCONF_GROUPING_SCHEMA_NODE.equals(g.getQName().getLocalName());
-        }
-    };
-
     private NormalizedNodeContext operationsFromModulesToNormalizedContext(final Set<Module> modules,
             final DOMMountPoint mountPoint) {
-
-        final Module restconfModule = getRestconfModule();
-        final ModuleBuilder restConfModuleBuilder = new ModuleBuilder(restconfModule);
-        final Set<GroupingBuilder> gropingBuilders = restConfModuleBuilder.getGroupingBuilders();
-        final Iterable<GroupingBuilder> filteredGroups = Iterables.filter(gropingBuilders, GROUPING_FILTER);
-        final GroupingBuilder restconfGroupingBuilder = Iterables.getFirst(filteredGroups, null);
-        final ContainerSchemaNodeBuilder restContainerSchemaNodeBuilder = (ContainerSchemaNodeBuilder) restconfGroupingBuilder
-                .getDataChildByName(Draft02.RestConfModule.RESTCONF_CONTAINER_SCHEMA_NODE);
-        final ContainerSchemaNodeBuilder containerSchemaNodeBuilder = (ContainerSchemaNodeBuilder) restContainerSchemaNodeBuilder
-                .getDataChildByName(Draft02.RestConfModule.OPERATIONS_CONTAINER_SCHEMA_NODE);
-
-        final ContainerSchemaNodeBuilder fakeOperationsSchemaNodeBuilder = containerSchemaNodeBuilder;
-        final SchemaPath fakeSchemaPath = fakeOperationsSchemaNodeBuilder.getPath().createChild(QName.create("dummy"));
-
-        final List<LeafNode<Object>> operationsAsData = new ArrayList<>();
-
-        for (final Module module : modules) {
-            final Set<RpcDefinition> rpcs = module.getRpcs();
-            for (final RpcDefinition rpc : rpcs) {
-                final QName rpcQName = rpc.getQName();
-                final String name = module.getName();
-
-                final QName qName = QName.create(restconfModule.getQNameModule(), rpcQName.getLocalName());
-                final LeafSchemaNodeBuilder leafSchemaNodeBuilder = new LeafSchemaNodeBuilder(name, 0, qName, fakeSchemaPath);
-                final LeafSchemaNodeBuilder fakeRpcSchemaNodeBuilder = leafSchemaNodeBuilder;
-                fakeRpcSchemaNodeBuilder.setAugmenting(true);
-
-                final EmptyType instance = EmptyType.getInstance();
-                fakeRpcSchemaNodeBuilder.setType(instance);
-                final LeafSchemaNode fakeRpcSchemaNode = fakeRpcSchemaNodeBuilder.build();
-                fakeOperationsSchemaNodeBuilder.addChildNode(fakeRpcSchemaNode);
-
-                final LeafNode<Object> leaf = Builders.leafBuilder(fakeRpcSchemaNode).build();
-                operationsAsData.add(leaf);
-            }
-        }
-
-        final ContainerSchemaNode operContainerSchemaNode = fakeOperationsSchemaNodeBuilder.build();
-        final DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> operContainerNode = Builders.containerBuilder(operContainerSchemaNode);
-
-        for (final LeafNode<Object> oper : operationsAsData) {
-            operContainerNode.withChild(oper);
-        }
-
-        final Set<Module> fakeRpcModules = Collections.singleton(restConfModuleBuilder.build());
-
-        final YangParserImpl yangParser = new YangParserImpl();
-        final SchemaContext fakeSchemaCx = yangParser.resolveSchemaContext(fakeRpcModules);
-
-        final InstanceIdentifierContext<?> fakeIICx = new InstanceIdentifierContext<>(null, operContainerSchemaNode, mountPoint, fakeSchemaCx);
-
-        return new NormalizedNodeContext(fakeIICx, operContainerNode.build());
+        throw new UnsupportedOperationException();
     }
 
     private Module getRestconfModule() {
-        final Module restconfModule = controllerContext.getRestconfModule();
+        final Module restconfModule = this.controllerContext.getRestconfModule();
         if (restconfModule == null) {
             LOG.debug("ietf-restconf module was not found.");
             throw new RestconfDocumentedException("ietf-restconf module was not found.", ErrorType.APPLICATION,
@@ -445,18 +390,26 @@ public class RestconfImpl implements RestconfService {
             response = mountRpcServices.get().invokeRpc(type, payload.getData());
         } else {
             if (namespace.toString().equals(SAL_REMOTE_NAMESPACE)) {
-                response = invokeSalRemoteRpcSubscribeRPC(payload);
+                if (identifier.contains(CREATE_DATA_SUBSCR)) {
+                    response = invokeSalRemoteRpcSubscribeRPC(payload);
+                } else if (identifier.contains(CREATE_NOTIFICATION_STREAM)) {
+                    response = invokeSalRemoteRpcNotifiStrRPC(payload);
+                } else {
+                    final String msg = "Not supported operation";
+                    LOG.warn(msg);
+                    throw new RestconfDocumentedException(msg, ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED);
+                }
             } else {
-                response = broker.invokeRpc(type, payload.getData());
+                response = this.broker.invokeRpc(type, payload.getData());
             }
-            schemaContext = controllerContext.getGlobalSchema();
+            schemaContext = this.controllerContext.getGlobalSchema();
         }
 
         final DOMRpcResult result = checkRpcResponse(response);
 
         RpcDefinition resultNodeSchema = null;
         final NormalizedNode<?, ?> resultData = result.getResult();
-        if (result != null && result.getResult() != null) {
+        if ((result != null) && (result.getResult() != null)) {
             resultNodeSchema = (RpcDefinition) payload.getInstanceIdentifierContext().getSchemaNode();
         }
 
@@ -471,7 +424,7 @@ public class RestconfImpl implements RestconfService {
         }
         try {
             final DOMRpcResult retValue = response.get();
-            if (retValue.getErrors() == null || retValue.getErrors().isEmpty()) {
+            if ((retValue.getErrors() == null) || retValue.getErrors().isEmpty()) {
                 return retValue;
             }
             LOG.debug("RpcError message", retValue.getErrors());
@@ -506,10 +459,10 @@ public class RestconfImpl implements RestconfService {
     }
 
     private static void validateInput(final SchemaNode inputSchema, final NormalizedNodeContext payload) {
-        if (inputSchema != null && payload.getData() == null) {
+        if ((inputSchema != null) && (payload.getData() == null)) {
             // expected a non null payload
             throw new RestconfDocumentedException("Input is required.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE);
-        } else if (inputSchema == null && payload.getData() != null) {
+        } else if ((inputSchema == null) && (payload.getData() != null)) {
             // did not expect any input
             throw new RestconfDocumentedException("No input expected.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE);
         }
@@ -535,9 +488,10 @@ public class RestconfImpl implements RestconfService {
         }
 
         final YangInstanceIdentifier pathIdentifier = ((YangInstanceIdentifier) pathValue);
-        String streamName = null;
+        String streamName = (String) CREATE_DATA_SUBSCR;
         if (!pathIdentifier.isEmpty()) {
-            final String fullRestconfIdentifier = controllerContext.toFullRestconfIdentifier(pathIdentifier, null);
+            final String fullRestconfIdentifier = DATA_SUBSCR
+                    + this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null);
 
             LogicalDatastoreType datastore = parseEnumTypeParameter(value, LogicalDatastoreType.class, DATASTORE_PARAM_NAME);
             datastore = datastore == null ? DEFAULT_DATASTORE : datastore;
@@ -572,7 +526,7 @@ public class RestconfImpl implements RestconfService {
 
     @Override
     public NormalizedNodeContext invokeRpc(final String identifier, final String noPayload, final UriInfo uriInfo) {
-        if (noPayload != null && !CharMatcher.WHITESPACE.matchesAllOf(noPayload)) {
+        if ((noPayload != null) && !CharMatcher.WHITESPACE.matchesAllOf(noPayload)) {
             throw new RestconfDocumentedException("Content must be empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
         }
 
@@ -581,7 +535,7 @@ public class RestconfImpl implements RestconfService {
         final SchemaContext schemaContext;
         if (identifier.contains(ControllerContext.MOUNT)) {
             // mounted RPC call - look up mount instance.
-            final InstanceIdentifierContext<?> mountPointId = controllerContext.toMountPointIdentifier(identifier);
+            final InstanceIdentifierContext<?> mountPointId = this.controllerContext.toMountPointIdentifier(identifier);
             mountPoint = mountPointId.getMountPoint();
             schemaContext = mountPoint.getSchemaContext();
             final int startOfRemoteRpcName = identifier.lastIndexOf(ControllerContext.MOUNT)
@@ -596,14 +550,14 @@ public class RestconfImpl implements RestconfService {
             throw new RestconfDocumentedException(slashErrorMsg, ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
         } else {
             identifierEncoded = identifier;
-            schemaContext = controllerContext.getGlobalSchema();
+            schemaContext = this.controllerContext.getGlobalSchema();
         }
 
-        final String identifierDecoded = controllerContext.urlPathArgDecode(identifierEncoded);
+        final String identifierDecoded = this.controllerContext.urlPathArgDecode(identifierEncoded);
 
         RpcDefinition rpc = null;
         if (mountPoint == null) {
-            rpc = controllerContext.getRpcDefinition(identifierDecoded, null);
+            rpc = this.controllerContext.getRpcDefinition(identifierDecoded, null);
         } else {
             rpc = findRpc(mountPoint.getSchemaContext(), identifierDecoded);
         }
@@ -627,7 +581,7 @@ public class RestconfImpl implements RestconfService {
             }
             response = mountRpcServices.get().invokeRpc(rpc.getPath(), null);
         } else {
-            response = broker.invokeRpc(rpc.getPath(), null);
+            response = this.broker.invokeRpc(rpc.getPath(), null);
         }
 
         final DOMRpcResult result = checkRpcResponse(response);
@@ -657,14 +611,14 @@ public class RestconfImpl implements RestconfService {
 
     @Override
     public NormalizedNodeContext readConfigurationData(final String identifier, final UriInfo uriInfo) {
-        final InstanceIdentifierContext<?> iiWithData = controllerContext.toInstanceIdentifier(identifier);
+        final InstanceIdentifierContext<?> iiWithData = this.controllerContext.toInstanceIdentifier(identifier);
         final DOMMountPoint mountPoint = iiWithData.getMountPoint();
         NormalizedNode<?, ?> data = null;
         final YangInstanceIdentifier normalizedII = iiWithData.getInstanceIdentifier();
         if (mountPoint != null) {
-            data = broker.readConfigurationData(mountPoint, normalizedII);
+            data = this.broker.readConfigurationData(mountPoint, normalizedII);
         } else {
-            data = broker.readConfigurationData(normalizedII);
+            data = this.broker.readConfigurationData(normalizedII);
         }
         if(data == null) {
             final String errMsg = "Request could not be completed because the relevant data model content does not exist ";
@@ -676,14 +630,14 @@ public class RestconfImpl implements RestconfService {
 
     @Override
     public NormalizedNodeContext readOperationalData(final String identifier, final UriInfo uriInfo) {
-        final InstanceIdentifierContext<?> iiWithData = controllerContext.toInstanceIdentifier(identifier);
+        final InstanceIdentifierContext<?> iiWithData = this.controllerContext.toInstanceIdentifier(identifier);
         final DOMMountPoint mountPoint = iiWithData.getMountPoint();
         NormalizedNode<?, ?> data = null;
         final YangInstanceIdentifier normalizedII = iiWithData.getInstanceIdentifier();
         if (mountPoint != null) {
-            data = broker.readOperationalData(mountPoint, normalizedII);
+            data = this.broker.readOperationalData(mountPoint, normalizedII);
         } else {
-            data = broker.readOperationalData(normalizedII);
+            data = this.broker.readOperationalData(normalizedII);
         }
         if(data == null) {
             final String errMsg = "Request could not be completed because the relevant data model content does not exist ";
@@ -722,9 +676,9 @@ public class RestconfImpl implements RestconfService {
         while(true) {
             try {
                 if (mountPoint != null) {
-                    broker.commitConfigurationDataPut(mountPoint, normalizedII, payload.getData()).checkedGet();
+                    this.broker.commitConfigurationDataPut(mountPoint, normalizedII, payload.getData()).checkedGet();
                 } else {
-                    broker.commitConfigurationDataPut(controllerContext.getGlobalSchema(), normalizedII, payload.getData()).checkedGet();
+                    this.broker.commitConfigurationDataPut(this.controllerContext.getGlobalSchema(), normalizedII, payload.getData()).checkedGet();
                 }
 
                 break;
@@ -740,7 +694,7 @@ public class RestconfImpl implements RestconfService {
                     LOG.debug("Update ConfigDataStore fail " + identifier, e);
                     throw new RestconfDocumentedException(e.getMessage(), e, e.getErrorList());
                 }
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 final String errMsg = "Error updating data ";
                 LOG.debug(errMsg + identifier, e);
                 throw new RestconfDocumentedException(errMsg, e);
@@ -788,7 +742,7 @@ public class RestconfImpl implements RestconfService {
         final NormalizedNode<?, ?> data = payload.getData();
         if (schemaNode instanceof ListSchemaNode) {
             final List<QName> keyDefinitions = ((ListSchemaNode) schemaNode).getKeyDefinition();
-            if (lastPathArgument instanceof NodeIdentifierWithPredicates && data instanceof MapEntryNode) {
+            if ((lastPathArgument instanceof NodeIdentifierWithPredicates) && (data instanceof MapEntryNode)) {
                 final Map<QName, Object> uriKeyValues = ((NodeIdentifierWithPredicates) lastPathArgument).getKeyValues();
                 isEqualUriAndPayloadKeyValues(uriKeyValues, (MapEntryNode) data, keyDefinitions);
             }
@@ -870,9 +824,9 @@ public class RestconfImpl implements RestconfService {
         final YangInstanceIdentifier normalizedII = iiWithData.getInstanceIdentifier();
         try {
             if (mountPoint != null) {
-                broker.commitConfigurationDataPost(mountPoint, normalizedII, payload.getData()).checkedGet();
+                this.broker.commitConfigurationDataPost(mountPoint, normalizedII, payload.getData()).checkedGet();
             } else {
-                broker.commitConfigurationDataPost(controllerContext.getGlobalSchema(), normalizedII, payload.getData()).checkedGet();
+                this.broker.commitConfigurationDataPost(this.controllerContext.getGlobalSchema(), normalizedII, payload.getData()).checkedGet();
             }
         } catch(final RestconfDocumentedException e) {
             throw e;
@@ -900,7 +854,7 @@ public class RestconfImpl implements RestconfService {
         final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
         uriBuilder.path("config");
         try {
-            uriBuilder.path(controllerContext.toFullRestconfIdentifier(normalizedII, mountPoint));
+            uriBuilder.path(this.controllerContext.toFullRestconfIdentifier(normalizedII, mountPoint));
         } catch (final Exception e) {
             LOG.info("Location for instance identifier" + normalizedII + "wasn't created", e);
             return null;
@@ -910,15 +864,15 @@ public class RestconfImpl implements RestconfService {
 
     @Override
     public Response deleteConfigurationData(final String identifier) {
-        final InstanceIdentifierContext<?> iiWithData = controllerContext.toInstanceIdentifier(identifier);
+        final InstanceIdentifierContext<?> iiWithData = this.controllerContext.toInstanceIdentifier(identifier);
         final DOMMountPoint mountPoint = iiWithData.getMountPoint();
         final YangInstanceIdentifier normalizedII = iiWithData.getInstanceIdentifier();
 
         try {
             if (mountPoint != null) {
-                broker.commitConfigurationDataDelete(mountPoint, normalizedII);
+                this.broker.commitConfigurationDataDelete(mountPoint, normalizedII);
             } else {
-                broker.commitConfigurationDataDelete(normalizedII).get();
+                this.broker.commitConfigurationDataDelete(normalizedII).get();
             }
         } catch (final Exception e) {
             final Optional<Throwable> searchedException = Iterables.tryFind(Throwables.getCausalChain(e),
@@ -944,6 +898,64 @@ public class RestconfImpl implements RestconfService {
      */
     @Override
     public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
+        if (identifier.contains(DATA_SUBSCR)) {
+            return dataSubs(identifier, uriInfo);
+        } else if (identifier.contains(NOTIFICATION_STREAM)) {
+            return notifStream(identifier, uriInfo);
+        }
+        final String msg = "Bad type of notification of sal-remote";
+        LOG.warn(msg);
+        throw new RestconfDocumentedException(msg);
+    }
+
+    /**
+     * Register notification listener by stream name
+     *
+     * @param identifier
+     *            - stream name
+     * @param uriInfo
+     *            - uriInfo
+     * @return {@link Response}
+     */
+    private Response notifStream(final String identifier, final UriInfo uriInfo) {
+        final String streamName = Notificator.createStreamNameFromUri(identifier);
+        if (Strings.isNullOrEmpty(streamName)) {
+            throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+        }
+        final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+        if ((listeners == null) || listeners.isEmpty()) {
+            throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
+                    ErrorTag.UNKNOWN_ELEMENT);
+        }
+
+        for (final NotificationListenerAdapter listener : listeners) {
+            this.broker.registerToListenNotification(listener);
+        }
+
+        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+        int notificationPort = NOTIFICATION_PORT;
+        try {
+            final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
+            notificationPort = webSocketServerInstance.getPort();
+        } catch (final NullPointerException e) {
+            WebSocketServer.createInstance(NOTIFICATION_PORT);
+        }
+        final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
+        final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
+
+        return Response.status(Status.OK).location(uriToWebsocketServer).build();
+    }
+
+    /**
+     * Register data change listener by stream name
+     *
+     * @param identifier
+     *            - stream name
+     * @param uriInfo
+     *            - uri info
+     * @return {@link Response}
+     */
+    private Response dataSubs(final String identifier, final UriInfo uriInfo) {
         final String streamName = Notificator.createStreamNameFromUri(identifier);
         if (Strings.isNullOrEmpty(streamName)) {
             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
@@ -967,7 +979,7 @@ public class RestconfImpl implements RestconfService {
                     ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
         }
 
-        broker.registerToListenDataChanges(datastore, scope, listener);
+        this.broker.registerToListenDataChanges(datastore, scope, listener);
 
         final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
         int notificationPort = NOTIFICATION_PORT;
@@ -984,25 +996,25 @@ public class RestconfImpl implements RestconfService {
     }
 
     @Override
-    public PATCHStatusContext patchConfigurationData(String identifier, PATCHContext context, UriInfo uriInfo) {
+    public PATCHStatusContext patchConfigurationData(final String identifier, final PATCHContext context, final UriInfo uriInfo) {
         if (context == null) {
             throw new RestconfDocumentedException("Input is required.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE);
         }
-        return broker.patchConfigurationDataWithinTransaction(context, controllerContext.getGlobalSchema());
+        return this.broker.patchConfigurationDataWithinTransaction(context, this.controllerContext.getGlobalSchema());
     }
 
     @Override
-    public PATCHStatusContext patchConfigurationData(PATCHContext context, @Context UriInfo uriInfo) {
+    public PATCHStatusContext patchConfigurationData(final PATCHContext context, @Context final UriInfo uriInfo) {
         if (context == null) {
             throw new RestconfDocumentedException("Input is required.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE);
         }
-        return broker.patchConfigurationDataWithinTransaction(context, controllerContext.getGlobalSchema());
+        return this.broker.patchConfigurationDataWithinTransaction(context, this.controllerContext.getGlobalSchema());
     }
 
     /**
      * Load parameter for subscribing to stream from input composite node
      *
-     * @param compNode
+     * @param value
      *            contains value
      * @return enum object if its string value is equal to {@code paramName}. In other cases null.
      */
@@ -1070,7 +1082,7 @@ public class RestconfImpl implements RestconfService {
     private MapNode makeModuleMapNode(final Set<Module> modules) {
         Preconditions.checkNotNull(modules);
         final Module restconfModule = getRestconfModule();
-        final DataSchemaNode moduleSchemaNode = controllerContext.getRestconfModuleRestConfSchemaNode(
+        final DataSchemaNode moduleSchemaNode = this.controllerContext.getRestconfModuleRestConfSchemaNode(
                 restconfModule, Draft02.RestConfModule.MODULE_LIST_SCHEMA_NODE);
         Preconditions.checkState(moduleSchemaNode instanceof ListSchemaNode);
 
@@ -1171,4 +1183,69 @@ public class RestconfImpl implements RestconfService {
 
         return streamNodeValues.build();
     }
+
+    /**
+     * Prepare stream for notification
+     *
+     * @param payload
+     *            - contains list of qnames of notifications
+     * @return - checked future object
+     */
+    private CheckedFuture<DOMRpcResult, DOMRpcException> invokeSalRemoteRpcNotifiStrRPC(
+            final NormalizedNodeContext payload) {
+        final ContainerNode data = (ContainerNode) payload.getData();
+        LeafSetNode leafSet = null;
+        String outputType = "XML";
+        for (final DataContainerChild<? extends PathArgument, ?> dataChild : data.getValue()) {
+            if (dataChild instanceof LeafSetNode) {
+                leafSet = (LeafSetNode) dataChild;
+            } else if (dataChild instanceof AugmentationNode) {
+                outputType = (String) (((AugmentationNode) dataChild).getValue()).iterator().next().getValue();
+            }
+        }
+
+        final Collection<LeafSetEntryNode> entryNodes = leafSet.getValue();
+        final List<SchemaPath> paths = new ArrayList<>();
+        String streamName = CREATE_NOTIFICATION_STREAM + "/";
+
+        final Iterator<LeafSetEntryNode> iterator = entryNodes.iterator();
+        while (iterator.hasNext()) {
+            final QName valueQName = QName.create((String) iterator.next().getValue());
+            final Module module = ControllerContext.getInstance()
+                    .findModuleByNamespace(valueQName.getModule().getNamespace());
+            Preconditions.checkNotNull(module, "Module for namespace " + valueQName.getModule().getNamespace()
+                    + " does not exist");
+            NotificationDefinition notifiDef = null;
+            for (final NotificationDefinition notification : module.getNotifications()) {
+                if (notification.getQName().equals(valueQName)) {
+                    notifiDef = notification;
+                    break;
+                }
+            }
+            final String moduleName = module.getName();
+            Preconditions.checkNotNull(notifiDef,
+                    "Notification " + valueQName + "doesn't exist in module " + moduleName);
+            paths.add(notifiDef.getPath());
+            streamName = streamName + moduleName + ":" + valueQName.getLocalName();
+            if (iterator.hasNext()) {
+                streamName = streamName + ",";
+            }
+        }
+
+        final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
+        final QName outputQname = QName.create(rpcQName, "output");
+        final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier");
+
+        final ContainerNode output = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(outputQname))
+                .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
+
+        if (!Notificator.existNotificationListenerFor(streamName)) {
+            Notificator.createNotificationListener(paths, streamName, outputType);
+        }
+
+        final DOMRpcResult defaultDOMRpcResult = new DefaultDOMRpcResult(output);
+
+        return Futures.immediateCheckedFuture(defaultDOMRpcResult);
+    }
 }