import com.google.common.util.concurrent.Futures;
import java.net.URI;
import java.net.URISyntaxException;
+import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.QNameModule;
*/
@Override
public NormalizedNodeContext subscribeToStream(final String identifier, final UriInfo uriInfo) {
+ boolean startTime_used = false;
+ boolean stopTime_used = false;
+ Date start = null;
+ Date stop = null;
+
+ for (final Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
+ switch (entry.getKey()) {
+ case "start-time":
+ if (!startTime_used) {
+ startTime_used = true;
+ start = parseDateFromQueryParam(entry);
+ } else {
+ throw new RestconfDocumentedException("Start-time parameter can be used only once.");
+ }
+ break;
+ case "stop-time":
+ if (!stopTime_used) {
+ stopTime_used = true;
+ stop = parseDateFromQueryParam(entry);
+ } else {
+ throw new RestconfDocumentedException("Stop-time parameter can be used only once.");
+ }
+ break;
+ default:
+ throw new RestconfDocumentedException("Bad parameter used with notifications: " + entry.getKey());
+ }
+ }
+ if(!startTime_used && stopTime_used){
+ throw new RestconfDocumentedException("Stop-time parameter has to be used with start-time parameter.");
+ }
URI response = null;
if (identifier.contains(DATA_SUBSCR)) {
- response = dataSubs(identifier, uriInfo);
+ response = dataSubs(identifier, uriInfo, start, stop);
} else if (identifier.contains(NOTIFICATION_STREAM)) {
- response = notifStream(identifier, uriInfo);
+ response = notifStream(identifier, uriInfo, start, stop);
}
if(response != null){
throw new RestconfDocumentedException(msg);
}
+ private Date parseDateFromQueryParam(final Entry<String, List<String>> entry) {
+ final DateAndTime event = new DateAndTime(entry.getValue().iterator().next());
+ String numOf_ms = "";
+ final String value = event.getValue();
+ if (value.contains(".")) {
+ numOf_ms = numOf_ms + ".";
+ final int lastChar = value.contains("Z") ? value.indexOf("Z") : (value.contains("+") ? value.indexOf("+")
+ : (value.contains("-") ? value.indexOf("-") : value.length()));
+ for (int i = 0; i < (lastChar - value.indexOf(".") - 1); i++) {
+ numOf_ms = numOf_ms + "S";
+ }
+ }
+ String zone = "";
+ if (!value.contains("Z")) {
+ zone = zone + "XXX";
+ }
+ final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" + numOf_ms + zone);
+
+ try {
+ return dateFormatter.parse(value.contains("Z") ? value.replace('T', ' ').substring(0, value.indexOf("Z"))
+ : value.replace('T', ' '));
+ } catch (final ParseException e) {
+ throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e);
+ }
+ }
+
/**
* @return {@link InstanceIdentifierContext} of location leaf for
* notification
* - stream name
* @param uriInfo
* - uriInfo
+ * @param stop
+ * - stop-time of getting notification
+ * @param start
+ * - start-time of getting notification
* @return {@link Response}
*/
- private URI notifStream(final String identifier, final UriInfo uriInfo) {
+ private URI notifStream(final String identifier, final UriInfo uriInfo, final Date start, final Date stop) {
final String streamName = Notificator.createStreamNameFromUri(identifier);
if (Strings.isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
for (final NotificationListenerAdapter listener : listeners) {
this.broker.registerToListenNotification(listener);
+ listener.setTime(start, stop);
}
final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
* - stream name
* @param uriInfo
* - uri info
+ * @param stop
+ * - start-time of getting notification
+ * @param start
+ * - stop-time of getting notification
* @return {@link Response}
*/
- private URI dataSubs(final String identifier, final UriInfo uriInfo) {
+ private URI dataSubs(final String identifier, final UriInfo uriInfo, final Date start, final Date stop) {
final String streamName = Notificator.createStreamNameFromUri(identifier);
if (Strings.isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
if (listener == null) {
throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
}
+ listener.setTimer(start, stop);
final Map<String, String> paramToValues = resolveValuesFromUri(identifier);
final LogicalDatastoreType datastore = parserURIEnumParameter(LogicalDatastoreType.class,
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.QName;
private final EventBus eventBus;
private final EventBusChangeRecorder eventBusChangeRecorder;
private final NotificationOutputType outputType;
+ private Date start = null;
+ private Date stop = null;
/**
* Creates new {@link ListenerAdapter} listener specified by path and stream
@Override
public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ final Date now = new Date();
+ if (this.stop != null) {
+ if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
+ prepareAndPostData(change);
+ }
+ if (this.stop.compareTo(now) < 0) {
+ try {
+ this.close();
+ } catch (final Exception e) {
+ throw new RestconfDocumentedException("Problem with unregister listener." + e);
+ }
+ }
+ } else if (this.start != null) {
+ if (this.start.compareTo(now) < 0) {
+ this.start = null;
+ prepareAndPostData(change);
+ }
+ } else {
+ prepareAndPostData(change);
+ }
+ }
+
+ private void prepareAndPostData(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
if (!change.getCreatedData().isEmpty() || !change.getUpdatedData().isEmpty()
|| !change.getRemovedPaths().isEmpty()) {
final String xml = prepareXmlFrom(change);
}
}
+ /**
+ * Set query parameters for listener
+ *
+ * @param start
+ * - start-time of getting notification
+ * @param stop
+ * - stop-time of getting notification
+ */
+ public void setTimer(final Date start, final Date stop) {
+ this.start = start;
+ this.stop = stop;
+ }
+
}
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
private final SchemaPath path;
private final String outputType;
+ private Date start = null;
+ private Date stop = null;
/**
* Set path of listener and stream name, register event bus.
@Override
public void onNotification(final DOMNotification notification) {
+ final Date now = new Date();
+ if (this.stop != null) {
+ if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
+ prepareAndPostData(notification);
+ }
+ if (this.stop.compareTo(now) < 0) {
+ try {
+ this.close();
+ } catch (final Exception e) {
+ throw new RestconfDocumentedException("Problem with unregister listener." + e);
+ }
+ }
+ } else if (this.start != null) {
+ if (this.start.compareTo(now) < 0) {
+ this.start = null;
+ prepareAndPostData(notification);
+ }
+ } else {
+ prepareAndPostData(notification);
+ }
+ }
+
+ /**
+ * @param notification
+ */
+ private void prepareAndPostData(final DOMNotification notification) {
final String xml = prepareXmlFrom(notification);
final Event event = new Event(EventType.NOTIFY);
if (this.outputType.equals("JSON")) {
private enum EventType {
REGISTER, DEREGISTER, NOTIFY
}
+
+ /**
+ * Set query parameters for listener
+ *
+ * @param start
+ * - start-time of getting notification
+ * @param stop
+ * - stop-time of getting notification
+ */
+ public void setTime(final Date start, final Date stop) {
+ this.start = start;
+ this.stop = stop;
+ }
}
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.restconf.common.wrapper.services.ServicesWrapperImpl;
import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
import org.opendaylight.restconf.handlers.DOMMountPointServiceHandler;
+import org.opendaylight.restconf.handlers.NotificationServiceHandler;
import org.opendaylight.restconf.handlers.RpcServiceHandler;
import org.opendaylight.restconf.handlers.SchemaContextHandler;
import org.opendaylight.restconf.handlers.TransactionChainHandler;
final DOMRpcService rpcService = session.getService(DOMRpcService.class);
final RpcServiceHandler rpcServiceHandler = new RpcServiceHandler(rpcService);
+ final DOMNotificationService notificationService = session.getService(DOMNotificationService.class);
+ final NotificationServiceHandler notificationServiceHandler =
+ new NotificationServiceHandler(notificationService);
+
wrapperServices.setHandlers(schemaCtxHandler, RestConnectorProvider.mountPointServiceHandler,
- RestConnectorProvider.transactionChainHandler, brokerHandler, rpcServiceHandler);
+ RestConnectorProvider.transactionChainHandler, brokerHandler, rpcServiceHandler,
+ notificationServiceHandler);
}
/**
import org.opendaylight.netconf.sal.restconf.impl.PATCHStatusContext;
import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
import org.opendaylight.restconf.handlers.DOMMountPointServiceHandler;
+import org.opendaylight.restconf.handlers.NotificationServiceHandler;
import org.opendaylight.restconf.handlers.RpcServiceHandler;
import org.opendaylight.restconf.handlers.SchemaContextHandler;
import org.opendaylight.restconf.handlers.TransactionChainHandler;
}
@Override
- public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
+ public NormalizedNodeContext subscribeToStream(final String identifier, final UriInfo uriInfo) {
return this.delegRestconfSubscrService.subscribeToStream(identifier, uriInfo);
}
final DOMMountPointServiceHandler domMountPointServiceHandler,
final TransactionChainHandler transactionChainHandler,
final DOMDataBrokerHandler domDataBrokerHandler,
- final RpcServiceHandler rpcServiceHandler) {
+ final RpcServiceHandler rpcServiceHandler, final NotificationServiceHandler notificationServiceHandler) {
this.delegRestModService = new RestconfModulesServiceImpl(schemaCtxHandler, domMountPointServiceHandler);
this.delegRestOpsService = new RestconfOperationsServiceImpl(schemaCtxHandler, domMountPointServiceHandler);
this.delegRestSchService = new RestconfSchemaServiceImpl(schemaCtxHandler, domMountPointServiceHandler);
domMountPointServiceHandler);
this.delegRestconfInvokeOpsService = new RestconfInvokeOperationsServiceImpl(rpcServiceHandler,
schemaCtxHandler);
- this.delegRestconfSubscrService = new RestconfStreamsSubscriptionServiceImpl(domDataBrokerHandler);
+ this.delegRestconfSubscrService =
+ new RestconfStreamsSubscriptionServiceImpl(domDataBrokerHandler, notificationServiceHandler,
+ schemaCtxHandler);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.handlers;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+
+public class NotificationServiceHandler implements Handler<DOMNotificationService> {
+
+ private final DOMNotificationService notificationService;
+
+ /**
+ * Set DOMNotificationService
+ *
+ * @param notificationService
+ * - DOMNotificationService
+ */
+ public NotificationServiceHandler(final DOMNotificationService notificationService) {
+ this.notificationService = notificationService;
+ }
+
+ @Override
+ public DOMNotificationService get() {
+ return this.notificationService;
+ }
+
+}
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Context;
-import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
+import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext;
/**
* Subscribing to streams
* - name of stream
* @param uriInfo
* - URI info
- * @return {@link Response}
+ * @return {@link NormalizedNodeContext}
*/
@GET
@Path("data/ietf-restconf-monitoring:restconf-state/streams/stream/{identifier:.+}")
- Response subscribeToStream(@Encoded @PathParam("identifier") String identifier, @Context UriInfo uriInfo);
+ NormalizedNodeContext subscribeToStream(@Encoded @PathParam("identifier") String identifier,
+ @Context UriInfo uriInfo);
}
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
import org.opendaylight.restconf.common.references.SchemaContextRef;
import org.opendaylight.restconf.handlers.RpcServiceHandler;
import org.opendaylight.restconf.handlers.SchemaContextHandler;
if (mountPoint == null) {
if (namespace.toString().equals(RestconfStreamsConstants.SAL_REMOTE_NAMESPACE)) {
- response = CreateStreamUtil.createStream(payload, refSchemaCtx);
+ if (identifier.contains(RestconfStreamsConstants.CREATE_DATA_SUBSCR)) {
+ response = CreateStreamUtil.createDataChangeNotifiStream(payload, refSchemaCtx);
+ } else if (identifier.contains(RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM)) {
+ response = CreateStreamUtil.createYangNotifiStream(payload, refSchemaCtx);
+ } else {
+ throw new RestconfDocumentedException("Not supported operation", ErrorType.RPC,
+ ErrorTag.OPERATION_NOT_SUPPORTED);
+ }
} else {
response = RestconfInvokeOperationsUtil.invokeRpc(payload.getData(), schemaPath,
this.rpcServiceHandler);
package org.opendaylight.restconf.restful.services.impl;
import java.net.URI;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
+import java.util.Map.Entry;
import javax.ws.rs.core.UriInfo;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
+import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext;
import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
-import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
-import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
-import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
-import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
+import org.opendaylight.restconf.handlers.NotificationServiceHandler;
+import org.opendaylight.restconf.handlers.SchemaContextHandler;
import org.opendaylight.restconf.restful.services.api.RestconfStreamsSubscriptionService;
import org.opendaylight.restconf.restful.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.restful.utils.SubscribeToStreamUtil;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeAttrBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final DOMDataBrokerHandler domDataBrokerHandler;
- public RestconfStreamsSubscriptionServiceImpl(final DOMDataBrokerHandler domDataBrokerHandler) {
+ private final NotificationServiceHandler notificationServiceHandler;
+
+ private final SchemaContextHandler schemaHandler;
+
+ public RestconfStreamsSubscriptionServiceImpl(final DOMDataBrokerHandler domDataBrokerHandler,
+ final NotificationServiceHandler notificationServiceHandler, final SchemaContextHandler schemaHandler) {
this.domDataBrokerHandler = domDataBrokerHandler;
+ this.notificationServiceHandler = notificationServiceHandler;
+ this.schemaHandler = schemaHandler;
}
@Override
- public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
- final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
+ public NormalizedNodeContext subscribeToStream(final String identifier, final UriInfo uriInfo) {
+ boolean startTime_used = false;
+ boolean stopTime_used = false;
+ Date start = null;
+ Date stop = null;
- final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
- mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
- if (ds == null) {
- final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)";
- LOG.debug(msg);
- throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+ for (final Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
+ switch (entry.getKey()) {
+ case "start-time":
+ if (!startTime_used) {
+ startTime_used = true;
+ start = SubscribeToStreamUtil.parseDateFromQueryParam(entry);
+ } else {
+ throw new RestconfDocumentedException("Start-time parameter can be used only once.");
+ }
+ break;
+ case "stop-time":
+ if (!stopTime_used) {
+ stopTime_used = true;
+ stop = SubscribeToStreamUtil.parseDateFromQueryParam(entry);
+ } else {
+ throw new RestconfDocumentedException("Stop-time parameter can be used only once.");
+ }
+ break;
+ default:
+ throw new RestconfDocumentedException("Bad parameter used with notifications: " + entry.getKey());
+ }
}
-
- final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class,
- mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
- if (scope == null) {
- final String msg = "Stream name doesn't contains datastore value (pattern /scope=)";
- LOG.warn(msg);
- throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+ if (!startTime_used && stopTime_used) {
+ throw new RestconfDocumentedException("Stop-time parameter has to be used with start-time parameter.");
+ }
+ URI response = null;
+ if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCR)) {
+ response = SubscribeToStreamUtil.dataSubs(identifier, uriInfo, start, stop, this.domDataBrokerHandler);
+ } else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
+ response = SubscribeToStreamUtil.notifStream(identifier, uriInfo, start, stop,
+ this.notificationServiceHandler);
}
- final String streamName = Notificator.createStreamNameFromUri(identifier);
-
- final ListenerAdapter listener = Notificator.getListenerFor(streamName);
- SubscribeToStreamUtil.registration(ds, scope, listener, this.domDataBrokerHandler.get());
+ if (response != null) {
+ // prepare node with value of location
+ final InstanceIdentifierContext<?> iid =
+ SubscribeToStreamUtil.prepareIIDSubsStreamOutput(this.schemaHandler);
+ final NormalizedNodeAttrBuilder<NodeIdentifier, Object, LeafNode<Object>> builder =
+ ImmutableLeafNodeBuilder.create().withValue(response.toString());
+ builder.withNodeIdentifier(
+ NodeIdentifier.create(QName.create("subscribe:to:notification", "2016-10-28", "location")));
- final int port = SubscribeToStreamUtil.prepareNotificationPort();
+ // prepare new header with location
+ final Map<String, Object> headers = new HashMap<>();
+ headers.put("Location", response);
- final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
- final UriBuilder uriToWebSocketServer = uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
- final URI uri = uriToWebSocketServer.replacePath(streamName).build();
+ return new NormalizedNodeContext(iid, builder.build(), headers);
+ }
- return Response.status(Status.OK).location(uri).build();
+ final String msg = "Bad type of notification of sal-remote";
+ LOG.warn(msg);
+ throw new RestconfDocumentedException(msg);
}
}
package org.opendaylight.restconf.restful.utils;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* </pre>
*
*/
- public static DOMRpcResult createStream(final NormalizedNodeContext payload,
+ public static DOMRpcResult createDataChangeNotifiStream(final NormalizedNodeContext payload,
final SchemaContextRef refSchemaCtx) {
final ContainerNode data = (ContainerNode) payload.getData();
final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
final YangInstanceIdentifier path = preparePath(data, qname);
- final String streamName = prepareStream(path, refSchemaCtx.get(), data);
+ final String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data);
final QName outputQname = QName.create(qname, "output");
final QName streamNameQname = QName.create(qname, "stream-name");
return outputType = outputType == null ? NotificationOutputType.XML : outputType;
}
- private static String prepareStream(final YangInstanceIdentifier path, final SchemaContext schemaContext,
+ private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path, final SchemaContext schemaContext,
final ContainerNode data) {
LogicalDatastoreType ds = parseEnum(data, LogicalDatastoreType.class,
RestconfStreamsConstants.DATASTORE_PARAM_NAME);
DataChangeScope scope = parseEnum(data, DataChangeScope.class, RestconfStreamsConstants.SCOPE_PARAM_NAME);
scope = scope == null ? RestconfStreamsConstants.DEFAULT_SCOPE : scope;
- final String streamName = Notificator
+ final String streamName = RestconfStreamsConstants.DATA_SUBSCR + "/"
+ + Notificator
.createStreamNameFromUri(ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext)
+ RestconfStreamsConstants.DS_URI + ds + RestconfStreamsConstants.SCOPE_URI + scope);
return streamName;
}
return (YangInstanceIdentifier) pathValue;
}
+
+ /**
+ * Create stream with POST operation via RPC
+ *
+ * @param payload
+ * - input of RPC
+ * @param refSchemaCtx
+ * - schemaContext
+ * @return {@link DOMRpcResult}
+ */
+ public static DOMRpcResult createYangNotifiStream(final NormalizedNodeContext payload,
+ final SchemaContextRef refSchemaCtx) {
+ final ContainerNode data = (ContainerNode) payload.getData();
+ LeafSetNode leafSet = null;
+ String outputType = "XML";
+ for (final DataContainerChild<? extends PathArgument, ?> dataChild : data.getValue()) {
+ if (dataChild instanceof LeafSetNode) {
+ leafSet = (LeafSetNode) dataChild;
+ } else if (dataChild instanceof AugmentationNode) {
+ outputType = (String) (((AugmentationNode) dataChild).getValue()).iterator().next().getValue();
+ }
+ }
+
+ final Collection<LeafSetEntryNode> entryNodes = leafSet.getValue();
+ final List<SchemaPath> paths = new ArrayList<>();
+ String streamName = RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM + "/";
+
+ final Iterator<LeafSetEntryNode> iterator = entryNodes.iterator();
+ while (iterator.hasNext()) {
+ final QName valueQName = QName.create((String) iterator.next().getValue());
+ final Module module = refSchemaCtx.findModuleByNamespaceAndRevision(valueQName.getModule().getNamespace(),
+ valueQName.getModule().getRevision());
+ Preconditions.checkNotNull(module,
+ "Module for namespace " + valueQName.getModule().getNamespace() + " does not exist");
+ NotificationDefinition notifiDef = null;
+ for (final NotificationDefinition notification : module.getNotifications()) {
+ if (notification.getQName().equals(valueQName)) {
+ notifiDef = notification;
+ break;
+ }
+ }
+ final String moduleName = module.getName();
+ Preconditions.checkNotNull(notifiDef,
+ "Notification " + valueQName + "doesn't exist in module " + moduleName);
+ paths.add(notifiDef.getPath());
+ streamName = streamName + moduleName + ":" + valueQName.getLocalName();
+ if (iterator.hasNext()) {
+ streamName = streamName + ",";
+ }
+ }
+
+ final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
+ final QName outputQname = QName.create(rpcQName, "output");
+ final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier");
+
+ final ContainerNode output =
+ ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname))
+ .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
+
+ if (!Notificator.existNotificationListenerFor(streamName)) {
+ Notificator.createNotificationListener(paths, streamName, outputType);
+ }
+
+ return new DefaultDOMRpcResult(output);
+ }
}
public static final String SCHEMA_SUBSCIBRE_URI = "ws";
+ public static final CharSequence DATA_SUBSCR = "data-change-event-subscription";
+ public static final CharSequence CREATE_DATA_SUBSCR = "create-" + DATA_SUBSCR;
+
+ public static final CharSequence NOTIFICATION_STREAM = "notification-stream";
+ public static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM;
+
static {
Date eventSubscriptionAugRevision;
try {
*/
package org.opendaylight.restconf.restful.utils;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import java.net.URI;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
+import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
+import org.opendaylight.restconf.handlers.NotificationServiceHandler;
+import org.opendaylight.restconf.handlers.SchemaContextHandler;
import org.opendaylight.restconf.utils.RestconfConstants;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Subscribe to stream util class
*/
public final class SubscribeToStreamUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(SubscribeToStreamUtil.class);
+
private SubscribeToStreamUtil() {
throw new UnsupportedOperationException("Util class");
}
* @param domDataBroker
* - data broker for register data change listener
*/
- public static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
+ private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
if (listener.isListening()) {
return;
*
* @return port
*/
- public static int prepareNotificationPort() {
+ private static int prepareNotificationPort() {
int port = RestconfStreamsConstants.NOTIFICATION_PORT;
try {
final WebSocketServer webSocketServer = WebSocketServer.getInstance();
return port;
}
+ /**
+ * Register listeners by streamName in identifier to listen to yang notifications
+ *
+ * @param identifier
+ * - identifier as stream name
+ * @param uriInfo
+ * - for getting base URI information
+ * @param start
+ * - start-time query parameter
+ * @param stop
+ * - stop-time query parameter
+ * @param notifiServiceHandler
+ * - DOMNotificationService handler for register listeners
+ * @return location for listening
+ */
+ public static URI notifStream(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
+ final NotificationServiceHandler notifiServiceHandler) {
+ final String streamName = Notificator.createStreamNameFromUri(identifier);
+ if (Strings.isNullOrEmpty(streamName)) {
+ throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+ }
+ final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+ if ((listeners == null) || listeners.isEmpty()) {
+ throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
+ ErrorTag.UNKNOWN_ELEMENT);
+ }
+
+ for (final NotificationListenerAdapter listener : listeners) {
+ registerToListenNotification(listener, notifiServiceHandler);
+ listener.setTime(start, stop);
+ }
+
+ final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+ int notificationPort = RestconfStreamsConstants.NOTIFICATION_PORT;
+ try {
+ final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
+ notificationPort = webSocketServerInstance.getPort();
+ } catch (final NullPointerException e) {
+ WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
+ }
+ final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
+ final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
+
+ return uriToWebsocketServer;
+ }
+
+ private static void registerToListenNotification(final NotificationListenerAdapter listener,
+ final NotificationServiceHandler notificationServiceHandler) {
+ if (listener.isListening()) {
+ return;
+ }
+
+ final SchemaPath path = listener.getSchemaPath();
+ final ListenerRegistration<DOMNotificationListener> registration =
+ notificationServiceHandler.get().registerNotificationListener(listener, path);
+
+ listener.setRegistration(registration);
+ }
+
+ /**
+ * Prepare InstanceIdentifierContext for Location leaf
+ *
+ * @param schemaHandler
+ * - schemaContext handler
+ * @return InstanceIdentifier of Location leaf
+ */
+ public static InstanceIdentifierContext<?> prepareIIDSubsStreamOutput(final SchemaContextHandler schemaHandler) {
+ final QName qnameBase = QName.create("subscribe:to:notification", "2016-10-28", "notifi");
+ final DataSchemaNode location = ((ContainerSchemaNode) schemaHandler.get()
+ .findModuleByNamespaceAndRevision(qnameBase.getNamespace(), qnameBase.getRevision())
+ .getDataChildByName(qnameBase)).getDataChildByName(QName.create(qnameBase, "location"));
+ final List<PathArgument> path = new ArrayList<>();
+ path.add(NodeIdentifier.create(qnameBase));
+ path.add(NodeIdentifier.create(QName.create(qnameBase, "location")));
+
+ return new InstanceIdentifierContext<SchemaNode>(YangInstanceIdentifier.create(path), location, null,
+ schemaHandler.get());
+ }
+
+ /**
+ * Register listener by streamName in identifier to listen to yang notifications
+ *
+ * @param identifier
+ * - identifier as stream name
+ * @param uriInfo
+ * - for getting base URI information
+ * @param start
+ * - start-time query parameter
+ * @param stop
+ * - stop-time query parameter
+ * @param domDataBrokerHandler
+ * - DOMDataBroker handler for register listener
+ * @return location for listening
+ */
+ public static URI dataSubs(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
+ final DOMDataBrokerHandler domDataBrokerHandler) {
+ final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
+
+ final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
+ mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
+ if (ds == null) {
+ final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)";
+ LOG.debug(msg);
+ throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+ }
+
+ final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class,
+ mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
+ if (scope == null) {
+ final String msg = "Stream name doesn't contains datastore value (pattern /scope=)";
+ LOG.warn(msg);
+ throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+ }
+
+ final String streamName = Notificator.createStreamNameFromUri(identifier);
+
+ final ListenerAdapter listener = Notificator.getListenerFor(streamName);
+ Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
+
+ listener.setTimer(start, stop);
+
+ SubscribeToStreamUtil.registration(ds, scope, listener, domDataBrokerHandler.get());
+
+ final int port = SubscribeToStreamUtil.prepareNotificationPort();
+
+ final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+ final UriBuilder uriToWebSocketServer =
+ uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
+ return uriToWebSocketServer.replacePath(streamName).build();
+ }
+
+ public static Date parseDateFromQueryParam(final Entry<String, List<String>> entry) {
+ final DateAndTime event = new DateAndTime(entry.getValue().iterator().next());
+ String numOf_ms = "";
+ final String value = event.getValue();
+ if (value.contains(".")) {
+ numOf_ms = numOf_ms + ".";
+ final int lastChar = value.contains("Z") ? value.indexOf("Z") : (value.contains("+") ? value.indexOf("+")
+ : (value.contains("-") ? value.indexOf("-") : value.length()));
+ for (int i = 0; i < (lastChar - value.indexOf(".") - 1); i++) {
+ numOf_ms = numOf_ms + "S";
+ }
+ }
+ String zone = "";
+ if (!value.contains("Z")) {
+ zone = zone + "XXX";
+ }
+ final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" + numOf_ms + zone);
+
+ try {
+ return dateFormatter.parse(value.contains("Z") ? value.replace('T', ' ').substring(0, value.indexOf("Z"))
+ : value.replace('T', ' '));
+ } catch (final ParseException e) {
+ throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e);
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+/**
+ *
+ */
+package org.opendaylight.controller.sal.restconf.impl.test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.rest.common.TestRestconfUtils;
+import org.opendaylight.netconf.sal.restconf.impl.BrokerFacade;
+import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfImpl;
+import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.Notificator;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class RestconfImplNotificationSubscribingTest {
+
+ private final String identifier = "data-change-event-subscription/datastore=OPERATIONAL/scope=ONE";
+
+ @Mock
+ private BrokerFacade broker;
+
+ @Mock
+ private DOMDataBroker domDataBroker;
+
+ @Mock
+ private UriInfo uriInfo;
+
+ @Before
+ public void setup() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ this.broker.setDomDataBroker(this.domDataBroker);
+ RestconfImpl.getInstance().setBroker(this.broker);
+ ControllerContext.getInstance().setGlobalSchema(TestRestconfUtils.loadSchemaContext("/notifications"));
+
+ final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class);
+ Notificator.createListener(path, this.identifier, NotificationOutputType.XML);
+ }
+
+ @Test
+ public void startTimeTest() {
+ final List<Entry<String, List<String>>> list = new ArrayList<>();
+ final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+ Mockito.when(entry.getKey()).thenReturn("start-time");
+ final List<String> time = new ArrayList<>();
+ time.add("2014-10-25T10:02:00Z");
+ Mockito.when(entry.getValue()).thenReturn(time);
+ list.add(entry);
+
+ subscribe(list);
+ Notificator.removeAllListeners();
+ }
+
+ @Test
+ public void milisecsTest() {
+ final List<Entry<String, List<String>>> list = new ArrayList<>();
+ final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+ Mockito.when(entry.getKey()).thenReturn("start-time");
+ final List<String> time = new ArrayList<>();
+ time.add("2014-10-25T10:02:00.12345Z");
+ Mockito.when(entry.getValue()).thenReturn(time);
+ list.add(entry);
+
+ subscribe(list);
+ Notificator.removeAllListeners();
+ }
+
+ @Test
+ public void zonesPlusTest() {
+ final List<Entry<String, List<String>>> list = new ArrayList<>();
+ final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+ Mockito.when(entry.getKey()).thenReturn("start-time");
+ final List<String> time = new ArrayList<>();
+ time.add("2014-10-25T10:02:00+01:00");
+ Mockito.when(entry.getValue()).thenReturn(time);
+ list.add(entry);
+
+ subscribe(list);
+ Notificator.removeAllListeners();
+ }
+
+ @Test
+ public void zonesMinusTest() {
+ final List<Entry<String, List<String>>> list = new ArrayList<>();
+ final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+ Mockito.when(entry.getKey()).thenReturn("start-time");
+ final List<String> time = new ArrayList<>();
+ time.add("2014-10-25T10:02:00-01:00");
+ Mockito.when(entry.getValue()).thenReturn(time);
+ list.add(entry);
+
+ subscribe(list);
+ Notificator.removeAllListeners();
+ }
+
+ @Test
+ public void startAndStopTimeTest() {
+ final List<Entry<String, List<String>>> list = new ArrayList<>();
+ final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+ Mockito.when(entry.getKey()).thenReturn("start-time");
+ final List<String> time = new ArrayList<>();
+ time.add("2014-10-25T10:02:00Z");
+ Mockito.when(entry.getValue()).thenReturn(time);
+
+ final Entry<String, List<String>> entry2 = Mockito.mock(Entry.class);
+ Mockito.when(entry2.getKey()).thenReturn("stop-time");
+ final List<String> time2 = new ArrayList<>();
+ time2.add("2014-10-25T12:31:00Z");
+ Mockito.when(entry2.getValue()).thenReturn(time2);
+
+ list.add(entry);
+ list.add(entry2);
+
+ subscribe(list);
+ Notificator.removeAllListeners();
+ }
+
+ @Test(expected = RestconfDocumentedException.class)
+ public void stopTimeTest() {
+ final List<Entry<String, List<String>>> list = new ArrayList<>();
+ final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+ Mockito.when(entry.getKey()).thenReturn("stop-time");
+ final List<String> time = new ArrayList<>();
+ time.add("2014-10-25T12:31:00Z");
+ Mockito.when(entry.getValue()).thenReturn(time);
+ list.add(entry);
+
+ subscribe(list);
+ Notificator.removeAllListeners();
+ }
+
+ @Test(expected = RestconfDocumentedException.class)
+ public void badParamTest() {
+ final List<Entry<String, List<String>>> list = new ArrayList<>();
+ final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+ Mockito.when(entry.getKey()).thenReturn("time");
+ final List<String> time = new ArrayList<>();
+ time.add("2014-10-25T12:31:00Z");
+ Mockito.when(entry.getValue()).thenReturn(time);
+ list.add(entry);
+
+ subscribe(list);
+ Notificator.removeAllListeners();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void badValueTest() {
+ final List<Entry<String, List<String>>> list = new ArrayList<>();
+ final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+ Mockito.when(entry.getKey()).thenReturn("start-time");
+ final List<String> time = new ArrayList<>();
+ time.add("badvalue");
+ Mockito.when(entry.getValue()).thenReturn(time);
+ list.add(entry);
+
+ subscribe(list);
+ Notificator.removeAllListeners();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void badZonesTest() {
+ final List<Entry<String, List<String>>> list = new ArrayList<>();
+ final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+ Mockito.when(entry.getKey()).thenReturn("start-time");
+ final List<String> time = new ArrayList<>();
+ time.add("2014-10-25T10:02:00Z+1:00");
+ Mockito.when(entry.getValue()).thenReturn(time);
+ list.add(entry);
+
+ subscribe(list);
+ Notificator.removeAllListeners();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void badMilisecsTest() {
+ final List<Entry<String, List<String>>> list = new ArrayList<>();
+ final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+ Mockito.when(entry.getKey()).thenReturn("start-time");
+ final List<String> time = new ArrayList<>();
+ time.add("2014-10-25T10:02:00:0026Z");
+ Mockito.when(entry.getValue()).thenReturn(time);
+ list.add(entry);
+
+ subscribe(list);
+ Notificator.removeAllListeners();
+ }
+
+ @Test
+ public void onNotifiTest() throws Exception {
+ final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class);
+ final ListenerAdapter listener = Notificator.createListener(path, this.identifier, NotificationOutputType.XML);
+
+ final List<Entry<String, List<String>>> list = new ArrayList<>();
+ final Entry<String, List<String>> entry = Mockito.mock(Entry.class);
+ Mockito.when(entry.getKey()).thenReturn("start-time");
+ final List<String> time = new ArrayList<>();
+ time.add("2014-10-25T10:02:00Z");
+ Mockito.when(entry.getValue()).thenReturn(time);
+ list.add(entry);
+
+ subscribe(list);
+
+ final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change =
+ Mockito.mock(AsyncDataChangeEvent.class);
+ Field start = listener.getClass().getDeclaredField("start");
+ start.setAccessible(true);
+ Date startOrig = (Date) start.get(listener);
+ Assert.assertNotNull(startOrig);
+ listener.onDataChanged(change);
+
+ start = listener.getClass().getDeclaredField("start");
+ start.setAccessible(true);
+ startOrig = (Date) start.get(listener);
+ Assert.assertNull(startOrig);
+ }
+
+ private void subscribe(final List<Entry<String, List<String>>> entries) {
+ final MultivaluedMap<String, String> map = Mockito.mock(MultivaluedMap.class);
+ Mockito.when(this.uriInfo.getQueryParameters()).thenReturn(map);
+ final UriBuilder uriBuilder = UriBuilder.fromPath("http://localhost:8181/" + this.identifier);
+ Mockito.when(this.uriInfo.getAbsolutePathBuilder()).thenReturn(uriBuilder);
+ final Set<Entry<String, List<String>>> set = new HashSet<>();
+ for(final Entry<String, List<String>> entry : entries){
+ set.add(entry);
+ }
+ Mockito.when(map.entrySet()).thenReturn(set);
+ RestconfImpl.getInstance().subscribeToStream(this.identifier, this.uriInfo);
+ }
+
+}
import java.io.FileNotFoundException;
import java.net.URI;
import java.text.ParseException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
import java.util.Set;
import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import org.junit.Before;
when(uriBuilder.build()).thenReturn(new URI(""));
when(uriBuilder.scheme("ws")).thenReturn(uriBuilder);
when(uriInfo.getAbsolutePathBuilder()).thenReturn(uriBuilder);
+ final MultivaluedMap<String, String> map = mock(MultivaluedMap.class);
+ final Set<Entry<String, List<String>>> set = new HashSet<>();
+ when(map.entrySet()).thenReturn(set);
+ when(uriInfo.getQueryParameters()).thenReturn(map);
final BrokerFacade brokerFacade = mock(BrokerFacade.class);
this.restconfImpl.setBroker(brokerFacade);
import static org.mockito.Mockito.mock;
import java.lang.reflect.Field;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import javax.ws.rs.core.Response;
+import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.rest.common.TestRestconfUtils;
+import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext;
import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
+import org.opendaylight.restconf.handlers.NotificationServiceHandler;
+import org.opendaylight.restconf.handlers.SchemaContextHandler;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
public class RestconfStreamsSubscriptionServiceImplTest {
private DOMDataBrokerHandler dataBrokerHandler;
@Mock
private UriInfo uriInfo;
+ @Mock
+ private NotificationServiceHandler notificationServiceHandler;
+
+ private final SchemaContextHandler schemaHandler = new SchemaContextHandler();
@Before
- public void setUp() {
+ public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
final DOMDataBroker dataBroker = mock(DOMDataBroker.class);
final ListenerRegistration<DOMDataChangeListener> listener = mock(ListenerRegistration.class);
doReturn(dataBroker).when(this.dataBrokerHandler).get();
doReturn(listener).when(dataBroker).registerDataChangeListener(any(), any(), any(), any());
+ final MultivaluedMap<String, String> map = Mockito.mock(MultivaluedMap.class);
+ final Set<Entry<String, List<String>>> set = new HashSet<>();
+ Mockito.when(map.entrySet()).thenReturn(set);
+ Mockito.when(this.uriInfo.getQueryParameters()).thenReturn(map);
+ this.schemaHandler.onGlobalContextUpdated(TestRestconfUtils.loadSchemaContext("/notifications"));
}
@BeforeClass
final Map<String, ListenerAdapter> listenersByStreamNameSetter = new HashMap<>();
final ListenerAdapter adapter = mock(ListenerAdapter.class);
doReturn(false).when(adapter).isListening();
- listenersByStreamNameSetter.put("toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", adapter);
+ listenersByStreamNameSetter.put(
+ "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
+ adapter);
listenersByStreamName = Notificator.class.getDeclaredField("dataChangeListener");
listenersByStreamName.setAccessible(true);
final UriBuilder uriBuilder = UriBuilder.fromUri(uri);
doReturn(uriBuilder).when(this.uriInfo).getAbsolutePathBuilder();
final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
- new RestconfStreamsSubscriptionServiceImpl(this.dataBrokerHandler);
- final Response response = streamsSubscriptionService
- .subscribeToStream("toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", this.uriInfo);
- assertEquals(200, response.getStatus());
- assertEquals("ws://:8181/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
- response.getHeaderString("Location"));
+ new RestconfStreamsSubscriptionServiceImpl(this.dataBrokerHandler, this.notificationServiceHandler,
+ this.schemaHandler);
+ final NormalizedNodeContext response = streamsSubscriptionService
+ .subscribeToStream(
+ "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
+ this.uriInfo);
+ assertEquals(
+ "ws://:8181/data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
+ response.getNewHeaders().get("Location").toString());
}
@Test(expected = RestconfDocumentedException.class)
final UriBuilder uriBuilder = UriBuilder.fromUri(uri);
doReturn(uriBuilder).when(this.uriInfo).getAbsolutePathBuilder();
final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
- new RestconfStreamsSubscriptionServiceImpl(this.dataBrokerHandler);
+ new RestconfStreamsSubscriptionServiceImpl(this.dataBrokerHandler, this.notificationServiceHandler,
+ this.schemaHandler);
streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/scope=ONE", this.uriInfo);
}
final UriBuilder uriBuilder = UriBuilder.fromUri(uri);
doReturn(uriBuilder).when(this.uriInfo).getAbsolutePathBuilder();
final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
- new RestconfStreamsSubscriptionServiceImpl(this.dataBrokerHandler);
+ new RestconfStreamsSubscriptionServiceImpl(this.dataBrokerHandler, this.notificationServiceHandler,
+ this.schemaHandler);
streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/datastore=OPERATIONAL",
this.uriInfo);
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-
import java.util.Collections;
import java.util.Set;
import org.junit.Before;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
- refSchemaCtx = new SchemaContextRef(TestRestconfUtils.loadSchemaContext(PATH_FOR_NEW_SCHEMA_CONTEXT));
+ this.refSchemaCtx = new SchemaContextRef(TestRestconfUtils.loadSchemaContext(PATH_FOR_NEW_SCHEMA_CONTEXT));
}
@Test
public void createStreamTest() {
- payload = prepareDomPayload("create-data-change-event-subscription", "input", "toaster", "path");
- final DOMRpcResult result = CreateStreamUtil.createStream(payload, refSchemaCtx);
+ this.payload = prepareDomPayload("create-data-change-event-subscription", "input", "toaster", "path");
+ final DOMRpcResult result = CreateStreamUtil.createDataChangeNotifiStream(this.payload, this.refSchemaCtx);
assertEquals(result.getErrors(), Collections.emptyList());
final NormalizedNode<?, ?> testedNn = result.getResult();
assertNotNull(testedNn);
- final NormalizedNodeContext contextRef = prepareDomPayload("create-data-change-event-subscription", "output", "toaster:toaster/datastore=CONFIGURATION/scope=BASE", "stream-name");
+ final NormalizedNodeContext contextRef = prepareDomPayload("create-data-change-event-subscription", "output",
+ "data-change-event-subscription/toaster:toaster/datastore=CONFIGURATION/scope=BASE", "stream-name");
assertEquals(contextRef.getData(), testedNn);
}
@Test(expected = RestconfDocumentedException.class)
public void createStreamWrongValueTest() {
- payload = prepareDomPayload("create-data-change-event-subscription", "input", "String value", "path");
- final DOMRpcResult result = CreateStreamUtil.createStream(payload, refSchemaCtx);
+ this.payload = prepareDomPayload("create-data-change-event-subscription", "input", "String value", "path");
+ final DOMRpcResult result = CreateStreamUtil.createDataChangeNotifiStream(this.payload, this.refSchemaCtx);
assertEquals(result.getErrors(), Collections.emptyList());
}
@Test(expected = RestconfDocumentedException.class)
public void createStreamWrongInputRpcTest() {
- payload = prepareDomPayload("create-data-change-event-subscription2", "input", "toaster", "path2");
- final DOMRpcResult result = CreateStreamUtil.createStream(payload, refSchemaCtx);
+ this.payload = prepareDomPayload("create-data-change-event-subscription2", "input", "toaster", "path2");
+ final DOMRpcResult result = CreateStreamUtil.createDataChangeNotifiStream(this.payload, this.refSchemaCtx);
assertEquals(result.getErrors(), Collections.emptyList());
}
- private NormalizedNodeContext prepareDomPayload(final String rpcName, final String inputOutput, final String toasterValue, final String inputOutputName) {
- final SchemaContext schema = refSchemaCtx.get();
+ private NormalizedNodeContext prepareDomPayload(final String rpcName, final String inputOutput,
+ final String toasterValue, final String inputOutputName) {
+ final SchemaContext schema = this.refSchemaCtx.get();
final Module rpcModule = schema.findModuleByName("sal-remote", null);
assertNotNull(rpcModule);
final QName rpcQName = QName.create(rpcModule.getQNameModule(), rpcName);
}
assertNotNull(rpcInputSchemaNode);
- final DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode> container = Builders.containerBuilder(rpcInputSchemaNode);
+ final DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode> container =
+ Builders.containerBuilder(rpcInputSchemaNode);
final QName lfQName = QName.create(rpcModule.getQNameModule(), inputOutputName);
final DataSchemaNode lfSchemaNode = rpcInputSchemaNode.getDataChildByName(lfQName);
.withValue(o)).build();
container.withChild(lfNode);
- return new NormalizedNodeContext(new InstanceIdentifierContext<>(null, rpcInputSchemaNode, null, schema), container.build());
+ return new NormalizedNodeContext(new InstanceIdentifierContext<>(null, rpcInputSchemaNode, null, schema),
+ container.build());
}
}
--- /dev/null
+module subscribe-to-notification {
+
+ yang-version 1;
+ namespace "subscribe:to:notification";
+ prefix "subs-to-notifi";
+
+ description
+ "Added input parameters to rpc create-data-change-event-subscription and to create-notification-stream";
+
+ revision "2016-10-28" {
+ }
+
+ container "notifi"{
+ leaf "location"{
+ type string;
+ }
+ }
+}