Bug 5679 - implement ietf-restconf-monitoring - streams
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / NotificationListenerAdapter.java
index db8b27821b063665bb337c4ae3526759d024832d..2d3ad3a10b3cc4306e0e15ad3abe0a41217dca56 100644 (file)
@@ -18,11 +18,16 @@ import io.netty.util.internal.ConcurrentSet;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.io.StringReader;
+import java.io.StringWriter;
 import java.io.UnsupportedEncodingException;
+import java.io.Writer;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Set;
 import java.util.concurrent.Executors;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.stream.XMLOutputFactory;
 import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamWriter;
@@ -33,11 +38,21 @@ import javax.xml.transform.TransformerFactory;
 import javax.xml.transform.dom.DOMResult;
 import javax.xml.transform.dom.DOMSource;
 import javax.xml.transform.stream.StreamResult;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathFactory;
 import org.json.JSONObject;
-import org.json.XML;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.restconf.Draft18.MonitoringModule;
+import org.opendaylight.restconf.handlers.SchemaContextHandler;
+import org.opendaylight.restconf.handlers.TransactionChainHandler;
+import org.opendaylight.restconf.parser.IdentifierCodec;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
@@ -46,6 +61,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactory;
+import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.codec.gson.JsonWriterFactory;
 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -55,6 +73,7 @@ import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
+import org.xml.sax.InputSource;
 
 /**
  * {@link NotificationListenerAdapter} is responsible to track events on
@@ -67,13 +86,21 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
 
     private final String streamName;
-    private ListenerRegistration<DOMNotificationListener> registration;
-    private Set<Channel> subscribers = new ConcurrentSet<>();
     private final EventBus eventBus;
     private final EventBusChangeRecorder eventBusChangeRecorder;
 
     private final SchemaPath path;
     private final String outputType;
+    private Date start = null;
+    private Date stop = null;
+    private String filter;
+
+    private SchemaContext schemaContext;
+    private DOMNotification notification;
+    private ListenerRegistration<DOMNotificationListener> registration;
+    private Set<Channel> subscribers = new ConcurrentSet<>();
+    private TransactionChainHandler transactionChainHandler;
+    private SchemaContextHandler schemaHandler;
 
     /**
      * Set path of listener and stream name, register event bus.
@@ -98,17 +125,108 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
 
     @Override
     public void onNotification(final DOMNotification notification) {
-        final String xml = prepareXmlFrom(notification);
+        this.schemaContext = ControllerContext.getInstance().getGlobalSchema();
+        this.notification = notification;
+        final Date now = new Date();
+        if (this.stop != null) {
+            if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
+                checkFilter();
+            }
+            if (this.stop.compareTo(now) < 0) {
+                try {
+                    this.close();
+                } catch (final Exception e) {
+                    throw new RestconfDocumentedException("Problem with unregister listener." + e);
+                }
+            }
+        } else if (this.start != null) {
+            if (this.start.compareTo(now) < 0) {
+                this.start = null;
+                checkFilter();
+            }
+        } else {
+            checkFilter();
+        }
+    }
+
+    /**
+     * Check if is filter used and then prepare and post data do client
+     *
+     */
+    private void checkFilter() {
+        final String xml = prepareXml();
+        if (this.filter == null) {
+            prepareAndPostData(xml);
+        } else {
+            try {
+                if (parseFilterParam(xml)) {
+                    prepareAndPostData(xml);
+                }
+            } catch (final Exception e) {
+                throw new RestconfDocumentedException("Problem while parsing filter.", e);
+            }
+        }
+    }
+
+    /**
+     * Parse and evaluate filter value by xml
+     *
+     * @param xml
+     *            - notification data in xml
+     * @return true or false - depends on filter expression and data of
+     *         notifiaction
+     * @throws Exception
+     */
+    private boolean parseFilterParam(final String xml) throws Exception {
+        final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+        final DocumentBuilder builder = factory.newDocumentBuilder();
+        final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
+        final XPath xPath = XPathFactory.newInstance().newXPath();
+        return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
+    }
+
+    /**
+     * Prepare data of notification and data to client
+     *
+     * @param xml
+     */
+    private void prepareAndPostData(final String xml) {
         final Event event = new Event(EventType.NOTIFY);
         if (this.outputType.equals("JSON")) {
-            final JSONObject jsonObject = XML.toJSONObject(xml);
-            event.setData(jsonObject.toString());
+            event.setData(prepareJson());
         } else {
             event.setData(xml);
         }
         this.eventBus.post(event);
     }
 
+    /**
+     * Prepare json from notification data
+     *
+     * @return json as {@link String}
+     */
+    private String prepareJson() {
+        final JSONObject json = new JSONObject();
+        json.put("ietf-restconf:notification",
+                new JSONObject(writeBodyToString()).put("event-time", ListenerAdapter.toRFC3339(new Date())));
+        return json.toString();
+    }
+
+    private String writeBodyToString() {
+        final Writer writer = new StringWriter();
+        final NormalizedNodeStreamWriter jsonStream =
+                JSONNormalizedNodeStreamWriter.createExclusiveWriter(JSONCodecFactory.create(this.schemaContext),
+                        this.notification.getType(), null, JsonWriterFactory.createJsonWriter(writer));
+        final NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(jsonStream);
+        try {
+            nodeWriter.write(this.notification.getBody());
+            nodeWriter.close();
+        } catch (final IOException e) {
+            throw new RestconfDocumentedException("Problem while writing body of notification to JSON. ", e);
+        }
+        return writer.toString();
+    }
+
     /**
      * Checks if exists at least one {@link Channel} subscriber.
      *
@@ -120,9 +238,20 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
     }
 
     /**
-     * Reset lists, close registration and unregister bus event.
+     * Reset lists, close registration and unregister bus event and delete data in DS.
      */
     public void close() {
+        final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction();
+        wTx.delete(LogicalDatastoreType.OPERATIONAL,
+                IdentifierCodec.deserialize(
+                        MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + this.path.getLastComponent().getLocalName(),
+                        this.schemaHandler.get()));
+        try {
+            wTx.submit().checkedGet();
+        } catch (final TransactionCommitFailedException e) {
+            throw new RestconfDocumentedException("Problem while deleting data from DS.", e);
+        }
+
         this.subscribers = new ConcurrentSet<>();
         this.registration.close();
         this.registration = null;
@@ -197,20 +326,20 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
         this.eventBus.post(event);
     }
 
-    private String prepareXmlFrom(final DOMNotification notification) {
-        final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
+    private String prepareXml() {
         final Document doc = ListenerAdapter.createDocument();
-        final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:yang:ietf-restconf",
+        final Element notificationElement =
+                doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
                 "notification");
         doc.appendChild(notificationElement);
 
         final Element eventTimeElement = doc.createElement("eventTime");
         eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
         notificationElement.appendChild(eventTimeElement);
-        final String notificationNamespace = notification.getType().getLastComponent().getNamespace().toString();
+
         final Element notificationEventElement = doc.createElementNS(
-                notificationNamespace, "event");
-        addValuesToNotificationEventElement(doc, notificationEventElement, notification, schemaContext);
+                "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
+        addValuesToNotificationEventElement(doc, notificationEventElement, this.notification, this.schemaContext);
         notificationElement.appendChild(notificationEventElement);
 
         try {
@@ -243,7 +372,9 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
             final DOMResult domResult = writeNormalizedNode(body,
                     YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
-            element.appendChild(result);
+            final Element dataElement = doc.createElement("notification");
+            dataElement.appendChild(result);
+            element.appendChild(dataElement);
         } catch (final IOException e) {
             LOG.error("Error in writer ", e);
         } catch (final XMLStreamException e) {
@@ -385,4 +516,46 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
     private enum EventType {
         REGISTER, DEREGISTER, NOTIFY
     }
+
+    /**
+     * Set query parameters for listener
+     *
+     * @param start
+     *            - start-time of getting notification
+     * @param stop
+     *            - stop-time of getting notification
+     * @param filter
+     *            - indicate which subset of all possible events are of interest
+     */
+    public void setQueryParams(final Date start, final Date stop, final String filter) {
+        this.start = start;
+        this.stop = stop;
+        this.filter = filter;
+    }
+
+    /**
+     * Get outputType of listenere
+     *
+     * @return the outputType
+     */
+    public String getOutputType() {
+        return this.outputType;
+    }
+
+    /**
+     * Transaction chain to delete data in DS on close()
+     *
+     * @param transactionChainHandler
+     *            - creating new write transaction to delete data on close
+     * @param schemaHandler
+     *            - for getting schema to deserialize
+     *            {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to
+     *            {@link YangInstanceIdentifier}
+     */
+    public void setCloseVars(final TransactionChainHandler transactionChainHandler,
+            final SchemaContextHandler schemaHandler) {
+        this.transactionChainHandler = transactionChainHandler;
+        this.schemaHandler = schemaHandler;
+
+    }
 }