public NormalizedNodeContext subscribeToStream(final String identifier, final UriInfo uriInfo) {
boolean startTime_used = false;
boolean stopTime_used = false;
+ boolean filter_used = false;
Date start = null;
Date stop = null;
+ String filter = null;
for (final Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
switch (entry.getKey()) {
throw new RestconfDocumentedException("Stop-time parameter can be used only once.");
}
break;
+ case "filter":
+ if (!filter_used) {
+ filter_used = true;
+ filter = entry.getValue().iterator().next();
+ } else {
+ throw new RestconfDocumentedException("Filter parameter can be used only once.");
+ }
+ break;
default:
throw new RestconfDocumentedException("Bad parameter used with notifications: " + entry.getKey());
}
}
URI response = null;
if (identifier.contains(DATA_SUBSCR)) {
- response = dataSubs(identifier, uriInfo, start, stop);
+ response = dataSubs(identifier, uriInfo, start, stop, filter);
} else if (identifier.contains(NOTIFICATION_STREAM)) {
- response = notifStream(identifier, uriInfo, start, stop);
+ response = notifStream(identifier, uriInfo, start, stop, filter);
}
if(response != null){
* - stop-time of getting notification
* @param start
* - start-time of getting notification
- * @return {@link Response}
+ * @param filter
+ * - indicate wh ich subset of allpossible events are of interest
+ * @return {@link URI} of location
*/
- private URI notifStream(final String identifier, final UriInfo uriInfo, final Date start, final Date stop) {
+ private URI notifStream(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
+ final String filter) {
final String streamName = Notificator.createStreamNameFromUri(identifier);
if (Strings.isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
for (final NotificationListenerAdapter listener : listeners) {
this.broker.registerToListenNotification(listener);
- listener.setTime(start, stop);
+ listener.setQueryParams(start, stop, filter);
}
final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
* - start-time of getting notification
* @param start
* - stop-time of getting notification
- * @return {@link Response}
+ * @param filter
+ * - indicate which subset of all possible events are of interest
+ * @return {@link URI} of location
*/
- private URI dataSubs(final String identifier, final UriInfo uriInfo, final Date start, final Date stop) {
+ private URI dataSubs(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
+ final String filter) {
final String streamName = Notificator.createStreamNameFromUri(identifier);
if (Strings.isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
if (listener == null) {
throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
}
- listener.setTimer(start, stop);
+ listener.setQueryParams(start, stop, filter);
final Map<String, String> paramToValues = resolveValuesFromUri(identifier);
final LogicalDatastoreType datastore = parserURIEnumParameter(LogicalDatastoreType.class,
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
-import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import javax.xml.transform.dom.DOMResult;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathFactory;
import org.json.JSONObject;
import org.json.XML;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
+import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
+import org.xml.sax.InputSource;
/**
* {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
private final NotificationOutputType outputType;
private Date start = null;
private Date stop = null;
+ private String filter = null;
/**
* Creates new {@link ListenerAdapter} listener specified by path and stream
final Date now = new Date();
if (this.stop != null) {
if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
- prepareAndPostData(change);
+ checkFilter(change);
}
if (this.stop.compareTo(now) < 0) {
try {
} else if (this.start != null) {
if (this.start.compareTo(now) < 0) {
this.start = null;
- prepareAndPostData(change);
+ checkFilter(change);
}
} else {
- prepareAndPostData(change);
+ checkFilter(change);
}
}
- private void prepareAndPostData(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- if (!change.getCreatedData().isEmpty() || !change.getUpdatedData().isEmpty()
- || !change.getRemovedPaths().isEmpty()) {
- final String xml = prepareXmlFrom(change);
+ /**
+ * Check if is filter used and then prepare and post data do client
+ *
+ * @param change
+ * - data of notification
+ */
+ private void checkFilter(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ final String xml = prepareXmlFrom(change);
+ if (this.filter == null) {
+ prepareAndPostData(xml);
+ } else {
+ try {
+ if (parseFilterParam(xml)) {
+ prepareAndPostData(xml);
+ }
+ } catch (final Exception e) {
+ throw new RestconfDocumentedException("Problem while parsing filter.", e);
+ }
+ }
+ }
+
+ /**
+ * Parse and evaluate filter value by xml
+ *
+ * @param xml
+ * - notification data in xml
+ * @return true or false - depends on filter expression and data of
+ * notifiaction
+ * @throws Exception
+ */
+ private boolean parseFilterParam(final String xml) throws Exception {
+ final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ final DocumentBuilder builder = factory.newDocumentBuilder();
+ final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
+ final XPath xPath = XPathFactory.newInstance().newXPath();
+ return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
+ }
+
+ /**
+ * Prepare data of notification and data to client
+ *
+ * @param xml
+ */
+ private void prepareAndPostData(final String xml) {
final Event event = new Event(EventType.NOTIFY);
if (this.outputType.equals(NotificationOutputType.JSON)) {
final JSONObject jsonObject = XML.toJSONObject(xml);
event.setData(xml);
}
this.eventBus.post(event);
- }
}
/**
* {@link Element}
*/
private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) {
- // Map< key = namespace, value = prefix>
- final Map<String, String> prefixes = new HashMap<>();
final YangInstanceIdentifier normalizedPath = ControllerContext.getInstance().toXpathRepresentation(path);
final StringBuilder textContent = new StringBuilder();
- // FIXME: BUG-1281: this is duplicated code from yangtools (BUG-1275)
for (final PathArgument pathArgument : normalizedPath.getPathArguments()) {
if (pathArgument instanceof YangInstanceIdentifier.AugmentationIdentifier) {
continue;
}
textContent.append("/");
- writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes);
+ writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType());
if (pathArgument instanceof NodeIdentifierWithPredicates) {
final Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
for (final QName keyValue : predicates.keySet()) {
final String predicateValue = String.valueOf(predicates.get(keyValue));
textContent.append("[");
- writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes);
+ writeIdentifierWithNamespacePrefix(element, textContent, keyValue);
textContent.append("='");
textContent.append(predicateValue);
textContent.append("'");
* StringBuilder
* @param qName
* QName
- * @param prefixes
- * Map of namespaces and prefixes.
*/
private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
- final QName qName, final Map<String, String> prefixes) {
- final String namespace = qName.getNamespace().toString();
- String prefix = prefixes.get(namespace);
- if (prefix == null) {
- prefix = generateNewPrefix(prefixes.values());
- }
-
- element.setAttribute("xmlns:" + prefix, namespace);
- textContent.append(prefix);
- prefixes.put(namespace, prefix);
+ final QName qName) {
+ final Module module = ControllerContext.getInstance().getGlobalSchema()
+ .findModuleByNamespaceAndRevision(qName.getNamespace(), qName.getRevision());
+ textContent.append(module.getName());
textContent.append(":");
textContent.append(qName.getLocalName());
}
* - start-time of getting notification
* @param stop
* - stop-time of getting notification
+ * @param filter
+ * - indicate which subset of all possible events are of interest
*/
- public void setTimer(final Date start, final Date stop) {
+ public void setQueryParams(final Date start, final Date stop, final String filter) {
this.start = start;
this.stop = stop;
+ this.filter = filter;
}
}
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.Executors;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
import javax.xml.transform.dom.DOMResult;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathFactory;
import org.json.JSONObject;
import org.json.XML;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
+import org.xml.sax.InputSource;
/**
* {@link NotificationListenerAdapter} is responsible to track events on
private final String outputType;
private Date start = null;
private Date stop = null;
+ private String filter;
/**
* Set path of listener and stream name, register event bus.
final Date now = new Date();
if (this.stop != null) {
if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
- prepareAndPostData(notification);
+ checkFilter(notification);
}
if (this.stop.compareTo(now) < 0) {
try {
} else if (this.start != null) {
if (this.start.compareTo(now) < 0) {
this.start = null;
- prepareAndPostData(notification);
+ checkFilter(notification);
}
} else {
- prepareAndPostData(notification);
+ checkFilter(notification);
}
}
/**
+ * Check if is filter used and then prepare and post data do client
+ *
* @param notification
+ * - data of notification
*/
- private void prepareAndPostData(final DOMNotification notification) {
+ private void checkFilter(final DOMNotification notification) {
final String xml = prepareXmlFrom(notification);
+ if (this.filter == null) {
+ prepareAndPostData(xml);
+ } else {
+ try {
+ if (parseFilterParam(xml)) {
+ prepareAndPostData(xml);
+ }
+ } catch (final Exception e) {
+ throw new RestconfDocumentedException("Problem while parsing filter.", e);
+ }
+ }
+ }
+
+ /**
+ * Parse and evaluate filter value by xml
+ *
+ * @param xml
+ * - notification data in xml
+ * @return true or false - depends on filter expression and data of
+ * notifiaction
+ * @throws Exception
+ */
+ private boolean parseFilterParam(final String xml) throws Exception {
+ final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ final DocumentBuilder builder = factory.newDocumentBuilder();
+ final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
+ final XPath xPath = XPathFactory.newInstance().newXPath();
+ return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
+ }
+
+ /**
+ * Prepare data of notification and data to client
+ *
+ * @param xml
+ */
+ private void prepareAndPostData(final String xml) {
final Event event = new Event(EventType.NOTIFY);
if (this.outputType.equals("JSON")) {
final JSONObject jsonObject = XML.toJSONObject(xml);
* - start-time of getting notification
* @param stop
* - stop-time of getting notification
+ * @param filter
+ * - indicate which subset of all possible events are of interest
*/
- public void setTime(final Date start, final Date stop) {
+ public void setQueryParams(final Date start, final Date stop, final String filter) {
this.start = start;
this.stop = stop;
+ this.filter = filter;
}
}
public NormalizedNodeContext subscribeToStream(final String identifier, final UriInfo uriInfo) {
boolean startTime_used = false;
boolean stopTime_used = false;
+ boolean filter_used = false;
Date start = null;
Date stop = null;
+ String filter = null;
for (final Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
switch (entry.getKey()) {
throw new RestconfDocumentedException("Stop-time parameter can be used only once.");
}
break;
+ case "filter":
+ if (!filter_used) {
+ filter_used = true;
+ filter = entry.getValue().iterator().next();
+ }
+ break;
default:
throw new RestconfDocumentedException("Bad parameter used with notifications: " + entry.getKey());
}
}
URI response = null;
if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCR)) {
- response = SubscribeToStreamUtil.dataSubs(identifier, uriInfo, start, stop, this.domDataBrokerHandler);
+ response =
+ SubscribeToStreamUtil.dataSubs(identifier, uriInfo, start, stop, this.domDataBrokerHandler, filter);
} else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
response = SubscribeToStreamUtil.notifStream(identifier, uriInfo, start, stop,
- this.notificationServiceHandler);
+ this.notificationServiceHandler, filter);
}
if (response != null) {
* - stop-time query parameter
* @param notifiServiceHandler
* - DOMNotificationService handler for register listeners
+ * @param filter
+ * - indicate which subset of all possible events are of interest
* @return location for listening
*/
public static URI notifStream(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
- final NotificationServiceHandler notifiServiceHandler) {
+ final NotificationServiceHandler notifiServiceHandler, final String filter) {
final String streamName = Notificator.createStreamNameFromUri(identifier);
if (Strings.isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
for (final NotificationListenerAdapter listener : listeners) {
registerToListenNotification(listener, notifiServiceHandler);
- listener.setTime(start, stop);
+ listener.setQueryParams(start, stop, filter);
}
final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
* - stop-time query parameter
* @param domDataBrokerHandler
* - DOMDataBroker handler for register listener
+ * @param filter
+ * - indicate which subset of all possible events are of interest
* @return location for listening
*/
public static URI dataSubs(final String identifier, final UriInfo uriInfo, final Date start, final Date stop,
- final DOMDataBrokerHandler domDataBrokerHandler) {
+ final DOMDataBrokerHandler domDataBrokerHandler, final String filter) {
final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
final ListenerAdapter listener = Notificator.getListenerFor(streamName);
Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
- listener.setTimer(start, stop);
+ listener.setQueryParams(start, stop, filter);
SubscribeToStreamUtil.registration(ds, scope, listener, domDataBrokerHandler.get());
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.impl.test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.lang.reflect.Method;
+import java.util.Collection;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.md.sal.rest.common.TestRestconfUtils;
+import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.Notificator;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public class ExpressionParserTest {
+
+ private Collection<File> xmls;
+
+ @Before
+ public void setup() throws Exception {
+ this.xmls = TestRestconfUtils.loadFiles("/notifications/xml/output/");
+ }
+
+ @Test
+ public void trueDownFilterTest() throws Exception {
+ final boolean parser =
+ parser("notification/data-changed-notification/data-change-event/data/toasterStatus='down'",
+ "data_change_notification_toaster_status_DOWN.xml");
+ Assert.assertTrue(parser);
+ }
+
+ @Test
+ public void falseDownFilterTest() throws Exception {
+ final boolean parser =
+ parser("notification/data-changed-notification/data-change-event/data/toasterStatus='up'",
+ "data_change_notification_toaster_status_DOWN.xml");
+ Assert.assertFalse(parser);
+ }
+
+ @Test
+ public void trueNumberEqualsFilterTest() throws Exception {
+ final boolean parser = parser(
+ "notification/data-changed-notification/data-change-event/data/toasterStatus=1",
+ "data_change_notification_toaster_status_NUMBER.xml");
+ Assert.assertTrue(parser);
+ }
+
+ @Test
+ public void falseNumberEqualsFilterTest() throws Exception {
+ final boolean parser = parser("notification/data-changed-notification/data-change-event/data/toasterStatus=0",
+ "data_change_notification_toaster_status_NUMBER.xml");
+ Assert.assertFalse(parser);
+ }
+
+ @Test
+ public void trueNumberLessFilterTest() throws Exception {
+ final boolean parser = parser("notification/data-changed-notification/data-change-event/data/toasterStatus<2",
+ "data_change_notification_toaster_status_NUMBER.xml");
+ Assert.assertTrue(parser);
+ }
+
+ @Test
+ public void falseNumberLessFilterTest() throws Exception {
+ final boolean parser = parser("notification/data-changed-notification/data-change-event/data/toasterStatus<0",
+ "data_change_notification_toaster_status_NUMBER.xml");
+ Assert.assertFalse(parser);
+ }
+
+ @Test
+ public void trueNumberLessEqualsFilterTest() throws Exception {
+ final boolean parser = parser("notification/data-changed-notification/data-change-event/data/toasterStatus<=2",
+ "data_change_notification_toaster_status_NUMBER.xml");
+ Assert.assertTrue(parser);
+ }
+
+ @Test
+ public void falseNumberLessEqualsFilterTest() throws Exception {
+ final boolean parser = parser("notification/data-changed-notification/data-change-event/data/toasterStatus<=-1",
+ "data_change_notification_toaster_status_NUMBER.xml");
+ Assert.assertFalse(parser);
+ }
+
+ @Test
+ public void trueNumberGreaterFilterTest() throws Exception {
+ final boolean parser = parser("notification/data-changed-notification/data-change-event/data/toasterStatus>0",
+ "data_change_notification_toaster_status_NUMBER.xml");
+ Assert.assertTrue(parser);
+ }
+
+ @Test
+ public void falseNumberGreaterFilterTest() throws Exception {
+ final boolean parser = parser("notification/data-changed-notification/data-change-event/data/toasterStatus>5",
+ "data_change_notification_toaster_status_NUMBER.xml");
+ Assert.assertFalse(parser);
+ }
+
+ @Test
+ public void trueNumberGreaterEqualsFilterTest() throws Exception {
+ final boolean parser = parser("notification/data-changed-notification/data-change-event/data/toasterStatus>=0",
+ "data_change_notification_toaster_status_NUMBER.xml");
+ Assert.assertTrue(parser);
+ }
+
+ @Test
+ public void falseNumberGreaterEqualsFilterTest() throws Exception {
+ final boolean parser = parser("notification/data-changed-notification/data-change-event/data/toasterStatus>=5",
+ "data_change_notification_toaster_status_NUMBER.xml");
+ Assert.assertFalse(parser);
+ }
+
+ private boolean parser(final String filter, final String fileName) throws Exception {
+ File xml = null;
+ for (final File file : this.xmls) {
+ if (file.getName().equals(fileName)) {
+ xml = file;
+ }
+ }
+ final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class);
+ final ListenerAdapter listener = Notificator.createListener(path, "streamName", NotificationOutputType.JSON);
+ listener.setQueryParams(null, null, filter);
+ final Method m = listener.getClass().getDeclaredMethod("parseFilterParam", String.class);
+ m.setAccessible(true);
+
+ return (boolean) m.invoke(listener, readFile(xml));
+ }
+
+ private String readFile(final File xml) throws Exception {
+ final BufferedReader br = new BufferedReader(new FileReader(xml));
+ try {
+ final StringBuilder sb = new StringBuilder();
+ String line = br.readLine();
+
+ while (line != null) {
+ sb.append(line);
+ sb.append("\n");
+ line = br.readLine();
+ }
+ return sb.toString();
+ } finally {
+ br.close();
+ }
+ }
+
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">
+ <eventTime>2016-11-10T04:45:31+01:00</eventTime>
+ <data-changed-notification xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote">
+ <data-change-event>
+ <path>/toaster:toaster/toaster:toasterStatus</path>
+ <operation>updated</operation>
+ <data>
+ <toasterStatus xmlns="http://netconfcentral.org/ns/toaster">down</toasterStatus>
+ </data>
+ </data-change-event>
+ </data-changed-notification>
+</notification>
\ No newline at end of file
--- /dev/null
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">
+ <eventTime>2016-11-10T04:45:31+01:00</eventTime>
+ <data-changed-notification xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote">
+ <data-change-event>
+ <path>/toaster:toaster/toaster:toasterStatus</path>
+ <operation>updated</operation>
+ <data>
+ <toasterStatus xmlns="http://netconfcentral.org/ns/toaster">1</toasterStatus>
+ </data>
+ </data-change-event>
+ </data-changed-notification>
+</notification>
\ No newline at end of file