import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.io.StringReader;
+import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
+import java.io.Writer;
import java.util.Collection;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.Executors;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
import javax.xml.transform.dom.DOMResult;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathFactory;
import org.json.JSONObject;
-import org.json.XML;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.restconf.Draft18.MonitoringModule;
+import org.opendaylight.restconf.handlers.SchemaContextHandler;
+import org.opendaylight.restconf.handlers.TransactionChainHandler;
+import org.opendaylight.restconf.parser.IdentifierCodec;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactory;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.codec.gson.JsonWriterFactory;
import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
+import org.xml.sax.InputSource;
/**
* {@link NotificationListenerAdapter} is responsible to track events on
private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
private final String streamName;
- private ListenerRegistration<DOMNotificationListener> registration;
- private Set<Channel> subscribers = new ConcurrentSet<>();
private final EventBus eventBus;
private final EventBusChangeRecorder eventBusChangeRecorder;
private final SchemaPath path;
private final String outputType;
+ private Date start = null;
+ private Date stop = null;
+ private String filter;
+
+ private SchemaContext schemaContext;
+ private DOMNotification notification;
+ private ListenerRegistration<DOMNotificationListener> registration;
+ private Set<Channel> subscribers = new ConcurrentSet<>();
+ private TransactionChainHandler transactionChainHandler;
+ private SchemaContextHandler schemaHandler;
/**
* Set path of listener and stream name, register event bus.
@Override
public void onNotification(final DOMNotification notification) {
- final String xml = prepareXmlFrom(notification);
+ this.schemaContext = ControllerContext.getInstance().getGlobalSchema();
+ this.notification = notification;
+ final Date now = new Date();
+ if (this.stop != null) {
+ if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
+ checkFilter();
+ }
+ if (this.stop.compareTo(now) < 0) {
+ try {
+ this.close();
+ } catch (final Exception e) {
+ throw new RestconfDocumentedException("Problem with unregister listener." + e);
+ }
+ }
+ } else if (this.start != null) {
+ if (this.start.compareTo(now) < 0) {
+ this.start = null;
+ checkFilter();
+ }
+ } else {
+ checkFilter();
+ }
+ }
+
+ /**
+ * Check if is filter used and then prepare and post data do client
+ *
+ */
+ private void checkFilter() {
+ final String xml = prepareXml();
+ if (this.filter == null) {
+ prepareAndPostData(xml);
+ } else {
+ try {
+ if (parseFilterParam(xml)) {
+ prepareAndPostData(xml);
+ }
+ } catch (final Exception e) {
+ throw new RestconfDocumentedException("Problem while parsing filter.", e);
+ }
+ }
+ }
+
+ /**
+ * Parse and evaluate filter value by xml
+ *
+ * @param xml
+ * - notification data in xml
+ * @return true or false - depends on filter expression and data of
+ * notifiaction
+ * @throws Exception
+ */
+ private boolean parseFilterParam(final String xml) throws Exception {
+ final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ final DocumentBuilder builder = factory.newDocumentBuilder();
+ final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
+ final XPath xPath = XPathFactory.newInstance().newXPath();
+ return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
+ }
+
+ /**
+ * Prepare data of notification and data to client
+ *
+ * @param xml
+ */
+ private void prepareAndPostData(final String xml) {
final Event event = new Event(EventType.NOTIFY);
if (this.outputType.equals("JSON")) {
- final JSONObject jsonObject = XML.toJSONObject(xml);
- event.setData(jsonObject.toString());
+ event.setData(prepareJson());
} else {
event.setData(xml);
}
this.eventBus.post(event);
}
+ /**
+ * Prepare json from notification data
+ *
+ * @return json as {@link String}
+ */
+ private String prepareJson() {
+ final JSONObject json = new JSONObject();
+ json.put("ietf-restconf:notification",
+ new JSONObject(writeBodyToString()).put("event-time", ListenerAdapter.toRFC3339(new Date())));
+ return json.toString();
+ }
+
+ private String writeBodyToString() {
+ final Writer writer = new StringWriter();
+ final NormalizedNodeStreamWriter jsonStream =
+ JSONNormalizedNodeStreamWriter.createExclusiveWriter(JSONCodecFactory.create(this.schemaContext),
+ this.notification.getType(), null, JsonWriterFactory.createJsonWriter(writer));
+ final NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(jsonStream);
+ try {
+ nodeWriter.write(this.notification.getBody());
+ nodeWriter.close();
+ } catch (final IOException e) {
+ throw new RestconfDocumentedException("Problem while writing body of notification to JSON. ", e);
+ }
+ return writer.toString();
+ }
+
/**
* Checks if exists at least one {@link Channel} subscriber.
*
}
/**
- * Reset lists, close registration and unregister bus event.
+ * Reset lists, close registration and unregister bus event and delete data in DS.
*/
public void close() {
+ final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction();
+ wTx.delete(LogicalDatastoreType.OPERATIONAL,
+ IdentifierCodec.deserialize(
+ MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + this.path.getLastComponent().getLocalName(),
+ this.schemaHandler.get()));
+ try {
+ wTx.submit().checkedGet();
+ } catch (final TransactionCommitFailedException e) {
+ throw new RestconfDocumentedException("Problem while deleting data from DS.", e);
+ }
+
this.subscribers = new ConcurrentSet<>();
this.registration.close();
this.registration = null;
this.eventBus.post(event);
}
- private String prepareXmlFrom(final DOMNotification notification) {
- final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
+ private String prepareXml() {
final Document doc = ListenerAdapter.createDocument();
- final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:yang:ietf-restconf",
+ final Element notificationElement =
+ doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
"notification");
doc.appendChild(notificationElement);
final Element eventTimeElement = doc.createElement("eventTime");
eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
notificationElement.appendChild(eventTimeElement);
- final String notificationNamespace = notification.getType().getLastComponent().getNamespace().toString();
+
final Element notificationEventElement = doc.createElementNS(
- notificationNamespace, "event");
- addValuesToNotificationEventElement(doc, notificationEventElement, notification, schemaContext);
+ "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
+ addValuesToNotificationEventElement(doc, notificationEventElement, this.notification, this.schemaContext);
notificationElement.appendChild(notificationEventElement);
try {
final DOMResult domResult = writeNormalizedNode(body,
YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
- element.appendChild(result);
+ final Element dataElement = doc.createElement("notification");
+ dataElement.appendChild(result);
+ element.appendChild(dataElement);
} catch (final IOException e) {
LOG.error("Error in writer ", e);
} catch (final XMLStreamException e) {
private enum EventType {
REGISTER, DEREGISTER, NOTIFY
}
+
+ /**
+ * Set query parameters for listener
+ *
+ * @param start
+ * - start-time of getting notification
+ * @param stop
+ * - stop-time of getting notification
+ * @param filter
+ * - indicate which subset of all possible events are of interest
+ */
+ public void setQueryParams(final Date start, final Date stop, final String filter) {
+ this.start = start;
+ this.stop = stop;
+ this.filter = filter;
+ }
+
+ /**
+ * Get outputType of listenere
+ *
+ * @return the outputType
+ */
+ public String getOutputType() {
+ return this.outputType;
+ }
+
+ /**
+ * Transaction chain to delete data in DS on close()
+ *
+ * @param transactionChainHandler
+ * - creating new write transaction to delete data on close
+ * @param schemaHandler
+ * - for getting schema to deserialize
+ * {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to
+ * {@link YangInstanceIdentifier}
+ */
+ public void setCloseVars(final TransactionChainHandler transactionChainHandler,
+ final SchemaContextHandler schemaHandler) {
+ this.transactionChainHandler = transactionChainHandler;
+ this.schemaHandler = schemaHandler;
+
+ }
}