--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import io.netty.channel.Channel;
+import io.netty.util.internal.ConcurrentSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Features of subscribing part of both notifications
+ */
+abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
+
+ private final Set<Channel> subscribers = new ConcurrentSet<>();
+ private final EventBus eventBus;
+
+ @SuppressWarnings("rawtypes")
+ private EventBusChangeRecorder eventBusChangeRecorder;
+ @SuppressWarnings("rawtypes")
+ private ListenerRegistration registration;
+
+ /**
+ * Creating {@link EventBus}
+ */
+ protected AbstractCommonSubscriber() {
+ this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+ }
+
+ @Override
+ public final boolean hasSubscribers() {
+ return !this.subscribers.isEmpty();
+ }
+
+ @Override
+ public final Set<Channel> getSubscribers() {
+ return this.subscribers;
+ }
+
+ @Override
+ public final void close() throws Exception {
+ this.registration.close();
+ this.registration = null;
+
+ deleteDataInDS();
+ unregister();
+ }
+
+ /**
+ * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
+ * subscriber to the event and post event into event bus.
+ *
+ * @param subscriber
+ * Channel
+ */
+ public void addSubscriber(final Channel subscriber) {
+ if (!subscriber.isActive()) {
+ LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
+ }
+ final Event event = new Event(EventType.REGISTER);
+ event.setSubscriber(subscriber);
+ this.eventBus.post(event);
+ }
+
+ /**
+ * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
+ * subscriber to the event and posts event into event bus.
+ *
+ * @param subscriber
+ */
+ public void removeSubscriber(final Channel subscriber) {
+ LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
+ final Event event = new Event(EventType.DEREGISTER);
+ event.setSubscriber(subscriber);
+ this.eventBus.post(event);
+ }
+
+ /**
+ * Sets {@link ListenerRegistration} registration.
+ *
+ * @param registration
+ * DOMDataChangeListener registration
+ */
+ @SuppressWarnings("rawtypes")
+ public void setRegistration(final ListenerRegistration registration) {
+ this.registration = registration;
+ }
+
+ /**
+ * Checks if {@link ListenerRegistration} registration exist.
+ *
+ * @return True if exist, false otherwise.
+ */
+ public boolean isListening() {
+ return this.registration == null ? false : true;
+ }
+
+ /**
+ * Creating and registering {@link EventBusChangeRecorder} of specific
+ * listener on {@link EventBus}
+ *
+ * @param listener
+ * - specific listener of notifications
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected <T extends BaseListenerInterface> void register(final T listener) {
+ this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
+ this.eventBus.register(this.eventBusChangeRecorder);
+ }
+
+ /**
+ * Post event to event bus
+ *
+ * @param event
+ * - data of incoming notifications
+ */
+ protected void post(final Event event) {
+ this.eventBus.post(event);
+ }
+
+ /**
+ * Removes all subscribers and unregisters event bus change recorder form
+ * event bus
+ */
+ protected void unregister() {
+ this.subscribers.clear();
+ this.eventBus.unregister(this.eventBusChangeRecorder);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.dom.DOMResult;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.restconf.Draft18.MonitoringModule;
+import org.opendaylight.restconf.handlers.SchemaContextHandler;
+import org.opendaylight.restconf.handlers.TransactionChainHandler;
+import org.opendaylight.restconf.parser.IdentifierCodec;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Abstract class for processing and preparing data
+ *
+ */
+abstract class AbstractNotificationsData {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationsData.class);
+
+ private TransactionChainHandler transactionChainHandler;
+ private SchemaContextHandler schemaHandler;
+ private String localName;
+
+ /**
+ * Transaction chain for delete data in DS on close()
+ *
+ * @param transactionChainHandler
+ * - creating new write transaction for delete data on close
+ * @param schemaHandler
+ * - for getting schema to deserialize
+ * {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to
+ * {@link YangInstanceIdentifier}
+ */
+ public void setCloseVars(final TransactionChainHandler transactionChainHandler,
+ final SchemaContextHandler schemaHandler) {
+ this.transactionChainHandler = transactionChainHandler;
+ this.schemaHandler = schemaHandler;
+ }
+
+ /**
+ * Delete data in DS
+ */
+ protected void deleteDataInDS() throws Exception {
+ final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction();
+ wTx.delete(LogicalDatastoreType.OPERATIONAL, IdentifierCodec
+ .deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + this.localName, this.schemaHandler.get()));
+ wTx.submit().checkedGet();
+ }
+
+ /**
+ * Set localName of last path element of specific listener
+ *
+ * @param localName
+ * - local name
+ */
+ protected void setLocalNameOfPath(final String localName) {
+ this.localName = localName;
+ }
+
+ /**
+ * Formats data specified by RFC3339.
+ *
+ * @param d
+ * Date
+ * @return Data specified by RFC3339.
+ */
+ protected static String toRFC3339(final Date d) {
+ return ListenersConstants.RFC3339_PATTERN.matcher(ListenersConstants.RFC3339.format(d)).replaceAll("$1:$2");
+ }
+
+ /**
+ * Creates {@link Document} document.
+ *
+ * @return {@link Document} document.
+ */
+ protected static Document createDocument() {
+ final DocumentBuilder bob;
+ try {
+ bob = ListenersConstants.DBF.newDocumentBuilder();
+ } catch (final ParserConfigurationException e) {
+ return null;
+ }
+ return bob.newDocument();
+ }
+
+ /**
+ * Write normalized node to {@link DOMResult}
+ *
+ * @param normalized
+ * - data
+ * @param context
+ * - actual schema context
+ * @param schemaPath
+ * - schema path of data
+ * @return {@link DOMResult}
+ */
+ protected DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final SchemaContext context,
+ final SchemaPath schemaPath) throws IOException, XMLStreamException {
+ final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
+ final Document doc = XmlDocumentUtils.getDocument();
+ final DOMResult result = new DOMResult(doc);
+ NormalizedNodeWriter normalizedNodeWriter = null;
+ NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
+ XMLStreamWriter writer = null;
+
+ try {
+ writer = XML_FACTORY.createXMLStreamWriter(result);
+ normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, schemaPath);
+ normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
+
+ normalizedNodeWriter.write(normalized);
+
+ normalizedNodeWriter.flush();
+ } finally {
+ if (normalizedNodeWriter != null) {
+ normalizedNodeWriter.close();
+ }
+ if (normalizedNodeStreamWriter != null) {
+ normalizedNodeStreamWriter.close();
+ }
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Generating base element of every notification
+ *
+ * @param doc
+ * - base {@link Document}
+ * @return element of {@link Document}
+ */
+ protected Element basePartDoc(final Document doc) {
+ final Element notificationElement =
+ doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0", "notification");
+
+ doc.appendChild(notificationElement);
+
+ final Element eventTimeElement = doc.createElement("eventTime");
+ eventTimeElement.setTextContent(toRFC3339(new Date()));
+ notificationElement.appendChild(eventTimeElement);
+
+ return notificationElement;
+ }
+
+ /**
+ * Generating of {@link Document} transforming to string
+ *
+ * @param doc
+ * - {@link Document} with data
+ * @return - string from {@link Document}
+ */
+ protected String transformDoc(final Document doc) {
+ try {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final Transformer transformer = ListenersConstants.FACTORY.newTransformer();
+ transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
+ transformer.setOutputProperty(OutputKeys.METHOD, "xml");
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+ transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
+ transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
+ transformer.transform(new DOMSource(doc),
+ new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
+ final byte[] charData = out.toByteArray();
+ return new String(charData, "UTF-8");
+ } catch (TransformerException | UnsupportedEncodingException e) {
+ final String msg = "Error during transformation of Document into String";
+ LOG.error(msg, e);
+ return msg;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import java.io.StringReader;
+import java.util.Date;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathFactory;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.w3c.dom.Document;
+import org.xml.sax.InputSource;
+
+/**
+ * Features of query parameters part of both notifications
+ *
+ */
+abstract class AbstractQueryParams extends AbstractNotificationsData {
+
+ protected Date start = null;
+ protected Date stop = null;
+ protected String filter = null;
+
+ private String xml;
+
+ /**
+ * Set query parameters for listener
+ *
+ * @param start
+ * - start-time of getting notification
+ * @param stop
+ * - stop-time of getting notification
+ * @param filter
+ * - indicate which subset of all possible events are of interest
+ */
+ public void setQueryParams(final Date start, final Date stop, final String filter) {
+ this.start = start;
+ this.stop = stop;
+ this.filter = filter;
+ }
+
+ /**
+ * Checking query parameters on specific notification
+ *
+ * @param xml
+ * - data of notification
+ * @param listener
+ * - listener of notification
+ * @return true if notification meets the requirements of query parameters,
+ * false otherwise
+ */
+ protected <T extends BaseListenerInterface> boolean checkQueryParams(final String xml, final T listener) {
+ this.xml = xml;
+ final Date now = new Date();
+ if (this.stop != null) {
+ if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
+ return checkFilter();
+ }
+ if (this.stop.compareTo(now) < 0) {
+ try {
+ listener.close();
+ } catch (final Exception e) {
+ throw new RestconfDocumentedException("Problem with unregister listener." + e);
+ }
+ }
+ } else if (this.start != null) {
+ if (this.start.compareTo(now) < 0) {
+ this.start = null;
+ return checkFilter();
+ }
+ } else {
+ return checkFilter();
+ }
+ return false;
+ }
+
+ /**
+ * Check if is filter used and then prepare and post data do client
+ *
+ * @param change
+ * - data of notification
+ */
+ private boolean checkFilter() {
+ if (this.filter == null) {
+ return true;
+ } else {
+ try {
+ return parseFilterParam();
+ } catch (final Exception e) {
+ throw new RestconfDocumentedException("Problem while parsing filter.", e);
+ }
+ }
+ }
+
+ /**
+ * Parse and evaluate filter value by xml
+ *
+ * @return true or false - depends on filter expression and data of
+ * notifiaction
+ * @throws Exception
+ */
+ private boolean parseFilterParam() throws Exception {
+ final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ final DocumentBuilder builder = factory.newDocumentBuilder();
+ final Document docOfXml = builder.parse(new InputSource(new StringReader(this.xml)));
+ final XPath xPath = XPathFactory.newInstance().newXPath();
+ return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import io.netty.channel.Channel;
+import java.util.Set;
+
+/**
+ * Base interface for both listeners({@link ListenerAdapter},
+ * {@link NotificationListenerAdapter})
+ */
+interface BaseListenerInterface extends AutoCloseable {
+
+ /**
+ * Return all subscribers of listener
+ *
+ * @return set of subscribers
+ */
+ Set<Channel> getSubscribers();
+
+ /**
+ * Checks if exists at least one {@link Channel} subscriber.
+ *
+ * @return True if exist at least one {@link Channel} subscriber, false
+ * otherwise.
+ */
+ boolean hasSubscribers();
+
+ /**
+ * Get name of stream
+ *
+ * @return stream name
+ */
+ String getStreamName();
+
+ /**
+ * Get output type
+ *
+ * @return outputType
+ */
+ String getOutputType();
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import io.netty.channel.Channel;
+
+/**
+ * Represents event of specific {@link EventType} type, holds data and
+ * {@link Channel} subscriber.
+ */
+class Event {
+ private final EventType type;
+ private Channel subscriber;
+ private String data;
+
+ /**
+ * Creates new event specified by {@link EventType} type.
+ *
+ * @param type
+ * EventType
+ */
+ public Event(final EventType type) {
+ this.type = type;
+ }
+
+ /**
+ * Gets the {@link Channel} subscriber.
+ *
+ * @return Channel
+ */
+ public Channel getSubscriber() {
+ return this.subscriber;
+ }
+
+ /**
+ * Sets subscriber for event.
+ *
+ * @param subscriber
+ * Channel
+ */
+ public void setSubscriber(final Channel subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ /**
+ * Gets event String.
+ *
+ * @return String representation of event data.
+ */
+ public String getData() {
+ return this.data;
+ }
+
+ /**
+ * Sets event data.
+ *
+ * @param data
+ * String.
+ */
+ public void setData(final String data) {
+ this.data = data;
+ }
+
+ /**
+ * Gets event type.
+ *
+ * @return The type of the event.
+ */
+ public EventType getType() {
+ return this.type;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import com.google.common.eventbus.Subscribe;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class EventBusChangeRecorder<T extends BaseListenerInterface> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EventBusChangeRecorder.class);
+ private final T listener;
+
+ /**
+ * Event bus change recorder of specific listener of notifications
+ *
+ * @param listener
+ * - specific listener
+ */
+ EventBusChangeRecorder(final T listener) {
+ this.listener = listener;
+ }
+
+ @Subscribe
+ public void recordCustomerChange(final Event event) {
+ if (event.getType() == EventType.REGISTER) {
+ final Channel subscriber = event.getSubscriber();
+ if (!this.listener.getSubscribers().contains(subscriber)) {
+ this.listener.getSubscribers().add(subscriber);
+ }
+ } else if (event.getType() == EventType.DEREGISTER) {
+ this.listener.getSubscribers().remove(event.getSubscriber());
+ Notificator.removeListenerIfNoSubscriberExists(this.listener);
+ } else if (event.getType() == EventType.NOTIFY) {
+ for (final Channel subscriber : this.listener.getSubscribers()) {
+ if (subscriber.isActive()) {
+ LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
+ subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
+ } else {
+ LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
+ this.listener.getSubscribers().remove(subscriber);
+ }
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+/**
+ * Type of the event.
+ */
+enum EventType {
+ REGISTER, DEREGISTER, NOTIFY;
+}
package org.opendaylight.netconf.sal.streams.listeners;
import com.google.common.base.Preconditions;
-import com.google.common.eventbus.AsyncEventBus;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import io.netty.channel.Channel;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.util.internal.ConcurrentSet;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.StringReader;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.Date;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.regex.Pattern;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamWriter;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMResult;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathFactory;
import org.json.JSONObject;
import org.json.XML;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
-import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
-import org.opendaylight.restconf.Draft18.MonitoringModule;
-import org.opendaylight.restconf.handlers.SchemaContextHandler;
-import org.opendaylight.restconf.handlers.TransactionChainHandler;
-import org.opendaylight.restconf.parser.IdentifierCodec;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
-import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
-import org.xml.sax.InputSource;
/**
- * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
+ * {@link ListenerAdapter} is responsible to track events, which occurred by
+ * changing data in data source.
*/
-public class ListenerAdapter implements DOMDataChangeListener {
+public class ListenerAdapter extends AbstractCommonSubscriber implements DOMDataChangeListener {
private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
- private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
- private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
- private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
-
- private static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
private final YangInstanceIdentifier path;
- private ListenerRegistration<DOMDataChangeListener> registration;
private final String streamName;
- private Set<Channel> subscribers = new ConcurrentSet<>();
- private final EventBus eventBus;
- private final EventBusChangeRecorder eventBusChangeRecorder;
private final NotificationOutputType outputType;
- private Date start = null;
- private Date stop = null;
- private String filter = null;
- private TransactionChainHandler transactionChainHandler;
- private SchemaContextHandler schemaHandler;
+
+ private AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
/**
* Creates new {@link ListenerAdapter} listener specified by path and stream
- * name.
+ * name and register for subscribing
*
* @param path
* Path to data in data store.
*/
ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
final NotificationOutputType outputType) {
- this.outputType = outputType;
- Preconditions.checkNotNull(path);
+ super();
+ register(this);
+ setLocalNameOfPath(path.getLastPathArgument().getNodeType().getLocalName());
+
+ this.outputType = Preconditions.checkNotNull(outputType);
+ this.path = Preconditions.checkNotNull(path);
Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
- this.path = path;
this.streamName = streamName;
- this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
- this.eventBusChangeRecorder = new EventBusChangeRecorder();
- this.eventBus.register(this.eventBusChangeRecorder);
}
@Override
public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- final Date now = new Date();
- if (this.stop != null) {
- if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
- checkFilter(change);
- }
- if (this.stop.compareTo(now) < 0) {
- try {
- this.close();
- } catch (final Exception e) {
- throw new RestconfDocumentedException("Problem with unregister listener." + e);
- }
- }
- } else if (this.start != null) {
- if (this.start.compareTo(now) < 0) {
- this.start = null;
- checkFilter(change);
- }
- } else {
- checkFilter(change);
+ this.change = change;
+ final String xml = prepareXml();
+ if (checkQueryParams(xml, this)) {
+ prepareAndPostData(xml);
}
}
/**
- * Check if is filter used and then prepare and post data do client
+ * Gets the name of the stream.
*
- * @param change
- * - data of notification
+ * @return The name of the stream.
*/
- private void checkFilter(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- final String xml = prepareXmlFrom(change);
- if (this.filter == null) {
- prepareAndPostData(xml);
- } else {
- try {
- if (parseFilterParam(xml)) {
- prepareAndPostData(xml);
- }
- } catch (final Exception e) {
- throw new RestconfDocumentedException("Problem while parsing filter.", e);
- }
- }
+ @Override
+ public String getStreamName() {
+ return this.streamName;
+ }
+
+ @Override
+ public String getOutputType() {
+ return this.outputType.getName();
}
/**
- * Parse and evaluate filter value by xml
+ * Get path pointed to data in data store.
*
- * @param xml
- * - notification data in xml
- * @return true or false - depends on filter expression and data of
- * notifiaction
- * @throws Exception
+ * @return Path pointed to data in data store.
*/
- private boolean parseFilterParam(final String xml) throws Exception {
- final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
- final DocumentBuilder builder = factory.newDocumentBuilder();
- final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
- final XPath xPath = XPathFactory.newInstance().newXPath();
- return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
+ public YangInstanceIdentifier getPath() {
+ return this.path;
}
/**
* @param xml
*/
private void prepareAndPostData(final String xml) {
- final Event event = new Event(EventType.NOTIFY);
- if (this.outputType.equals(NotificationOutputType.JSON)) {
- final JSONObject jsonObject = XML.toJSONObject(xml);
- event.setData(jsonObject.toString());
- } else {
- event.setData(xml);
- }
- this.eventBus.post(event);
- }
-
- /**
- * Tracks events of data change by customer.
- */
- private final class EventBusChangeRecorder {
- @Subscribe
- public void recordCustomerChange(final Event event) {
- if (event.getType() == EventType.REGISTER) {
- final Channel subscriber = event.getSubscriber();
- if (!ListenerAdapter.this.subscribers.contains(subscriber)) {
- ListenerAdapter.this.subscribers.add(subscriber);
- }
- } else if (event.getType() == EventType.DEREGISTER) {
- ListenerAdapter.this.subscribers.remove(event.getSubscriber());
- Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
- } else if (event.getType() == EventType.NOTIFY) {
- for (final Channel subscriber : ListenerAdapter.this.subscribers) {
- if (subscriber.isActive()) {
- LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
- subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
- } else {
- LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
- ListenerAdapter.this.subscribers.remove(subscriber);
- }
- }
- }
- }
- }
-
- /**
- * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber.
- */
- private final class Event {
- private final EventType type;
- private Channel subscriber;
- private String data;
-
- /**
- * Creates new event specified by {@link EventType} type.
- *
- * @param type
- * EventType
- */
- public Event(final EventType type) {
- this.type = type;
- }
-
- /**
- * Gets the {@link Channel} subscriber.
- *
- * @return Channel
- */
- public Channel getSubscriber() {
- return this.subscriber;
- }
-
- /**
- * Sets subscriber for event.
- *
- * @param subscriber
- * Channel
- */
- public void setSubscriber(final Channel subscriber) {
- this.subscriber = subscriber;
- }
-
- /**
- * Gets event String.
- *
- * @return String representation of event data.
- */
- public String getData() {
- return this.data;
- }
-
- /**
- * Sets event data.
- *
- * @param data String.
- */
- public void setData(final String data) {
- this.data = data;
- }
-
- /**
- * Gets event type.
- *
- * @return The type of the event.
- */
- public EventType getType() {
- return this.type;
+ final Event event = new Event(EventType.NOTIFY);
+ if (this.outputType.equals(NotificationOutputType.JSON)) {
+ final JSONObject jsonObject = XML.toJSONObject(xml);
+ event.setData(jsonObject.toString());
+ } else {
+ event.setData(xml);
}
+ post(event);
}
/**
- * Type of the event.
+ * Tracks events of data change by customer.
*/
- private enum EventType {
- REGISTER,
- DEREGISTER,
- NOTIFY;
- }
/**
* Prepare data in printable form and transform it to String.
* DataChangeEvent
* @return Data in printable form.
*/
- private String prepareXmlFrom(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ private String prepareXml() {
final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
- final DataSchemaContextTree dataContextTree = DataSchemaContextTree.from(schemaContext);
+ final DataSchemaContextTree dataContextTree = DataSchemaContextTree.from(schemaContext);
final Document doc = createDocument();
- final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
- "notification");
-
- doc.appendChild(notificationElement);
-
- final Element eventTimeElement = doc.createElement("eventTime");
- eventTimeElement.setTextContent(toRFC3339(new Date()));
- notificationElement.appendChild(eventTimeElement);
+ final Element notificationElement = basePartDoc(doc);
final Element dataChangedNotificationEventElement = doc.createElementNS(
"urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
- addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change,
+ addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, this.change,
schemaContext, dataContextTree);
notificationElement.appendChild(dataChangedNotificationEventElement);
-
- try {
- final ByteArrayOutputStream out = new ByteArrayOutputStream();
- final Transformer transformer = FACTORY.newTransformer();
- transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
- transformer.setOutputProperty(OutputKeys.METHOD, "xml");
- transformer.setOutputProperty(OutputKeys.INDENT, "yes");
- transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
- transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
- transformer.transform(new DOMSource(doc),
- new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
- final byte[] charData = out.toByteArray();
- return new String(charData, "UTF-8");
- } catch (TransformerException | UnsupportedEncodingException e) {
- final String msg = "Error during transformation of Document into String";
- LOG.error(msg, e);
- return msg;
- }
- }
-
- /**
- * Formats data specified by RFC3339.
- *
- * @param d
- * Date
- * @return Data specified by RFC3339.
- */
- public static String toRFC3339(final Date d) {
- return RFC3339_PATTERN.matcher(RFC3339.format(d)).replaceAll("$1:$2");
- }
-
- /**
- * Creates {@link Document} document.
- * @return {@link Document} document.
- */
- public static Document createDocument() {
- final DocumentBuilder bob;
- try {
- bob = DBF.newDocumentBuilder();
- } catch (final ParserConfigurationException e) {
- return null;
- }
- return bob.newDocument();
+ return transformDoc(doc);
}
/**
private void addValuesToDataChangedNotificationEventElement(final Document doc,
final Element dataChangedNotificationEventElement,
final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change,
- final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
+ final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
addCreatedChangedValuesFromDataToElement(doc, change.getCreatedData().entrySet(),
- dataChangedNotificationEventElement,
- Operation.CREATED, schemaContext, dataSchemaContextTree);
+ dataChangedNotificationEventElement, Operation.CREATED, schemaContext, dataSchemaContextTree);
addCreatedChangedValuesFromDataToElement(doc, change.getUpdatedData().entrySet(),
- dataChangedNotificationEventElement,
- Operation.UPDATED, schemaContext, dataSchemaContextTree);
+ dataChangedNotificationEventElement, Operation.UPDATED, schemaContext, dataSchemaContextTree);
addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement,
Operation.DELETED);
}
}
- private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set<Entry<YangInstanceIdentifier,
- NormalizedNode<?,?>>> data, final Element element, final Operation operation, final SchemaContext
- schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
+ private void addCreatedChangedValuesFromDataToElement(final Document doc,
+ final Set<Entry<YangInstanceIdentifier, NormalizedNode<?, ?>>> data, final Element element,
+ final Operation operation, final SchemaContext schemaContext,
+ final DataSchemaContextTree dataSchemaContextTree) {
if ((data == null) || data.isEmpty()) {
return;
}
return dataChangeEventElement;
}
- private Node createCreatedChangedDataChangeEventElement(final Document doc, final Entry<YangInstanceIdentifier,
- NormalizedNode<?, ?>> entry, final Operation operation, final SchemaContext
- schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
+ private Node createCreatedChangedDataChangeEventElement(final Document doc,
+ final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry, final Operation operation,
+ final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
final Element dataChangeEventElement = doc.createElement("data-change-event");
final Element pathElement = doc.createElement("path");
final YangInstanceIdentifier path = entry.getKey();
dataChangeEventElement.appendChild(operationElement);
try {
- final DOMResult domResult = writeNormalizedNode(entry.getValue(), path,
- schemaContext, dataSchemaContextTree);
+ SchemaPath nodePath;
+ final NormalizedNode<?, ?> normalized = entry.getValue();
+ if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) {
+ nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
+ } else {
+ nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
+ }
+ final DOMResult domResult = writeNormalizedNode(normalized, schemaContext, nodePath);
final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
final Element dataElement = doc.createElement("data");
dataElement.appendChild(result);
return dataChangeEventElement;
}
- private static DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized,
- final YangInstanceIdentifier path, final SchemaContext context,
- final DataSchemaContextTree dataSchemaContextTree)
- throws IOException, XMLStreamException {
- final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
- final Document doc = XmlDocumentUtils.getDocument();
- final DOMResult result = new DOMResult(doc);
- NormalizedNodeWriter normalizedNodeWriter = null;
- NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
- XMLStreamWriter writer = null;
- final SchemaPath nodePath;
-
- if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) {
- nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
- } else {
- nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
- }
-
- try {
- writer = XML_FACTORY.createXMLStreamWriter(result);
- normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, nodePath);
- normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
-
- normalizedNodeWriter.write(normalized);
-
- normalizedNodeWriter.flush();
- } finally {
- if (normalizedNodeWriter != null) {
- normalizedNodeWriter.close();
- }
- if (normalizedNodeStreamWriter != null) {
- normalizedNodeStreamWriter.close();
- }
- if (writer != null) {
- writer.close();
- }
- }
-
- return result;
- }
-
/**
* Adds path as value to element.
*
* @param element
* {@link Element}
*/
+ @SuppressWarnings("rawtypes")
private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) {
final YangInstanceIdentifier normalizedPath = ControllerContext.getInstance().toXpathRepresentation(path);
final StringBuilder textContent = new StringBuilder();
}
/**
- * Gets path pointed to data in data store.
- *
- * @return Path pointed to data in data store.
- */
- public YangInstanceIdentifier getPath() {
- return this.path;
- }
-
- /**
- * Sets {@link ListenerRegistration} registration.
- *
- * @param registration DOMDataChangeListener registration
- */
- public void setRegistration(final ListenerRegistration<DOMDataChangeListener> registration) {
- this.registration = registration;
- }
-
- /**
- * Gets the name of the stream.
- *
- * @return The name of the stream.
- */
- public String getStreamName() {
- return this.streamName;
- }
-
- /**
- * Removes all subscribers and unregisters event bus change recorder form
- * event bus and delete data in DS
- */
- public void close() throws Exception {
- final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction();
- wTx.delete(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY
- + this.path.getLastPathArgument().getNodeType().getLocalName(), this.schemaHandler.get()));
- wTx.submit().checkedGet();
-
- this.subscribers = new ConcurrentSet<>();
- this.registration.close();
- this.registration = null;
- this.eventBus.unregister(this.eventBusChangeRecorder);
- }
-
- /**
- * Checks if {@link ListenerRegistration} registration exist.
- *
- * @return True if exist, false otherwise.
- */
- public boolean isListening() {
- return this.registration == null ? false : true;
- }
-
- /**
- * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into
- * event bus.
- *
- * @param subscriber
- * Channel
- */
- public void addSubscriber(final Channel subscriber) {
- if (!subscriber.isActive()) {
- LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
- }
- final Event event = new Event(EventType.REGISTER);
- event.setSubscriber(subscriber);
- this.eventBus.post(event);
- }
-
- /**
- * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event
- * into event bus.
- *
- * @param subscriber
- */
- public void removeSubscriber(final Channel subscriber) {
- LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
- final Event event = new Event(EventType.DEREGISTER);
- event.setSubscriber(subscriber);
- this.eventBus.post(event);
- }
-
- /**
- * Checks if exists at least one {@link Channel} subscriber.
- *
- * @return True if exist at least one {@link Channel} subscriber, false otherwise.
- */
- public boolean hasSubscribers() {
- return !this.subscribers.isEmpty();
- }
-
- /**
- * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
- */
- private static enum Store {
- CONFIG("config"),
- OPERATION("operation");
-
- private final String value;
-
- private Store(final String value) {
- this.value = value;
- }
- }
-
- /**
- * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
+ * Consists of three types {@link Operation#CREATED},
+ * {@link Operation#UPDATED} and {@link Operation#DELETED}.
*/
private static enum Operation {
- CREATED("created"),
- UPDATED("updated"),
- DELETED("deleted");
+ CREATED("created"), UPDATED("updated"), DELETED("deleted");
private final String value;
this.value = value;
}
}
-
- /**
- * Set query parameters for listener
- *
- * @param start
- * - start-time of getting notification
- * @param stop
- * - stop-time of getting notification
- * @param filter
- * - indicate which subset of all possible events are of interest
- */
- public void setQueryParams(final Date start, final Date stop, final String filter) {
- this.start = start;
- this.stop = stop;
- this.filter = filter;
- }
-
- /**
- * Get output type
- *
- * @return outputType
- */
- public String getOutputType() {
- return this.outputType.getName();
- }
-
- /**
- * Transaction chain to delete data in DS on close()
- *
- * @param transactionChainHandler
- * - creating new write transaction to delete data on close
- * @param schemaHandler
- * - for getting schema to deserialize
- * {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to
- * {@link YangInstanceIdentifier}
- */
- public void setCloseVars(final TransactionChainHandler transactionChainHandler,
- final SchemaContextHandler schemaHandler) {
- this.transactionChainHandler = transactionChainHandler;
- this.schemaHandler = schemaHandler;
- }
-
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import java.text.SimpleDateFormat;
+import java.util.regex.Pattern;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.TransformerFactory;
+
+/**
+ * Constants of listeners
+ */
+class ListenersConstants {
+
+ static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
+ static final TransformerFactory FACTORY = TransformerFactory.newInstance();
+ static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
+
+ static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
+}
*/
package org.opendaylight.netconf.sal.streams.listeners;
-import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
-import com.google.common.eventbus.AsyncEventBus;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import io.netty.channel.Channel;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.util.internal.ConcurrentSet;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.StringReader;
import java.io.StringWriter;
-import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.util.Collection;
import java.util.Date;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamWriter;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMResult;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathFactory;
import org.json.JSONObject;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
-import org.opendaylight.restconf.Draft18.MonitoringModule;
-import org.opendaylight.restconf.handlers.SchemaContextHandler;
-import org.opendaylight.restconf.handlers.TransactionChainHandler;
-import org.opendaylight.restconf.parser.IdentifierCodec;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactory;
import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
import org.opendaylight.yangtools.yang.data.codec.gson.JsonWriterFactory;
-import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
-import org.xml.sax.InputSource;
/**
* {@link NotificationListenerAdapter} is responsible to track events on
* notifications.
*
*/
-public class NotificationListenerAdapter implements DOMNotificationListener {
+public class NotificationListenerAdapter extends AbstractCommonSubscriber implements DOMNotificationListener {
private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
- private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
private final String streamName;
- private final EventBus eventBus;
- private final EventBusChangeRecorder eventBusChangeRecorder;
-
private final SchemaPath path;
private final String outputType;
- private Date start = null;
- private Date stop = null;
- private String filter;
private SchemaContext schemaContext;
private DOMNotification notification;
- private ListenerRegistration<DOMNotificationListener> registration;
- private Set<Channel> subscribers = new ConcurrentSet<>();
- private TransactionChainHandler transactionChainHandler;
- private SchemaContextHandler schemaHandler;
/**
* Set path of listener and stream name, register event bus.
* - type of output on notification (JSON, XML)
*/
NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
- this.outputType = outputType;
+ super();
+ register(this);
+ setLocalNameOfPath(path.getLastComponent().getLocalName());
+
+ this.outputType = Preconditions.checkNotNull(outputType);
+ this.path = Preconditions.checkNotNull(path);
Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
- Preconditions.checkArgument(path != null);
- this.path = path;
this.streamName = streamName;
- this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
- this.eventBusChangeRecorder = new EventBusChangeRecorder();
- this.eventBus.register(this.eventBusChangeRecorder);
+ }
+
+ /**
+ * Get outputType of listenere
+ *
+ * @return the outputType
+ */
+ @Override
+ public String getOutputType() {
+ return this.outputType;
}
@Override
public void onNotification(final DOMNotification notification) {
this.schemaContext = ControllerContext.getInstance().getGlobalSchema();
this.notification = notification;
- final Date now = new Date();
- if (this.stop != null) {
- if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
- checkFilter();
- }
- if (this.stop.compareTo(now) < 0) {
- try {
- this.close();
- } catch (final Exception e) {
- throw new RestconfDocumentedException("Problem with unregister listener." + e);
- }
- }
- } else if (this.start != null) {
- if (this.start.compareTo(now) < 0) {
- this.start = null;
- checkFilter();
- }
- } else {
- checkFilter();
+
+ final String xml = prepareXml();
+ if (checkQueryParams(xml, this)) {
+ prepareAndPostData(xml);
}
}
/**
- * Check if is filter used and then prepare and post data do client
+ * Get stream name of this listener
*
+ * @return {@link String}
*/
- private void checkFilter() {
- final String xml = prepareXml();
- if (this.filter == null) {
- prepareAndPostData(xml);
- } else {
- try {
- if (parseFilterParam(xml)) {
- prepareAndPostData(xml);
- }
- } catch (final Exception e) {
- throw new RestconfDocumentedException("Problem while parsing filter.", e);
- }
- }
+ @Override
+ public String getStreamName() {
+ return this.streamName;
}
/**
- * Parse and evaluate filter value by xml
+ * Get schema path of notification
*
- * @param xml
- * - notification data in xml
- * @return true or false - depends on filter expression and data of
- * notifiaction
- * @throws Exception
+ * @return {@link SchemaPath}
*/
- private boolean parseFilterParam(final String xml) throws Exception {
- final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
- final DocumentBuilder builder = factory.newDocumentBuilder();
- final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
- final XPath xPath = XPathFactory.newInstance().newXPath();
- return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
+ public SchemaPath getSchemaPath() {
+ return this.path;
}
/**
} else {
event.setData(xml);
}
- this.eventBus.post(event);
+ post(event);
}
/**
return writer.toString();
}
- /**
- * Checks if exists at least one {@link Channel} subscriber.
- *
- * @return True if exist at least one {@link Channel} subscriber, false
- * otherwise.
- */
- public boolean hasSubscribers() {
- return !this.subscribers.isEmpty();
- }
-
- /**
- * Reset lists, close registration and unregister bus event and delete data in DS.
- */
- public void close() {
- final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction();
- wTx.delete(LogicalDatastoreType.OPERATIONAL,
- IdentifierCodec.deserialize(
- MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + this.path.getLastComponent().getLocalName(),
- this.schemaHandler.get()));
- try {
- wTx.submit().checkedGet();
- } catch (final TransactionCommitFailedException e) {
- throw new RestconfDocumentedException("Problem while deleting data from DS.", e);
- }
-
- this.subscribers = new ConcurrentSet<>();
- this.registration.close();
- this.registration = null;
- this.eventBus.unregister(this.eventBusChangeRecorder);
- }
-
- /**
- * Get stream name of this listener
- *
- * @return {@link String}
- */
- public String getStreamName() {
- return this.streamName;
- }
-
- /**
- * Check if is this listener registered.
- *
- * @return - true if is registered, otherwise null
- */
- public boolean isListening() {
- return this.registration == null ? false : true;
- }
-
- /**
- * Get schema path of notification
- *
- * @return {@link SchemaPath}
- */
- public SchemaPath getSchemaPath() {
- return this.path;
- }
-
- /**
- * Set registration for close after closing connection and check if this
- * listener is registered
- *
- * @param registration
- * - registered listener
- */
- public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
- Preconditions.checkNotNull(registration);
- this.registration = registration;
- }
-
- /**
- * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
- * subscriber to the event and post event into event bus.
- *
- * @param subscriber
- * Channel
- */
- public void addSubscriber(final Channel subscriber) {
- if (!subscriber.isActive()) {
- LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
- }
- final Event event = new Event(EventType.REGISTER);
- event.setSubscriber(subscriber);
- this.eventBus.post(event);
- }
-
- /**
- * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
- * subscriber to the event and posts event into event bus.
- *
- * @param subscriber
- */
- public void removeSubscriber(final Channel subscriber) {
- LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
- final Event event = new Event(EventType.DEREGISTER);
- event.setSubscriber(subscriber);
- this.eventBus.post(event);
- }
-
private String prepareXml() {
- final Document doc = ListenerAdapter.createDocument();
- final Element notificationElement =
- doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
- "notification");
- doc.appendChild(notificationElement);
-
- final Element eventTimeElement = doc.createElement("eventTime");
- eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
- notificationElement.appendChild(eventTimeElement);
+ final Document doc = createDocument();
+ final Element notificationElement = basePartDoc(doc);
final Element notificationEventElement = doc.createElementNS(
"urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
addValuesToNotificationEventElement(doc, notificationEventElement, this.notification, this.schemaContext);
notificationElement.appendChild(notificationEventElement);
- try {
- final ByteArrayOutputStream out = new ByteArrayOutputStream();
- final Transformer transformer = FACTORY.newTransformer();
- transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
- transformer.setOutputProperty(OutputKeys.METHOD, "xml");
- transformer.setOutputProperty(OutputKeys.INDENT, "yes");
- transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
- transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
- transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
- final byte[] charData = out.toByteArray();
- return new String(charData, "UTF-8");
- } catch (TransformerException | UnsupportedEncodingException e) {
- final String msg = "Error during transformation of Document into String";
- LOG.error(msg, e);
- return msg;
- }
+ return transformDoc(doc);
}
private void addValuesToNotificationEventElement(final Document doc, final Element element,
return;
}
- final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
- .getBody();
+ final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body =
+ notification.getBody();
try {
- final DOMResult domResult = writeNormalizedNode(body,
- YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
+
+ final DOMResult domResult = writeNormalizedNode(body, schemaContext, this.path);
final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
final Element dataElement = doc.createElement("notification");
dataElement.appendChild(result);
LOG.error("Error processing stream", e);
}
}
-
- private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
- final SchemaContext context) throws IOException, XMLStreamException {
- final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
- final Document doc = XmlDocumentUtils.getDocument();
- final DOMResult result = new DOMResult(doc);
- NormalizedNodeWriter normalizedNodeWriter = null;
- NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
- XMLStreamWriter writer = null;
-
- try {
- writer = XML_FACTORY.createXMLStreamWriter(result);
- normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
- this.getSchemaPath());
- normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
-
- normalizedNodeWriter.write(normalized);
-
- normalizedNodeWriter.flush();
- } finally {
- if (normalizedNodeWriter != null) {
- normalizedNodeWriter.close();
- }
- if (normalizedNodeStreamWriter != null) {
- normalizedNodeStreamWriter.close();
- }
- if (writer != null) {
- writer.close();
- }
- }
-
- return result;
- }
-
- /**
- * Tracks events of data change by customer.
- */
- private final class EventBusChangeRecorder {
- @Subscribe
- public void recordCustomerChange(final Event event) {
- if (event.getType() == EventType.REGISTER) {
- final Channel subscriber = event.getSubscriber();
- if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
- NotificationListenerAdapter.this.subscribers.add(subscriber);
- }
- } else if (event.getType() == EventType.DEREGISTER) {
- NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
- Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
- } else if (event.getType() == EventType.NOTIFY) {
- for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
- if (subscriber.isActive()) {
- LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
- subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
- } else {
- LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
- NotificationListenerAdapter.this.subscribers.remove(subscriber);
- }
- }
- }
- }
- }
-
- /**
- * Represents event of specific {@link EventType} type, holds data and
- * {@link Channel} subscriber.
- */
- private final class Event {
- private final EventType type;
- private Channel subscriber;
- private String data;
-
- /**
- * Creates new event specified by {@link EventType} type.
- *
- * @param type
- * EventType
- */
- public Event(final EventType type) {
- this.type = type;
- }
-
- /**
- * Gets the {@link Channel} subscriber.
- *
- * @return Channel
- */
- public Channel getSubscriber() {
- return this.subscriber;
- }
-
- /**
- * Sets subscriber for event.
- *
- * @param subscriber
- * Channel
- */
- public void setSubscriber(final Channel subscriber) {
- this.subscriber = subscriber;
- }
-
- /**
- * Gets event String.
- *
- * @return String representation of event data.
- */
- public String getData() {
- return this.data;
- }
-
- /**
- * Sets event data.
- *
- * @param data
- * String.
- */
- public void setData(final String data) {
- this.data = data;
- }
-
- /**
- * Gets event type.
- *
- * @return The type of the event.
- */
- public EventType getType() {
- return this.type;
- }
- }
-
- /**
- * Type of the event.
- */
- private enum EventType {
- REGISTER, DEREGISTER, NOTIFY
- }
-
- /**
- * Set query parameters for listener
- *
- * @param start
- * - start-time of getting notification
- * @param stop
- * - stop-time of getting notification
- * @param filter
- * - indicate which subset of all possible events are of interest
- */
- public void setQueryParams(final Date start, final Date stop, final String filter) {
- this.start = start;
- this.stop = stop;
- this.filter = filter;
- }
-
- /**
- * Get outputType of listenere
- *
- * @return the outputType
- */
- public String getOutputType() {
- return this.outputType;
- }
-
- /**
- * Transaction chain to delete data in DS on close()
- *
- * @param transactionChainHandler
- * - creating new write transaction to delete data on close
- * @param schemaHandler
- * - for getting schema to deserialize
- * {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to
- * {@link YangInstanceIdentifier}
- */
- public void setCloseVars(final TransactionChainHandler transactionChainHandler,
- final SchemaContextHandler schemaHandler) {
- this.transactionChainHandler = transactionChainHandler;
- this.schemaHandler = schemaHandler;
-
- }
}
}
}
- /**
- * Checks if listener has at least one subscriber. In case it doesn't have any, delete listener.
- *
- * @param listener
- * ListenerAdapter
- */
- public static void removeListenerIfNoSubscriberExists(final ListenerAdapter listener) {
- if (!listener.hasSubscribers()) {
- deleteListener(listener);
- }
- }
-
/**
* Delete {@link ListenerAdapter} listener specified in parameter.
*
+ * @param <T>
+ *
* @param listener
* ListenerAdapter
*/
- private static void deleteListener(final ListenerAdapter listener) {
+ private static <T extends BaseListenerInterface> void deleteListener(final T listener) {
if (listener != null) {
try {
listener.close();
return listListeners;
}
- public static void removeNotificationListenerIfNoSubscriberExists(final NotificationListenerAdapter listener) {
+ public static <T extends BaseListenerInterface> void removeListenerIfNoSubscriberExists(final T listener) {
if (!listener.hasSubscribers()) {
- deleteNotificationListener(listener);
+ if (listener instanceof NotificationListenerAdapter) {
+ deleteNotificationListener(listener);
+ } else {
+ deleteListener(listener);
+ }
}
}
- private static void deleteNotificationListener(final NotificationListenerAdapter listener) {
+ private static <T extends BaseListenerInterface> void deleteNotificationListener(final T listener) {
if (listener != null) {
try {
listener.close();
ref.get(), Optional.of(this.domMountPointServiceHandler.get()));
mountPoint = mountPointIdentifier.getMountPoint();
modules = ref.getModules(mountPoint);
-
} else {
final String errMsg =
"URI has bad format. If operations behind mount point should be showed, URI has to end with ";
import java.util.Map;
import java.util.Map.Entry;
import javax.ws.rs.core.UriInfo;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext;
import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeAttrBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(RestconfStreamsSubscriptionServiceImpl.class);
- private final DOMDataBrokerHandler domDataBrokerHandler;
-
- private final NotificationServiceHandler notificationServiceHandler;
-
- private final SchemaContextHandler schemaHandler;
-
- private final TransactionChainHandler transactionChainHandler;
+ private final HandlersHolder handlersHolder;
+ /**
+ * Initialize holder of handlers with holders as parameters.
+ *
+ * @param domDataBrokerHandler
+ * - handler of {@link DOMDataBroker}
+ * @param notificationServiceHandler
+ * - handler of {@link DOMNotificationService}
+ * @param schemaHandler
+ * - handler of {@link SchemaContext}
+ * @param transactionChainHandler
+ * - handler of {@link DOMTransactionChain}
+ */
public RestconfStreamsSubscriptionServiceImpl(final DOMDataBrokerHandler domDataBrokerHandler,
final NotificationServiceHandler notificationServiceHandler, final SchemaContextHandler schemaHandler,
final TransactionChainHandler transactionChainHandler) {
- this.domDataBrokerHandler = domDataBrokerHandler;
- this.notificationServiceHandler = notificationServiceHandler;
- this.schemaHandler = schemaHandler;
- this.transactionChainHandler = transactionChainHandler;
+ this.handlersHolder = new HandlersHolder(domDataBrokerHandler, notificationServiceHandler,
+ transactionChainHandler, schemaHandler);
}
@Override
public NormalizedNodeContext subscribeToStream(final String identifier, final UriInfo uriInfo) {
- boolean startTime_used = false;
- boolean stopTime_used = false;
- boolean filter_used = false;
- Date start = null;
- Date stop = null;
- String filter = null;
-
- for (final Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
- switch (entry.getKey()) {
- case "start-time":
- if (!startTime_used) {
- startTime_used = true;
- start = SubscribeToStreamUtil.parseDateFromQueryParam(entry);
- } else {
- throw new RestconfDocumentedException("Start-time parameter can be used only once.");
- }
- break;
- case "stop-time":
- if (!stopTime_used) {
- stopTime_used = true;
- stop = SubscribeToStreamUtil.parseDateFromQueryParam(entry);
- } else {
- throw new RestconfDocumentedException("Stop-time parameter can be used only once.");
- }
- break;
- case "filter":
- if (!filter_used) {
- filter_used = true;
- filter = entry.getValue().iterator().next();
- }
- break;
- default:
- throw new RestconfDocumentedException("Bad parameter used with notifications: " + entry.getKey());
- }
- }
- if (!startTime_used && stopTime_used) {
- throw new RestconfDocumentedException("Stop-time parameter has to be used with start-time parameter.");
- }
+ final NotificationQueryParams notificationQueryParams = new NotificationQueryParams();
+ notificationQueryParams.prepareParams(uriInfo);
+
URI response = null;
if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCR)) {
- response =
- SubscribeToStreamUtil.notifiDataStream(identifier, uriInfo, start, stop, this.domDataBrokerHandler, filter,
- this.transactionChainHandler, this.schemaHandler);
+ response = SubscribeToStreamUtil.notifiDataStream(identifier, uriInfo, notificationQueryParams,
+ this.handlersHolder);
} else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
- response = SubscribeToStreamUtil.notifYangStream(identifier, uriInfo, start, stop,
- this.notificationServiceHandler, filter, this.transactionChainHandler, this.schemaHandler);
+ response = SubscribeToStreamUtil.notifYangStream(identifier, uriInfo, notificationQueryParams,
+ this.handlersHolder);
}
if (response != null) {
// prepare node with value of location
final InstanceIdentifierContext<?> iid =
- SubscribeToStreamUtil.prepareIIDSubsStreamOutput(this.schemaHandler);
+ SubscribeToStreamUtil.prepareIIDSubsStreamOutput(this.handlersHolder.getSchemaHandler());
final NormalizedNodeAttrBuilder<NodeIdentifier, Object, LeafNode<Object>> builder =
ImmutableLeafNodeBuilder.create().withValue(response.toString());
builder.withNodeIdentifier(
LOG.warn(msg);
throw new RestconfDocumentedException(msg);
}
+
+ /**
+ * Holder of all handlers for notifications
+ */
+ public final class HandlersHolder {
+
+ private final DOMDataBrokerHandler domDataBrokerHandler;
+ private final NotificationServiceHandler notificationServiceHandler;
+ private final TransactionChainHandler transactionChainHandler;
+ private final SchemaContextHandler schemaHandler;
+
+ private HandlersHolder(final DOMDataBrokerHandler domDataBrokerHandler,
+ final NotificationServiceHandler notificationServiceHandler,
+ final TransactionChainHandler transactionChainHandler, final SchemaContextHandler schemaHandler) {
+ this.domDataBrokerHandler = domDataBrokerHandler;
+ this.notificationServiceHandler = notificationServiceHandler;
+ this.transactionChainHandler = transactionChainHandler;
+ this.schemaHandler = schemaHandler;
+ }
+
+ /**
+ * Get {@link DOMDataBrokerHandler}
+ *
+ * @return the domDataBrokerHandler
+ */
+ public DOMDataBrokerHandler getDomDataBrokerHandler() {
+ return this.domDataBrokerHandler;
+ }
+
+ /**
+ * Get {@link NotificationServiceHandler}
+ *
+ * @return the notificationServiceHandler
+ */
+ public NotificationServiceHandler getNotificationServiceHandler() {
+ return this.notificationServiceHandler;
+ }
+
+ /**
+ * Get {@link TransactionChainHandler}
+ *
+ * @return the transactionChainHandler
+ */
+ public TransactionChainHandler getTransactionChainHandler() {
+ return this.transactionChainHandler;
+ }
+
+ /**
+ * Get {@link SchemaContextHandler}
+ *
+ * @return the schemaHandler
+ */
+ public SchemaContextHandler getSchemaHandler() {
+ return this.schemaHandler;
+ }
+ }
+
+ /**
+ * Parser and holder of query paramteres from uriInfo for notifications
+ *
+ */
+ public final class NotificationQueryParams {
+
+ private Date start = null;
+ private Date stop = null;
+ private String filter = null;
+
+ private NotificationQueryParams() {
+
+ }
+
+ private void prepareParams(final UriInfo uriInfo) {
+ boolean startTime_used = false;
+ boolean stopTime_used = false;
+ boolean filter_used = false;
+
+ for (final Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
+ switch (entry.getKey()) {
+ case "start-time":
+ if (!startTime_used) {
+ startTime_used = true;
+ this.start = SubscribeToStreamUtil.parseDateFromQueryParam(entry);
+ } else {
+ throw new RestconfDocumentedException("Start-time parameter can be used only once.");
+ }
+ break;
+ case "stop-time":
+ if (!stopTime_used) {
+ stopTime_used = true;
+ this.stop = SubscribeToStreamUtil.parseDateFromQueryParam(entry);
+ } else {
+ throw new RestconfDocumentedException("Stop-time parameter can be used only once.");
+ }
+ break;
+ case "filter":
+ if (!filter_used) {
+ filter_used = true;
+ this.filter = entry.getValue().iterator().next();
+ }
+ break;
+ default:
+ throw new RestconfDocumentedException(
+ "Bad parameter used with notifications: " + entry.getKey());
+ }
+ }
+ if (!startTime_used && stopTime_used) {
+ throw new RestconfDocumentedException("Stop-time parameter has to be used with start-time parameter.");
+ }
+
+ if (this.start == null) {
+ this.start = new Date();
+ }
+ }
+
+ /**
+ * Get start-time query parameter
+ *
+ * @return start-time
+ */
+ public Date getStart() {
+ return this.start;
+ }
+
+ /**
+ * Get stop-time query parameter
+ *
+ * @return stop-time
+ */
+ public Date getStop() {
+ return this.stop;
+ }
+
+ /**
+ * Get filter query parameter
+ *
+ * @return filter
+ */
+ public String getFilter() {
+ return this.filter;
+ }
+ }
+
}
final ContainerNode data = (ContainerNode) payload.getData();
final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
final YangInstanceIdentifier path = preparePath(data, qname);
- final String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data);
+ String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data);
final QName outputQname = QName.create(qname, "output");
final QName streamNameQname = QName.create(qname, "stream-name");
- final ContainerNode output = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new NodeIdentifier(outputQname))
- .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
final NotificationOutputType outputType = prepareOutputType(data);
+ if(outputType.equals(NotificationOutputType.JSON)){
+ streamName = streamName + "/JSON";
+ }
if (!Notificator.existListenerFor(streamName)) {
Notificator.createListener(path, streamName, outputType);
}
+ final ContainerNode output =
+ ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname))
+ .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
return new DefaultDOMRpcResult(output);
}
streamName = streamName + ",";
}
}
-
+ if (outputType.equals("JSON")) {
+ streamName = streamName + "/JSON";
+ }
final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
final QName outputQname = QName.create(rpcQName, "output");
final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier");
import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
import org.opendaylight.restconf.Draft18.MonitoringModule;
-import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
import org.opendaylight.restconf.handlers.NotificationServiceHandler;
import org.opendaylight.restconf.handlers.SchemaContextHandler;
-import org.opendaylight.restconf.handlers.TransactionChainHandler;
import org.opendaylight.restconf.parser.IdentifierCodec;
+import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
+import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
import org.opendaylight.restconf.utils.RestconfConstants;
import org.opendaylight.restconf.utils.mapping.RestconfMappingNodeUtil;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaNode;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
throw new UnsupportedOperationException("Util class");
}
- /**
- * Parse enum from URI
- *
- * @param clazz
- * - enum type
- * @param value
- * - string of enum value
- * @return enum
- */
- private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
- if ((value == null) || value.equals("")) {
- return null;
- }
- return ResolveEnumUtil.resolveEnum(clazz, value);
- }
-
- /**
- * Prepare map of values from URI
- *
- * @param identifier
- * - URI
- * @return {@link Map}
- */
- public static Map<String, String> mapValuesFromUri(final String identifier) {
- final HashMap<String, String> result = new HashMap<>();
- final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH));
- for (final String token : tokens) {
- final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
- if (paramToken.length == 2) {
- result.put(paramToken[0], paramToken[1]);
- }
- }
- return result;
- }
-
- /**
- * Register data change listener in dom data broker and set it to listener
- * on stream
- *
- * @param ds
- * - {@link LogicalDatastoreType}
- * @param scope
- * - {@link DataChangeScope}
- * @param listener
- * - listener on specific stream
- * @param domDataBroker
- * - data broker for register data change listener
- */
- private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
- final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
- if (listener.isListening()) {
- return;
- }
-
- final YangInstanceIdentifier path = listener.getPath();
- final ListenerRegistration<DOMDataChangeListener> registration = domDataBroker.registerDataChangeListener(ds,
- path, listener, scope);
-
- listener.setRegistration(registration);
- }
-
- /**
- * Get port from web socket server. If doesn't exit, create it.
- *
- * @return port
- */
- private static int prepareNotificationPort() {
- int port = RestconfStreamsConstants.NOTIFICATION_PORT;
- try {
- final WebSocketServer webSocketServer = WebSocketServer.getInstance();
- port = webSocketServer.getPort();
- } catch (final NullPointerException e) {
- WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
- }
- return port;
- }
-
/**
* Register listeners by streamName in identifier to listen to yang
* notifications, put or delete info about listener to DS according to
* - identifier as stream name
* @param uriInfo
* - for getting base URI information
- * @param start
- * - start-time query parameter
- * @param stop
- * - stop-time query parameter
- * @param notifiServiceHandler
- * - DOMNotificationService handler for register listeners
- * @param filter
- * - indicate which subset of all possible events are of interest
- * @param transactionChainHandler
- * - to put new data about stream to DS and delete after close
- * listener
- * @param schemaHandler
- * - for getting schema context
+ * @param notificationQueryParams
+ * - query parameters of notification
+ * @param handlersHolder
+ * - holder of handlers for notifications
* @return location for listening
*/
- public static URI notifYangStream(final String identifier, final UriInfo uriInfo, Date start, final Date stop,
- final NotificationServiceHandler notifiServiceHandler, final String filter,
- final TransactionChainHandler transactionChainHandler, final SchemaContextHandler schemaHandler) {
+ @SuppressWarnings("rawtypes")
+ public static URI notifYangStream(final String identifier, final UriInfo uriInfo,
+ final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
final String streamName = Notificator.createStreamNameFromUri(identifier);
if (Strings.isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
ErrorTag.UNKNOWN_ELEMENT);
}
- final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
- int notificationPort = RestconfStreamsConstants.NOTIFICATION_PORT;
- try {
- final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
- notificationPort = webSocketServerInstance.getPort();
- } catch (final NullPointerException e) {
- WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
- }
- final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
- final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
-
- final DOMDataReadWriteTransaction wTx = transactionChainHandler.get().newReadWriteTransaction();
- final boolean exist = checkExist(schemaHandler, wTx);
- final Module monitoringModule = schemaHandler.get()
- .findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE);
- if (start == null) {
- start = new Date();
- }
+ final DOMDataReadWriteTransaction wTx =
+ handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+ final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
+ final boolean exist = checkExist(schemaContext, wTx);
+
+ final URI uri = prepareUriByStreamName(uriInfo, streamName);
for (final NotificationListenerAdapter listener : listeners) {
- registerToListenNotification(listener, notifiServiceHandler);
- listener.setQueryParams(start, stop, filter);
- listener.setCloseVars(transactionChainHandler, schemaHandler);
+ registerToListenNotification(listener, handlersHolder.getNotificationServiceHandler());
+ listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
+ notificationQueryParams.getFilter());
+ listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
final NormalizedNode mapToStreams =
RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(listener.getSchemaPath().getLastComponent(),
- schemaHandler.get().getNotifications(), start, listener.getOutputType(),
- uriToWebsocketServer, monitoringModule, exist);
- writeDataToDS(schemaHandler, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist, mapToStreams);
+ schemaContext.getNotifications(), notificationQueryParams.getStart(),
+ listener.getOutputType(), uri, getMonitoringModule(schemaContext), exist);
+ writeDataToDS(schemaContext, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist,
+ mapToStreams);
}
submitData(wTx);
- return uriToWebsocketServer;
- }
-
- private static boolean checkExist(final SchemaContextHandler schemaHandler, final DOMDataReadWriteTransaction wTx) {
- boolean exist;
- try {
- exist = wTx
- .exists(LogicalDatastoreType.OPERATIONAL, IdentifierCodec
- .deserialize(MonitoringModule.PATH_TO_STREAMS, schemaHandler.get()))
- .checkedGet();
- } catch (final ReadFailedException e1) {
- throw new RestconfDocumentedException("Problem while checking data if exists", e1);
- }
- return exist;
- }
-
- private static void registerToListenNotification(final NotificationListenerAdapter listener,
- final NotificationServiceHandler notificationServiceHandler) {
- if (listener.isListening()) {
- return;
- }
-
- final SchemaPath path = listener.getSchemaPath();
- final ListenerRegistration<DOMNotificationListener> registration =
- notificationServiceHandler.get().registerNotificationListener(listener, path);
-
- listener.setRegistration(registration);
+ return uri;
}
/**
* - identifier as stream name
* @param uriInfo
* - for getting base URI information
- * @param start
- * - start-time query parameter
- * @param stop
- * - stop-time query parameter
- * @param domDataBrokerHandler
- * - DOMDataBroker handler for register listener
- * @param filter
- * - indicate which subset of all possible events are of interest
- * @param schemaHandler
- * - for getting schema context
- * @param transactionChainHandler
- * - to put new data about stream to DS and delete after close
- * listener
+ * @param notificationQueryParams
+ * - query parameters of notification
+ * @param handlersHolder
+ * - holder of handlers for notifications
* @return location for listening
*/
- public static URI notifiDataStream(final String identifier, final UriInfo uriInfo, Date start, final Date stop,
- final DOMDataBrokerHandler domDataBrokerHandler, final String filter,
- final TransactionChainHandler transactionChainHandler, final SchemaContextHandler schemaHandler) {
+ @SuppressWarnings("rawtypes")
+ public static URI notifiDataStream(final String identifier, final UriInfo uriInfo,
+ final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
final ListenerAdapter listener = Notificator.getListenerFor(streamName);
Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
- if (start == null) {
- start = new Date();
- }
+ listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
+ notificationQueryParams.getFilter());
+ listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
- listener.setQueryParams(start, stop, filter);
- listener.setCloseVars(transactionChainHandler, schemaHandler);
+ registration(ds, scope, listener, handlersHolder.getDomDataBrokerHandler().get());
- SubscribeToStreamUtil.registration(ds, scope, listener, domDataBrokerHandler.get());
+ final URI uri = prepareUriByStreamName(uriInfo, streamName);
- final int port = SubscribeToStreamUtil.prepareNotificationPort();
+ final DOMDataReadWriteTransaction wTx =
+ handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+ final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
+ final boolean exist = checkExist(schemaContext, wTx);
- final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
- final UriBuilder uriToWebSocketServer =
- uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
- final URI uri = uriToWebSocketServer.replacePath(streamName).build();
-
- final Module monitoringModule = schemaHandler.get()
- .findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE);
- final DOMDataReadWriteTransaction wTx = transactionChainHandler.get().newReadWriteTransaction();
- final boolean exist = checkExist(schemaHandler, wTx);
-
- final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(), start,
- listener.getOutputType(), uri, monitoringModule, exist, schemaHandler.get());
- writeDataToDS(schemaHandler, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist,
+ final NormalizedNode mapToStreams = RestconfMappingNodeUtil
+ .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
+ notificationQueryParams.getStart(), listener.getOutputType(), uri,
+ getMonitoringModule(schemaContext), exist, schemaContext);
+ writeDataToDS(schemaContext, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist,
mapToStreams);
submitData(wTx);
return uri;
}
- private static void writeDataToDS(final SchemaContextHandler schemaHandler, final String name,
- final DOMDataReadWriteTransaction wTx, final boolean exist, final NormalizedNode mapToStreams) {
- String pathId = "";
- if (exist) {
- pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
- } else {
- pathId = MonitoringModule.PATH_TO_STREAMS;
- }
- wTx.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaHandler.get()),
- mapToStreams);
- }
-
- private static void submitData(final DOMDataReadWriteTransaction wTx) {
- try {
- wTx.submit().checkedGet();
- } catch (final TransactionCommitFailedException e) {
- throw new RestconfDocumentedException("Problem while putting data to DS.", e);
- }
+ public static Module getMonitoringModule(final SchemaContext schemaContext) {
+ final Module monitoringModule =
+ schemaContext.findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE);
+ return monitoringModule;
}
/**
throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e);
}
}
+
+ @SuppressWarnings("rawtypes")
+ private static void writeDataToDS(final SchemaContext schemaContext, final String name,
+ final DOMDataReadWriteTransaction wTx, final boolean exist, final NormalizedNode mapToStreams) {
+ String pathId = "";
+ if (exist) {
+ pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
+ } else {
+ pathId = MonitoringModule.PATH_TO_STREAMS;
+ }
+ wTx.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaContext),
+ mapToStreams);
+ }
+
+ private static void submitData(final DOMDataReadWriteTransaction wTx) {
+ try {
+ wTx.submit().checkedGet();
+ } catch (final TransactionCommitFailedException e) {
+ throw new RestconfDocumentedException("Problem while putting data to DS.", e);
+ }
+ }
+
+ /**
+ * Prepare map of values from URI
+ *
+ * @param identifier
+ * - URI
+ * @return {@link Map}
+ */
+ public static Map<String, String> mapValuesFromUri(final String identifier) {
+ final HashMap<String, String> result = new HashMap<>();
+ final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH));
+ for (final String token : tokens) {
+ final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
+ if (paramToken.length == 2) {
+ result.put(paramToken[0], paramToken[1]);
+ }
+ }
+ return result;
+ }
+
+ private static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+ final int port = SubscribeToStreamUtil.prepareNotificationPort();
+
+ final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+ final UriBuilder uriToWebSocketServer =
+ uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
+ final URI uri = uriToWebSocketServer.replacePath(streamName).build();
+ return uri;
+ }
+
+ /**
+ * Register data change listener in dom data broker and set it to listener
+ * on stream
+ *
+ * @param ds
+ * - {@link LogicalDatastoreType}
+ * @param scope
+ * - {@link DataChangeScope}
+ * @param listener
+ * - listener on specific stream
+ * @param domDataBroker
+ * - data broker for register data change listener
+ */
+ @SuppressWarnings("deprecation")
+ private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
+ final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
+ if (listener.isListening()) {
+ return;
+ }
+
+ final YangInstanceIdentifier path = listener.getPath();
+ final ListenerRegistration<DOMDataChangeListener> registration =
+ domDataBroker.registerDataChangeListener(ds, path, listener, scope);
+
+ listener.setRegistration(registration);
+ }
+
+ /**
+ * Get port from web socket server. If doesn't exit, create it.
+ *
+ * @return port
+ */
+ private static int prepareNotificationPort() {
+ int port = RestconfStreamsConstants.NOTIFICATION_PORT;
+ try {
+ final WebSocketServer webSocketServer = WebSocketServer.getInstance();
+ port = webSocketServer.getPort();
+ } catch (final NullPointerException e) {
+ WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
+ }
+ return port;
+ }
+
+ private static boolean checkExist(final SchemaContext schemaContext, final DOMDataReadWriteTransaction wTx) {
+ boolean exist;
+ try {
+ exist = wTx.exists(LogicalDatastoreType.OPERATIONAL,
+ IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).checkedGet();
+ } catch (final ReadFailedException e1) {
+ throw new RestconfDocumentedException("Problem while checking data if exists", e1);
+ }
+ return exist;
+ }
+
+ private static void registerToListenNotification(final NotificationListenerAdapter listener,
+ final NotificationServiceHandler notificationServiceHandler) {
+ if (listener.isListening()) {
+ return;
+ }
+
+ final SchemaPath path = listener.getSchemaPath();
+ final ListenerRegistration<DOMNotificationListener> registration =
+ notificationServiceHandler.get().registerNotificationListener(listener, path);
+
+ listener.setRegistration(registration);
+ }
+
+ /**
+ * Parse enum from URI
+ *
+ * @param clazz
+ * - enum type
+ * @param value
+ * - string of enum value
+ * @return enum
+ */
+ private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
+ if ((value == null) || value.equals("")) {
+ return null;
+ }
+ return ResolveEnumUtil.resolveEnum(clazz, value);
+ }
+
}
listener.setCloseVars(transactionChainHandler, schemaHandler);
// close and remove test notification listener
listener.close();
- Notificator.removeNotificationListenerIfNoSubscriberExists(listener);
+ Notificator.removeListenerIfNoSubscriberExists(listener);
}
/**
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
+import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collection;
import org.junit.Assert;
import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
public class ExpressionParserTest {
}
}
final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class);
+ final PathArgument pathValue = NodeIdentifier.create(QName.create("module", "2016-14-12", "localName"));
+ Mockito.when(path.getLastPathArgument()).thenReturn(pathValue);
final ListenerAdapter listener = Notificator.createListener(path, "streamName", NotificationOutputType.JSON);
listener.setQueryParams(null, null, filter);
- final Method m = listener.getClass().getDeclaredMethod("parseFilterParam", String.class);
+ final Class<?> superclass = listener.getClass().getSuperclass().getSuperclass();
+ Method m = null;
+ for (final Method method : superclass.getDeclaredMethods()) {
+ if (method.getName().equals("parseFilterParam")) {
+ m = method;
+ }
+ }
+ if (m == null) {
+ throw new Exception("Methode parseFilterParam doesn't exist in " + superclass.getName());
+ }
m.setAccessible(true);
-
- return (boolean) m.invoke(listener, readFile(xml));
+ final Field xmlField = superclass.getDeclaredField("xml");
+ xmlField.setAccessible(true);
+ xmlField.set(listener, readFile(xml));
+ return (boolean) m.invoke(listener, null);
}
private String readFile(final File xml) throws Exception {
import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class RestconfImplNotificationSubscribingTest {
ControllerContext.getInstance().setGlobalSchema(TestRestconfUtils.loadSchemaContext("/notifications"));
final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class);
+ final PathArgument pathValue = NodeIdentifier.create(QName.create("module", "2016-14-12", "localName"));
+ Mockito.when(path.getLastPathArgument()).thenReturn(pathValue);
Notificator.createListener(path, this.identifier, NotificationOutputType.XML);
}
@Test
public void onNotifiTest() throws Exception {
final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class);
+ final PathArgument pathValue = NodeIdentifier.create(QName.create("module", "2016-14-12", "localName"));
+ Mockito.when(path.getLastPathArgument()).thenReturn(pathValue);
final ListenerAdapter listener = Notificator.createListener(path, this.identifier, NotificationOutputType.XML);
final List<Entry<String, List<String>>> list = new ArrayList<>();
final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change =
Mockito.mock(AsyncDataChangeEvent.class);
- Field start = listener.getClass().getDeclaredField("start");
+ final Class<?> superclass = listener.getClass().getSuperclass().getSuperclass();
+ Field start = superclass.getDeclaredField("start");
start.setAccessible(true);
Date startOrig = (Date) start.get(listener);
Assert.assertNotNull(startOrig);
listener.onDataChanged(change);
- start = listener.getClass().getDeclaredField("start");
+ start = superclass.getDeclaredField("start");
start.setAccessible(true);
startOrig = (Date) start.get(listener);
Assert.assertNull(startOrig);
public static void setUpBeforeTest() throws Exception {
final Map<String, ListenerAdapter> listenersByStreamNameSetter = new HashMap<>();
final ListenerAdapter adapter = mock(ListenerAdapter.class);
- doReturn(false).when(adapter).isListening();
listenersByStreamNameSetter.put(
"data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
adapter);