Bug 3959 - support netconf notification 39/41639/18
authorJakub Toth <jatoth@cisco.com>
Fri, 8 Jul 2016 13:44:52 +0000 (15:44 +0200)
committerJakub Toth <jatoth@cisco.com>
Tue, 26 Jul 2016 08:47:32 +0000 (10:47 +0200)
  *added dependency for converting JSON to XML
  *created new notification listener adapter extended
   by DOMNotificationListener
  *added augment for sal-remote to add new param
   for input of create-notification-stream rpc to
   make a choice between JSON or XML output of
   notification (not mandatory - default is XML)
  *added unit tests

Change-Id: I7f5909208dce71e2280fd1bc6dbe49cb7533523a
Signed-off-by: Jakub Toth <jatoth@cisco.com>
restconf/sal-rest-connector/pom.xml
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BrokerFacade.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfImpl.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfProviderImpl.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java [new file with mode: 0644]
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerHandler.java
restconf/sal-rest-connector/src/main/yang/sal-remote-augment.yang
restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java
restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplTest.java

index 84e5e8e8c43eddeb0b213f9211466c540b832eee..806ee55e9308d524bc7b837bbfea9b08ac9bc570 100644 (file)
       <version>1.2</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.json</groupId>
+      <artifactId>json</artifactId>
+    </dependency>
+
     <!-- Testing Dependencies -->
     <dependency>
       <groupId>org.glassfish.jersey.test-framework.providers</groupId>
index e2097310f2ce485d422941dc05237f665a4a069e..37c90515abab3c33cff7f38ecb2f06a3e881b17b 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.netconf.sal.restconf.impl;
 
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -28,6 +29,8 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
@@ -35,6 +38,7 @@ import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
 import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
@@ -54,12 +58,17 @@ public class BrokerFacade {
     private volatile DOMRpcService rpcService;
     private volatile ConsumerSession context;
     private DOMDataBroker domDataBroker;
+    private DOMNotificationService domNotification;
 
     private BrokerFacade() {
     }
 
     public void setRpcService(final DOMRpcService router) {
-        rpcService = router;
+        this.rpcService = router;
+    }
+
+    public void setDomNotificationService(final DOMNotificationService domNotification) {
+        this.domNotification = domNotification;
     }
 
     public void setContext(final ConsumerSession context) {
@@ -71,7 +80,7 @@ public class BrokerFacade {
     }
 
     private void checkPreconditions() {
-        if (context == null || domDataBroker == null) {
+        if ((this.context == null) || (this.domDataBroker == null)) {
             throw new RestconfDocumentedException(Status.SERVICE_UNAVAILABLE);
         }
     }
@@ -79,7 +88,7 @@ public class BrokerFacade {
     // READ configuration
     public NormalizedNode<?, ?> readConfigurationData(final YangInstanceIdentifier path) {
         checkPreconditions();
-        return readDataViaTransaction(domDataBroker.newReadOnlyTransaction(), CONFIGURATION, path);
+        return readDataViaTransaction(this.domDataBroker.newReadOnlyTransaction(), CONFIGURATION, path);
     }
 
     public NormalizedNode<?, ?> readConfigurationData(final DOMMountPoint mountPoint, final YangInstanceIdentifier path) {
@@ -95,7 +104,7 @@ public class BrokerFacade {
     // READ operational
     public NormalizedNode<?, ?> readOperationalData(final YangInstanceIdentifier path) {
         checkPreconditions();
-        return readDataViaTransaction(domDataBroker.newReadOnlyTransaction(), OPERATIONAL, path);
+        return readDataViaTransaction(this.domDataBroker.newReadOnlyTransaction(), OPERATIONAL, path);
     }
 
     public NormalizedNode<?, ?> readOperationalData(final DOMMountPoint mountPoint, final YangInstanceIdentifier path) {
@@ -112,7 +121,7 @@ public class BrokerFacade {
     public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPut(
             final SchemaContext globalSchema, final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
         checkPreconditions();
-        return putDataViaTransaction(domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema);
+        return putDataViaTransaction(this.domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema);
     }
 
     public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPut(
@@ -129,13 +138,13 @@ public class BrokerFacade {
 
     public PATCHStatusContext patchConfigurationDataWithinTransaction(final PATCHContext context,
                                                                       final SchemaContext globalSchema) {
-        final DOMDataReadWriteTransaction patchTransaction = domDataBroker.newReadWriteTransaction();
-        List<PATCHStatusEntity> editCollection = new ArrayList<>();
+        final DOMDataReadWriteTransaction patchTransaction = this.domDataBroker.newReadWriteTransaction();
+        final List<PATCHStatusEntity> editCollection = new ArrayList<>();
         List<RestconfError> editErrors;
-        List<RestconfError> globalErrors = null;
+        final List<RestconfError> globalErrors = null;
         int errorCounter = 0;
 
-        for (PATCHEntity patchEntity : context.getData()) {
+        for (final PATCHEntity patchEntity : context.getData()) {
             final PATCHEditOperation operation = PATCHEditOperation.valueOf(patchEntity.getOperation().toUpperCase());
 
             switch (operation) {
@@ -145,7 +154,7 @@ public class BrokerFacade {
                             postDataWithinTransaction(patchTransaction, CONFIGURATION, patchEntity.getTargetNode(),
                                     patchEntity.getNode(), globalSchema);
                             editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), true, null));
-                        } catch (RestconfDocumentedException e) {
+                        } catch (final RestconfDocumentedException e) {
                             editErrors = new ArrayList<>();
                             editErrors.addAll(e.getErrors());
                             editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), false, editErrors));
@@ -159,7 +168,7 @@ public class BrokerFacade {
                             putDataWithinTransaction(patchTransaction, CONFIGURATION, patchEntity
                                     .getTargetNode(), patchEntity.getNode(), globalSchema);
                             editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), true, null));
-                        } catch (RestconfDocumentedException e) {
+                        } catch (final RestconfDocumentedException e) {
                             editErrors = new ArrayList<>();
                             editErrors.addAll(e.getErrors());
                             editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), false, editErrors));
@@ -173,7 +182,7 @@ public class BrokerFacade {
                             deleteDataWithinTransaction(patchTransaction, CONFIGURATION, patchEntity
                                     .getTargetNode());
                             editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), true, null));
-                        } catch (RestconfDocumentedException e) {
+                        } catch (final RestconfDocumentedException e) {
                             editErrors = new ArrayList<>();
                             editErrors.addAll(e.getErrors());
                             editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), false, editErrors));
@@ -187,7 +196,7 @@ public class BrokerFacade {
                             deleteDataWithinTransaction(patchTransaction, CONFIGURATION, patchEntity
                                     .getTargetNode());
                             editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), true, null));
-                        } catch (RestconfDocumentedException e) {
+                        } catch (final RestconfDocumentedException e) {
                             LOG.error("Error removing {} by {} operation", patchEntity.getTargetNode().toString(),
                                     patchEntity.getEditId(), e);
                         }
@@ -213,7 +222,7 @@ public class BrokerFacade {
     public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPost(
             final SchemaContext globalSchema, final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
         checkPreconditions();
-        return postDataViaTransaction(domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema);
+        return postDataViaTransaction(this.domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema);
     }
 
     public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPost(
@@ -232,7 +241,7 @@ public class BrokerFacade {
     public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataDelete(
             final YangInstanceIdentifier path) {
         checkPreconditions();
-        return deleteDataViaTransaction(domDataBroker.newWriteOnlyTransaction(), CONFIGURATION, path);
+        return deleteDataViaTransaction(this.domDataBroker.newWriteOnlyTransaction(), CONFIGURATION, path);
     }
 
     public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataDelete(
@@ -249,11 +258,11 @@ public class BrokerFacade {
     // RPC
     public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
         checkPreconditions();
-        if (rpcService == null) {
+        if (this.rpcService == null) {
             throw new RestconfDocumentedException(Status.SERVICE_UNAVAILABLE);
         }
         LOG.trace("Invoke RPC {} with input: {}", type, input);
-        return rpcService.invokeRpc(type, input);
+        return this.rpcService.invokeRpc(type, input);
     }
 
     public void registerToListenDataChanges(final LogicalDatastoreType datastore, final DataChangeScope scope,
@@ -265,7 +274,7 @@ public class BrokerFacade {
         }
 
         final YangInstanceIdentifier path = listener.getPath();
-        final ListenerRegistration<DOMDataChangeListener> registration = domDataBroker.registerDataChangeListener(
+        final ListenerRegistration<DOMDataChangeListener> registration = this.domDataBroker.registerDataChangeListener(
                 datastore, path, listener, scope);
 
         listener.setRegistration(registration);
@@ -421,4 +430,18 @@ public class BrokerFacade {
                 ImmutableNodes.fromInstanceId(schemaContext, YangInstanceIdentifier.create(normalizedPathWithoutChildArgs));
         rwTx.merge(store, rootNormalizedPath, parentStructure);
     }
+
+    public void registerToListenNotification(final NotificationListenerAdapter listener) {
+        checkPreconditions();
+
+        if (listener.isListening()) {
+            return;
+        }
+
+        final SchemaPath path = listener.getSchemaPath();
+        final ListenerRegistration<DOMNotificationListener> registration = this.domNotification
+                .registerNotificationListener(listener, path);
+
+        listener.setRegistration(registration);
+    }
 }
index 493dbb99c433f2f1d510f5221f12f25b518d1fae..7226a0abb22b357d4da42ab386cc66f46664296f 100644 (file)
@@ -26,9 +26,12 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -56,6 +59,7 @@ import org.opendaylight.netconf.sal.rest.api.RestconfService;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
 import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -68,6 +72,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -86,6 +91,7 @@ import org.opendaylight.yangtools.yang.model.api.LeafListSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaNode;
@@ -134,6 +140,12 @@ public class RestconfImpl implements RestconfService {
 
     private static final YangInstanceIdentifier.AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER;
 
+    public static final CharSequence DATA_SUBSCR = "data-change-event-subscription";
+    private static final CharSequence CREATE_DATA_SUBSCR = "create-" + DATA_SUBSCR;
+
+    public static final CharSequence NOTIFICATION_STREAM = "notification-stream";
+    private static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM;
+
     static {
         try {
             final Date eventSubscriptionAugRevision = new SimpleDateFormat("yyyy-MM-dd").parse("2014-07-08");
@@ -378,7 +390,15 @@ public class RestconfImpl implements RestconfService {
             response = mountRpcServices.get().invokeRpc(type, payload.getData());
         } else {
             if (namespace.toString().equals(SAL_REMOTE_NAMESPACE)) {
-                response = invokeSalRemoteRpcSubscribeRPC(payload);
+                if (identifier.contains(CREATE_DATA_SUBSCR)) {
+                    response = invokeSalRemoteRpcSubscribeRPC(payload);
+                } else if (identifier.contains(CREATE_NOTIFICATION_STREAM)) {
+                    response = invokeSalRemoteRpcNotifiStrRPC(payload);
+                } else {
+                    final String msg = "Not supported operation";
+                    LOG.warn(msg);
+                    throw new RestconfDocumentedException(msg, ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED);
+                }
             } else {
                 response = this.broker.invokeRpc(type, payload.getData());
             }
@@ -468,9 +488,10 @@ public class RestconfImpl implements RestconfService {
         }
 
         final YangInstanceIdentifier pathIdentifier = ((YangInstanceIdentifier) pathValue);
-        String streamName = null;
+        String streamName = (String) CREATE_DATA_SUBSCR;
         if (!pathIdentifier.isEmpty()) {
-            final String fullRestconfIdentifier = this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null);
+            final String fullRestconfIdentifier = DATA_SUBSCR
+                    + this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null);
 
             LogicalDatastoreType datastore = parseEnumTypeParameter(value, LogicalDatastoreType.class, DATASTORE_PARAM_NAME);
             datastore = datastore == null ? DEFAULT_DATASTORE : datastore;
@@ -877,6 +898,64 @@ public class RestconfImpl implements RestconfService {
      */
     @Override
     public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
+        if (identifier.contains(DATA_SUBSCR)) {
+            return dataSubs(identifier, uriInfo);
+        } else if (identifier.contains(NOTIFICATION_STREAM)) {
+            return notifStream(identifier, uriInfo);
+        }
+        final String msg = "Bad type of notification of sal-remote";
+        LOG.warn(msg);
+        throw new RestconfDocumentedException(msg);
+    }
+
+    /**
+     * Register notification listener by stream name
+     *
+     * @param identifier
+     *            - stream name
+     * @param uriInfo
+     *            - uriInfo
+     * @return {@link Response}
+     */
+    private Response notifStream(final String identifier, final UriInfo uriInfo) {
+        final String streamName = Notificator.createStreamNameFromUri(identifier);
+        if (Strings.isNullOrEmpty(streamName)) {
+            throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+        }
+        final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+        if ((listeners == null) || listeners.isEmpty()) {
+            throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
+                    ErrorTag.UNKNOWN_ELEMENT);
+        }
+
+        for (final NotificationListenerAdapter listener : listeners) {
+            this.broker.registerToListenNotification(listener);
+        }
+
+        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+        int notificationPort = NOTIFICATION_PORT;
+        try {
+            final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
+            notificationPort = webSocketServerInstance.getPort();
+        } catch (final NullPointerException e) {
+            WebSocketServer.createInstance(NOTIFICATION_PORT);
+        }
+        final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
+        final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
+
+        return Response.status(Status.OK).location(uriToWebsocketServer).build();
+    }
+
+    /**
+     * Register data change listener by stream name
+     *
+     * @param identifier
+     *            - stream name
+     * @param uriInfo
+     *            - uri info
+     * @return {@link Response}
+     */
+    private Response dataSubs(final String identifier, final UriInfo uriInfo) {
         final String streamName = Notificator.createStreamNameFromUri(identifier);
         if (Strings.isNullOrEmpty(streamName)) {
             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
@@ -935,7 +1014,7 @@ public class RestconfImpl implements RestconfService {
     /**
      * Load parameter for subscribing to stream from input composite node
      *
-     * @param compNode
+     * @param value
      *            contains value
      * @return enum object if its string value is equal to {@code paramName}. In other cases null.
      */
@@ -1104,4 +1183,69 @@ public class RestconfImpl implements RestconfService {
 
         return streamNodeValues.build();
     }
+
+    /**
+     * Prepare stream for notification
+     *
+     * @param payload
+     *            - contains list of qnames of notifications
+     * @return - checked future object
+     */
+    private CheckedFuture<DOMRpcResult, DOMRpcException> invokeSalRemoteRpcNotifiStrRPC(
+            final NormalizedNodeContext payload) {
+        final ContainerNode data = (ContainerNode) payload.getData();
+        LeafSetNode leafSet = null;
+        String outputType = "XML";
+        for (final DataContainerChild<? extends PathArgument, ?> dataChild : data.getValue()) {
+            if (dataChild instanceof LeafSetNode) {
+                leafSet = (LeafSetNode) dataChild;
+            } else if (dataChild instanceof AugmentationNode) {
+                outputType = (String) (((AugmentationNode) dataChild).getValue()).iterator().next().getValue();
+            }
+        }
+
+        final Collection<LeafSetEntryNode> entryNodes = leafSet.getValue();
+        final List<SchemaPath> paths = new ArrayList<>();
+        String streamName = CREATE_NOTIFICATION_STREAM + "/";
+
+        final Iterator<LeafSetEntryNode> iterator = entryNodes.iterator();
+        while (iterator.hasNext()) {
+            final QName valueQName = QName.create((String) iterator.next().getValue());
+            final Module module = ControllerContext.getInstance()
+                    .findModuleByNamespace(valueQName.getModule().getNamespace());
+            Preconditions.checkNotNull(module, "Module for namespace " + valueQName.getModule().getNamespace()
+                    + " does not exist");
+            NotificationDefinition notifiDef = null;
+            for (final NotificationDefinition notification : module.getNotifications()) {
+                if (notification.getQName().equals(valueQName)) {
+                    notifiDef = notification;
+                    break;
+                }
+            }
+            final String moduleName = module.getName();
+            Preconditions.checkNotNull(notifiDef,
+                    "Notification " + valueQName + "doesn't exist in module " + moduleName);
+            paths.add(notifiDef.getPath());
+            streamName = streamName + moduleName + ":" + valueQName.getLocalName();
+            if (iterator.hasNext()) {
+                streamName = streamName + ",";
+            }
+        }
+
+        final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
+        final QName outputQname = QName.create(rpcQName, "output");
+        final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier");
+
+        final ContainerNode output = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(outputQname))
+                .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
+
+        if (!Notificator.existNotificationListenerFor(streamName)) {
+            Notificator.createNotificationListener(paths, streamName, outputType);
+        }
+
+        final DOMRpcResult defaultDOMRpcResult = new DefaultDOMRpcResult(output);
+
+        return Futures.immediateCheckedFuture(defaultDOMRpcResult);
+    }
 }
index ce188748fbdfe34e7a3ac4a997cd5aa0f6c43541..71006b829284b11a6bc0345ca0176cd456da3ad9 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.config.yang.md.sal.rest.connector.RestConnect
 import org.opendaylight.controller.config.yang.md.sal.rest.connector.Rpcs;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.opendaylight.controller.sal.core.api.Provider;
@@ -48,16 +49,16 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec
         BrokerFacade.getInstance().setContext(session);
         BrokerFacade.getInstance().setDomDataBroker( domDataBroker);
         final SchemaService schemaService = session.getService(SchemaService.class);
-        listenerRegistration = schemaService.registerSchemaContextListener(ControllerContext.getInstance());
+        this.listenerRegistration = schemaService.registerSchemaContextListener(ControllerContext.getInstance());
         BrokerFacade.getInstance().setRpcService(session.getService(DOMRpcService.class));
-
+        BrokerFacade.getInstance().setDomNotificationService(session.getService(DOMNotificationService.class));
 
         ControllerContext.getInstance().setSchemas(schemaService.getGlobalContext());
         ControllerContext.getInstance().setMountService(session.getService(DOMMountPointService.class));
 
-        webSocketServerThread = new Thread(WebSocketServer.createInstance(port.getValue().intValue()));
-        webSocketServerThread.setName("Web socket server on port " + port);
-        webSocketServerThread.start();
+        this.webSocketServerThread = new Thread(WebSocketServer.createInstance(this.port.getValue().intValue()));
+        this.webSocketServerThread.setName("Web socket server on port " + this.port);
+        this.webSocketServerThread.start();
     }
 
     @Override
@@ -68,12 +69,12 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec
     @Override
     public void close() {
 
-        if (listenerRegistration != null) {
-            listenerRegistration.close();
+        if (this.listenerRegistration != null) {
+            this.listenerRegistration.close();
         }
 
         WebSocketServer.destroyInstance();
-        webSocketServerThread.interrupt();
+        this.webSocketServerThread.interrupt();
     }
 
     @Override
@@ -81,27 +82,27 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec
         final Config config = new Config();
 
         final Get get = new Get();
-        get.setReceivedRequests(stats.getConfigGet());
-        get.setSuccessfulResponses(stats.getSuccessGetConfig());
-        get.setFailedResponses(stats.getFailureGetConfig());
+        get.setReceivedRequests(this.stats.getConfigGet());
+        get.setSuccessfulResponses(this.stats.getSuccessGetConfig());
+        get.setFailedResponses(this.stats.getFailureGetConfig());
         config.setGet(get);
 
         final Post post = new Post();
-        post.setReceivedRequests(stats.getConfigPost());
-        post.setSuccessfulResponses(stats.getSuccessPost());
-        post.setFailedResponses(stats.getFailurePost());
+        post.setReceivedRequests(this.stats.getConfigPost());
+        post.setSuccessfulResponses(this.stats.getSuccessPost());
+        post.setFailedResponses(this.stats.getFailurePost());
         config.setPost(post);
 
         final Put put = new Put();
-        put.setReceivedRequests(stats.getConfigPut());
-        put.setSuccessfulResponses(stats.getSuccessPut());
-        put.setFailedResponses(stats.getFailurePut());
+        put.setReceivedRequests(this.stats.getConfigPut());
+        put.setSuccessfulResponses(this.stats.getSuccessPut());
+        put.setFailedResponses(this.stats.getFailurePut());
         config.setPut(put);
 
         final Delete delete = new Delete();
-        delete.setReceivedRequests(stats.getConfigDelete());
-        delete.setSuccessfulResponses(stats.getSuccessDelete());
-        delete.setFailedResponses(stats.getFailureDelete());
+        delete.setReceivedRequests(this.stats.getConfigDelete());
+        delete.setSuccessfulResponses(this.stats.getSuccessDelete());
+        delete.setFailedResponses(this.stats.getFailureDelete());
         config.setDelete(delete);
 
         return config;
@@ -109,19 +110,19 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec
 
     @Override
     public Operational getOperational() {
-        final BigInteger opGet = stats.getOperationalGet();
+        final BigInteger opGet = this.stats.getOperationalGet();
         final Operational operational = new Operational();
         final Get get = new Get();
         get.setReceivedRequests(opGet);
-        get.setSuccessfulResponses(stats.getSuccessGetOperational());
-        get.setFailedResponses(stats.getFailureGetOperational());
+        get.setSuccessfulResponses(this.stats.getSuccessGetOperational());
+        get.setFailedResponses(this.stats.getFailureGetOperational());
         operational.setGet(get);
         return operational;
     }
 
     @Override
     public Rpcs getRpcs() {
-        final BigInteger rpcInvoke = stats.getRpc();
+        final BigInteger rpcInvoke = this.stats.getRpc();
         final Rpcs rpcs = new Rpcs();
         rpcs.setReceivedRequests(rpcInvoke);
         return rpcs;
index 51286725770a045244088329fa893b61c55b7f60..a8651c21638a93477a4f4ad933e141306ecdaf2f 100644 (file)
@@ -77,7 +77,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
     private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
 
-    private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
+    private static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
 
     private final YangInstanceIdentifier path;
     private ListenerRegistration<DOMDataChangeListener> registration;
@@ -96,12 +96,12 @@ public class ListenerAdapter implements DOMDataChangeListener {
      */
     ListenerAdapter(final YangInstanceIdentifier path, final String streamName) {
         Preconditions.checkNotNull(path);
-        Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
+        Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
         this.path = path;
         this.streamName = streamName;
-        eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
-        eventBusChangeRecorder = new EventBusChangeRecorder();
-        eventBus.register(eventBusChangeRecorder);
+        this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+        this.eventBusChangeRecorder = new EventBusChangeRecorder();
+        this.eventBus.register(this.eventBusChangeRecorder);
     }
 
     @Override
@@ -111,7 +111,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
             final String xml = prepareXmlFrom(change);
             final Event event = new Event(EventType.NOTIFY);
             event.setData(xml);
-            eventBus.post(event);
+            this.eventBus.post(event);
         }
     }
 
@@ -123,20 +123,20 @@ public class ListenerAdapter implements DOMDataChangeListener {
         public void recordCustomerChange(final Event event) {
             if (event.getType() == EventType.REGISTER) {
                 final Channel subscriber = event.getSubscriber();
-                if (!subscribers.contains(subscriber)) {
-                    subscribers.add(subscriber);
+                if (!ListenerAdapter.this.subscribers.contains(subscriber)) {
+                    ListenerAdapter.this.subscribers.add(subscriber);
                 }
             } else if (event.getType() == EventType.DEREGISTER) {
-                subscribers.remove(event.getSubscriber());
+                ListenerAdapter.this.subscribers.remove(event.getSubscriber());
                 Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
             } else if (event.getType() == EventType.NOTIFY) {
-                for (final Channel subscriber : subscribers) {
+                for (final Channel subscriber : ListenerAdapter.this.subscribers) {
                     if (subscriber.isActive()) {
                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
                     } else {
                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
-                        subscribers.remove(subscriber);
+                        ListenerAdapter.this.subscribers.remove(subscriber);
                     }
                 }
             }
@@ -167,7 +167,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
          * @return Channel
          */
         public Channel getSubscriber() {
-            return subscriber;
+            return this.subscriber;
         }
 
         /**
@@ -186,7 +186,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
          * @return String representation of event data.
          */
         public String getData() {
-            return data;
+            return this.data;
         }
 
         /**
@@ -204,7 +204,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
          * @return The type of the event.
          */
         public EventType getType() {
-            return type;
+            return this.type;
         }
     }
 
@@ -267,15 +267,15 @@ public class ListenerAdapter implements DOMDataChangeListener {
      *            Date
      * @return Data specified by RFC3339.
      */
-    private String toRFC3339(final Date d) {
-        return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2");
+    public static String toRFC3339(final Date d) {
+        return RFC3339_PATTERN.matcher(RFC3339.format(d)).replaceAll("$1:$2");
     }
 
     /**
      * Creates {@link Document} document.
      * @return {@link Document} document.
      */
-    private Document createDocument() {
+    public static Document createDocument() {
         final DocumentBuilder bob;
         try {
             bob = DBF.newDocumentBuilder();
@@ -326,7 +326,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
      */
     private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data, final Element element,
             final Operation operation) {
-        if (data == null || data.isEmpty()) {
+        if ((data == null) || data.isEmpty()) {
             return;
         }
         for (final YangInstanceIdentifier path : data) {
@@ -340,10 +340,10 @@ public class ListenerAdapter implements DOMDataChangeListener {
     private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set<Entry<YangInstanceIdentifier,
                 NormalizedNode<?,?>>> data, final Element element, final Operation operation, final SchemaContext
             schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
-        if (data == null || data.isEmpty()) {
+        if ((data == null) || data.isEmpty()) {
             return;
         }
-        for (Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
+        for (final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
             if (!ControllerContext.getInstance().isNodeMixin(entry.getKey())) {
                 final Node node = createCreatedChangedDataChangeEventElement(doc, entry, operation, schemaContext,
                         dataSchemaContextTree);
@@ -396,18 +396,19 @@ public class ListenerAdapter implements DOMDataChangeListener {
             final Element dataElement = doc.createElement("data");
             dataElement.appendChild(result);
             dataChangeEventElement.appendChild(dataElement);
-        } catch (IOException e) {
+        } catch (final IOException e) {
             LOG.error("Error in writer ", e);
-        } catch (XMLStreamException e) {
+        } catch (final XMLStreamException e) {
             LOG.error("Error processing stream", e);
         }
 
         return dataChangeEventElement;
     }
 
-    private static DOMResult writeNormalizedNode(final NormalizedNode<?,?> normalized, final
-        YangInstanceIdentifier path, final SchemaContext context, final DataSchemaContextTree dataSchemaContextTree) throws
-            IOException, XMLStreamException {
+    private static DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized,
+                                                 final YangInstanceIdentifier path, final SchemaContext context,
+                                                 final DataSchemaContextTree dataSchemaContextTree)
+            throws IOException, XMLStreamException {
         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
         final Document doc = XmlDocumentUtils.getDocument();
         final DOMResult result = new DOMResult(doc);
@@ -416,7 +417,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
         XMLStreamWriter writer = null;
         final SchemaPath nodePath;
 
-        if (normalized instanceof MapEntryNode || normalized instanceof UnkeyedListEntryNode) {
+        if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) {
             nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
         } else {
             nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
@@ -542,7 +543,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
      * @return Path pointed to data in data store.
      */
     public YangInstanceIdentifier getPath() {
-        return path;
+        return this.path;
     }
 
     /**
@@ -560,17 +561,17 @@ public class ListenerAdapter implements DOMDataChangeListener {
      * @return The name of the stream.
      */
     public String getStreamName() {
-        return streamName;
+        return this.streamName;
     }
 
     /**
      * Removes all subscribers and unregisters event bus change recorder form event bus.
      */
     public void close() throws Exception {
-        subscribers = new ConcurrentSet<>();
-        registration.close();
-        registration = null;
-        eventBus.unregister(eventBusChangeRecorder);
+        this.subscribers = new ConcurrentSet<>();
+        this.registration.close();
+        this.registration = null;
+        this.eventBus.unregister(this.eventBusChangeRecorder);
     }
 
     /**
@@ -579,7 +580,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
      * @return True if exist, false otherwise.
      */
     public boolean isListening() {
-        return registration == null ? false : true;
+        return this.registration == null ? false : true;
     }
 
     /**
@@ -595,7 +596,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
         }
         final Event event = new Event(EventType.REGISTER);
         event.setSubscriber(subscriber);
-        eventBus.post(event);
+        this.eventBus.post(event);
     }
 
     /**
@@ -608,7 +609,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
         final Event event = new Event(EventType.DEREGISTER);
         event.setSubscriber(subscriber);
-        eventBus.post(event);
+        this.eventBus.post(event);
     }
 
     /**
@@ -617,7 +618,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
      * @return True if exist at least one {@link Channel} subscriber, false otherwise.
      */
     public boolean hasSubscribers() {
-        return !subscribers.isEmpty();
+        return !this.subscribers.isEmpty();
     }
 
     /**
diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java
new file mode 100644 (file)
index 0000000..42f0996
--- /dev/null
@@ -0,0 +1,390 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.util.internal.ConcurrentSet;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMResult;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import org.json.JSONObject;
+import org.json.XML;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+/**
+ * {@link NotificationListenerAdapter} is responsible to track events on
+ * notifications.
+ *
+ */
+public class NotificationListenerAdapter implements DOMNotificationListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
+    private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
+
+    private final String streamName;
+    private ListenerRegistration<DOMNotificationListener> registration;
+    private Set<Channel> subscribers = new ConcurrentSet<>();
+    private final EventBus eventBus;
+    private final EventBusChangeRecorder eventBusChangeRecorder;
+
+    private final SchemaPath path;
+    private final String outputType;
+
+    /**
+     * Set path of listener and stream name, register event bus.
+     *
+     * @param path
+     *            - path of notification
+     * @param streamName
+     *            - stream name of listener
+     * @param outputType
+     *            - type of output on notification (JSON, XML)
+     */
+    NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
+        this.outputType = outputType;
+        Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
+        Preconditions.checkArgument(path != null);
+        this.path = path;
+        this.streamName = streamName;
+        this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+        this.eventBusChangeRecorder = new EventBusChangeRecorder();
+        this.eventBus.register(this.eventBusChangeRecorder);
+    }
+
+    @Override
+    public void onNotification(final DOMNotification notification) {
+        final String xml = prepareXmlFrom(notification);
+        final Event event = new Event(EventType.NOTIFY);
+        if (this.outputType.equals("JSON")) {
+            final JSONObject jsonObject = XML.toJSONObject(xml);
+            event.setData(jsonObject.toString());
+        } else {
+            event.setData(xml);
+        }
+        this.eventBus.post(event);
+    }
+
+    /**
+     * Checks if exists at least one {@link Channel} subscriber.
+     *
+     * @return True if exist at least one {@link Channel} subscriber, false
+     *         otherwise.
+     */
+    public boolean hasSubscribers() {
+        return !this.subscribers.isEmpty();
+    }
+
+    /**
+     * Reset lists, close registration and unregister bus event.
+     */
+    public void close() {
+        this.subscribers = new ConcurrentSet<>();
+        this.registration.close();
+        this.registration = null;
+        this.eventBus.unregister(this.eventBusChangeRecorder);
+    }
+
+    /**
+     * Get stream name of this listener
+     *
+     * @return {@link String}
+     */
+    public String getStreamName() {
+        return this.streamName;
+    }
+
+    /**
+     * Check if is this listener registered.
+     *
+     * @return - true if is registered, otherwise null
+     */
+    public boolean isListening() {
+        return this.registration == null ? false : true;
+    }
+
+    /**
+     * Get schema path of notification
+     *
+     * @return {@link SchemaPath}
+     */
+    public SchemaPath getSchemaPath() {
+        return this.path;
+    }
+
+    /**
+     * Set registration for close after closing connection and check if this
+     * listener is registered
+     *
+     * @param registration
+     *            - registered listener
+     */
+    public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
+        Preconditions.checkNotNull(registration);
+        this.registration = registration;
+    }
+
+    /**
+     * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
+     * subscriber to the event and post event into event bus.
+     *
+     * @param subscriber
+     *            Channel
+     */
+    public void addSubscriber(final Channel subscriber) {
+        if (!subscriber.isActive()) {
+            LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
+        }
+        final Event event = new Event(EventType.REGISTER);
+        event.setSubscriber(subscriber);
+        this.eventBus.post(event);
+    }
+
+    /**
+     * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
+     * subscriber to the event and posts event into event bus.
+     *
+     * @param subscriber
+     */
+    public void removeSubscriber(final Channel subscriber) {
+        LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
+        final Event event = new Event(EventType.DEREGISTER);
+        event.setSubscriber(subscriber);
+        this.eventBus.post(event);
+    }
+
+    private String prepareXmlFrom(final DOMNotification notification) {
+        final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
+        final Document doc = ListenerAdapter.createDocument();
+        final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
+                "notification");
+        doc.appendChild(notificationElement);
+
+        final Element eventTimeElement = doc.createElement("eventTime");
+        eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
+        notificationElement.appendChild(eventTimeElement);
+
+        final Element notificationEventElement = doc.createElementNS(
+                "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
+        addValuesToNotificationEventElement(doc, notificationEventElement, notification, schemaContext);
+        notificationElement.appendChild(notificationEventElement);
+
+        try {
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
+            final Transformer transformer = FACTORY.newTransformer();
+            transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
+            transformer.setOutputProperty(OutputKeys.METHOD, "xml");
+            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+            transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
+            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
+            transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
+            final byte[] charData = out.toByteArray();
+            return new String(charData, "UTF-8");
+        } catch (TransformerException | UnsupportedEncodingException e) {
+            final String msg = "Error during transformation of Document into String";
+            LOG.error(msg, e);
+            return msg;
+        }
+    }
+
+    private void addValuesToNotificationEventElement(final Document doc, final Element element,
+            final DOMNotification notification, final SchemaContext schemaContext) {
+        if (notification == null) {
+            return;
+        }
+
+        final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
+                .getBody();
+        try {
+            final DOMResult domResult = writeNormalizedNode(body,
+                    YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
+            final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
+            final Element dataElement = doc.createElement("notification");
+            dataElement.appendChild(result);
+            element.appendChild(dataElement);
+        } catch (final IOException e) {
+            LOG.error("Error in writer ", e);
+        } catch (final XMLStreamException e) {
+            LOG.error("Error processing stream", e);
+        }
+    }
+
+    private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
+            final SchemaContext context) throws IOException, XMLStreamException {
+        final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
+        final Document doc = XmlDocumentUtils.getDocument();
+        final DOMResult result = new DOMResult(doc);
+        NormalizedNodeWriter normalizedNodeWriter = null;
+        NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
+        XMLStreamWriter writer = null;
+
+        try {
+            writer = XML_FACTORY.createXMLStreamWriter(result);
+            normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
+                    this.getSchemaPath());
+            normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
+
+            normalizedNodeWriter.write(normalized);
+
+            normalizedNodeWriter.flush();
+        } finally {
+            if (normalizedNodeWriter != null) {
+                normalizedNodeWriter.close();
+            }
+            if (normalizedNodeStreamWriter != null) {
+                normalizedNodeStreamWriter.close();
+            }
+            if (writer != null) {
+                writer.close();
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Tracks events of data change by customer.
+     */
+    private final class EventBusChangeRecorder {
+        @Subscribe
+        public void recordCustomerChange(final Event event) {
+            if (event.getType() == EventType.REGISTER) {
+                final Channel subscriber = event.getSubscriber();
+                if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
+                    NotificationListenerAdapter.this.subscribers.add(subscriber);
+                }
+            } else if (event.getType() == EventType.DEREGISTER) {
+                NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
+                Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
+            } else if (event.getType() == EventType.NOTIFY) {
+                for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
+                    if (subscriber.isActive()) {
+                        LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
+                        subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
+                    } else {
+                        LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
+                        NotificationListenerAdapter.this.subscribers.remove(subscriber);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Represents event of specific {@link EventType} type, holds data and
+     * {@link Channel} subscriber.
+     */
+    private final class Event {
+        private final EventType type;
+        private Channel subscriber;
+        private String data;
+
+        /**
+         * Creates new event specified by {@link EventType} type.
+         *
+         * @param type
+         *            EventType
+         */
+        public Event(final EventType type) {
+            this.type = type;
+        }
+
+        /**
+         * Gets the {@link Channel} subscriber.
+         *
+         * @return Channel
+         */
+        public Channel getSubscriber() {
+            return this.subscriber;
+        }
+
+        /**
+         * Sets subscriber for event.
+         *
+         * @param subscriber
+         *            Channel
+         */
+        public void setSubscriber(final Channel subscriber) {
+            this.subscriber = subscriber;
+        }
+
+        /**
+         * Gets event String.
+         *
+         * @return String representation of event data.
+         */
+        public String getData() {
+            return this.data;
+        }
+
+        /**
+         * Sets event data.
+         *
+         * @param data
+         *            String.
+         */
+        public void setData(final String data) {
+            this.data = data;
+        }
+
+        /**
+         * Gets event type.
+         *
+         * @return The type of the event.
+         */
+        public EventType getType() {
+            return this.type;
+        }
+    }
+
+    /**
+     * Type of the event.
+     */
+    private enum EventType {
+        REGISTER, DEREGISTER, NOTIFY;
+    }
+}
index 9537732133e011180f0bf054cead560d06f9b6dc..0bd38652b766634f4b28088a4210d0c005beb2e7 100644 (file)
@@ -7,12 +7,17 @@
  */
 package org.opendaylight.netconf.sal.streams.listeners;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * {@link Notificator} is responsible to create, remove and find
@@ -21,6 +26,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 public class Notificator {
 
     private static Map<String, ListenerAdapter> listenersByStreamName = new ConcurrentHashMap<>();
+    private static Map<String, List<NotificationListenerAdapter>> notificationListenersByStreamName = new ConcurrentHashMap<>();
+
+    private static final Logger LOG = LoggerFactory.getLogger(Notificator.class);
     private static final Lock lock = new ReentrantLock();
 
     private Notificator() {
@@ -40,7 +48,7 @@ public class Notificator {
      *            The name of the stream.
      * @return {@link ListenerAdapter} specified by stream name.
      */
-    public static ListenerAdapter getListenerFor(String streamName) {
+    public static ListenerAdapter getListenerFor(final String streamName) {
         return listenersByStreamName.get(streamName);
     }
 
@@ -50,7 +58,7 @@ public class Notificator {
      * @param streamName
      * @return True if the listener exist, false otherwise.
      */
-    public static boolean existListenerFor(String streamName) {
+    public static boolean existListenerFor(final String streamName) {
         return listenersByStreamName.containsKey(streamName);
     }
 
@@ -63,8 +71,8 @@ public class Notificator {
      *            The name of the stream.
      * @return New {@link ListenerAdapter} listener from {@link YangInstanceIdentifier} path and stream name.
      */
-    public static ListenerAdapter createListener(YangInstanceIdentifier path, String streamName) {
-        ListenerAdapter listener = new ListenerAdapter(path, streamName);
+    public static ListenerAdapter createListener(final YangInstanceIdentifier path, final String streamName) {
+        final ListenerAdapter listener = new ListenerAdapter(path, streamName);
         try {
             lock.lock();
             listenersByStreamName.put(streamName, listener);
@@ -82,7 +90,7 @@ public class Notificator {
      *            URI for creation stream name.
      * @return String representation of stream name.
      */
-    public static String createStreamNameFromUri(String uri) {
+    public static String createStreamNameFromUri(final String uri) {
         if (uri == null) {
             return null;
         }
@@ -100,10 +108,11 @@ public class Notificator {
      * Removes all listeners.
      */
     public static void removeAllListeners() {
-        for (ListenerAdapter listener : listenersByStreamName.values()) {
+        for (final ListenerAdapter listener : listenersByStreamName.values()) {
             try {
                 listener.close();
-            } catch (Exception e) {
+            } catch (final Exception e) {
+                LOG.error("Failed to close listener", e);
             }
         }
         try {
@@ -120,7 +129,7 @@ public class Notificator {
      * @param listener
      *            ListenerAdapter
      */
-    public static void removeListenerIfNoSubscriberExists(ListenerAdapter listener) {
+    public static void removeListenerIfNoSubscriberExists(final ListenerAdapter listener) {
         if (!listener.hasSubscribers()) {
             deleteListener(listener);
         }
@@ -132,11 +141,12 @@ public class Notificator {
      * @param listener
      *            ListenerAdapter
      */
-    private static void deleteListener(ListenerAdapter listener) {
+    private static void deleteListener(final ListenerAdapter listener) {
         if (listener != null) {
             try {
                 listener.close();
-            } catch (Exception e) {
+            } catch (final Exception e) {
+                LOG.error("Failed to close listener", e);
             }
             try {
                 lock.lock();
@@ -147,4 +157,57 @@ public class Notificator {
         }
     }
 
+    /**
+     * Check if the listener specified by qnames of request exist.
+     *
+     * @param streamName
+     *            - name of stream
+     * @return True if the listener exist, false otherwise.
+     */
+    public static boolean existNotificationListenerFor(final String streamName) {
+        return notificationListenersByStreamName.containsKey(streamName);
+
+    }
+
+    public static List<NotificationListenerAdapter> createNotificationListener(final List<SchemaPath> paths,
+            final String streamName, final String outputType) {
+        final List<NotificationListenerAdapter> listListeners = new ArrayList<>();
+        for (final SchemaPath path : paths) {
+            final NotificationListenerAdapter listener = new NotificationListenerAdapter(path, streamName, outputType);
+            listListeners.add(listener);
+        }
+        try {
+            lock.lock();
+            notificationListenersByStreamName.put(streamName, listListeners);
+        } finally {
+            lock.unlock();
+        }
+        return listListeners;
+    }
+
+    public static void removeNotificationListenerIfNoSubscriberExists(final NotificationListenerAdapter listener) {
+        if (!listener.hasSubscribers()) {
+            deleteNotificationListener(listener);
+        }
+    }
+
+    private static void deleteNotificationListener(final NotificationListenerAdapter listener) {
+        if (listener != null) {
+            try {
+                listener.close();
+            } catch (final Exception e) {
+                LOG.error("Failed to close listener", e);
+            }
+            try {
+                lock.lock();
+                notificationListenersByStreamName.remove(listener.getStreamName());
+            } finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    public static List<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
+        return notificationListenersByStreamName.get(streamName);
+    }
 }
index c2b4edc6214d8f9042eceb7c8e99d936e877c2a3..5af22bfafc7d27bcd565f62ad722242e04c513a9 100644 (file)
@@ -8,9 +8,9 @@
 
 package org.opendaylight.netconf.sal.streams.websockets;
 
+import static io.netty.handler.codec.http.HttpHeaders.Names.HOST;
 import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
 import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static io.netty.handler.codec.http.HttpHeaders.Names.HOST;
 import static io.netty.handler.codec.http.HttpMethod.GET;
 import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
 import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
@@ -35,7 +35,10 @@ import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
 import io.netty.util.CharsetUtil;
 import java.io.IOException;
+import java.util.List;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfImpl;
 import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,24 +83,37 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
             return;
         }
 
-        String streamName = Notificator.createStreamNameFromUri(req.getUri());
-        ListenerAdapter listener = Notificator.getListenerFor(streamName);
-        if (listener != null) {
-            listener.addSubscriber(ctx.channel());
-            logger.debug("Subscriber successfully registered.");
-        } else {
-            logger.error("Listener for stream with name '{}' was not found.", streamName);
-            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+        final String streamName = Notificator.createStreamNameFromUri(req.getUri());
+        if (streamName.contains(RestconfImpl.DATA_SUBSCR)) {
+            final ListenerAdapter listener = Notificator.getListenerFor(streamName);
+            if (listener != null) {
+                listener.addSubscriber(ctx.channel());
+                logger.debug("Subscriber successfully registered.");
+            } else {
+                logger.error("Listener for stream with name '{}' was not found.", streamName);
+                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+            }
+        } else if (streamName.contains(RestconfImpl.NOTIFICATION_STREAM)) {
+            final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+            if (!listeners.isEmpty() && (listeners != null)) {
+                for (final NotificationListenerAdapter listener : listeners) {
+                    listener.addSubscriber(ctx.channel());
+                    logger.debug("Subscriber successfully registered.");
+                }
+            } else {
+                logger.error("Listener for stream with name '{}' was not found.", streamName);
+                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+            }
         }
 
         // Handshake
-        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req),
+        final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req),
                 null, false);
-        handshaker = wsFactory.newHandshaker(req);
-        if (handshaker == null) {
+        this.handshaker = wsFactory.newHandshaker(req);
+        if (this.handshaker == null) {
             WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
         } else {
-            handshaker.handshake(ctx.channel(), req);
+            this.handshaker.handshake(ctx.channel(), req);
         }
 
     }
@@ -116,15 +132,15 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
             final FullHttpResponse res) {
         // Generate an error page if response getStatus code is not OK (200).
         if (res.getStatus().code() != 200) {
-            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
+            final ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
             res.content().writeBytes(buf);
             buf.release();
             setContentLength(res, res.content().readableBytes());
         }
 
         // Send the response and close the connection if necessary.
-        ChannelFuture f = ctx.channel().writeAndFlush(res);
-        if (!isKeepAlive(req) || res.getStatus().code() != 200) {
+        final ChannelFuture f = ctx.channel().writeAndFlush(res);
+        if (!isKeepAlive(req) || (res.getStatus().code() != 200)) {
             f.addListener(ChannelFutureListener.CLOSE);
         }
     }
@@ -139,14 +155,23 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
      */
     private void handleWebSocketFrame(final ChannelHandlerContext ctx, final WebSocketFrame frame) throws IOException {
         if (frame instanceof CloseWebSocketFrame) {
-            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
-            String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText());
-            ListenerAdapter listener = Notificator.getListenerFor(streamName);
-            if (listener != null) {
-                listener.removeSubscriber(ctx.channel());
-                logger.debug("Subscriber successfully registered.");
+            this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
+            final String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText());
+            if (streamName.contains(RestconfImpl.DATA_SUBSCR)) {
+                final ListenerAdapter listener = Notificator.getListenerFor(streamName);
+                if (listener != null) {
+                    listener.removeSubscriber(ctx.channel());
+                    logger.debug("Subscriber successfully registered.");
+                }
+                Notificator.removeListenerIfNoSubscriberExists(listener);
+            } else if (streamName.contains(RestconfImpl.NOTIFICATION_STREAM)) {
+                final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+                if (!listeners.isEmpty() && (listeners != null)) {
+                    for (final NotificationListenerAdapter listener : listeners) {
+                        listener.removeSubscriber(ctx.channel());
+                    }
+                }
             }
-            Notificator.removeListenerIfNoSubscriberExists(listener);
             return;
         } else if (frame instanceof PingWebSocketFrame) {
             ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
@@ -156,7 +181,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
 
     @Override
     public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
-        if (cause instanceof java.nio.channels.ClosedChannelException == false) {
+        if ((cause instanceof java.nio.channels.ClosedChannelException) == false) {
             // cause.printStackTrace();
         }
         ctx.close();
index cc1d26fb979ddfd708c1651f86eaadf941bcdf94..8f5088f8bd0c8bf346bb9d131d43f4d5042611d6 100644 (file)
@@ -7,7 +7,7 @@ module sal-remote-augment {
     import sal-remote {prefix salrmt; revision-date "2014-01-14";}
 
     description
-        "Added input parameters to rpc create-data-change-event-subscription";
+        "Added input parameters to rpc create-data-change-event-subscription and to create-notification-stream";
 
     revision "2014-07-08" {
     }
@@ -28,4 +28,15 @@ module sal-remote-augment {
         }
     }
 
+    augment "/salrmt:create-notification-stream/salrmt:input" {
+        leaf notification-output-type {
+            type enumeration {
+                enum JSON;
+                enum XML;
+            }
+            default "XML";
+            description "Input parameter which type of output will be parsed on notification";
+        }
+    }
+
 }
index 43b3ca7bec977c28e15f6482d28891dfc5d6756e..9fbf9007e3f90e6cfba0566e969c8ab97770f3e0 100644 (file)
@@ -15,11 +15,13 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import java.util.concurrent.Future;
@@ -39,6 +41,7 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
@@ -48,6 +51,7 @@ import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfError;
 import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -68,6 +72,9 @@ public class BrokerFacadeTest {
     @Mock
     DOMDataBroker domDataBroker;
 
+    @Mock
+    DOMNotificationService domNotification;
+
     @Mock
     ConsumerSession context;
 
@@ -102,6 +109,7 @@ public class BrokerFacadeTest {
         MockitoAnnotations.initMocks(this);
         // TODO it is started before every test method
         brokerFacade.setDomDataBroker(domDataBroker);
+        brokerFacade.setDomNotificationService(domNotification);
         brokerFacade.setRpcService(mockRpcService);
         brokerFacade.setContext(context);
         when(domDataBroker.newReadOnlyTransaction()).thenReturn(rTransaction);
@@ -109,7 +117,6 @@ public class BrokerFacadeTest {
         when(domDataBroker.newReadWriteTransaction()).thenReturn(rwTransaction);
 
         ControllerContext.getInstance().setSchemas(TestUtils.loadSchemaContext("/full-versions/test-module"));
-
     }
 
     private CheckedFuture<Optional<NormalizedNode<?, ?>>,ReadFailedException> wrapDummyNode(final NormalizedNode<?, ?> dummyNode) {
@@ -267,6 +274,38 @@ public class BrokerFacadeTest {
 
         brokerFacade.registerToListenDataChanges(LogicalDatastoreType.CONFIGURATION, DataChangeScope.BASE, listener);
         verifyNoMoreInteractions(domDataBroker);
+    }
 
+    /**
+     * Create, register, close and remove notification listener.
+     */
+    @Test
+    public void testRegisterToListenNotificationChanges() {
+        // create test notification listener
+        final String identifier = "create-notification-stream/toaster:toastDone";
+        final SchemaPath path = SchemaPath.create(true,
+                QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toastDone"));
+        Notificator.createNotificationListener(Lists.newArrayList(path), identifier, "XML");
+        final NotificationListenerAdapter listener = Notificator.getNotificationListenerFor(identifier).get(0);
+
+        // mock registration
+        final ListenerRegistration<NotificationListenerAdapter> registration = mock(ListenerRegistration.class);
+        when(domNotification.registerNotificationListener(listener, listener.getSchemaPath()))
+                .thenReturn(registration);
+
+        // test to register listener for the first time
+        brokerFacade.registerToListenNotification(listener);
+        assertEquals("Registration was not successful", true, listener.isListening());
+
+        // try to register for the second time
+        brokerFacade.registerToListenNotification(listener);
+        assertEquals("Registration was not successful", true, listener.isListening());
+
+        // registrations should be invoked only once
+        verify(domNotification, times(1)).registerNotificationListener(listener, listener.getSchemaPath());
+
+        // close and remove test notification listener
+        listener.close();
+        Notificator.removeNotificationListenerIfNoSubscriberExists(listener);
     }
 }
index 1c57effe4590a7bd2760907a100d255b687faf82..7c4b350c6cf622a3259dfe43b0d5c8d051991591 100644 (file)
@@ -13,20 +13,26 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+
 import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
 import java.io.FileNotFoundException;
+import java.net.URI;
 import java.text.ParseException;
 import java.util.Set;
 import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
@@ -35,10 +41,16 @@ import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
 import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
 import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfImpl;
+import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
@@ -60,9 +72,8 @@ public class RestconfImplTest {
         final Set<Module> allModules = schemaContext.getModules();
         assertNotNull(allModules);
 
-        controllerContext = spy(ControllerContext.getInstance());
+        controllerContext = ControllerContext.getInstance();
         controllerContext.setSchemas(schemaContext);
-
     }
 
     @Before
@@ -105,4 +116,73 @@ public class RestconfImplTest {
         this.restconfImpl.invokeRpc("ietf-netconf", ctx, uriInfo);
         verify(rpcService, times(2)).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
     }
+
+    /**
+     * Create notification stream for toaster module
+     */
+    @Test
+    public void createNotificationStreamTest() {
+        final NormalizedNodeContext payload = mock(NormalizedNodeContext.class);
+        final InstanceIdentifierContext iiCtx = mock(InstanceIdentifierContext.class);
+        doReturn(iiCtx).when(payload).getInstanceIdentifierContext();
+
+        final SchemaNode schemaNode = mock(SchemaNode.class,
+                Mockito.withSettings().extraInterfaces(RpcDefinition.class));
+        doReturn(schemaNode).when(iiCtx).getSchemaNode();
+        doReturn(mock(SchemaPath.class)).when(schemaNode).getPath();
+
+        doReturn(QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote",
+                "2014-01-14", "create-notification-stream")).when(schemaNode).getQName();
+        doReturn(null).when(iiCtx).getMountPoint();
+
+        final Set<DataContainerChild<?, ?>> children = Sets.newHashSet();
+        final DataContainerChild<?, ?> child = mock(DataContainerChild.class,
+                Mockito.withSettings().extraInterfaces(LeafSetNode.class));
+
+        final LeafSetEntryNode entryNode = mock(LeafSetEntryNode.class);
+        when(entryNode.getValue()).thenReturn("(http://netconfcentral.org/ns/toaster?revision=2009-11-20)toastDone");
+        when(((LeafSetNode) child).getValue()).thenReturn(Sets.newHashSet(entryNode));
+        children.add(child);
+
+        final NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class,
+                Mockito.withSettings().extraInterfaces(ContainerNode.class));
+        doReturn(normalizedNode).when(payload).getData();
+        doReturn(children).when(normalizedNode).getValue();
+
+        // register notification
+        final NormalizedNodeContext context = this.restconfImpl
+                .invokeRpc("sal-remote:create-notification-stream", payload, null);
+        assertNotNull(context);
+    }
+
+    /**
+     * Subscribe for notification stream of toaster module
+     */
+    @Test
+    public void subscribeToNotificationStreamTest() throws Exception {
+        final String identifier = "create-notification-stream/toaster:toastDone";
+
+        // register test notification stream
+        final SchemaPath path = SchemaPath.create(
+                true, QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toastDone"));
+        Notificator.createNotificationListener(Lists.newArrayList(path), identifier, "XML");
+
+        final UriInfo uriInfo = mock(UriInfo.class);
+        final UriBuilder uriBuilder = mock(UriBuilder.class);
+        when(uriBuilder.port(8181)).thenReturn(uriBuilder);
+        when(uriBuilder.replacePath(identifier)).thenReturn(uriBuilder);
+        when(uriBuilder.build()).thenReturn(new URI(""));
+        when(uriBuilder.scheme("ws")).thenReturn(uriBuilder);
+        when(uriInfo.getAbsolutePathBuilder()).thenReturn(uriBuilder);
+
+        final BrokerFacade brokerFacade = mock(BrokerFacade.class);
+        this.restconfImpl.setBroker(brokerFacade);
+
+        // subscribe to stream and verify response
+        final Response response = this.restconfImpl.subscribeToStream(identifier, uriInfo);
+        assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+
+        // remove test notification stream
+        Notificator.removeAllListeners();
+    }
 }