Bug 6949 / Bug 6950 - Implementation of start-time and stop-time 41/48141/6
authorJakub Toth <jatoth@cisco.com>
Fri, 14 Oct 2016 08:43:36 +0000 (10:43 +0200)
committerJakub Toth <jatoth@cisco.com>
Sat, 12 Nov 2016 01:41:57 +0000 (01:41 +0000)
query parameters

  * added and fixed tests
  * added yang notifications to latest restconf draft impl

Change-Id: Ie860b568c45eab7325c4a3b6284a75541b5433db
Signed-off-by: Jakub Toth <jatoth@cisco.com>
17 files changed:
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfImpl.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/RestConnectorProvider.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/common/wrapper/services/ServicesWrapperImpl.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/handlers/NotificationServiceHandler.java [new file with mode: 0644]
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/services/api/RestconfStreamsSubscriptionService.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/services/impl/RestconfInvokeOperationsServiceImpl.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/services/impl/RestconfStreamsSubscriptionServiceImpl.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/CreateStreamUtil.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/RestconfStreamsConstants.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/SubscribeToStreamUtil.java
restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplNotificationSubscribingTest.java [new file with mode: 0644]
restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplTest.java
restconf/sal-rest-connector/src/test/java/org/opendaylight/restconf/restful/services/impl/RestconfStreamsSubscriptionServiceImplTest.java
restconf/sal-rest-connector/src/test/java/org/opendaylight/restconf/restful/utils/CreateStreamUtilTest.java
restconf/sal-rest-connector/src/test/resources/notifications/subscribe-to-notification.yang [new file with mode: 0644]

index 7a5d521331fdc4c531edd1d3a3920640145d0f07..a7e2161702ed88ab184c6d1f3bd5b8a087cef264 100644 (file)
@@ -22,6 +22,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -33,6 +34,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CountDownLatch;
@@ -62,6 +64,7 @@ import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
 import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.QNameModule;
@@ -1046,11 +1049,41 @@ public class RestconfImpl implements RestconfService {
      */
     @Override
     public NormalizedNodeContext subscribeToStream(final String identifier, final UriInfo uriInfo) {
+        boolean startTime_used = false;
+        boolean stopTime_used = false;
+        Date start = null;
+        Date stop = null;
+
+        for (final Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
+            switch (entry.getKey()) {
+                case "start-time":
+                    if (!startTime_used) {
+                        startTime_used = true;
+                        start = parseDateFromQueryParam(entry);
+                    } else {
+                        throw new RestconfDocumentedException("Start-time parameter can be used only once.");
+                    }
+                    break;
+                case "stop-time":
+                    if (!stopTime_used) {
+                        stopTime_used = true;
+                        stop = parseDateFromQueryParam(entry);
+                    } else {
+                        throw new RestconfDocumentedException("Stop-time parameter can be used only once.");
+                    }
+                    break;
+                default:
+                    throw new RestconfDocumentedException("Bad parameter used with notifications: " + entry.getKey());
+            }
+        }
+        if(!startTime_used && stopTime_used){
+            throw new RestconfDocumentedException("Stop-time parameter has to be used with start-time parameter.");
+        }
         URI response = null;
         if (identifier.contains(DATA_SUBSCR)) {
-            response = dataSubs(identifier, uriInfo);
+            response = dataSubs(identifier, uriInfo, start, stop);
         } else if (identifier.contains(NOTIFICATION_STREAM)) {
-            response = notifStream(identifier, uriInfo);
+            response = notifStream(identifier, uriInfo, start, stop);
         }
 
         if(response != null){
@@ -1073,6 +1106,32 @@ public class RestconfImpl implements RestconfService {
         throw new RestconfDocumentedException(msg);
     }
 
+    private Date parseDateFromQueryParam(final Entry<String, List<String>> entry) {
+        final DateAndTime event = new DateAndTime(entry.getValue().iterator().next());
+        String numOf_ms = "";
+        final String value = event.getValue();
+        if (value.contains(".")) {
+            numOf_ms = numOf_ms + ".";
+            final int lastChar = value.contains("Z") ? value.indexOf("Z") : (value.contains("+") ? value.indexOf("+")
+                    : (value.contains("-") ? value.indexOf("-") : value.length()));
+            for (int i = 0; i < (lastChar - value.indexOf(".") - 1); i++) {
+                numOf_ms = numOf_ms + "S";
+            }
+        }
+        String zone = "";
+        if (!value.contains("Z")) {
+            zone = zone + "XXX";
+        }
+        final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" + numOf_ms + zone);
+
+        try {
+            return dateFormatter.parse(value.contains("Z") ? value.replace('T', ' ').substring(0, value.indexOf("Z"))
+                    : value.replace('T', ' '));
+        } catch (final ParseException e) {
+            throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e);
+        }
+    }
+
     /**
      * @return {@link InstanceIdentifierContext} of location leaf for
      *         notification
@@ -1098,9 +1157,13 @@ public class RestconfImpl implements RestconfService {
      *            - stream name
      * @param uriInfo
      *            - uriInfo
+     * @param stop
+     *            - stop-time of getting notification
+     * @param start
+     *            - start-time of getting notification
      * @return {@link Response}
      */
-    private URI notifStream(final String identifier, final UriInfo uriInfo) {
+    private URI notifStream(final String identifier, final UriInfo uriInfo, final Date start, final Date stop) {
         final String streamName = Notificator.createStreamNameFromUri(identifier);
         if (Strings.isNullOrEmpty(streamName)) {
             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
@@ -1113,6 +1176,7 @@ public class RestconfImpl implements RestconfService {
 
         for (final NotificationListenerAdapter listener : listeners) {
             this.broker.registerToListenNotification(listener);
+            listener.setTime(start, stop);
         }
 
         final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
@@ -1136,9 +1200,13 @@ public class RestconfImpl implements RestconfService {
      *            - stream name
      * @param uriInfo
      *            - uri info
+     * @param stop
+     *            - start-time of getting notification
+     * @param start
+     *            - stop-time of getting notification
      * @return {@link Response}
      */
-    private URI dataSubs(final String identifier, final UriInfo uriInfo) {
+    private URI dataSubs(final String identifier, final UriInfo uriInfo, final Date start, final Date stop) {
         final String streamName = Notificator.createStreamNameFromUri(identifier);
         if (Strings.isNullOrEmpty(streamName)) {
             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
@@ -1148,6 +1216,7 @@ public class RestconfImpl implements RestconfService {
         if (listener == null) {
             throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
         }
+        listener.setTimer(start, stop);
 
         final Map<String, String> paramToValues = resolveValuesFromUri(identifier);
         final LogicalDatastoreType datastore = parserURIEnumParameter(LogicalDatastoreType.class,
index b2cdc052af0387b23ae44f57aa7cb852d41ccb62..0f784d11b4837e1db0b488cceb92b51587fdcac5 100644 (file)
@@ -47,6 +47,7 @@ import org.json.XML;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -89,6 +90,8 @@ public class ListenerAdapter implements DOMDataChangeListener {
     private final EventBus eventBus;
     private final EventBusChangeRecorder eventBusChangeRecorder;
     private final NotificationOutputType outputType;
+    private Date start = null;
+    private Date stop = null;
 
     /**
      * Creates new {@link ListenerAdapter} listener specified by path and stream
@@ -115,6 +118,29 @@ public class ListenerAdapter implements DOMDataChangeListener {
 
     @Override
     public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+        final Date now = new Date();
+        if (this.stop != null) {
+            if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
+                prepareAndPostData(change);
+            }
+            if (this.stop.compareTo(now) < 0) {
+                try {
+                    this.close();
+                } catch (final Exception e) {
+                    throw new RestconfDocumentedException("Problem with unregister listener." + e);
+                }
+            }
+        } else if (this.start != null) {
+            if (this.start.compareTo(now) < 0) {
+                this.start = null;
+                prepareAndPostData(change);
+            }
+        } else {
+            prepareAndPostData(change);
+        }
+    }
+
+    private void prepareAndPostData(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
         if (!change.getCreatedData().isEmpty() || !change.getUpdatedData().isEmpty()
                 || !change.getRemovedPaths().isEmpty()) {
             final String xml = prepareXmlFrom(change);
@@ -668,4 +694,17 @@ public class ListenerAdapter implements DOMDataChangeListener {
         }
     }
 
+    /**
+     * Set query parameters for listener
+     *
+     * @param start
+     *            - start-time of getting notification
+     * @param stop
+     *            - stop-time of getting notification
+     */
+    public void setTimer(final Date start, final Date stop) {
+        this.start = start;
+        this.stop = stop;
+    }
+
 }
index 093f529acf8c3eb6880c4e9870f96a9dd4df8a7c..303bfb30b5a0519dcce3ebfdb41a903380f35445 100644 (file)
@@ -38,6 +38,7 @@ import org.json.XML;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
@@ -74,6 +75,8 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
 
     private final SchemaPath path;
     private final String outputType;
+    private Date start = null;
+    private Date stop = null;
 
     /**
      * Set path of listener and stream name, register event bus.
@@ -98,6 +101,32 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
 
     @Override
     public void onNotification(final DOMNotification notification) {
+        final Date now = new Date();
+        if (this.stop != null) {
+            if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
+                prepareAndPostData(notification);
+            }
+            if (this.stop.compareTo(now) < 0) {
+                try {
+                    this.close();
+                } catch (final Exception e) {
+                    throw new RestconfDocumentedException("Problem with unregister listener." + e);
+                }
+            }
+        } else if (this.start != null) {
+            if (this.start.compareTo(now) < 0) {
+                this.start = null;
+                prepareAndPostData(notification);
+            }
+        } else {
+            prepareAndPostData(notification);
+        }
+    }
+
+    /**
+     * @param notification
+     */
+    private void prepareAndPostData(final DOMNotification notification) {
         final String xml = prepareXmlFrom(notification);
         final Event event = new Event(EventType.NOTIFY);
         if (this.outputType.equals("JSON")) {
@@ -386,4 +415,17 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
     private enum EventType {
         REGISTER, DEREGISTER, NOTIFY
     }
+
+    /**
+     * Set query parameters for listener
+     *
+     * @param start
+     *            - start-time of getting notification
+     * @param stop
+     *            - stop-time of getting notification
+     */
+    public void setTime(final Date start, final Date stop) {
+        this.start = start;
+        this.stop = stop;
+    }
 }
index 846320049d4f2bbb46e47698f8cd6967e395d7f3..6f5964a626f2feaca1b839e4a1a9c336bb686764 100644 (file)
@@ -16,6 +16,7 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.opendaylight.controller.sal.core.api.Provider;
@@ -25,6 +26,7 @@ import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
 import org.opendaylight.restconf.common.wrapper.services.ServicesWrapperImpl;
 import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
 import org.opendaylight.restconf.handlers.DOMMountPointServiceHandler;
+import org.opendaylight.restconf.handlers.NotificationServiceHandler;
 import org.opendaylight.restconf.handlers.RpcServiceHandler;
 import org.opendaylight.restconf.handlers.SchemaContextHandler;
 import org.opendaylight.restconf.handlers.TransactionChainHandler;
@@ -82,8 +84,13 @@ public class RestConnectorProvider implements Provider, RestConnector, AutoClose
         final DOMRpcService rpcService = session.getService(DOMRpcService.class);
         final RpcServiceHandler rpcServiceHandler = new RpcServiceHandler(rpcService);
 
+        final DOMNotificationService notificationService = session.getService(DOMNotificationService.class);
+        final NotificationServiceHandler notificationServiceHandler =
+                new NotificationServiceHandler(notificationService);
+
         wrapperServices.setHandlers(schemaCtxHandler, RestConnectorProvider.mountPointServiceHandler,
-                RestConnectorProvider.transactionChainHandler, brokerHandler, rpcServiceHandler);
+                RestConnectorProvider.transactionChainHandler, brokerHandler, rpcServiceHandler,
+                notificationServiceHandler);
     }
 
     /**
index febb30802bc851141f6e33e7c6bcc1602ddf9902..0211174b19dbeffce02af0ff458cebec00f4df47 100644 (file)
@@ -16,6 +16,7 @@ import org.opendaylight.netconf.sal.restconf.impl.PATCHContext;
 import org.opendaylight.netconf.sal.restconf.impl.PATCHStatusContext;
 import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
 import org.opendaylight.restconf.handlers.DOMMountPointServiceHandler;
+import org.opendaylight.restconf.handlers.NotificationServiceHandler;
 import org.opendaylight.restconf.handlers.RpcServiceHandler;
 import org.opendaylight.restconf.handlers.SchemaContextHandler;
 import org.opendaylight.restconf.handlers.TransactionChainHandler;
@@ -148,7 +149,7 @@ public class ServicesWrapperImpl implements BaseServicesWrapper, TransactionServ
     }
 
     @Override
-    public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
+    public NormalizedNodeContext subscribeToStream(final String identifier, final UriInfo uriInfo) {
         return this.delegRestconfSubscrService.subscribeToStream(identifier, uriInfo);
     }
 
@@ -156,7 +157,7 @@ public class ServicesWrapperImpl implements BaseServicesWrapper, TransactionServ
                             final DOMMountPointServiceHandler domMountPointServiceHandler,
                             final TransactionChainHandler transactionChainHandler,
                             final DOMDataBrokerHandler domDataBrokerHandler,
-                            final RpcServiceHandler rpcServiceHandler) {
+            final RpcServiceHandler rpcServiceHandler, final NotificationServiceHandler notificationServiceHandler) {
         this.delegRestModService = new RestconfModulesServiceImpl(schemaCtxHandler, domMountPointServiceHandler);
         this.delegRestOpsService = new RestconfOperationsServiceImpl(schemaCtxHandler, domMountPointServiceHandler);
         this.delegRestSchService = new RestconfSchemaServiceImpl(schemaCtxHandler, domMountPointServiceHandler);
@@ -165,6 +166,8 @@ public class ServicesWrapperImpl implements BaseServicesWrapper, TransactionServ
                 domMountPointServiceHandler);
         this.delegRestconfInvokeOpsService = new RestconfInvokeOperationsServiceImpl(rpcServiceHandler,
                 schemaCtxHandler);
-        this.delegRestconfSubscrService = new RestconfStreamsSubscriptionServiceImpl(domDataBrokerHandler);
+        this.delegRestconfSubscrService =
+                new RestconfStreamsSubscriptionServiceImpl(domDataBrokerHandler, notificationServiceHandler,
+                        schemaCtxHandler);
     }
 }
diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/handlers/NotificationServiceHandler.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/handlers/NotificationServiceHandler.java
new file mode 100644 (file)
index 0000000..3365582
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.handlers;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+
+public class NotificationServiceHandler implements Handler<DOMNotificationService> {
+
+    private final DOMNotificationService notificationService;
+
+    /**
+     * Set DOMNotificationService
+     *
+     * @param notificationService
+     *            - DOMNotificationService
+     */
+    public NotificationServiceHandler(final DOMNotificationService notificationService) {
+        this.notificationService = notificationService;
+    }
+
+    @Override
+    public DOMNotificationService get() {
+        return this.notificationService;
+    }
+
+}
index 0528d89c44d3740814fb007e945215f811afdc02..278a8f7af6ce85b0b386524eb3b7017df2474093 100644 (file)
@@ -12,8 +12,8 @@ import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.core.Context;
-import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriInfo;
+import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext;
 
 /**
  * Subscribing to streams
@@ -28,9 +28,10 @@ public interface RestconfStreamsSubscriptionService {
      *            - name of stream
      * @param uriInfo
      *            - URI info
-     * @return {@link Response}
+     * @return {@link NormalizedNodeContext}
      */
     @GET
     @Path("data/ietf-restconf-monitoring:restconf-state/streams/stream/{identifier:.+}")
-    Response subscribeToStream(@Encoded @PathParam("identifier") String identifier, @Context UriInfo uriInfo);
+    NormalizedNodeContext subscribeToStream(@Encoded @PathParam("identifier") String identifier,
+            @Context UriInfo uriInfo);
 }
index 222c13902d8c679bcf90c49286a79fed1cee0586..f885e78c2638762288a218a9cf02c57460532aee 100644 (file)
@@ -13,6 +13,9 @@ import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
 import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
 import org.opendaylight.restconf.common.references.SchemaContextRef;
 import org.opendaylight.restconf.handlers.RpcServiceHandler;
 import org.opendaylight.restconf.handlers.SchemaContextHandler;
@@ -51,7 +54,14 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat
 
         if (mountPoint == null) {
             if (namespace.toString().equals(RestconfStreamsConstants.SAL_REMOTE_NAMESPACE)) {
-                response = CreateStreamUtil.createStream(payload, refSchemaCtx);
+                if (identifier.contains(RestconfStreamsConstants.CREATE_DATA_SUBSCR)) {
+                    response = CreateStreamUtil.createDataChangeNotifiStream(payload, refSchemaCtx);
+                } else if (identifier.contains(RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM)) {
+                    response = CreateStreamUtil.createYangNotifiStream(payload, refSchemaCtx);
+                } else {
+                    throw new RestconfDocumentedException("Not supported operation", ErrorType.RPC,
+                            ErrorTag.OPERATION_NOT_SUPPORTED);
+                }
             } else {
                 response = RestconfInvokeOperationsUtil.invokeRpc(payload.getData(), schemaPath,
                         this.rpcServiceHandler);
index c9d53ae91804896de6aca46b863d1eb5f9cf80ed..6b0cbe3c2b178e8baaabdf2664f1a48de67ad449 100644 (file)
@@ -8,22 +8,26 @@
 package org.opendaylight.restconf.restful.services.impl;
 
 import java.net.URI;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
+import java.util.Map.Entry;
 import javax.ws.rs.core.UriInfo;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
+import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
-import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
-import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
-import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
-import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
+import org.opendaylight.restconf.handlers.NotificationServiceHandler;
+import org.opendaylight.restconf.handlers.SchemaContextHandler;
 import org.opendaylight.restconf.restful.services.api.RestconfStreamsSubscriptionService;
 import org.opendaylight.restconf.restful.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.restful.utils.SubscribeToStreamUtil;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeAttrBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,41 +41,75 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu
 
     private final DOMDataBrokerHandler domDataBrokerHandler;
 
-    public RestconfStreamsSubscriptionServiceImpl(final DOMDataBrokerHandler domDataBrokerHandler) {
+    private final NotificationServiceHandler notificationServiceHandler;
+
+    private final SchemaContextHandler schemaHandler;
+
+    public RestconfStreamsSubscriptionServiceImpl(final DOMDataBrokerHandler domDataBrokerHandler,
+            final NotificationServiceHandler notificationServiceHandler, final SchemaContextHandler schemaHandler) {
         this.domDataBrokerHandler = domDataBrokerHandler;
+        this.notificationServiceHandler = notificationServiceHandler;
+        this.schemaHandler = schemaHandler;
     }
 
     @Override
-    public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
-        final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
+    public NormalizedNodeContext subscribeToStream(final String identifier, final UriInfo uriInfo) {
+        boolean startTime_used = false;
+        boolean stopTime_used = false;
+        Date start = null;
+        Date stop = null;
 
-        final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
-                mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
-        if (ds == null) {
-            final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)";
-            LOG.debug(msg);
-            throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+        for (final Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
+            switch (entry.getKey()) {
+                case "start-time":
+                    if (!startTime_used) {
+                        startTime_used = true;
+                        start = SubscribeToStreamUtil.parseDateFromQueryParam(entry);
+                    } else {
+                        throw new RestconfDocumentedException("Start-time parameter can be used only once.");
+                    }
+                    break;
+                case "stop-time":
+                    if (!stopTime_used) {
+                        stopTime_used = true;
+                        stop = SubscribeToStreamUtil.parseDateFromQueryParam(entry);
+                    } else {
+                        throw new RestconfDocumentedException("Stop-time parameter can be used only once.");
+                    }
+                    break;
+                default:
+                    throw new RestconfDocumentedException("Bad parameter used with notifications: " + entry.getKey());
+            }
         }
-
-        final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class,
-                mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
-        if (scope == null) {
-            final String msg = "Stream name doesn't contains datastore value (pattern /scope=)";
-            LOG.warn(msg);
-            throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+        if (!startTime_used && stopTime_used) {
+            throw new RestconfDocumentedException("Stop-time parameter has to be used with start-time parameter.");
+        }
+        URI response = null;
+        if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCR)) {
+            response = SubscribeToStreamUtil.dataSubs(identifier, uriInfo, start, stop, this.domDataBrokerHandler);
+        } else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
+            response = SubscribeToStreamUtil.notifStream(identifier, uriInfo, start, stop,
+                    this.notificationServiceHandler);
         }
 
-        final String streamName = Notificator.createStreamNameFromUri(identifier);
-
-        final ListenerAdapter listener = Notificator.getListenerFor(streamName);
-        SubscribeToStreamUtil.registration(ds, scope, listener, this.domDataBrokerHandler.get());
+        if (response != null) {
+            // prepare node with value of location
+            final InstanceIdentifierContext<?> iid =
+                    SubscribeToStreamUtil.prepareIIDSubsStreamOutput(this.schemaHandler);
+            final NormalizedNodeAttrBuilder<NodeIdentifier, Object, LeafNode<Object>> builder =
+                    ImmutableLeafNodeBuilder.create().withValue(response.toString());
+            builder.withNodeIdentifier(
+                    NodeIdentifier.create(QName.create("subscribe:to:notification", "2016-10-28", "location")));
 
-        final int port = SubscribeToStreamUtil.prepareNotificationPort();
+            // prepare new header with location
+            final Map<String, Object> headers = new HashMap<>();
+            headers.put("Location", response);
 
-        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
-        final UriBuilder uriToWebSocketServer = uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
-        final URI uri = uriToWebSocketServer.replacePath(streamName).build();
+            return new NormalizedNodeContext(iid, builder.build(), headers);
+        }
 
-        return Response.status(Status.OK).location(uri).build();
+        final String msg = "Bad type of notification of sal-remote";
+        LOG.warn(msg);
+        throw new RestconfDocumentedException(msg);
     }
 }
index 261c4dba7ec2ddbe27dde6b6ff994d89bc6ceb4b..b513f81672799404369c18c74d7321e050dc2040 100644 (file)
@@ -8,7 +8,12 @@
 package org.opendaylight.restconf.restful.utils;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
@@ -28,9 +33,14 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgum
 import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,12 +97,12 @@ public final class CreateStreamUtil {
      *         </pre>
      *
      */
-    public static DOMRpcResult createStream(final NormalizedNodeContext payload,
+    public static DOMRpcResult createDataChangeNotifiStream(final NormalizedNodeContext payload,
             final SchemaContextRef refSchemaCtx) {
         final ContainerNode data = (ContainerNode) payload.getData();
         final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
         final YangInstanceIdentifier path = preparePath(data, qname);
-        final String streamName = prepareStream(path, refSchemaCtx.get(), data);
+        final String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data);
 
         final QName outputQname = QName.create(qname, "output");
         final QName streamNameQname = QName.create(qname, "stream-name");
@@ -119,7 +129,7 @@ public final class CreateStreamUtil {
         return outputType = outputType == null ? NotificationOutputType.XML : outputType;
     }
 
-    private static String prepareStream(final YangInstanceIdentifier path, final SchemaContext schemaContext,
+    private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path, final SchemaContext schemaContext,
             final ContainerNode data) {
         LogicalDatastoreType ds = parseEnum(data, LogicalDatastoreType.class,
                 RestconfStreamsConstants.DATASTORE_PARAM_NAME);
@@ -128,7 +138,8 @@ public final class CreateStreamUtil {
         DataChangeScope scope = parseEnum(data, DataChangeScope.class, RestconfStreamsConstants.SCOPE_PARAM_NAME);
         scope = scope == null ? RestconfStreamsConstants.DEFAULT_SCOPE : scope;
 
-        final String streamName = Notificator
+        final String streamName = RestconfStreamsConstants.DATA_SUBSCR + "/"
+                + Notificator
                 .createStreamNameFromUri(ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext)
                 + RestconfStreamsConstants.DS_URI + ds + RestconfStreamsConstants.SCOPE_URI + scope);
         return streamName;
@@ -168,4 +179,69 @@ public final class CreateStreamUtil {
         }
         return (YangInstanceIdentifier) pathValue;
     }
+
+    /**
+     * Create stream with POST operation via RPC
+     *
+     * @param payload
+     *            - input of RPC
+     * @param refSchemaCtx
+     *            - schemaContext
+     * @return {@link DOMRpcResult}
+     */
+    public static DOMRpcResult createYangNotifiStream(final NormalizedNodeContext payload,
+            final SchemaContextRef refSchemaCtx) {
+        final ContainerNode data = (ContainerNode) payload.getData();
+        LeafSetNode leafSet = null;
+        String outputType = "XML";
+        for (final DataContainerChild<? extends PathArgument, ?> dataChild : data.getValue()) {
+            if (dataChild instanceof LeafSetNode) {
+                leafSet = (LeafSetNode) dataChild;
+            } else if (dataChild instanceof AugmentationNode) {
+                outputType = (String) (((AugmentationNode) dataChild).getValue()).iterator().next().getValue();
+            }
+        }
+
+        final Collection<LeafSetEntryNode> entryNodes = leafSet.getValue();
+        final List<SchemaPath> paths = new ArrayList<>();
+        String streamName = RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM + "/";
+
+        final Iterator<LeafSetEntryNode> iterator = entryNodes.iterator();
+        while (iterator.hasNext()) {
+            final QName valueQName = QName.create((String) iterator.next().getValue());
+            final Module module = refSchemaCtx.findModuleByNamespaceAndRevision(valueQName.getModule().getNamespace(),
+                    valueQName.getModule().getRevision());
+            Preconditions.checkNotNull(module,
+                    "Module for namespace " + valueQName.getModule().getNamespace() + " does not exist");
+            NotificationDefinition notifiDef = null;
+            for (final NotificationDefinition notification : module.getNotifications()) {
+                if (notification.getQName().equals(valueQName)) {
+                    notifiDef = notification;
+                    break;
+                }
+            }
+            final String moduleName = module.getName();
+            Preconditions.checkNotNull(notifiDef,
+                    "Notification " + valueQName + "doesn't exist in module " + moduleName);
+            paths.add(notifiDef.getPath());
+            streamName = streamName + moduleName + ":" + valueQName.getLocalName();
+            if (iterator.hasNext()) {
+                streamName = streamName + ",";
+            }
+        }
+
+        final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
+        final QName outputQname = QName.create(rpcQName, "output");
+        final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier");
+
+        final ContainerNode output =
+                ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname))
+                        .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
+
+        if (!Notificator.existNotificationListenerFor(streamName)) {
+            Notificator.createNotificationListener(paths, streamName, outputType);
+        }
+
+        return new DefaultDOMRpcResult(output);
+    }
 }
index 29cc47a8956d68843f8e8335220066203347c1e4..09da9e8983a61a5767f7c0920e9b914b1c680eaf 100644 (file)
@@ -59,6 +59,12 @@ public final class RestconfStreamsConstants {
 
     public static final String SCHEMA_SUBSCIBRE_URI = "ws";
 
+    public static final CharSequence DATA_SUBSCR = "data-change-event-subscription";
+    public static final CharSequence CREATE_DATA_SUBSCR = "create-" + DATA_SUBSCR;
+
+    public static final CharSequence NOTIFICATION_STREAM = "notification-stream";
+    public static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM;
+
     static {
         Date eventSubscriptionAugRevision;
         try {
index c07551c182ed62e323ff455ca94621ed913b60f8..0904b02e461d4ba98b8a609859525f454716b03e 100644 (file)
@@ -7,17 +7,49 @@
  */
 package org.opendaylight.restconf.restful.utils;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import java.net.URI;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
 import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
+import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
+import org.opendaylight.restconf.handlers.NotificationServiceHandler;
+import org.opendaylight.restconf.handlers.SchemaContextHandler;
 import org.opendaylight.restconf.utils.RestconfConstants;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Subscribe to stream util class
@@ -25,6 +57,8 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  */
 public final class SubscribeToStreamUtil {
 
+    private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
+
     private SubscribeToStreamUtil() {
         throw new UnsupportedOperationException("Util class");
     }
@@ -77,7 +111,7 @@ public final class SubscribeToStreamUtil {
      * @param domDataBroker
      *            - data broker for register data change listener
      */
-    public static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
+    private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
             final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
         if (listener.isListening()) {
             return;
@@ -95,7 +129,7 @@ public final class SubscribeToStreamUtil {
      *
      * @return port
      */
-    public static int prepareNotificationPort() {
+    private static int prepareNotificationPort() {
         int port = RestconfStreamsConstants.NOTIFICATION_PORT;
         try {
             final WebSocketServer webSocketServer = WebSocketServer.getInstance();
@@ -106,4 +140,160 @@ public final class SubscribeToStreamUtil {
         return port;
     }
 
+    /**
+     * Register listeners by streamName in identifier to listen to yang notifications
+     *
+     * @param identifier
+     *            - identifier as stream name
+     * @param uriInfo
+     *            - for getting base URI information
+     * @param start
+     *            - start-time query parameter
+     * @param stop
+     *            - stop-time query parameter
+     * @param notifiServiceHandler
+     *            - DOMNotificationService handler for register listeners
+     * @return location for listening
+     */
+    public static URI notifStream(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
+            final NotificationServiceHandler notifiServiceHandler) {
+        final String streamName = Notificator.createStreamNameFromUri(identifier);
+        if (Strings.isNullOrEmpty(streamName)) {
+            throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+        }
+        final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+        if ((listeners == null) || listeners.isEmpty()) {
+            throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
+                    ErrorTag.UNKNOWN_ELEMENT);
+        }
+
+        for (final NotificationListenerAdapter listener : listeners) {
+            registerToListenNotification(listener, notifiServiceHandler);
+            listener.setTime(start, stop);
+        }
+
+        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+        int notificationPort = RestconfStreamsConstants.NOTIFICATION_PORT;
+        try {
+            final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
+            notificationPort = webSocketServerInstance.getPort();
+        } catch (final NullPointerException e) {
+            WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
+        }
+        final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
+        final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
+
+        return uriToWebsocketServer;
+    }
+
+    private static void registerToListenNotification(final NotificationListenerAdapter listener,
+            final NotificationServiceHandler notificationServiceHandler) {
+        if (listener.isListening()) {
+            return;
+        }
+
+        final SchemaPath path = listener.getSchemaPath();
+        final ListenerRegistration<DOMNotificationListener> registration =
+                notificationServiceHandler.get().registerNotificationListener(listener, path);
+
+        listener.setRegistration(registration);
+    }
+
+    /**
+     * Prepare InstanceIdentifierContext for Location leaf
+     *
+     * @param schemaHandler
+     *            - schemaContext handler
+     * @return InstanceIdentifier of Location leaf
+     */
+    public static InstanceIdentifierContext<?> prepareIIDSubsStreamOutput(final SchemaContextHandler schemaHandler) {
+        final QName qnameBase = QName.create("subscribe:to:notification", "2016-10-28", "notifi");
+        final DataSchemaNode location = ((ContainerSchemaNode) schemaHandler.get()
+                .findModuleByNamespaceAndRevision(qnameBase.getNamespace(), qnameBase.getRevision())
+                .getDataChildByName(qnameBase)).getDataChildByName(QName.create(qnameBase, "location"));
+        final List<PathArgument> path = new ArrayList<>();
+        path.add(NodeIdentifier.create(qnameBase));
+        path.add(NodeIdentifier.create(QName.create(qnameBase, "location")));
+
+        return new InstanceIdentifierContext<SchemaNode>(YangInstanceIdentifier.create(path), location, null,
+                schemaHandler.get());
+    }
+
+    /**
+     * Register listener by streamName in identifier to listen to yang notifications
+     *
+     * @param identifier
+     *            - identifier as stream name
+     * @param uriInfo
+     *            - for getting base URI information
+     * @param start
+     *            - start-time query parameter
+     * @param stop
+     *            - stop-time query parameter
+     * @param domDataBrokerHandler
+     *            - DOMDataBroker handler for register listener
+     * @return location for listening
+     */
+    public static URI dataSubs(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
+            final DOMDataBrokerHandler domDataBrokerHandler) {
+        final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
+
+        final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
+                mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
+        if (ds == null) {
+            final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)";
+            LOG.debug(msg);
+            throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+        }
+
+        final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class,
+                mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
+        if (scope == null) {
+            final String msg = "Stream name doesn't contains datastore value (pattern /scope=)";
+            LOG.warn(msg);
+            throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+        }
+
+        final String streamName = Notificator.createStreamNameFromUri(identifier);
+
+        final ListenerAdapter listener = Notificator.getListenerFor(streamName);
+        Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
+
+        listener.setTimer(start, stop);
+
+        SubscribeToStreamUtil.registration(ds, scope, listener, domDataBrokerHandler.get());
+
+        final int port = SubscribeToStreamUtil.prepareNotificationPort();
+
+        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+        final UriBuilder uriToWebSocketServer =
+                uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
+        return uriToWebSocketServer.replacePath(streamName).build();
+    }
+
+    public static Date parseDateFromQueryParam(final Entry<String, List<String>> entry) {
+        final DateAndTime event = new DateAndTime(entry.getValue().iterator().next());
+        String numOf_ms = "";
+        final String value = event.getValue();
+        if (value.contains(".")) {
+            numOf_ms = numOf_ms + ".";
+            final int lastChar = value.contains("Z") ? value.indexOf("Z") : (value.contains("+") ? value.indexOf("+")
+                    : (value.contains("-") ? value.indexOf("-") : value.length()));
+            for (int i = 0; i < (lastChar - value.indexOf(".") - 1); i++) {
+                numOf_ms = numOf_ms + "S";
+            }
+        }
+        String zone = "";
+        if (!value.contains("Z")) {
+            zone = zone + "XXX";
+        }
+        final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" + numOf_ms + zone);
+
+        try {
+            return dateFormatter.parse(value.contains("Z") ? value.replace('T', ' ').substring(0, value.indexOf("Z"))
+                    : value.replace('T', ' '));
+        } catch (final ParseException e) {
+            throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e);
+        }
+    }
 }
diff --git a/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplNotificationSubscribingTest.java b/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplNotificationSubscribingTest.java
new file mode 100644 (file)
index 0000000..2fdb0ba
--- /dev/null
@@ -0,0 +1,257 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+/**
+ *
+ */
+package org.opendaylight.controller.sal.restconf.impl.test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.rest.common.TestRestconfUtils;
+import org.opendaylight.netconf.sal.restconf.impl.BrokerFacade;
+import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfImpl;
+import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.Notificator;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class RestconfImplNotificationSubscribingTest {
+
+    private final String identifier = "data-change-event-subscription/datastore=OPERATIONAL/scope=ONE";
+
+    @Mock
+    private BrokerFacade broker;
+
+    @Mock
+    private DOMDataBroker domDataBroker;
+
+    @Mock
+    private UriInfo uriInfo;
+
+    @Before
+    public void setup() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        this.broker.setDomDataBroker(this.domDataBroker);
+        RestconfImpl.getInstance().setBroker(this.broker);
+        ControllerContext.getInstance().setGlobalSchema(TestRestconfUtils.loadSchemaContext("/notifications"));
+
+        final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class);
+        Notificator.createListener(path, this.identifier, NotificationOutputType.XML);
+    }
+
+    @Test
+    public void startTimeTest() {
+        final List<Entry<String, List<String>>> list = new ArrayList<>();
+        final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+        Mockito.when(entry.getKey()).thenReturn("start-time");
+        final List<String> time = new ArrayList<>();
+        time.add("2014-10-25T10:02:00Z");
+        Mockito.when(entry.getValue()).thenReturn(time);
+        list.add(entry);
+
+        subscribe(list);
+        Notificator.removeAllListeners();
+    }
+
+    @Test
+    public void milisecsTest() {
+        final List<Entry<String, List<String>>> list = new ArrayList<>();
+        final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+        Mockito.when(entry.getKey()).thenReturn("start-time");
+        final List<String> time = new ArrayList<>();
+        time.add("2014-10-25T10:02:00.12345Z");
+        Mockito.when(entry.getValue()).thenReturn(time);
+        list.add(entry);
+
+        subscribe(list);
+        Notificator.removeAllListeners();
+    }
+
+    @Test
+    public void zonesPlusTest() {
+        final List<Entry<String, List<String>>> list = new ArrayList<>();
+        final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+        Mockito.when(entry.getKey()).thenReturn("start-time");
+        final List<String> time = new ArrayList<>();
+        time.add("2014-10-25T10:02:00+01:00");
+        Mockito.when(entry.getValue()).thenReturn(time);
+        list.add(entry);
+
+        subscribe(list);
+        Notificator.removeAllListeners();
+    }
+
+    @Test
+    public void zonesMinusTest() {
+        final List<Entry<String, List<String>>> list = new ArrayList<>();
+        final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+        Mockito.when(entry.getKey()).thenReturn("start-time");
+        final List<String> time = new ArrayList<>();
+        time.add("2014-10-25T10:02:00-01:00");
+        Mockito.when(entry.getValue()).thenReturn(time);
+        list.add(entry);
+
+        subscribe(list);
+        Notificator.removeAllListeners();
+    }
+
+    @Test
+    public void startAndStopTimeTest() {
+        final List<Entry<String, List<String>>> list = new ArrayList<>();
+        final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+        Mockito.when(entry.getKey()).thenReturn("start-time");
+        final List<String> time = new ArrayList<>();
+        time.add("2014-10-25T10:02:00Z");
+        Mockito.when(entry.getValue()).thenReturn(time);
+
+        final Entry<String, List<String>> entry2 = Mockito.mock(Entry.class);
+        Mockito.when(entry2.getKey()).thenReturn("stop-time");
+        final List<String> time2 = new ArrayList<>();
+        time2.add("2014-10-25T12:31:00Z");
+        Mockito.when(entry2.getValue()).thenReturn(time2);
+
+        list.add(entry);
+        list.add(entry2);
+
+        subscribe(list);
+        Notificator.removeAllListeners();
+    }
+
+    @Test(expected = RestconfDocumentedException.class)
+    public void stopTimeTest() {
+        final List<Entry<String, List<String>>> list = new ArrayList<>();
+        final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+        Mockito.when(entry.getKey()).thenReturn("stop-time");
+        final List<String> time = new ArrayList<>();
+        time.add("2014-10-25T12:31:00Z");
+        Mockito.when(entry.getValue()).thenReturn(time);
+        list.add(entry);
+
+        subscribe(list);
+        Notificator.removeAllListeners();
+    }
+
+    @Test(expected = RestconfDocumentedException.class)
+    public void badParamTest() {
+        final List<Entry<String, List<String>>> list = new ArrayList<>();
+        final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+        Mockito.when(entry.getKey()).thenReturn("time");
+        final List<String> time = new ArrayList<>();
+        time.add("2014-10-25T12:31:00Z");
+        Mockito.when(entry.getValue()).thenReturn(time);
+        list.add(entry);
+
+        subscribe(list);
+        Notificator.removeAllListeners();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void badValueTest() {
+        final List<Entry<String, List<String>>> list = new ArrayList<>();
+        final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+        Mockito.when(entry.getKey()).thenReturn("start-time");
+        final List<String> time = new ArrayList<>();
+        time.add("badvalue");
+        Mockito.when(entry.getValue()).thenReturn(time);
+        list.add(entry);
+
+        subscribe(list);
+        Notificator.removeAllListeners();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void badZonesTest() {
+        final List<Entry<String, List<String>>> list = new ArrayList<>();
+        final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+        Mockito.when(entry.getKey()).thenReturn("start-time");
+        final List<String> time = new ArrayList<>();
+        time.add("2014-10-25T10:02:00Z+1:00");
+        Mockito.when(entry.getValue()).thenReturn(time);
+        list.add(entry);
+
+        subscribe(list);
+        Notificator.removeAllListeners();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void badMilisecsTest() {
+        final List<Entry<String, List<String>>> list = new ArrayList<>();
+        final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+        Mockito.when(entry.getKey()).thenReturn("start-time");
+        final List<String> time = new ArrayList<>();
+        time.add("2014-10-25T10:02:00:0026Z");
+        Mockito.when(entry.getValue()).thenReturn(time);
+        list.add(entry);
+
+        subscribe(list);
+        Notificator.removeAllListeners();
+    }
+
+    @Test
+    public void onNotifiTest() throws Exception {
+        final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class);
+        final ListenerAdapter listener = Notificator.createListener(path, this.identifier, NotificationOutputType.XML);
+
+        final List<Entry<String, List<String>>> list = new ArrayList<>();
+        final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+        Mockito.when(entry.getKey()).thenReturn("start-time");
+        final List<String> time = new ArrayList<>();
+        time.add("2014-10-25T10:02:00Z");
+        Mockito.when(entry.getValue()).thenReturn(time);
+        list.add(entry);
+
+        subscribe(list);
+
+        final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change =
+                Mockito.mock(AsyncDataChangeEvent.class);
+        Field start = listener.getClass().getDeclaredField("start");
+        start.setAccessible(true);
+        Date startOrig = (Date) start.get(listener);
+        Assert.assertNotNull(startOrig);
+        listener.onDataChanged(change);
+
+        start = listener.getClass().getDeclaredField("start");
+        start.setAccessible(true);
+        startOrig = (Date) start.get(listener);
+        Assert.assertNull(startOrig);
+    }
+
+    private void subscribe(final List<Entry<String, List<String>>> entries) {
+        final MultivaluedMap<String, String> map = Mockito.mock(MultivaluedMap.class);
+        Mockito.when(this.uriInfo.getQueryParameters()).thenReturn(map);
+        final UriBuilder uriBuilder = UriBuilder.fromPath("http://localhost:8181/" + this.identifier);
+        Mockito.when(this.uriInfo.getAbsolutePathBuilder()).thenReturn(uriBuilder);
+        final Set<Entry<String, List<String>>> set = new HashSet<>();
+        for(final Entry<String, List<String>> entry : entries){
+            set.add(entry);
+        }
+        Mockito.when(map.entrySet()).thenReturn(set);
+        RestconfImpl.getInstance().subscribeToStream(this.identifier, this.uriInfo);
+    }
+
+}
index b0996bff5c07f7cf60855fbdbbf833788ec361b7..dc7e2e9e9fc7038d45b0db93bc2cd075080f3ca4 100644 (file)
@@ -23,8 +23,12 @@ import com.google.common.util.concurrent.Futures;
 import java.io.FileNotFoundException;
 import java.net.URI;
 import java.text.ParseException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
 import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
 import org.junit.Before;
@@ -173,6 +177,10 @@ public class RestconfImplTest {
         when(uriBuilder.build()).thenReturn(new URI(""));
         when(uriBuilder.scheme("ws")).thenReturn(uriBuilder);
         when(uriInfo.getAbsolutePathBuilder()).thenReturn(uriBuilder);
+        final MultivaluedMap<String, String> map = mock(MultivaluedMap.class);
+        final Set<Entry<String, List<String>>> set = new HashSet<>();
+        when(map.entrySet()).thenReturn(set);
+        when(uriInfo.getQueryParameters()).thenReturn(map);
 
         final BrokerFacade brokerFacade = mock(BrokerFacade.class);
         this.restconfImpl.setBroker(brokerFacade);
index 157fafc9995fd3dd9e03ba8bcbef79407a105547..c964ba36d33fabfbaae94fdf8702089b6ceafc36 100644 (file)
@@ -14,9 +14,13 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import java.lang.reflect.Field;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import javax.ws.rs.core.Response;
+import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
 import org.junit.AfterClass;
@@ -24,13 +28,18 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.rest.common.TestRestconfUtils;
+import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
 import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
+import org.opendaylight.restconf.handlers.NotificationServiceHandler;
+import org.opendaylight.restconf.handlers.SchemaContextHandler;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 
 public class RestconfStreamsSubscriptionServiceImplTest {
@@ -43,14 +52,23 @@ public class RestconfStreamsSubscriptionServiceImplTest {
     private DOMDataBrokerHandler dataBrokerHandler;
     @Mock
     private UriInfo uriInfo;
+    @Mock
+    private NotificationServiceHandler notificationServiceHandler;
+
+    private final SchemaContextHandler schemaHandler = new SchemaContextHandler();
 
     @Before
-    public void setUp() {
+    public void setUp() throws Exception {
         MockitoAnnotations.initMocks(this);
         final DOMDataBroker dataBroker = mock(DOMDataBroker.class);
         final ListenerRegistration<DOMDataChangeListener> listener = mock(ListenerRegistration.class);
         doReturn(dataBroker).when(this.dataBrokerHandler).get();
         doReturn(listener).when(dataBroker).registerDataChangeListener(any(), any(), any(), any());
+        final MultivaluedMap<String, String> map = Mockito.mock(MultivaluedMap.class);
+        final Set<Entry<String, List<String>>> set = new HashSet<>();
+        Mockito.when(map.entrySet()).thenReturn(set);
+        Mockito.when(this.uriInfo.getQueryParameters()).thenReturn(map);
+        this.schemaHandler.onGlobalContextUpdated(TestRestconfUtils.loadSchemaContext("/notifications"));
     }
 
     @BeforeClass
@@ -58,7 +76,9 @@ public class RestconfStreamsSubscriptionServiceImplTest {
         final Map<String, ListenerAdapter> listenersByStreamNameSetter = new HashMap<>();
         final ListenerAdapter adapter = mock(ListenerAdapter.class);
         doReturn(false).when(adapter).isListening();
-        listenersByStreamNameSetter.put("toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", adapter);
+        listenersByStreamNameSetter.put(
+                "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
+                adapter);
         listenersByStreamName = Notificator.class.getDeclaredField("dataChangeListener");
 
         listenersByStreamName.setAccessible(true);
@@ -77,12 +97,15 @@ public class RestconfStreamsSubscriptionServiceImplTest {
         final UriBuilder uriBuilder = UriBuilder.fromUri(uri);
         doReturn(uriBuilder).when(this.uriInfo).getAbsolutePathBuilder();
         final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
-                new RestconfStreamsSubscriptionServiceImpl(this.dataBrokerHandler);
-        final Response response = streamsSubscriptionService
-                .subscribeToStream("toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", this.uriInfo);
-        assertEquals(200, response.getStatus());
-        assertEquals("ws://:8181/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
-                response.getHeaderString("Location"));
+                new RestconfStreamsSubscriptionServiceImpl(this.dataBrokerHandler, this.notificationServiceHandler,
+                        this.schemaHandler);
+        final NormalizedNodeContext response = streamsSubscriptionService
+                .subscribeToStream(
+                        "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
+                        this.uriInfo);
+        assertEquals(
+                "ws://:8181/data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
+                response.getNewHeaders().get("Location").toString());
     }
 
     @Test(expected = RestconfDocumentedException.class)
@@ -90,7 +113,8 @@ public class RestconfStreamsSubscriptionServiceImplTest {
         final UriBuilder uriBuilder = UriBuilder.fromUri(uri);
         doReturn(uriBuilder).when(this.uriInfo).getAbsolutePathBuilder();
         final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
-                new RestconfStreamsSubscriptionServiceImpl(this.dataBrokerHandler);
+                new RestconfStreamsSubscriptionServiceImpl(this.dataBrokerHandler, this.notificationServiceHandler,
+                        this.schemaHandler);
         streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/scope=ONE", this.uriInfo);
     }
 
@@ -99,7 +123,8 @@ public class RestconfStreamsSubscriptionServiceImplTest {
         final UriBuilder uriBuilder = UriBuilder.fromUri(uri);
         doReturn(uriBuilder).when(this.uriInfo).getAbsolutePathBuilder();
         final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
-                new RestconfStreamsSubscriptionServiceImpl(this.dataBrokerHandler);
+                new RestconfStreamsSubscriptionServiceImpl(this.dataBrokerHandler, this.notificationServiceHandler,
+                        this.schemaHandler);
         streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/datastore=OPERATIONAL",
                 this.uriInfo);
     }
index 7ca762705b66cb7ef73b3084a35d6ce5d1502ea6..4e8875789b2f22963c0835b6b2129b26efd79cc3 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.restconf.restful.utils;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-
 import java.util.Collections;
 import java.util.Set;
 import org.junit.Before;
@@ -48,36 +47,38 @@ public class CreateStreamUtilTest {
     @Before
     public void setUp() throws Exception {
         MockitoAnnotations.initMocks(this);
-        refSchemaCtx = new SchemaContextRef(TestRestconfUtils.loadSchemaContext(PATH_FOR_NEW_SCHEMA_CONTEXT));
+        this.refSchemaCtx = new SchemaContextRef(TestRestconfUtils.loadSchemaContext(PATH_FOR_NEW_SCHEMA_CONTEXT));
     }
 
     @Test
     public void createStreamTest() {
-        payload = prepareDomPayload("create-data-change-event-subscription", "input", "toaster", "path");
-        final DOMRpcResult result = CreateStreamUtil.createStream(payload, refSchemaCtx);
+        this.payload = prepareDomPayload("create-data-change-event-subscription", "input", "toaster", "path");
+        final DOMRpcResult result = CreateStreamUtil.createDataChangeNotifiStream(this.payload, this.refSchemaCtx);
         assertEquals(result.getErrors(), Collections.emptyList());
         final NormalizedNode<?, ?> testedNn = result.getResult();
         assertNotNull(testedNn);
-        final NormalizedNodeContext contextRef = prepareDomPayload("create-data-change-event-subscription", "output", "toaster:toaster/datastore=CONFIGURATION/scope=BASE", "stream-name");
+        final NormalizedNodeContext contextRef = prepareDomPayload("create-data-change-event-subscription", "output",
+                "data-change-event-subscription/toaster:toaster/datastore=CONFIGURATION/scope=BASE", "stream-name");
         assertEquals(contextRef.getData(), testedNn);
     }
 
     @Test(expected = RestconfDocumentedException.class)
     public void createStreamWrongValueTest() {
-        payload = prepareDomPayload("create-data-change-event-subscription", "input", "String value", "path");
-        final DOMRpcResult result = CreateStreamUtil.createStream(payload, refSchemaCtx);
+        this.payload = prepareDomPayload("create-data-change-event-subscription", "input", "String value", "path");
+        final DOMRpcResult result = CreateStreamUtil.createDataChangeNotifiStream(this.payload, this.refSchemaCtx);
         assertEquals(result.getErrors(), Collections.emptyList());
     }
 
     @Test(expected = RestconfDocumentedException.class)
     public void createStreamWrongInputRpcTest() {
-        payload = prepareDomPayload("create-data-change-event-subscription2", "input", "toaster", "path2");
-        final DOMRpcResult result = CreateStreamUtil.createStream(payload, refSchemaCtx);
+        this.payload = prepareDomPayload("create-data-change-event-subscription2", "input", "toaster", "path2");
+        final DOMRpcResult result = CreateStreamUtil.createDataChangeNotifiStream(this.payload, this.refSchemaCtx);
         assertEquals(result.getErrors(), Collections.emptyList());
     }
 
-    private NormalizedNodeContext prepareDomPayload(final String rpcName, final String inputOutput, final String toasterValue, final String inputOutputName) {
-        final SchemaContext schema = refSchemaCtx.get();
+    private NormalizedNodeContext prepareDomPayload(final String rpcName, final String inputOutput,
+            final String toasterValue, final String inputOutputName) {
+        final SchemaContext schema = this.refSchemaCtx.get();
         final Module rpcModule = schema.findModuleByName("sal-remote", null);
         assertNotNull(rpcModule);
         final QName rpcQName = QName.create(rpcModule.getQNameModule(), rpcName);
@@ -92,7 +93,8 @@ public class CreateStreamUtilTest {
         }
         assertNotNull(rpcInputSchemaNode);
 
-        final DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode> container = Builders.containerBuilder(rpcInputSchemaNode);
+        final DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode> container =
+                Builders.containerBuilder(rpcInputSchemaNode);
 
         final QName lfQName = QName.create(rpcModule.getQNameModule(), inputOutputName);
         final DataSchemaNode lfSchemaNode = rpcInputSchemaNode.getDataChildByName(lfQName);
@@ -110,6 +112,7 @@ public class CreateStreamUtilTest {
                 .withValue(o)).build();
         container.withChild(lfNode);
 
-        return new NormalizedNodeContext(new InstanceIdentifierContext<>(null, rpcInputSchemaNode, null, schema), container.build());
+        return new NormalizedNodeContext(new InstanceIdentifierContext<>(null, rpcInputSchemaNode, null, schema),
+                container.build());
     }
 }
diff --git a/restconf/sal-rest-connector/src/test/resources/notifications/subscribe-to-notification.yang b/restconf/sal-rest-connector/src/test/resources/notifications/subscribe-to-notification.yang
new file mode 100644 (file)
index 0000000..5fe7df7
--- /dev/null
@@ -0,0 +1,18 @@
+module subscribe-to-notification {
+
+    yang-version 1;
+    namespace "subscribe:to:notification";
+    prefix "subs-to-notifi";
+
+    description
+        "Added input parameters to rpc create-data-change-event-subscription and to create-notification-stream";
+
+    revision "2016-10-28" {
+    }
+
+    container "notifi"{
+        leaf "location"{
+            type string;
+        }
+    }
+}