Bug 5679 - implement ietf-restconf-monitoring - cleanup 87/49187/6
authorJakub Toth <jatoth@cisco.com>
Fri, 9 Dec 2016 16:13:10 +0000 (17:13 +0100)
committerJakub Toth <jatoth@cisco.com>
Tue, 20 Dec 2016 17:07:01 +0000 (18:07 +0100)
  * cleanup of both listeners (data-change, yang)
    * create new common abstract classes with common methodes
    * fix broken tests
  * add support of listener to listen on stream with both output
    types (XML JSON)

Change-Id: I865a0547e57a1035921f207d9f96b5a4c57bc20c
Signed-off-by: Jakub Toth <jatoth@cisco.com>
19 files changed:
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java [new file with mode: 0644]
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java [new file with mode: 0644]
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java [new file with mode: 0644]
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java [new file with mode: 0644]
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java [new file with mode: 0644]
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java [new file with mode: 0644]
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java [new file with mode: 0644]
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenersConstants.java [new file with mode: 0644]
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/base/services/impl/RestconfOperationsServiceImpl.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/services/impl/RestconfStreamsSubscriptionServiceImpl.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/CreateStreamUtil.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/SubscribeToStreamUtil.java
restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java
restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/ExpressionParserTest.java
restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplNotificationSubscribingTest.java
restconf/sal-rest-connector/src/test/java/org/opendaylight/restconf/restful/services/impl/RestconfStreamsSubscriptionServiceImplTest.java

diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java
new file mode 100644 (file)
index 0000000..5dfc016
--- /dev/null
@@ -0,0 +1,141 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import io.netty.channel.Channel;
+import io.netty.util.internal.ConcurrentSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Features of subscribing part of both notifications
+ */
+abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
+
+    private final Set<Channel> subscribers = new ConcurrentSet<>();
+    private final EventBus eventBus;
+
+    @SuppressWarnings("rawtypes")
+    private EventBusChangeRecorder eventBusChangeRecorder;
+    @SuppressWarnings("rawtypes")
+    private ListenerRegistration registration;
+
+    /**
+     * Creating {@link EventBus}
+     */
+    protected AbstractCommonSubscriber() {
+        this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+    }
+
+    @Override
+    public final boolean hasSubscribers() {
+        return !this.subscribers.isEmpty();
+    }
+
+    @Override
+    public final Set<Channel> getSubscribers() {
+        return this.subscribers;
+    }
+
+    @Override
+    public final void close() throws Exception {
+        this.registration.close();
+        this.registration = null;
+
+        deleteDataInDS();
+        unregister();
+    }
+
+    /**
+     * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
+     * subscriber to the event and post event into event bus.
+     *
+     * @param subscriber
+     *            Channel
+     */
+    public void addSubscriber(final Channel subscriber) {
+        if (!subscriber.isActive()) {
+            LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
+        }
+        final Event event = new Event(EventType.REGISTER);
+        event.setSubscriber(subscriber);
+        this.eventBus.post(event);
+    }
+
+    /**
+     * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
+     * subscriber to the event and posts event into event bus.
+     *
+     * @param subscriber
+     */
+    public void removeSubscriber(final Channel subscriber) {
+        LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
+        final Event event = new Event(EventType.DEREGISTER);
+        event.setSubscriber(subscriber);
+        this.eventBus.post(event);
+    }
+
+    /**
+     * Sets {@link ListenerRegistration} registration.
+     *
+     * @param registration
+     *            DOMDataChangeListener registration
+     */
+    @SuppressWarnings("rawtypes")
+    public void setRegistration(final ListenerRegistration registration) {
+        this.registration = registration;
+    }
+
+    /**
+     * Checks if {@link ListenerRegistration} registration exist.
+     *
+     * @return True if exist, false otherwise.
+     */
+    public boolean isListening() {
+        return this.registration == null ? false : true;
+    }
+
+    /**
+     * Creating and registering {@link EventBusChangeRecorder} of specific
+     * listener on {@link EventBus}
+     *
+     * @param listener
+     *            - specific listener of notifications
+     */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    protected <T extends BaseListenerInterface> void register(final T listener) {
+        this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
+        this.eventBus.register(this.eventBusChangeRecorder);
+    }
+
+    /**
+     * Post event to event bus
+     *
+     * @param event
+     *            - data of incoming notifications
+     */
+    protected void post(final Event event) {
+        this.eventBus.post(event);
+    }
+
+    /**
+     * Removes all subscribers and unregisters event bus change recorder form
+     * event bus
+     */
+    protected void unregister() {
+        this.subscribers.clear();
+        this.eventBus.unregister(this.eventBusChangeRecorder);
+    }
+}
diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java
new file mode 100644 (file)
index 0000000..14c70a6
--- /dev/null
@@ -0,0 +1,209 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.dom.DOMResult;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.restconf.Draft18.MonitoringModule;
+import org.opendaylight.restconf.handlers.SchemaContextHandler;
+import org.opendaylight.restconf.handlers.TransactionChainHandler;
+import org.opendaylight.restconf.parser.IdentifierCodec;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Abstract class for processing and preparing data
+ *
+ */
+abstract class AbstractNotificationsData {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationsData.class);
+
+    private TransactionChainHandler transactionChainHandler;
+    private SchemaContextHandler schemaHandler;
+    private String localName;
+
+    /**
+     * Transaction chain for delete data in DS on close()
+     *
+     * @param transactionChainHandler
+     *            - creating new write transaction for delete data on close
+     * @param schemaHandler
+     *            - for getting schema to deserialize
+     *            {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to
+     *            {@link YangInstanceIdentifier}
+     */
+    public void setCloseVars(final TransactionChainHandler transactionChainHandler,
+            final SchemaContextHandler schemaHandler) {
+        this.transactionChainHandler = transactionChainHandler;
+        this.schemaHandler = schemaHandler;
+    }
+
+    /**
+     * Delete data in DS
+     */
+    protected void deleteDataInDS() throws Exception {
+        final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction();
+        wTx.delete(LogicalDatastoreType.OPERATIONAL, IdentifierCodec
+                .deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + this.localName, this.schemaHandler.get()));
+        wTx.submit().checkedGet();
+    }
+
+    /**
+     * Set localName of last path element of specific listener
+     *
+     * @param localName
+     *            - local name
+     */
+    protected void setLocalNameOfPath(final String localName) {
+        this.localName = localName;
+    }
+
+    /**
+     * Formats data specified by RFC3339.
+     *
+     * @param d
+     *            Date
+     * @return Data specified by RFC3339.
+     */
+    protected static String toRFC3339(final Date d) {
+        return ListenersConstants.RFC3339_PATTERN.matcher(ListenersConstants.RFC3339.format(d)).replaceAll("$1:$2");
+    }
+
+    /**
+     * Creates {@link Document} document.
+     *
+     * @return {@link Document} document.
+     */
+    protected static Document createDocument() {
+        final DocumentBuilder bob;
+        try {
+            bob = ListenersConstants.DBF.newDocumentBuilder();
+        } catch (final ParserConfigurationException e) {
+            return null;
+        }
+        return bob.newDocument();
+    }
+
+    /**
+     * Write normalized node to {@link DOMResult}
+     *
+     * @param normalized
+     *            - data
+     * @param context
+     *            - actual schema context
+     * @param schemaPath
+     *            - schema path of data
+     * @return {@link DOMResult}
+     */
+    protected DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final SchemaContext context,
+            final SchemaPath schemaPath) throws IOException, XMLStreamException {
+        final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
+        final Document doc = XmlDocumentUtils.getDocument();
+        final DOMResult result = new DOMResult(doc);
+        NormalizedNodeWriter normalizedNodeWriter = null;
+        NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
+        XMLStreamWriter writer = null;
+
+        try {
+            writer = XML_FACTORY.createXMLStreamWriter(result);
+            normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, schemaPath);
+            normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
+
+            normalizedNodeWriter.write(normalized);
+
+            normalizedNodeWriter.flush();
+        } finally {
+            if (normalizedNodeWriter != null) {
+                normalizedNodeWriter.close();
+            }
+            if (normalizedNodeStreamWriter != null) {
+                normalizedNodeStreamWriter.close();
+            }
+            if (writer != null) {
+                writer.close();
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Generating base element of every notification
+     *
+     * @param doc
+     *            - base {@link Document}
+     * @return element of {@link Document}
+     */
+    protected Element basePartDoc(final Document doc) {
+        final Element notificationElement =
+                doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0", "notification");
+
+        doc.appendChild(notificationElement);
+
+        final Element eventTimeElement = doc.createElement("eventTime");
+        eventTimeElement.setTextContent(toRFC3339(new Date()));
+        notificationElement.appendChild(eventTimeElement);
+
+        return notificationElement;
+    }
+
+    /**
+     * Generating of {@link Document} transforming to string
+     *
+     * @param doc
+     *            - {@link Document} with data
+     * @return - string from {@link Document}
+     */
+    protected String transformDoc(final Document doc) {
+        try {
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
+            final Transformer transformer = ListenersConstants.FACTORY.newTransformer();
+            transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
+            transformer.setOutputProperty(OutputKeys.METHOD, "xml");
+            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+            transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
+            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
+            transformer.transform(new DOMSource(doc),
+                    new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
+            final byte[] charData = out.toByteArray();
+            return new String(charData, "UTF-8");
+        } catch (TransformerException | UnsupportedEncodingException e) {
+            final String msg = "Error during transformation of Document into String";
+            LOG.error(msg, e);
+            return msg;
+        }
+    }
+}
diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java
new file mode 100644 (file)
index 0000000..f7a72bc
--- /dev/null
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import java.io.StringReader;
+import java.util.Date;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathFactory;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.w3c.dom.Document;
+import org.xml.sax.InputSource;
+
+/**
+ * Features of query parameters part of both notifications
+ *
+ */
+abstract class AbstractQueryParams extends AbstractNotificationsData {
+
+    protected Date start = null;
+    protected Date stop = null;
+    protected String filter = null;
+
+    private String xml;
+
+    /**
+     * Set query parameters for listener
+     *
+     * @param start
+     *            - start-time of getting notification
+     * @param stop
+     *            - stop-time of getting notification
+     * @param filter
+     *            - indicate which subset of all possible events are of interest
+     */
+    public void setQueryParams(final Date start, final Date stop, final String filter) {
+        this.start = start;
+        this.stop = stop;
+        this.filter = filter;
+    }
+
+    /**
+     * Checking query parameters on specific notification
+     *
+     * @param xml
+     *            - data of notification
+     * @param listener
+     *            - listener of notification
+     * @return true if notification meets the requirements of query parameters,
+     *         false otherwise
+     */
+    protected <T extends BaseListenerInterface> boolean checkQueryParams(final String xml, final T listener) {
+        this.xml = xml;
+        final Date now = new Date();
+        if (this.stop != null) {
+            if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
+                return checkFilter();
+            }
+            if (this.stop.compareTo(now) < 0) {
+                try {
+                    listener.close();
+                } catch (final Exception e) {
+                    throw new RestconfDocumentedException("Problem with unregister listener." + e);
+                }
+            }
+        } else if (this.start != null) {
+            if (this.start.compareTo(now) < 0) {
+                this.start = null;
+                return checkFilter();
+            }
+        } else {
+            return checkFilter();
+        }
+        return false;
+    }
+
+    /**
+     * Check if is filter used and then prepare and post data do client
+     *
+     * @param change
+     *            - data of notification
+     */
+    private boolean checkFilter() {
+        if (this.filter == null) {
+            return true;
+        } else {
+            try {
+                return parseFilterParam();
+            } catch (final Exception e) {
+                throw new RestconfDocumentedException("Problem while parsing filter.", e);
+            }
+        }
+    }
+
+    /**
+     * Parse and evaluate filter value by xml
+     *
+     * @return true or false - depends on filter expression and data of
+     *         notifiaction
+     * @throws Exception
+     */
+    private boolean parseFilterParam() throws Exception {
+        final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+        final DocumentBuilder builder = factory.newDocumentBuilder();
+        final Document docOfXml = builder.parse(new InputSource(new StringReader(this.xml)));
+        final XPath xPath = XPathFactory.newInstance().newXPath();
+        return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
+    }
+}
diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java
new file mode 100644 (file)
index 0000000..50bb088
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import io.netty.channel.Channel;
+import java.util.Set;
+
+/**
+ * Base interface for both listeners({@link ListenerAdapter},
+ * {@link NotificationListenerAdapter})
+ */
+interface BaseListenerInterface extends AutoCloseable {
+
+    /**
+     * Return all subscribers of listener
+     *
+     * @return set of subscribers
+     */
+    Set<Channel> getSubscribers();
+
+    /**
+     * Checks if exists at least one {@link Channel} subscriber.
+     *
+     * @return True if exist at least one {@link Channel} subscriber, false
+     *         otherwise.
+     */
+    boolean hasSubscribers();
+
+    /**
+     * Get name of stream
+     *
+     * @return stream name
+     */
+    String getStreamName();
+
+    /**
+     * Get output type
+     *
+     * @return outputType
+     */
+    String getOutputType();
+}
diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java
new file mode 100644 (file)
index 0000000..fc0b118
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import io.netty.channel.Channel;
+
+/**
+ * Represents event of specific {@link EventType} type, holds data and
+ * {@link Channel} subscriber.
+ */
+class Event {
+    private final EventType type;
+    private Channel subscriber;
+    private String data;
+
+    /**
+     * Creates new event specified by {@link EventType} type.
+     *
+     * @param type
+     *            EventType
+     */
+    public Event(final EventType type) {
+        this.type = type;
+    }
+
+    /**
+     * Gets the {@link Channel} subscriber.
+     *
+     * @return Channel
+     */
+    public Channel getSubscriber() {
+        return this.subscriber;
+    }
+
+    /**
+     * Sets subscriber for event.
+     *
+     * @param subscriber
+     *            Channel
+     */
+    public void setSubscriber(final Channel subscriber) {
+        this.subscriber = subscriber;
+    }
+
+    /**
+     * Gets event String.
+     *
+     * @return String representation of event data.
+     */
+    public String getData() {
+        return this.data;
+    }
+
+    /**
+     * Sets event data.
+     *
+     * @param data
+     *            String.
+     */
+    public void setData(final String data) {
+        this.data = data;
+    }
+
+    /**
+     * Gets event type.
+     *
+     * @return The type of the event.
+     */
+    public EventType getType() {
+        return this.type;
+    }
+}
diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java
new file mode 100644 (file)
index 0000000..fd5dc36
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import com.google.common.eventbus.Subscribe;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class EventBusChangeRecorder<T extends BaseListenerInterface> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EventBusChangeRecorder.class);
+    private final T listener;
+
+    /**
+     * Event bus change recorder of specific listener of notifications
+     *
+     * @param listener
+     *            - specific listener
+     */
+    EventBusChangeRecorder(final T listener) {
+        this.listener = listener;
+    }
+
+    @Subscribe
+    public void recordCustomerChange(final Event event) {
+        if (event.getType() == EventType.REGISTER) {
+            final Channel subscriber = event.getSubscriber();
+            if (!this.listener.getSubscribers().contains(subscriber)) {
+                this.listener.getSubscribers().add(subscriber);
+            }
+        } else if (event.getType() == EventType.DEREGISTER) {
+            this.listener.getSubscribers().remove(event.getSubscriber());
+            Notificator.removeListenerIfNoSubscriberExists(this.listener);
+        } else if (event.getType() == EventType.NOTIFY) {
+            for (final Channel subscriber : this.listener.getSubscribers()) {
+                if (subscriber.isActive()) {
+                    LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
+                    subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
+                } else {
+                    LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
+                    this.listener.getSubscribers().remove(subscriber);
+                }
+            }
+        }
+    }
+}
diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java
new file mode 100644 (file)
index 0000000..2a4d8bc
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+/**
+ * Type of the event.
+ */
+enum EventType {
+    REGISTER, DEREGISTER, NOTIFY;
+}
index 331daf7344dda15d7ee0a910697c6b4a9ec98a0b..0b2c1dd34d8811b2f6e79ba1b2754d29ff32ecd5 100644 (file)
@@ -8,55 +8,18 @@
 package org.opendaylight.netconf.sal.streams.listeners;
 
 import com.google.common.base.Preconditions;
-import com.google.common.eventbus.AsyncEventBus;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import io.netty.channel.Channel;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.util.internal.ConcurrentSet;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.StringReader;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.regex.Pattern;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.stream.XMLOutputFactory;
 import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamWriter;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
 import javax.xml.transform.dom.DOMResult;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathFactory;
 import org.json.JSONObject;
 import org.json.XML;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
-import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
-import org.opendaylight.restconf.Draft18.MonitoringModule;
-import org.opendaylight.restconf.handlers.SchemaContextHandler;
-import org.opendaylight.restconf.handlers.TransactionChainHandler;
-import org.opendaylight.restconf.parser.IdentifierCodec;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
@@ -65,10 +28,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgum
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
-import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
 import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -78,36 +37,24 @@ import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
-import org.xml.sax.InputSource;
 
 /**
- * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
+ * {@link ListenerAdapter} is responsible to track events, which occurred by
+ * changing data in data source.
  */
-public class ListenerAdapter implements DOMDataChangeListener {
+public class ListenerAdapter extends AbstractCommonSubscriber implements DOMDataChangeListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
-    private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
-    private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
-    private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
-
-    private static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
 
     private final YangInstanceIdentifier path;
-    private ListenerRegistration<DOMDataChangeListener> registration;
     private final String streamName;
-    private Set<Channel> subscribers = new ConcurrentSet<>();
-    private final EventBus eventBus;
-    private final EventBusChangeRecorder eventBusChangeRecorder;
     private final NotificationOutputType outputType;
-    private Date start = null;
-    private Date stop = null;
-    private String filter = null;
-    private TransactionChainHandler transactionChainHandler;
-    private SchemaContextHandler schemaHandler;
+
+    private AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
 
     /**
      * Creates new {@link ListenerAdapter} listener specified by path and stream
-     * name.
+     * name and register for subscribing
      *
      * @param path
      *            Path to data in data store.
@@ -118,76 +65,47 @@ public class ListenerAdapter implements DOMDataChangeListener {
      */
     ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
             final NotificationOutputType outputType) {
-        this.outputType = outputType;
-        Preconditions.checkNotNull(path);
+        super();
+        register(this);
+        setLocalNameOfPath(path.getLastPathArgument().getNodeType().getLocalName());
+
+        this.outputType = Preconditions.checkNotNull(outputType);
+        this.path = Preconditions.checkNotNull(path);
         Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
-        this.path = path;
         this.streamName = streamName;
-        this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
-        this.eventBusChangeRecorder = new EventBusChangeRecorder();
-        this.eventBus.register(this.eventBusChangeRecorder);
     }
 
     @Override
     public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-        final Date now = new Date();
-        if (this.stop != null) {
-            if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
-                checkFilter(change);
-            }
-            if (this.stop.compareTo(now) < 0) {
-                try {
-                    this.close();
-                } catch (final Exception e) {
-                    throw new RestconfDocumentedException("Problem with unregister listener." + e);
-                }
-            }
-        } else if (this.start != null) {
-            if (this.start.compareTo(now) < 0) {
-                this.start = null;
-                checkFilter(change);
-            }
-        } else {
-            checkFilter(change);
+        this.change = change;
+        final String xml = prepareXml();
+        if (checkQueryParams(xml, this)) {
+            prepareAndPostData(xml);
         }
     }
 
     /**
-     * Check if is filter used and then prepare and post data do client
+     * Gets the name of the stream.
      *
-     * @param change
-     *            - data of notification
+     * @return The name of the stream.
      */
-    private void checkFilter(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-        final String xml = prepareXmlFrom(change);
-        if (this.filter == null) {
-            prepareAndPostData(xml);
-        } else {
-            try {
-                if (parseFilterParam(xml)) {
-                    prepareAndPostData(xml);
-                }
-            } catch (final Exception e) {
-                throw new RestconfDocumentedException("Problem while parsing filter.", e);
-            }
-        }
+    @Override
+    public String getStreamName() {
+        return this.streamName;
+    }
+
+    @Override
+    public String getOutputType() {
+        return this.outputType.getName();
     }
 
     /**
-     * Parse and evaluate filter value by xml
+     * Get path pointed to data in data store.
      *
-     * @param xml
-     *            - notification data in xml
-     * @return true or false - depends on filter expression and data of
-     *         notifiaction
-     * @throws Exception
+     * @return Path pointed to data in data store.
      */
-    private boolean parseFilterParam(final String xml) throws Exception {
-        final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-        final DocumentBuilder builder = factory.newDocumentBuilder();
-        final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
-        final XPath xPath = XPathFactory.newInstance().newXPath();
-        return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
+    public YangInstanceIdentifier getPath() {
+        return this.path;
     }
 
     /**
@@ -196,117 +114,19 @@ public class ListenerAdapter implements DOMDataChangeListener {
      * @param xml
      */
     private void prepareAndPostData(final String xml) {
-            final Event event = new Event(EventType.NOTIFY);
-            if (this.outputType.equals(NotificationOutputType.JSON)) {
-                final JSONObject jsonObject = XML.toJSONObject(xml);
-                event.setData(jsonObject.toString());
-            } else {
-                event.setData(xml);
-            }
-            this.eventBus.post(event);
-    }
-
-    /**
-     * Tracks events of data change by customer.
-     */
-    private final class EventBusChangeRecorder {
-        @Subscribe
-        public void recordCustomerChange(final Event event) {
-            if (event.getType() == EventType.REGISTER) {
-                final Channel subscriber = event.getSubscriber();
-                if (!ListenerAdapter.this.subscribers.contains(subscriber)) {
-                    ListenerAdapter.this.subscribers.add(subscriber);
-                }
-            } else if (event.getType() == EventType.DEREGISTER) {
-                ListenerAdapter.this.subscribers.remove(event.getSubscriber());
-                Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
-            } else if (event.getType() == EventType.NOTIFY) {
-                for (final Channel subscriber : ListenerAdapter.this.subscribers) {
-                    if (subscriber.isActive()) {
-                        LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
-                        subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
-                    } else {
-                        LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
-                        ListenerAdapter.this.subscribers.remove(subscriber);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber.
-     */
-    private final class Event {
-        private final EventType type;
-        private Channel subscriber;
-        private String data;
-
-        /**
-         * Creates new event specified by {@link EventType} type.
-         *
-         * @param type
-         *            EventType
-         */
-        public Event(final EventType type) {
-            this.type = type;
-        }
-
-        /**
-         * Gets the {@link Channel} subscriber.
-         *
-         * @return Channel
-         */
-        public Channel getSubscriber() {
-            return this.subscriber;
-        }
-
-        /**
-         * Sets subscriber for event.
-         *
-         * @param subscriber
-         *            Channel
-         */
-        public void setSubscriber(final Channel subscriber) {
-            this.subscriber = subscriber;
-        }
-
-        /**
-         * Gets event String.
-         *
-         * @return String representation of event data.
-         */
-        public String getData() {
-            return this.data;
-        }
-
-        /**
-         * Sets event data.
-         *
-         * @param data String.
-         */
-        public void setData(final String data) {
-            this.data = data;
-        }
-
-        /**
-         * Gets event type.
-         *
-         * @return The type of the event.
-         */
-        public EventType getType() {
-            return this.type;
+        final Event event = new Event(EventType.NOTIFY);
+        if (this.outputType.equals(NotificationOutputType.JSON)) {
+            final JSONObject jsonObject = XML.toJSONObject(xml);
+            event.setData(jsonObject.toString());
+        } else {
+            event.setData(xml);
         }
+        post(event);
     }
 
     /**
-     * Type of the event.
+     * Tracks events of data change by customer.
      */
-    private enum EventType {
-        REGISTER,
-        DEREGISTER,
-        NOTIFY;
-    }
 
     /**
      * Prepare data in printable form and transform it to String.
@@ -315,68 +135,19 @@ public class ListenerAdapter implements DOMDataChangeListener {
      *            DataChangeEvent
      * @return Data in printable form.
      */
-    private String prepareXmlFrom(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+    private String prepareXml() {
         final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
-        final DataSchemaContextTree dataContextTree =  DataSchemaContextTree.from(schemaContext);
+        final DataSchemaContextTree dataContextTree = DataSchemaContextTree.from(schemaContext);
         final Document doc = createDocument();
-        final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
-                "notification");
-
-        doc.appendChild(notificationElement);
-
-        final Element eventTimeElement = doc.createElement("eventTime");
-        eventTimeElement.setTextContent(toRFC3339(new Date()));
-        notificationElement.appendChild(eventTimeElement);
+        final Element notificationElement = basePartDoc(doc);
 
         final Element dataChangedNotificationEventElement = doc.createElementNS(
                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
 
-        addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change,
+        addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, this.change,
                 schemaContext, dataContextTree);
         notificationElement.appendChild(dataChangedNotificationEventElement);
-
-        try {
-            final ByteArrayOutputStream out = new ByteArrayOutputStream();
-            final Transformer transformer = FACTORY.newTransformer();
-            transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
-            transformer.setOutputProperty(OutputKeys.METHOD, "xml");
-            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-            transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
-            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
-            transformer.transform(new DOMSource(doc),
-                    new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
-            final byte[] charData = out.toByteArray();
-            return new String(charData, "UTF-8");
-        } catch (TransformerException | UnsupportedEncodingException e) {
-            final String msg = "Error during transformation of Document into String";
-            LOG.error(msg, e);
-            return msg;
-        }
-    }
-
-    /**
-     * Formats data specified by RFC3339.
-     *
-     * @param d
-     *            Date
-     * @return Data specified by RFC3339.
-     */
-    public static String toRFC3339(final Date d) {
-        return RFC3339_PATTERN.matcher(RFC3339.format(d)).replaceAll("$1:$2");
-    }
-
-    /**
-     * Creates {@link Document} document.
-     * @return {@link Document} document.
-     */
-    public static Document createDocument() {
-        final DocumentBuilder bob;
-        try {
-            bob = DBF.newDocumentBuilder();
-        } catch (final ParserConfigurationException e) {
-            return null;
-        }
-        return bob.newDocument();
+        return transformDoc(doc);
     }
 
     /**
@@ -392,15 +163,13 @@ public class ListenerAdapter implements DOMDataChangeListener {
     private void addValuesToDataChangedNotificationEventElement(final Document doc,
             final Element dataChangedNotificationEventElement,
             final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change,
-            final SchemaContext  schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
+            final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
 
         addCreatedChangedValuesFromDataToElement(doc, change.getCreatedData().entrySet(),
-                dataChangedNotificationEventElement,
-                Operation.CREATED, schemaContext, dataSchemaContextTree);
+                dataChangedNotificationEventElement, Operation.CREATED, schemaContext, dataSchemaContextTree);
 
         addCreatedChangedValuesFromDataToElement(doc, change.getUpdatedData().entrySet(),
-                    dataChangedNotificationEventElement,
-                    Operation.UPDATED, schemaContext, dataSchemaContextTree);
+                dataChangedNotificationEventElement, Operation.UPDATED, schemaContext, dataSchemaContextTree);
 
         addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement,
                 Operation.DELETED);
@@ -431,9 +200,10 @@ public class ListenerAdapter implements DOMDataChangeListener {
         }
     }
 
-    private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set<Entry<YangInstanceIdentifier,
-                NormalizedNode<?,?>>> data, final Element element, final Operation operation, final SchemaContext
-            schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
+    private void addCreatedChangedValuesFromDataToElement(final Document doc,
+            final Set<Entry<YangInstanceIdentifier, NormalizedNode<?, ?>>> data, final Element element,
+            final Operation operation, final SchemaContext schemaContext,
+            final DataSchemaContextTree dataSchemaContextTree) {
         if ((data == null) || data.isEmpty()) {
             return;
         }
@@ -471,9 +241,9 @@ public class ListenerAdapter implements DOMDataChangeListener {
         return dataChangeEventElement;
     }
 
-    private Node createCreatedChangedDataChangeEventElement(final Document doc, final Entry<YangInstanceIdentifier,
-            NormalizedNode<?, ?>> entry, final Operation operation, final SchemaContext
-            schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
+    private Node createCreatedChangedDataChangeEventElement(final Document doc,
+            final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry, final Operation operation,
+            final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
         final Element dataChangeEventElement = doc.createElement("data-change-event");
         final Element pathElement = doc.createElement("path");
         final YangInstanceIdentifier path = entry.getKey();
@@ -485,8 +255,14 @@ public class ListenerAdapter implements DOMDataChangeListener {
         dataChangeEventElement.appendChild(operationElement);
 
         try {
-            final DOMResult domResult = writeNormalizedNode(entry.getValue(), path,
-                    schemaContext, dataSchemaContextTree);
+            SchemaPath nodePath;
+            final NormalizedNode<?, ?> normalized = entry.getValue();
+            if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) {
+                nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
+            } else {
+                nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
+            }
+            final DOMResult domResult = writeNormalizedNode(normalized, schemaContext, nodePath);
             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
             final Element dataElement = doc.createElement("data");
             dataElement.appendChild(result);
@@ -500,47 +276,6 @@ public class ListenerAdapter implements DOMDataChangeListener {
         return dataChangeEventElement;
     }
 
-    private static DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized,
-                                                 final YangInstanceIdentifier path, final SchemaContext context,
-                                                 final DataSchemaContextTree dataSchemaContextTree)
-            throws IOException, XMLStreamException {
-        final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
-        final Document doc = XmlDocumentUtils.getDocument();
-        final DOMResult result = new DOMResult(doc);
-        NormalizedNodeWriter normalizedNodeWriter = null;
-        NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
-        XMLStreamWriter writer = null;
-        final SchemaPath nodePath;
-
-        if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) {
-            nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
-        } else {
-            nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
-        }
-
-        try {
-            writer = XML_FACTORY.createXMLStreamWriter(result);
-            normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, nodePath);
-            normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
-
-            normalizedNodeWriter.write(normalized);
-
-            normalizedNodeWriter.flush();
-        } finally {
-            if (normalizedNodeWriter != null) {
-                normalizedNodeWriter.close();
-            }
-            if (normalizedNodeStreamWriter != null) {
-                normalizedNodeStreamWriter.close();
-            }
-            if (writer != null) {
-                writer.close();
-            }
-        }
-
-        return result;
-    }
-
     /**
      * Adds path as value to element.
      *
@@ -549,6 +284,7 @@ public class ListenerAdapter implements DOMDataChangeListener {
      * @param element
      *            {@link Element}
      */
+    @SuppressWarnings("rawtypes")
     private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) {
         final YangInstanceIdentifier normalizedPath = ControllerContext.getInstance().toXpathRepresentation(path);
         final StringBuilder textContent = new StringBuilder();
@@ -601,116 +337,11 @@ public class ListenerAdapter implements DOMDataChangeListener {
     }
 
     /**
-     * Gets path pointed to data in data store.
-     *
-     * @return Path pointed to data in data store.
-     */
-    public YangInstanceIdentifier getPath() {
-        return this.path;
-    }
-
-    /**
-     * Sets {@link ListenerRegistration} registration.
-     *
-     * @param registration DOMDataChangeListener registration
-     */
-    public void setRegistration(final ListenerRegistration<DOMDataChangeListener> registration) {
-        this.registration = registration;
-    }
-
-    /**
-     * Gets the name of the stream.
-     *
-     * @return The name of the stream.
-     */
-    public String getStreamName() {
-        return this.streamName;
-    }
-
-    /**
-     * Removes all subscribers and unregisters event bus change recorder form
-     * event bus and delete data in DS
-     */
-    public void close() throws Exception {
-        final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction();
-        wTx.delete(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY
-                        + this.path.getLastPathArgument().getNodeType().getLocalName(), this.schemaHandler.get()));
-        wTx.submit().checkedGet();
-
-        this.subscribers = new ConcurrentSet<>();
-        this.registration.close();
-        this.registration = null;
-        this.eventBus.unregister(this.eventBusChangeRecorder);
-    }
-
-    /**
-     * Checks if {@link ListenerRegistration} registration exist.
-     *
-     * @return True if exist, false otherwise.
-     */
-    public boolean isListening() {
-        return this.registration == null ? false : true;
-    }
-
-    /**
-     * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into
-     * event bus.
-     *
-     * @param subscriber
-     *            Channel
-     */
-    public void addSubscriber(final Channel subscriber) {
-        if (!subscriber.isActive()) {
-            LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
-        }
-        final Event event = new Event(EventType.REGISTER);
-        event.setSubscriber(subscriber);
-        this.eventBus.post(event);
-    }
-
-    /**
-     * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event
-     * into event bus.
-     *
-     * @param subscriber
-     */
-    public void removeSubscriber(final Channel subscriber) {
-        LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
-        final Event event = new Event(EventType.DEREGISTER);
-        event.setSubscriber(subscriber);
-        this.eventBus.post(event);
-    }
-
-    /**
-     * Checks if exists at least one {@link Channel} subscriber.
-     *
-     * @return True if exist at least one {@link Channel} subscriber, false otherwise.
-     */
-    public boolean hasSubscribers() {
-        return !this.subscribers.isEmpty();
-    }
-
-    /**
-     * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
-     */
-    private static enum Store {
-        CONFIG("config"),
-        OPERATION("operation");
-
-        private final String value;
-
-        private Store(final String value) {
-            this.value = value;
-        }
-    }
-
-    /**
-     * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
+     * Consists of three types {@link Operation#CREATED},
+     * {@link Operation#UPDATED} and {@link Operation#DELETED}.
      */
     private static enum Operation {
-        CREATED("created"),
-        UPDATED("updated"),
-        DELETED("deleted");
+        CREATED("created"), UPDATED("updated"), DELETED("deleted");
 
         private final String value;
 
@@ -718,46 +349,4 @@ public class ListenerAdapter implements DOMDataChangeListener {
             this.value = value;
         }
     }
-
-    /**
-     * Set query parameters for listener
-     *
-     * @param start
-     *            - start-time of getting notification
-     * @param stop
-     *            - stop-time of getting notification
-     * @param filter
-     *            - indicate which subset of all possible events are of interest
-     */
-    public void setQueryParams(final Date start, final Date stop, final String filter) {
-        this.start = start;
-        this.stop = stop;
-        this.filter = filter;
-    }
-
-    /**
-     * Get output type
-     *
-     * @return outputType
-     */
-    public String getOutputType() {
-        return this.outputType.getName();
-    }
-
-    /**
-     * Transaction chain to delete data in DS on close()
-     *
-     * @param transactionChainHandler
-     *            - creating new write transaction to delete data on close
-     * @param schemaHandler
-     *            - for getting schema to deserialize
-     *            {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to
-     *            {@link YangInstanceIdentifier}
-     */
-    public void setCloseVars(final TransactionChainHandler transactionChainHandler,
-            final SchemaContextHandler schemaHandler) {
-        this.transactionChainHandler = transactionChainHandler;
-        this.schemaHandler = schemaHandler;
-    }
-
 }
diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenersConstants.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenersConstants.java
new file mode 100644 (file)
index 0000000..a92c3a2
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import java.text.SimpleDateFormat;
+import java.util.regex.Pattern;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.TransformerFactory;
+
+/**
+ * Constants of listeners
+ */
+class ListenersConstants {
+
+    static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance();
+    static final TransformerFactory FACTORY = TransformerFactory.newInstance();
+    static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
+
+    static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
+}
index 2d3ad3a10b3cc4306e0e15ad3abe0a41217dca56..843edafac688f3086c3fb3a6133fa45c7a224a63 100644 (file)
@@ -7,54 +7,19 @@
  */
 package org.opendaylight.netconf.sal.streams.listeners;
 
-import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
-import com.google.common.eventbus.AsyncEventBus;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import io.netty.channel.Channel;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.util.internal.ConcurrentSet;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.StringReader;
 import java.io.StringWriter;
-import java.io.UnsupportedEncodingException;
 import java.io.Writer;
 import java.util.Collection;
 import java.util.Date;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.stream.XMLOutputFactory;
 import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamWriter;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
 import javax.xml.transform.dom.DOMResult;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathFactory;
 import org.json.JSONObject;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
 import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
-import org.opendaylight.restconf.Draft18.MonitoringModule;
-import org.opendaylight.restconf.handlers.SchemaContextHandler;
-import org.opendaylight.restconf.handlers.TransactionChainHandler;
-import org.opendaylight.restconf.parser.IdentifierCodec;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
@@ -64,8 +29,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWrit
 import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactory;
 import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.data.codec.gson.JsonWriterFactory;
-import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
@@ -73,34 +36,22 @@ import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
-import org.xml.sax.InputSource;
 
 /**
  * {@link NotificationListenerAdapter} is responsible to track events on
  * notifications.
  *
  */
-public class NotificationListenerAdapter implements DOMNotificationListener {
+public class NotificationListenerAdapter extends AbstractCommonSubscriber implements DOMNotificationListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
-    private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
 
     private final String streamName;
-    private final EventBus eventBus;
-    private final EventBusChangeRecorder eventBusChangeRecorder;
-
     private final SchemaPath path;
     private final String outputType;
-    private Date start = null;
-    private Date stop = null;
-    private String filter;
 
     private SchemaContext schemaContext;
     private DOMNotification notification;
-    private ListenerRegistration<DOMNotificationListener> registration;
-    private Set<Channel> subscribers = new ConcurrentSet<>();
-    private TransactionChainHandler transactionChainHandler;
-    private SchemaContextHandler schemaHandler;
 
     /**
      * Set path of listener and stream name, register event bus.
@@ -113,76 +64,54 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
      *            - type of output on notification (JSON, XML)
      */
     NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
-        this.outputType = outputType;
+        super();
+        register(this);
+        setLocalNameOfPath(path.getLastComponent().getLocalName());
+
+        this.outputType = Preconditions.checkNotNull(outputType);
+        this.path = Preconditions.checkNotNull(path);
         Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
-        Preconditions.checkArgument(path != null);
-        this.path = path;
         this.streamName = streamName;
-        this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
-        this.eventBusChangeRecorder = new EventBusChangeRecorder();
-        this.eventBus.register(this.eventBusChangeRecorder);
+    }
+
+    /**
+     * Get outputType of listenere
+     *
+     * @return the outputType
+     */
+    @Override
+    public String getOutputType() {
+        return this.outputType;
     }
 
     @Override
     public void onNotification(final DOMNotification notification) {
         this.schemaContext = ControllerContext.getInstance().getGlobalSchema();
         this.notification = notification;
-        final Date now = new Date();
-        if (this.stop != null) {
-            if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
-                checkFilter();
-            }
-            if (this.stop.compareTo(now) < 0) {
-                try {
-                    this.close();
-                } catch (final Exception e) {
-                    throw new RestconfDocumentedException("Problem with unregister listener." + e);
-                }
-            }
-        } else if (this.start != null) {
-            if (this.start.compareTo(now) < 0) {
-                this.start = null;
-                checkFilter();
-            }
-        } else {
-            checkFilter();
+
+        final String xml = prepareXml();
+        if (checkQueryParams(xml, this)) {
+            prepareAndPostData(xml);
         }
     }
 
     /**
-     * Check if is filter used and then prepare and post data do client
+     * Get stream name of this listener
      *
+     * @return {@link String}
      */
-    private void checkFilter() {
-        final String xml = prepareXml();
-        if (this.filter == null) {
-            prepareAndPostData(xml);
-        } else {
-            try {
-                if (parseFilterParam(xml)) {
-                    prepareAndPostData(xml);
-                }
-            } catch (final Exception e) {
-                throw new RestconfDocumentedException("Problem while parsing filter.", e);
-            }
-        }
+    @Override
+    public String getStreamName() {
+        return this.streamName;
     }
 
     /**
-     * Parse and evaluate filter value by xml
+     * Get schema path of notification
      *
-     * @param xml
-     *            - notification data in xml
-     * @return true or false - depends on filter expression and data of
-     *         notifiaction
-     * @throws Exception
+     * @return {@link SchemaPath}
      */
-    private boolean parseFilterParam(final String xml) throws Exception {
-        final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-        final DocumentBuilder builder = factory.newDocumentBuilder();
-        final Document docOfXml = builder.parse(new InputSource(new StringReader(xml)));
-        final XPath xPath = XPathFactory.newInstance().newXPath();
-        return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
+    public SchemaPath getSchemaPath() {
+        return this.path;
     }
 
     /**
@@ -197,7 +126,7 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
         } else {
             event.setData(xml);
         }
-        this.eventBus.post(event);
+        post(event);
     }
 
     /**
@@ -227,137 +156,16 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
         return writer.toString();
     }
 
-    /**
-     * Checks if exists at least one {@link Channel} subscriber.
-     *
-     * @return True if exist at least one {@link Channel} subscriber, false
-     *         otherwise.
-     */
-    public boolean hasSubscribers() {
-        return !this.subscribers.isEmpty();
-    }
-
-    /**
-     * Reset lists, close registration and unregister bus event and delete data in DS.
-     */
-    public void close() {
-        final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction();
-        wTx.delete(LogicalDatastoreType.OPERATIONAL,
-                IdentifierCodec.deserialize(
-                        MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + this.path.getLastComponent().getLocalName(),
-                        this.schemaHandler.get()));
-        try {
-            wTx.submit().checkedGet();
-        } catch (final TransactionCommitFailedException e) {
-            throw new RestconfDocumentedException("Problem while deleting data from DS.", e);
-        }
-
-        this.subscribers = new ConcurrentSet<>();
-        this.registration.close();
-        this.registration = null;
-        this.eventBus.unregister(this.eventBusChangeRecorder);
-    }
-
-    /**
-     * Get stream name of this listener
-     *
-     * @return {@link String}
-     */
-    public String getStreamName() {
-        return this.streamName;
-    }
-
-    /**
-     * Check if is this listener registered.
-     *
-     * @return - true if is registered, otherwise null
-     */
-    public boolean isListening() {
-        return this.registration == null ? false : true;
-    }
-
-    /**
-     * Get schema path of notification
-     *
-     * @return {@link SchemaPath}
-     */
-    public SchemaPath getSchemaPath() {
-        return this.path;
-    }
-
-    /**
-     * Set registration for close after closing connection and check if this
-     * listener is registered
-     *
-     * @param registration
-     *            - registered listener
-     */
-    public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
-        Preconditions.checkNotNull(registration);
-        this.registration = registration;
-    }
-
-    /**
-     * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
-     * subscriber to the event and post event into event bus.
-     *
-     * @param subscriber
-     *            Channel
-     */
-    public void addSubscriber(final Channel subscriber) {
-        if (!subscriber.isActive()) {
-            LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
-        }
-        final Event event = new Event(EventType.REGISTER);
-        event.setSubscriber(subscriber);
-        this.eventBus.post(event);
-    }
-
-    /**
-     * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
-     * subscriber to the event and posts event into event bus.
-     *
-     * @param subscriber
-     */
-    public void removeSubscriber(final Channel subscriber) {
-        LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
-        final Event event = new Event(EventType.DEREGISTER);
-        event.setSubscriber(subscriber);
-        this.eventBus.post(event);
-    }
-
     private String prepareXml() {
-        final Document doc = ListenerAdapter.createDocument();
-        final Element notificationElement =
-                doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
-                "notification");
-        doc.appendChild(notificationElement);
-
-        final Element eventTimeElement = doc.createElement("eventTime");
-        eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
-        notificationElement.appendChild(eventTimeElement);
+        final Document doc = createDocument();
+        final Element notificationElement = basePartDoc(doc);
 
         final Element notificationEventElement = doc.createElementNS(
                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
         addValuesToNotificationEventElement(doc, notificationEventElement, this.notification, this.schemaContext);
         notificationElement.appendChild(notificationEventElement);
 
-        try {
-            final ByteArrayOutputStream out = new ByteArrayOutputStream();
-            final Transformer transformer = FACTORY.newTransformer();
-            transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
-            transformer.setOutputProperty(OutputKeys.METHOD, "xml");
-            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-            transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
-            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
-            transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
-            final byte[] charData = out.toByteArray();
-            return new String(charData, "UTF-8");
-        } catch (TransformerException | UnsupportedEncodingException e) {
-            final String msg = "Error during transformation of Document into String";
-            LOG.error(msg, e);
-            return msg;
-        }
+        return transformDoc(doc);
     }
 
     private void addValuesToNotificationEventElement(final Document doc, final Element element,
@@ -366,11 +174,11 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
             return;
         }
 
-        final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
-                .getBody();
+        final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body =
+                notification.getBody();
         try {
-            final DOMResult domResult = writeNormalizedNode(body,
-                    YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
+
+            final DOMResult domResult = writeNormalizedNode(body, schemaContext, this.path);
             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
             final Element dataElement = doc.createElement("notification");
             dataElement.appendChild(result);
@@ -381,181 +189,4 @@ public class NotificationListenerAdapter implements DOMNotificationListener {
             LOG.error("Error processing stream", e);
         }
     }
-
-    private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
-            final SchemaContext context) throws IOException, XMLStreamException {
-        final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
-        final Document doc = XmlDocumentUtils.getDocument();
-        final DOMResult result = new DOMResult(doc);
-        NormalizedNodeWriter normalizedNodeWriter = null;
-        NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
-        XMLStreamWriter writer = null;
-
-        try {
-            writer = XML_FACTORY.createXMLStreamWriter(result);
-            normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
-                    this.getSchemaPath());
-            normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
-
-            normalizedNodeWriter.write(normalized);
-
-            normalizedNodeWriter.flush();
-        } finally {
-            if (normalizedNodeWriter != null) {
-                normalizedNodeWriter.close();
-            }
-            if (normalizedNodeStreamWriter != null) {
-                normalizedNodeStreamWriter.close();
-            }
-            if (writer != null) {
-                writer.close();
-            }
-        }
-
-        return result;
-    }
-
-    /**
-     * Tracks events of data change by customer.
-     */
-    private final class EventBusChangeRecorder {
-        @Subscribe
-        public void recordCustomerChange(final Event event) {
-            if (event.getType() == EventType.REGISTER) {
-                final Channel subscriber = event.getSubscriber();
-                if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
-                    NotificationListenerAdapter.this.subscribers.add(subscriber);
-                }
-            } else if (event.getType() == EventType.DEREGISTER) {
-                NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
-                Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
-            } else if (event.getType() == EventType.NOTIFY) {
-                for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
-                    if (subscriber.isActive()) {
-                        LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
-                        subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
-                    } else {
-                        LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
-                        NotificationListenerAdapter.this.subscribers.remove(subscriber);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Represents event of specific {@link EventType} type, holds data and
-     * {@link Channel} subscriber.
-     */
-    private final class Event {
-        private final EventType type;
-        private Channel subscriber;
-        private String data;
-
-        /**
-         * Creates new event specified by {@link EventType} type.
-         *
-         * @param type
-         *            EventType
-         */
-        public Event(final EventType type) {
-            this.type = type;
-        }
-
-        /**
-         * Gets the {@link Channel} subscriber.
-         *
-         * @return Channel
-         */
-        public Channel getSubscriber() {
-            return this.subscriber;
-        }
-
-        /**
-         * Sets subscriber for event.
-         *
-         * @param subscriber
-         *            Channel
-         */
-        public void setSubscriber(final Channel subscriber) {
-            this.subscriber = subscriber;
-        }
-
-        /**
-         * Gets event String.
-         *
-         * @return String representation of event data.
-         */
-        public String getData() {
-            return this.data;
-        }
-
-        /**
-         * Sets event data.
-         *
-         * @param data
-         *            String.
-         */
-        public void setData(final String data) {
-            this.data = data;
-        }
-
-        /**
-         * Gets event type.
-         *
-         * @return The type of the event.
-         */
-        public EventType getType() {
-            return this.type;
-        }
-    }
-
-    /**
-     * Type of the event.
-     */
-    private enum EventType {
-        REGISTER, DEREGISTER, NOTIFY
-    }
-
-    /**
-     * Set query parameters for listener
-     *
-     * @param start
-     *            - start-time of getting notification
-     * @param stop
-     *            - stop-time of getting notification
-     * @param filter
-     *            - indicate which subset of all possible events are of interest
-     */
-    public void setQueryParams(final Date start, final Date stop, final String filter) {
-        this.start = start;
-        this.stop = stop;
-        this.filter = filter;
-    }
-
-    /**
-     * Get outputType of listenere
-     *
-     * @return the outputType
-     */
-    public String getOutputType() {
-        return this.outputType;
-    }
-
-    /**
-     * Transaction chain to delete data in DS on close()
-     *
-     * @param transactionChainHandler
-     *            - creating new write transaction to delete data on close
-     * @param schemaHandler
-     *            - for getting schema to deserialize
-     *            {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to
-     *            {@link YangInstanceIdentifier}
-     */
-    public void setCloseVars(final TransactionChainHandler transactionChainHandler,
-            final SchemaContextHandler schemaHandler) {
-        this.transactionChainHandler = transactionChainHandler;
-        this.schemaHandler = schemaHandler;
-
-    }
 }
index 7c809ad77fe74d49604c1d8c6b9e41c814aa8d42..4db067cd15df0e8151c95c978023d613f611cdc0 100644 (file)
@@ -131,25 +131,15 @@ public class Notificator {
         }
     }
 
-    /**
-     * Checks if listener has at least one subscriber. In case it doesn't have any, delete listener.
-     *
-     * @param listener
-     *            ListenerAdapter
-     */
-    public static void removeListenerIfNoSubscriberExists(final ListenerAdapter listener) {
-        if (!listener.hasSubscribers()) {
-            deleteListener(listener);
-        }
-    }
-
     /**
      * Delete {@link ListenerAdapter} listener specified in parameter.
      *
+     * @param <T>
+     *
      * @param listener
      *            ListenerAdapter
      */
-    private static void deleteListener(final ListenerAdapter listener) {
+    private static <T extends BaseListenerInterface> void deleteListener(final T listener) {
         if (listener != null) {
             try {
                 listener.close();
@@ -203,13 +193,17 @@ public class Notificator {
         return listListeners;
     }
 
-    public static void removeNotificationListenerIfNoSubscriberExists(final NotificationListenerAdapter listener) {
+    public static <T extends BaseListenerInterface> void removeListenerIfNoSubscriberExists(final T listener) {
         if (!listener.hasSubscribers()) {
-            deleteNotificationListener(listener);
+            if (listener instanceof NotificationListenerAdapter) {
+                deleteNotificationListener(listener);
+            } else {
+                deleteListener(listener);
+            }
         }
     }
 
-    private static void deleteNotificationListener(final NotificationListenerAdapter listener) {
+    private static <T extends BaseListenerInterface> void deleteNotificationListener(final T listener) {
         if (listener != null) {
             try {
                 listener.close();
index 612b03bdec1b96b477ca68f00a936fc78797a305..74df8b5422c3988ca3d0673f5ccd040973cc3b2c 100644 (file)
@@ -80,7 +80,6 @@ public class RestconfOperationsServiceImpl implements RestconfOperationsService
                     ref.get(), Optional.of(this.domMountPointServiceHandler.get()));
             mountPoint = mountPointIdentifier.getMountPoint();
             modules = ref.getModules(mountPoint);
-
         } else {
             final String errMsg =
                     "URI has bad format. If operations behind mount point should be showed, URI has to end with ";
index 3c81d2322cf86cc90ea333426678de67017cb23e..fb2c76ee49ef435d2f24e20704f8df2f99cdadee 100644 (file)
@@ -14,6 +14,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import javax.ws.rs.core.UriInfo;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
 import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
 import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
@@ -29,6 +32,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdent
 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeAttrBuilder;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,77 +44,45 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu
 
     private static final Logger LOG = LoggerFactory.getLogger(RestconfStreamsSubscriptionServiceImpl.class);
 
-    private final DOMDataBrokerHandler domDataBrokerHandler;
-
-    private final NotificationServiceHandler notificationServiceHandler;
-
-    private final SchemaContextHandler schemaHandler;
-
-    private final TransactionChainHandler transactionChainHandler;
+    private final HandlersHolder handlersHolder;
 
+    /**
+     * Initialize holder of handlers with holders as parameters.
+     *
+     * @param domDataBrokerHandler
+     *            - handler of {@link DOMDataBroker}
+     * @param notificationServiceHandler
+     *            - handler of {@link DOMNotificationService}
+     * @param schemaHandler
+     *            - handler of {@link SchemaContext}
+     * @param transactionChainHandler
+     *            - handler of {@link DOMTransactionChain}
+     */
     public RestconfStreamsSubscriptionServiceImpl(final DOMDataBrokerHandler domDataBrokerHandler,
             final NotificationServiceHandler notificationServiceHandler, final SchemaContextHandler schemaHandler,
             final TransactionChainHandler transactionChainHandler) {
-        this.domDataBrokerHandler = domDataBrokerHandler;
-        this.notificationServiceHandler = notificationServiceHandler;
-        this.schemaHandler = schemaHandler;
-        this.transactionChainHandler = transactionChainHandler;
+        this.handlersHolder = new HandlersHolder(domDataBrokerHandler, notificationServiceHandler,
+                transactionChainHandler, schemaHandler);
     }
 
     @Override
     public NormalizedNodeContext subscribeToStream(final String identifier, final UriInfo uriInfo) {
-        boolean startTime_used = false;
-        boolean stopTime_used = false;
-        boolean filter_used = false;
-        Date start = null;
-        Date stop = null;
-        String filter = null;
-
-        for (final Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
-            switch (entry.getKey()) {
-                case "start-time":
-                    if (!startTime_used) {
-                        startTime_used = true;
-                        start = SubscribeToStreamUtil.parseDateFromQueryParam(entry);
-                    } else {
-                        throw new RestconfDocumentedException("Start-time parameter can be used only once.");
-                    }
-                    break;
-                case "stop-time":
-                    if (!stopTime_used) {
-                        stopTime_used = true;
-                        stop = SubscribeToStreamUtil.parseDateFromQueryParam(entry);
-                    } else {
-                        throw new RestconfDocumentedException("Stop-time parameter can be used only once.");
-                    }
-                    break;
-                case "filter":
-                    if (!filter_used) {
-                        filter_used = true;
-                        filter = entry.getValue().iterator().next();
-                    }
-                    break;
-                default:
-                    throw new RestconfDocumentedException("Bad parameter used with notifications: " + entry.getKey());
-            }
-        }
-        if (!startTime_used && stopTime_used) {
-            throw new RestconfDocumentedException("Stop-time parameter has to be used with start-time parameter.");
-        }
+        final NotificationQueryParams notificationQueryParams = new NotificationQueryParams();
+        notificationQueryParams.prepareParams(uriInfo);
+
         URI response = null;
         if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCR)) {
-            response =
-                    SubscribeToStreamUtil.notifiDataStream(identifier, uriInfo, start, stop, this.domDataBrokerHandler, filter,
-                            this.transactionChainHandler, this.schemaHandler);
+            response = SubscribeToStreamUtil.notifiDataStream(identifier, uriInfo, notificationQueryParams,
+                    this.handlersHolder);
         } else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
-            response = SubscribeToStreamUtil.notifYangStream(identifier, uriInfo, start, stop,
-                    this.notificationServiceHandler, filter, this.transactionChainHandler, this.schemaHandler);
+            response = SubscribeToStreamUtil.notifYangStream(identifier, uriInfo, notificationQueryParams,
+                    this.handlersHolder);
         }
 
         if (response != null) {
             // prepare node with value of location
             final InstanceIdentifierContext<?> iid =
-                    SubscribeToStreamUtil.prepareIIDSubsStreamOutput(this.schemaHandler);
+                    SubscribeToStreamUtil.prepareIIDSubsStreamOutput(this.handlersHolder.getSchemaHandler());
             final NormalizedNodeAttrBuilder<NodeIdentifier, Object, LeafNode<Object>> builder =
                     ImmutableLeafNodeBuilder.create().withValue(response.toString());
             builder.withNodeIdentifier(
@@ -127,4 +99,146 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu
         LOG.warn(msg);
         throw new RestconfDocumentedException(msg);
     }
+
+    /**
+     * Holder of all handlers for notifications
+     */
+    public final class HandlersHolder {
+
+        private final DOMDataBrokerHandler domDataBrokerHandler;
+        private final NotificationServiceHandler notificationServiceHandler;
+        private final TransactionChainHandler transactionChainHandler;
+        private final SchemaContextHandler schemaHandler;
+
+        private HandlersHolder(final DOMDataBrokerHandler domDataBrokerHandler,
+                final NotificationServiceHandler notificationServiceHandler,
+                final TransactionChainHandler transactionChainHandler, final SchemaContextHandler schemaHandler) {
+            this.domDataBrokerHandler = domDataBrokerHandler;
+            this.notificationServiceHandler = notificationServiceHandler;
+            this.transactionChainHandler = transactionChainHandler;
+            this.schemaHandler = schemaHandler;
+        }
+
+        /**
+         * Get {@link DOMDataBrokerHandler}
+         *
+         * @return the domDataBrokerHandler
+         */
+        public DOMDataBrokerHandler getDomDataBrokerHandler() {
+            return this.domDataBrokerHandler;
+        }
+
+        /**
+         * Get {@link NotificationServiceHandler}
+         *
+         * @return the notificationServiceHandler
+         */
+        public NotificationServiceHandler getNotificationServiceHandler() {
+            return this.notificationServiceHandler;
+        }
+
+        /**
+         * Get {@link TransactionChainHandler}
+         *
+         * @return the transactionChainHandler
+         */
+        public TransactionChainHandler getTransactionChainHandler() {
+            return this.transactionChainHandler;
+        }
+
+        /**
+         * Get {@link SchemaContextHandler}
+         *
+         * @return the schemaHandler
+         */
+        public SchemaContextHandler getSchemaHandler() {
+            return this.schemaHandler;
+        }
+    }
+
+    /**
+     * Parser and holder of query paramteres from uriInfo for notifications
+     *
+     */
+    public final class NotificationQueryParams {
+
+        private Date start = null;
+        private Date stop = null;
+        private String filter = null;
+
+        private NotificationQueryParams() {
+
+        }
+
+        private void prepareParams(final UriInfo uriInfo) {
+            boolean startTime_used = false;
+            boolean stopTime_used = false;
+            boolean filter_used = false;
+
+            for (final Entry<String, List<String>> entry : uriInfo.getQueryParameters().entrySet()) {
+                switch (entry.getKey()) {
+                    case "start-time":
+                        if (!startTime_used) {
+                            startTime_used = true;
+                            this.start = SubscribeToStreamUtil.parseDateFromQueryParam(entry);
+                        } else {
+                            throw new RestconfDocumentedException("Start-time parameter can be used only once.");
+                        }
+                        break;
+                    case "stop-time":
+                        if (!stopTime_used) {
+                            stopTime_used = true;
+                            this.stop = SubscribeToStreamUtil.parseDateFromQueryParam(entry);
+                        } else {
+                            throw new RestconfDocumentedException("Stop-time parameter can be used only once.");
+                        }
+                        break;
+                    case "filter":
+                        if (!filter_used) {
+                            filter_used = true;
+                            this.filter = entry.getValue().iterator().next();
+                        }
+                        break;
+                    default:
+                        throw new RestconfDocumentedException(
+                                "Bad parameter used with notifications: " + entry.getKey());
+                }
+            }
+            if (!startTime_used && stopTime_used) {
+                throw new RestconfDocumentedException("Stop-time parameter has to be used with start-time parameter.");
+            }
+
+            if (this.start == null) {
+                this.start = new Date();
+            }
+        }
+
+        /**
+         * Get start-time query parameter
+         *
+         * @return start-time
+         */
+        public Date getStart() {
+            return this.start;
+        }
+
+        /**
+         * Get stop-time query parameter
+         *
+         * @return stop-time
+         */
+        public Date getStop() {
+            return this.stop;
+        }
+
+        /**
+         * Get filter query parameter
+         *
+         * @return filter
+         */
+        public String getFilter() {
+            return this.filter;
+        }
+    }
+
 }
index d199604aff0180d55d3319aceb141591acbab85b..a6e76b56b472d6b4bb2fdd7acb2b0d29295acfad 100644 (file)
@@ -102,20 +102,23 @@ public final class CreateStreamUtil {
         final ContainerNode data = (ContainerNode) payload.getData();
         final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
         final YangInstanceIdentifier path = preparePath(data, qname);
-        final String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data);
+        String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data);
 
         final QName outputQname = QName.create(qname, "output");
         final QName streamNameQname = QName.create(qname, "stream-name");
 
-        final ContainerNode output = ImmutableContainerNodeBuilder.create()
-                .withNodeIdentifier(new NodeIdentifier(outputQname))
-                .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
         final NotificationOutputType outputType = prepareOutputType(data);
+        if(outputType.equals(NotificationOutputType.JSON)){
+            streamName = streamName + "/JSON";
+        }
 
         if (!Notificator.existListenerFor(streamName)) {
             Notificator.createListener(path, streamName, outputType);
         }
 
+        final ContainerNode output =
+                ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname))
+                        .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
         return new DefaultDOMRpcResult(output);
     }
 
@@ -229,7 +232,9 @@ public final class CreateStreamUtil {
                 streamName = streamName + ",";
             }
         }
-
+        if (outputType.equals("JSON")) {
+            streamName = streamName + "/JSON";
+        }
         final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
         final QName outputQname = QName.create(rpcQName, "output");
         final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier");
index f387941b67b2a15612c7c78ba5a2dd207a2db5bf..d8df4d20c8d5ec62f2e31db9a7f038caf63645b6 100644 (file)
@@ -38,11 +38,11 @@ import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapte
 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
 import org.opendaylight.restconf.Draft18.MonitoringModule;
-import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
 import org.opendaylight.restconf.handlers.NotificationServiceHandler;
 import org.opendaylight.restconf.handlers.SchemaContextHandler;
-import org.opendaylight.restconf.handlers.TransactionChainHandler;
 import org.opendaylight.restconf.parser.IdentifierCodec;
+import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
+import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
 import org.opendaylight.restconf.utils.RestconfConstants;
 import org.opendaylight.restconf.utils.mapping.RestconfMappingNodeUtil;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
@@ -55,6 +55,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
@@ -72,83 +73,6 @@ public final class SubscribeToStreamUtil {
         throw new UnsupportedOperationException("Util class");
     }
 
-    /**
-     * Parse enum from URI
-     *
-     * @param clazz
-     *            - enum type
-     * @param value
-     *            - string of enum value
-     * @return enum
-     */
-    private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
-        if ((value == null) || value.equals("")) {
-            return null;
-        }
-        return ResolveEnumUtil.resolveEnum(clazz, value);
-    }
-
-    /**
-     * Prepare map of values from URI
-     *
-     * @param identifier
-     *            - URI
-     * @return {@link Map}
-     */
-    public static Map<String, String> mapValuesFromUri(final String identifier) {
-        final HashMap<String, String> result = new HashMap<>();
-        final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH));
-        for (final String token : tokens) {
-            final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
-            if (paramToken.length == 2) {
-                result.put(paramToken[0], paramToken[1]);
-            }
-        }
-        return result;
-    }
-
-    /**
-     * Register data change listener in dom data broker and set it to listener
-     * on stream
-     *
-     * @param ds
-     *            - {@link LogicalDatastoreType}
-     * @param scope
-     *            - {@link DataChangeScope}
-     * @param listener
-     *            - listener on specific stream
-     * @param domDataBroker
-     *            - data broker for register data change listener
-     */
-    private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
-            final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
-        if (listener.isListening()) {
-            return;
-        }
-
-        final YangInstanceIdentifier path = listener.getPath();
-        final ListenerRegistration<DOMDataChangeListener> registration = domDataBroker.registerDataChangeListener(ds,
-                path, listener, scope);
-
-        listener.setRegistration(registration);
-    }
-
-    /**
-     * Get port from web socket server. If doesn't exit, create it.
-     *
-     * @return port
-     */
-    private static int prepareNotificationPort() {
-        int port = RestconfStreamsConstants.NOTIFICATION_PORT;
-        try {
-            final WebSocketServer webSocketServer = WebSocketServer.getInstance();
-            port = webSocketServer.getPort();
-        } catch (final NullPointerException e) {
-            WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
-        }
-        return port;
-    }
-
     /**
      * Register listeners by streamName in identifier to listen to yang
      * notifications, put or delete info about listener to DS according to
@@ -158,24 +82,15 @@ public final class SubscribeToStreamUtil {
      *            - identifier as stream name
      * @param uriInfo
      *            - for getting base URI information
-     * @param start
-     *            - start-time query parameter
-     * @param stop
-     *            - stop-time query parameter
-     * @param notifiServiceHandler
-     *            - DOMNotificationService handler for register listeners
-     * @param filter
-     *            - indicate which subset of all possible events are of interest
-     * @param transactionChainHandler
-     *            - to put new data about stream to DS and delete after close
-     *            listener
-     * @param schemaHandler
-     *            - for getting schema context
+     * @param notificationQueryParams
+     *            - query parameters of notification
+     * @param handlersHolder
+     *            - holder of handlers for notifications
      * @return location for listening
      */
-    public static URI notifYangStream(final String identifier, final UriInfo uriInfo, Date start, final Date stop,
-            final NotificationServiceHandler notifiServiceHandler, final String filter,
-            final TransactionChainHandler transactionChainHandler, final SchemaContextHandler schemaHandler) {
+    @SuppressWarnings("rawtypes")
+    public static URI notifYangStream(final String identifier, final UriInfo uriInfo,
+            final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
         final String streamName = Notificator.createStreamNameFromUri(identifier);
         if (Strings.isNullOrEmpty(streamName)) {
             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
@@ -186,63 +101,27 @@ public final class SubscribeToStreamUtil {
                     ErrorTag.UNKNOWN_ELEMENT);
         }
 
-        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
-        int notificationPort = RestconfStreamsConstants.NOTIFICATION_PORT;
-        try {
-            final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
-            notificationPort = webSocketServerInstance.getPort();
-        } catch (final NullPointerException e) {
-            WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
-        }
-        final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
-        final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
-
-        final DOMDataReadWriteTransaction wTx = transactionChainHandler.get().newReadWriteTransaction();
-        final boolean exist = checkExist(schemaHandler, wTx);
-        final Module monitoringModule = schemaHandler.get()
-                .findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE);
-        if (start == null) {
-            start = new Date();
-        }
+        final DOMDataReadWriteTransaction wTx =
+                handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+        final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
+        final boolean exist = checkExist(schemaContext, wTx);
+
+        final URI uri = prepareUriByStreamName(uriInfo, streamName);
         for (final NotificationListenerAdapter listener : listeners) {
-            registerToListenNotification(listener, notifiServiceHandler);
-            listener.setQueryParams(start, stop, filter);
-            listener.setCloseVars(transactionChainHandler, schemaHandler);
+            registerToListenNotification(listener, handlersHolder.getNotificationServiceHandler());
+            listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
+                    notificationQueryParams.getFilter());
+            listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
             final NormalizedNode mapToStreams =
                     RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(listener.getSchemaPath().getLastComponent(),
-                    schemaHandler.get().getNotifications(), start, listener.getOutputType(),
-                            uriToWebsocketServer, monitoringModule, exist);
-            writeDataToDS(schemaHandler, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist, mapToStreams);
+                            schemaContext.getNotifications(), notificationQueryParams.getStart(),
+                            listener.getOutputType(), uri, getMonitoringModule(schemaContext), exist);
+            writeDataToDS(schemaContext, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist,
+                    mapToStreams);
         }
         submitData(wTx);
 
-        return uriToWebsocketServer;
-    }
-
-    private static boolean checkExist(final SchemaContextHandler schemaHandler, final DOMDataReadWriteTransaction wTx) {
-        boolean exist;
-        try {
-            exist = wTx
-                    .exists(LogicalDatastoreType.OPERATIONAL, IdentifierCodec
-                            .deserialize(MonitoringModule.PATH_TO_STREAMS, schemaHandler.get()))
-                    .checkedGet();
-        } catch (final ReadFailedException e1) {
-            throw new RestconfDocumentedException("Problem while checking data if exists", e1);
-        }
-        return exist;
-    }
-
-    private static void registerToListenNotification(final NotificationListenerAdapter listener,
-            final NotificationServiceHandler notificationServiceHandler) {
-        if (listener.isListening()) {
-            return;
-        }
-
-        final SchemaPath path = listener.getSchemaPath();
-        final ListenerRegistration<DOMNotificationListener> registration =
-                notificationServiceHandler.get().registerNotificationListener(listener, path);
-
-        listener.setRegistration(registration);
+        return uri;
     }
 
     /**
@@ -274,24 +153,15 @@ public final class SubscribeToStreamUtil {
      *            - identifier as stream name
      * @param uriInfo
      *            - for getting base URI information
-     * @param start
-     *            - start-time query parameter
-     * @param stop
-     *            - stop-time query parameter
-     * @param domDataBrokerHandler
-     *            - DOMDataBroker handler for register listener
-     * @param filter
-     *            - indicate which subset of all possible events are of interest
-     * @param schemaHandler
-     *            - for getting schema context
-     * @param transactionChainHandler
-     *            - to put new data about stream to DS and delete after close
-     *            listener
+     * @param notificationQueryParams
+     *            - query parameters of notification
+     * @param handlersHolder
+     *            - holder of handlers for notifications
      * @return location for listening
      */
-    public static URI notifiDataStream(final String identifier, final UriInfo uriInfo, Date start, final Date stop,
-            final DOMDataBrokerHandler domDataBrokerHandler, final String filter,
-            final TransactionChainHandler transactionChainHandler, final SchemaContextHandler schemaHandler) {
+    @SuppressWarnings("rawtypes")
+    public static URI notifiDataStream(final String identifier, final UriInfo uriInfo,
+            final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
         final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
 
         final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
@@ -315,53 +185,33 @@ public final class SubscribeToStreamUtil {
         final ListenerAdapter listener = Notificator.getListenerFor(streamName);
         Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
 
-        if (start == null) {
-            start = new Date();
-        }
+        listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
+                notificationQueryParams.getFilter());
+        listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
 
-        listener.setQueryParams(start, stop, filter);
-        listener.setCloseVars(transactionChainHandler, schemaHandler);
+        registration(ds, scope, listener, handlersHolder.getDomDataBrokerHandler().get());
 
-        SubscribeToStreamUtil.registration(ds, scope, listener, domDataBrokerHandler.get());
+        final URI uri = prepareUriByStreamName(uriInfo, streamName);
 
-        final int port = SubscribeToStreamUtil.prepareNotificationPort();
+        final DOMDataReadWriteTransaction wTx =
+                handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+        final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
+        final boolean exist = checkExist(schemaContext, wTx);
 
-        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
-        final UriBuilder uriToWebSocketServer =
-                uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
-        final URI uri = uriToWebSocketServer.replacePath(streamName).build();
-
-        final Module monitoringModule = schemaHandler.get()
-                .findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE);
-        final DOMDataReadWriteTransaction wTx = transactionChainHandler.get().newReadWriteTransaction();
-        final boolean exist = checkExist(schemaHandler, wTx);
-
-        final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(), start,
-                listener.getOutputType(), uri, monitoringModule, exist, schemaHandler.get());
-        writeDataToDS(schemaHandler, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist,
+        final NormalizedNode mapToStreams = RestconfMappingNodeUtil
+                .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
+                        notificationQueryParams.getStart(), listener.getOutputType(), uri,
+                        getMonitoringModule(schemaContext), exist, schemaContext);
+        writeDataToDS(schemaContext, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist,
                 mapToStreams);
         submitData(wTx);
         return uri;
     }
 
-    private static void writeDataToDS(final SchemaContextHandler schemaHandler, final String name,
-            final DOMDataReadWriteTransaction wTx, final boolean exist, final NormalizedNode mapToStreams) {
-        String pathId = "";
-        if (exist) {
-            pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
-        } else {
-            pathId = MonitoringModule.PATH_TO_STREAMS;
-        }
-        wTx.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaHandler.get()),
-                mapToStreams);
-    }
-
-    private static void submitData(final DOMDataReadWriteTransaction wTx) {
-        try {
-            wTx.submit().checkedGet();
-        } catch (final TransactionCommitFailedException e) {
-            throw new RestconfDocumentedException("Problem while putting data to DS.", e);
-        }
+    public static Module getMonitoringModule(final SchemaContext schemaContext) {
+        final Module monitoringModule =
+                schemaContext.findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE);
+        return monitoringModule;
     }
 
     /**
@@ -398,4 +248,138 @@ public final class SubscribeToStreamUtil {
             throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e);
         }
     }
+
+    @SuppressWarnings("rawtypes")
+    private static void writeDataToDS(final SchemaContext schemaContext, final String name,
+            final DOMDataReadWriteTransaction wTx, final boolean exist, final NormalizedNode mapToStreams) {
+        String pathId = "";
+        if (exist) {
+            pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
+        } else {
+            pathId = MonitoringModule.PATH_TO_STREAMS;
+        }
+        wTx.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaContext),
+                mapToStreams);
+    }
+
+    private static void submitData(final DOMDataReadWriteTransaction wTx) {
+        try {
+            wTx.submit().checkedGet();
+        } catch (final TransactionCommitFailedException e) {
+            throw new RestconfDocumentedException("Problem while putting data to DS.", e);
+        }
+    }
+
+    /**
+     * Prepare map of values from URI
+     *
+     * @param identifier
+     *            - URI
+     * @return {@link Map}
+     */
+    public static Map<String, String> mapValuesFromUri(final String identifier) {
+        final HashMap<String, String> result = new HashMap<>();
+        final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH));
+        for (final String token : tokens) {
+            final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
+            if (paramToken.length == 2) {
+                result.put(paramToken[0], paramToken[1]);
+            }
+        }
+        return result;
+    }
+
+    private static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+        final int port = SubscribeToStreamUtil.prepareNotificationPort();
+
+        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+        final UriBuilder uriToWebSocketServer =
+                uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
+        final URI uri = uriToWebSocketServer.replacePath(streamName).build();
+        return uri;
+    }
+
+    /**
+     * Register data change listener in dom data broker and set it to listener
+     * on stream
+     *
+     * @param ds
+     *            - {@link LogicalDatastoreType}
+     * @param scope
+     *            - {@link DataChangeScope}
+     * @param listener
+     *            - listener on specific stream
+     * @param domDataBroker
+     *            - data broker for register data change listener
+     */
+    @SuppressWarnings("deprecation")
+    private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
+            final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
+        if (listener.isListening()) {
+            return;
+        }
+
+        final YangInstanceIdentifier path = listener.getPath();
+        final ListenerRegistration<DOMDataChangeListener> registration =
+                domDataBroker.registerDataChangeListener(ds, path, listener, scope);
+
+        listener.setRegistration(registration);
+    }
+
+    /**
+     * Get port from web socket server. If doesn't exit, create it.
+     *
+     * @return port
+     */
+    private static int prepareNotificationPort() {
+        int port = RestconfStreamsConstants.NOTIFICATION_PORT;
+        try {
+            final WebSocketServer webSocketServer = WebSocketServer.getInstance();
+            port = webSocketServer.getPort();
+        } catch (final NullPointerException e) {
+            WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
+        }
+        return port;
+    }
+
+    private static boolean checkExist(final SchemaContext schemaContext, final DOMDataReadWriteTransaction wTx) {
+        boolean exist;
+        try {
+            exist = wTx.exists(LogicalDatastoreType.OPERATIONAL,
+                    IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).checkedGet();
+        } catch (final ReadFailedException e1) {
+            throw new RestconfDocumentedException("Problem while checking data if exists", e1);
+        }
+        return exist;
+    }
+
+    private static void registerToListenNotification(final NotificationListenerAdapter listener,
+            final NotificationServiceHandler notificationServiceHandler) {
+        if (listener.isListening()) {
+            return;
+        }
+
+        final SchemaPath path = listener.getSchemaPath();
+        final ListenerRegistration<DOMNotificationListener> registration =
+                notificationServiceHandler.get().registerNotificationListener(listener, path);
+
+        listener.setRegistration(registration);
+    }
+
+    /**
+     * Parse enum from URI
+     *
+     * @param clazz
+     *            - enum type
+     * @param value
+     *            - string of enum value
+     * @return enum
+     */
+    private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
+        if ((value == null) || value.equals("")) {
+            return null;
+        }
+        return ResolveEnumUtil.resolveEnum(clazz, value);
+    }
+
 }
index 35c6e1de33c609f56312c5416dac7f19362e4b37..8a11fa9c12bf0a3a78786d672e3f4d6a27e6731b 100644 (file)
@@ -366,7 +366,7 @@ public class BrokerFacadeTest {
         listener.setCloseVars(transactionChainHandler, schemaHandler);
         // close and remove test notification listener
         listener.close();
-        Notificator.removeNotificationListenerIfNoSubscriberExists(listener);
+        Notificator.removeListenerIfNoSubscriberExists(listener);
     }
 
     /**
index 61cd81d9e26fb6ee382e6b3020ef585adb5994c1..ab14b638080160d0546f4785dc1d1cb0ba458526 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.sal.restconf.impl.test;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.Collection;
 import org.junit.Assert;
@@ -20,7 +21,10 @@ import org.opendaylight.controller.md.sal.rest.common.TestRestconfUtils;
 import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 
 public class ExpressionParserTest {
 
@@ -126,12 +130,25 @@ public class ExpressionParserTest {
             }
         }
         final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class);
+        final PathArgument pathValue = NodeIdentifier.create(QName.create("module", "2016-14-12", "localName"));
+        Mockito.when(path.getLastPathArgument()).thenReturn(pathValue);
         final ListenerAdapter listener = Notificator.createListener(path, "streamName", NotificationOutputType.JSON);
         listener.setQueryParams(null, null, filter);
-        final Method m = listener.getClass().getDeclaredMethod("parseFilterParam", String.class);
+        final Class<?> superclass = listener.getClass().getSuperclass().getSuperclass();
+        Method m = null;
+        for (final Method method : superclass.getDeclaredMethods()) {
+            if (method.getName().equals("parseFilterParam")) {
+                m = method;
+            }
+        }
+        if (m == null) {
+            throw new Exception("Methode parseFilterParam doesn't exist in " + superclass.getName());
+        }
         m.setAccessible(true);
-
-        return (boolean) m.invoke(listener, readFile(xml));
+        final Field xmlField = superclass.getDeclaredField("xml");
+        xmlField.setAccessible(true);
+        xmlField.set(listener, readFile(xml));
+        return (boolean) m.invoke(listener, null);
     }
 
     private String readFile(final File xml) throws Exception {
index 2fdb0ba7e46c174145300a5fc435ef63205ee5ff..c2e69f79888edee23b4da7e7b95d8e6568846384 100644 (file)
@@ -36,7 +36,10 @@ import org.opendaylight.netconf.sal.restconf.impl.RestconfImpl;
 import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class RestconfImplNotificationSubscribingTest {
@@ -61,6 +64,8 @@ public class RestconfImplNotificationSubscribingTest {
         ControllerContext.getInstance().setGlobalSchema(TestRestconfUtils.loadSchemaContext("/notifications"));
 
         final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class);
+        final PathArgument pathValue = NodeIdentifier.create(QName.create("module", "2016-14-12", "localName"));
+        Mockito.when(path.getLastPathArgument()).thenReturn(pathValue);
         Notificator.createListener(path, this.identifier, NotificationOutputType.XML);
     }
 
@@ -215,6 +220,8 @@ public class RestconfImplNotificationSubscribingTest {
     @Test
     public void onNotifiTest() throws Exception {
         final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class);
+        final PathArgument pathValue = NodeIdentifier.create(QName.create("module", "2016-14-12", "localName"));
+        Mockito.when(path.getLastPathArgument()).thenReturn(pathValue);
         final ListenerAdapter listener = Notificator.createListener(path, this.identifier, NotificationOutputType.XML);
 
         final List<Entry<String, List<String>>> list = new ArrayList<>();
@@ -229,13 +236,14 @@ public class RestconfImplNotificationSubscribingTest {
 
         final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change =
                 Mockito.mock(AsyncDataChangeEvent.class);
-        Field start = listener.getClass().getDeclaredField("start");
+        final Class<?> superclass = listener.getClass().getSuperclass().getSuperclass();
+        Field start = superclass.getDeclaredField("start");
         start.setAccessible(true);
         Date startOrig = (Date) start.get(listener);
         Assert.assertNotNull(startOrig);
         listener.onDataChanged(change);
 
-        start = listener.getClass().getDeclaredField("start");
+        start = superclass.getDeclaredField("start");
         start.setAccessible(true);
         startOrig = (Date) start.get(listener);
         Assert.assertNull(startOrig);
index 619cab5013538f09ec4588af57e0425f63528db7..7a8fcd327f59342bc72b578e266695954862de1d 100644 (file)
@@ -104,7 +104,6 @@ public class RestconfStreamsSubscriptionServiceImplTest {
     public static void setUpBeforeTest() throws Exception {
         final Map<String, ListenerAdapter> listenersByStreamNameSetter = new HashMap<>();
         final ListenerAdapter adapter = mock(ListenerAdapter.class);
-        doReturn(false).when(adapter).isListening();
         listenersByStreamNameSetter.put(
                 "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
                 adapter);