Bug 6949 / Bug 6950 - Implementation of start-time and stop-time
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / streams / listeners / ListenerAdapter.java
index 2289d2e4dd63945b83bea38672be8764abdca3f0..0f784d11b4837e1db0b488cceb92b51587fdcac5 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.netconf.sal.streams.listeners;
 
-import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.EventBus;
@@ -19,6 +18,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
 import java.util.Collection;
 import java.util.Date;
@@ -42,9 +42,13 @@ import javax.xml.transform.TransformerFactory;
 import javax.xml.transform.dom.DOMResult;
 import javax.xml.transform.dom.DOMSource;
 import javax.xml.transform.stream.StreamResult;
+import org.json.JSONObject;
+import org.json.XML;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -77,7 +81,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
     private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
     private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
 
-    private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
+    private static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
 
     private final YangInstanceIdentifier path;
     private ListenerRegistration<DOMDataChangeListener> registration;
@@ -85,33 +89,69 @@ public class ListenerAdapter implements DOMDataChangeListener {
     private Set<Channel> subscribers = new ConcurrentSet<>();
     private final EventBus eventBus;
     private final EventBusChangeRecorder eventBusChangeRecorder;
+    private final NotificationOutputType outputType;
+    private Date start = null;
+    private Date stop = null;
 
     /**
-     * Creates new {@link ListenerAdapter} listener specified by path and stream name.
+     * Creates new {@link ListenerAdapter} listener specified by path and stream
+     * name.
      *
      * @param path
      *            Path to data in data store.
      * @param streamName
      *            The name of the stream.
+     * @param outputType
+     *            - type of output on notification (JSON, XML)
      */
-    ListenerAdapter(final YangInstanceIdentifier path, final String streamName) {
+    ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
+            final NotificationOutputType outputType) {
+        this.outputType = outputType;
         Preconditions.checkNotNull(path);
-        Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
+        Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
         this.path = path;
         this.streamName = streamName;
-        eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
-        eventBusChangeRecorder = new EventBusChangeRecorder();
-        eventBus.register(eventBusChangeRecorder);
+        this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+        this.eventBusChangeRecorder = new EventBusChangeRecorder();
+        this.eventBus.register(this.eventBusChangeRecorder);
     }
 
     @Override
     public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+        final Date now = new Date();
+        if (this.stop != null) {
+            if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
+                prepareAndPostData(change);
+            }
+            if (this.stop.compareTo(now) < 0) {
+                try {
+                    this.close();
+                } catch (final Exception e) {
+                    throw new RestconfDocumentedException("Problem with unregister listener." + e);
+                }
+            }
+        } else if (this.start != null) {
+            if (this.start.compareTo(now) < 0) {
+                this.start = null;
+                prepareAndPostData(change);
+            }
+        } else {
+            prepareAndPostData(change);
+        }
+    }
+
+    private void prepareAndPostData(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
         if (!change.getCreatedData().isEmpty() || !change.getUpdatedData().isEmpty()
                 || !change.getRemovedPaths().isEmpty()) {
             final String xml = prepareXmlFrom(change);
             final Event event = new Event(EventType.NOTIFY);
-            event.setData(xml);
-            eventBus.post(event);
+            if (this.outputType.equals(NotificationOutputType.JSON)) {
+                final JSONObject jsonObject = XML.toJSONObject(xml);
+                event.setData(jsonObject.toString());
+            } else {
+                event.setData(xml);
+            }
+            this.eventBus.post(event);
         }
     }
 
@@ -123,20 +163,20 @@ public class ListenerAdapter implements DOMDataChangeListener {
         public void recordCustomerChange(final Event event) {
             if (event.getType() == EventType.REGISTER) {
                 final Channel subscriber = event.getSubscriber();
-                if (!subscribers.contains(subscriber)) {
-                    subscribers.add(subscriber);
+                if (!ListenerAdapter.this.subscribers.contains(subscriber)) {
+                    ListenerAdapter.this.subscribers.add(subscriber);
                 }
             } else if (event.getType() == EventType.DEREGISTER) {
-                subscribers.remove(event.getSubscriber());
+                ListenerAdapter.this.subscribers.remove(event.getSubscriber());
                 Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
             } else if (event.getType() == EventType.NOTIFY) {
-                for (final Channel subscriber : subscribers) {
+                for (final Channel subscriber : ListenerAdapter.this.subscribers) {
                     if (subscriber.isActive()) {
                         LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
                     } else {
                         LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
-                        subscribers.remove(subscriber);
+                        ListenerAdapter.this.subscribers.remove(subscriber);
                     }
                 }
             }
@@ -167,7 +207,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
          * @return Channel
          */
         public Channel getSubscriber() {
-            return subscriber;
+            return this.subscriber;
         }
 
         /**
@@ -186,7 +226,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
          * @return String representation of event data.
          */
         public String getData() {
-            return data;
+            return this.data;
         }
 
         /**
@@ -204,7 +244,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
          * @return The type of the event.
          */
         public EventType getType() {
-            return type;
+            return this.type;
         }
     }
 
@@ -230,6 +270,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
         final Document doc = createDocument();
         final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
                 "notification");
+
         doc.appendChild(notificationElement);
 
         final Element eventTimeElement = doc.createElement("eventTime");
@@ -238,6 +279,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
 
         final Element dataChangedNotificationEventElement = doc.createElementNS(
                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
+
         addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change,
                 schemaContext, dataContextTree);
         notificationElement.appendChild(dataChangedNotificationEventElement);
@@ -250,7 +292,8 @@ public class ListenerAdapter implements DOMDataChangeListener {
             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
-            transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
+            transformer.transform(new DOMSource(doc),
+                    new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
             final byte[] charData = out.toByteArray();
             return new String(charData, "UTF-8");
         } catch (TransformerException | UnsupportedEncodingException e) {
@@ -267,15 +310,15 @@ public class ListenerAdapter implements DOMDataChangeListener {
      *            Date
      * @return Data specified by RFC3339.
      */
-    private String toRFC3339(final Date d) {
-        return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2");
+    public static String toRFC3339(final Date d) {
+        return RFC3339_PATTERN.matcher(RFC3339.format(d)).replaceAll("$1:$2");
     }
 
     /**
      * Creates {@link Document} document.
      * @return {@link Document} document.
      */
-    private Document createDocument() {
+    public static Document createDocument() {
         final DocumentBuilder bob;
         try {
             bob = DBF.newDocumentBuilder();
@@ -324,9 +367,9 @@ public class ListenerAdapter implements DOMDataChangeListener {
      * @param operation
      *            {@link Operation}
      */
-    private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data, final Element element,
-            final Operation operation) {
-        if (data == null || data.isEmpty()) {
+    private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data,
+            final Element element, final Operation operation) {
+        if ((data == null) || data.isEmpty()) {
             return;
         }
         for (final YangInstanceIdentifier path : data) {
@@ -340,10 +383,10 @@ public class ListenerAdapter implements DOMDataChangeListener {
     private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set<Entry<YangInstanceIdentifier,
                 NormalizedNode<?,?>>> data, final Element element, final Operation operation, final SchemaContext
             schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
-        if (data == null || data.isEmpty()) {
+        if ((data == null) || data.isEmpty()) {
             return;
         }
-        for (Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
+        for (final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
             if (!ControllerContext.getInstance().isNodeMixin(entry.getKey())) {
                 final Node node = createCreatedChangedDataChangeEventElement(doc, entry, operation, schemaContext,
                         dataSchemaContextTree);
@@ -363,7 +406,8 @@ public class ListenerAdapter implements DOMDataChangeListener {
      *            {@link Operation}
      * @return {@link Node} node represented by changed event element.
      */
-    private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path, final Operation operation) {
+    private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path,
+            final Operation operation) {
         final Element dataChangeEventElement = doc.createElement("data-change-event");
         final Element pathElement = doc.createElement("path");
         addPathAsValueToElement(path, pathElement);
@@ -396,18 +440,19 @@ public class ListenerAdapter implements DOMDataChangeListener {
             final Element dataElement = doc.createElement("data");
             dataElement.appendChild(result);
             dataChangeEventElement.appendChild(dataElement);
-        } catch (IOException e) {
+        } catch (final IOException e) {
             LOG.error("Error in writer ", e);
-        } catch (XMLStreamException e) {
+        } catch (final XMLStreamException e) {
             LOG.error("Error processing stream", e);
         }
 
         return dataChangeEventElement;
     }
 
-    private static DOMResult writeNormalizedNode(final NormalizedNode<?,?> normalized, final
-        YangInstanceIdentifier path, final SchemaContext context, final DataSchemaContextTree dataSchemaContextTree) throws
-            IOException, XMLStreamException {
+    private static DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized,
+                                                 final YangInstanceIdentifier path, final SchemaContext context,
+                                                 final DataSchemaContextTree dataSchemaContextTree)
+            throws IOException, XMLStreamException {
         final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
         final Document doc = XmlDocumentUtils.getDocument();
         final DOMResult result = new DOMResult(doc);
@@ -416,7 +461,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
         XMLStreamWriter writer = null;
         final SchemaPath nodePath;
 
-        if (normalized instanceof MapEntryNode || normalized instanceof UnkeyedListEntryNode) {
+        if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) {
             nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
         } else {
             nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
@@ -542,7 +587,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
      * @return Path pointed to data in data store.
      */
     public YangInstanceIdentifier getPath() {
-        return path;
+        return this.path;
     }
 
     /**
@@ -560,17 +605,17 @@ public class ListenerAdapter implements DOMDataChangeListener {
      * @return The name of the stream.
      */
     public String getStreamName() {
-        return streamName;
+        return this.streamName;
     }
 
     /**
      * Removes all subscribers and unregisters event bus change recorder form event bus.
      */
     public void close() throws Exception {
-        subscribers = new ConcurrentSet<>();
-        registration.close();
-        registration = null;
-        eventBus.unregister(eventBusChangeRecorder);
+        this.subscribers = new ConcurrentSet<>();
+        this.registration.close();
+        this.registration = null;
+        this.eventBus.unregister(this.eventBusChangeRecorder);
     }
 
     /**
@@ -579,7 +624,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
      * @return True if exist, false otherwise.
      */
     public boolean isListening() {
-        return registration == null ? false : true;
+        return this.registration == null ? false : true;
     }
 
     /**
@@ -595,7 +640,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
         }
         final Event event = new Event(EventType.REGISTER);
         event.setSubscriber(subscriber);
-        eventBus.post(event);
+        this.eventBus.post(event);
     }
 
     /**
@@ -608,7 +653,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
         final Event event = new Event(EventType.DEREGISTER);
         event.setSubscriber(subscriber);
-        eventBus.post(event);
+        this.eventBus.post(event);
     }
 
     /**
@@ -617,7 +662,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
      * @return True if exist at least one {@link Channel} subscriber, false otherwise.
      */
     public boolean hasSubscribers() {
-        return !subscribers.isEmpty();
+        return !this.subscribers.isEmpty();
     }
 
     /**
@@ -649,4 +694,17 @@ public class ListenerAdapter implements DOMDataChangeListener {
         }
     }
 
+    /**
+     * Set query parameters for listener
+     *
+     * @param start
+     *            - start-time of getting notification
+     * @param stop
+     *            - stop-time of getting notification
+     */
+    public void setTimer(final Date start, final Date stop) {
+        this.start = start;
+        this.stop = stop;
+    }
+
 }