From 4a84711256caebbfd4b59aacf99f59ced55d16db Mon Sep 17 00:00:00 2001 From: Jakub Toth Date: Fri, 9 Dec 2016 17:13:10 +0100 Subject: [PATCH 1/1] Bug 5679 - implement ietf-restconf-monitoring - cleanup * cleanup of both listeners (data-change, yang) * create new common abstract classes with common methodes * fix broken tests * add support of listener to listen on stream with both output types (XML JSON) Change-Id: I865a0547e57a1035921f207d9f96b5a4c57bc20c Signed-off-by: Jakub Toth --- .../listeners/AbstractCommonSubscriber.java | 141 +++++ .../listeners/AbstractNotificationsData.java | 209 +++++++ .../listeners/AbstractQueryParams.java | 116 ++++ .../listeners/BaseListenerInterface.java | 47 ++ .../netconf/sal/streams/listeners/Event.java | 77 +++ .../listeners/EventBusChangeRecorder.java | 53 ++ .../sal/streams/listeners/EventType.java | 15 + .../streams/listeners/ListenerAdapter.java | 541 +++--------------- .../streams/listeners/ListenersConstants.java | 25 + .../NotificationListenerAdapter.java | 445 ++------------ .../sal/streams/listeners/Notificator.java | 26 +- .../impl/RestconfOperationsServiceImpl.java | 1 - ...estconfStreamsSubscriptionServiceImpl.java | 224 ++++++-- .../restful/utils/CreateStreamUtil.java | 15 +- .../restful/utils/SubscribeToStreamUtil.java | 384 ++++++------- .../restconf/impl/test/BrokerFacadeTest.java | 2 +- .../impl/test/ExpressionParserTest.java | 23 +- ...stconfImplNotificationSubscribingTest.java | 12 +- ...onfStreamsSubscriptionServiceImplTest.java | 1 - 19 files changed, 1190 insertions(+), 1167 deletions(-) create mode 100644 restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java create mode 100644 restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java create mode 100644 restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java create mode 100644 restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java create mode 100644 restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java create mode 100644 restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java create mode 100644 restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java create mode 100644 restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenersConstants.java diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java new file mode 100644 index 0000000000..5dfc0164c7 --- /dev/null +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractCommonSubscriber.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import com.google.common.eventbus.AsyncEventBus; +import com.google.common.eventbus.EventBus; +import io.netty.channel.Channel; +import io.netty.util.internal.ConcurrentSet; +import java.util.Set; +import java.util.concurrent.Executors; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Features of subscribing part of both notifications + */ +abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class); + + private final Set subscribers = new ConcurrentSet<>(); + private final EventBus eventBus; + + @SuppressWarnings("rawtypes") + private EventBusChangeRecorder eventBusChangeRecorder; + @SuppressWarnings("rawtypes") + private ListenerRegistration registration; + + /** + * Creating {@link EventBus} + */ + protected AbstractCommonSubscriber() { + this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); + } + + @Override + public final boolean hasSubscribers() { + return !this.subscribers.isEmpty(); + } + + @Override + public final Set getSubscribers() { + return this.subscribers; + } + + @Override + public final void close() throws Exception { + this.registration.close(); + this.registration = null; + + deleteDataInDS(); + unregister(); + } + + /** + * Creates event of type {@link EventType#REGISTER}, set {@link Channel} + * subscriber to the event and post event into event bus. + * + * @param subscriber + * Channel + */ + public void addSubscriber(final Channel subscriber) { + if (!subscriber.isActive()) { + LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress()); + } + final Event event = new Event(EventType.REGISTER); + event.setSubscriber(subscriber); + this.eventBus.post(event); + } + + /** + * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} + * subscriber to the event and posts event into event bus. + * + * @param subscriber + */ + public void removeSubscriber(final Channel subscriber) { + LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress()); + final Event event = new Event(EventType.DEREGISTER); + event.setSubscriber(subscriber); + this.eventBus.post(event); + } + + /** + * Sets {@link ListenerRegistration} registration. + * + * @param registration + * DOMDataChangeListener registration + */ + @SuppressWarnings("rawtypes") + public void setRegistration(final ListenerRegistration registration) { + this.registration = registration; + } + + /** + * Checks if {@link ListenerRegistration} registration exist. + * + * @return True if exist, false otherwise. + */ + public boolean isListening() { + return this.registration == null ? false : true; + } + + /** + * Creating and registering {@link EventBusChangeRecorder} of specific + * listener on {@link EventBus} + * + * @param listener + * - specific listener of notifications + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + protected void register(final T listener) { + this.eventBusChangeRecorder = new EventBusChangeRecorder(listener); + this.eventBus.register(this.eventBusChangeRecorder); + } + + /** + * Post event to event bus + * + * @param event + * - data of incoming notifications + */ + protected void post(final Event event) { + this.eventBus.post(event); + } + + /** + * Removes all subscribers and unregisters event bus change recorder form + * event bus + */ + protected void unregister() { + this.subscribers.clear(); + this.eventBus.unregister(this.eventBusChangeRecorder); + } +} diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java new file mode 100644 index 0000000000..14c70a6994 --- /dev/null +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractNotificationsData.java @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.Date; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.dom.DOMResult; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.restconf.Draft18.MonitoringModule; +import org.opendaylight.restconf.handlers.SchemaContextHandler; +import org.opendaylight.restconf.handlers.TransactionChainHandler; +import org.opendaylight.restconf.parser.IdentifierCodec; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter; +import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +/** + * Abstract class for processing and preparing data + * + */ +abstract class AbstractNotificationsData { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationsData.class); + + private TransactionChainHandler transactionChainHandler; + private SchemaContextHandler schemaHandler; + private String localName; + + /** + * Transaction chain for delete data in DS on close() + * + * @param transactionChainHandler + * - creating new write transaction for delete data on close + * @param schemaHandler + * - for getting schema to deserialize + * {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to + * {@link YangInstanceIdentifier} + */ + public void setCloseVars(final TransactionChainHandler transactionChainHandler, + final SchemaContextHandler schemaHandler) { + this.transactionChainHandler = transactionChainHandler; + this.schemaHandler = schemaHandler; + } + + /** + * Delete data in DS + */ + protected void deleteDataInDS() throws Exception { + final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction(); + wTx.delete(LogicalDatastoreType.OPERATIONAL, IdentifierCodec + .deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + this.localName, this.schemaHandler.get())); + wTx.submit().checkedGet(); + } + + /** + * Set localName of last path element of specific listener + * + * @param localName + * - local name + */ + protected void setLocalNameOfPath(final String localName) { + this.localName = localName; + } + + /** + * Formats data specified by RFC3339. + * + * @param d + * Date + * @return Data specified by RFC3339. + */ + protected static String toRFC3339(final Date d) { + return ListenersConstants.RFC3339_PATTERN.matcher(ListenersConstants.RFC3339.format(d)).replaceAll("$1:$2"); + } + + /** + * Creates {@link Document} document. + * + * @return {@link Document} document. + */ + protected static Document createDocument() { + final DocumentBuilder bob; + try { + bob = ListenersConstants.DBF.newDocumentBuilder(); + } catch (final ParserConfigurationException e) { + return null; + } + return bob.newDocument(); + } + + /** + * Write normalized node to {@link DOMResult} + * + * @param normalized + * - data + * @param context + * - actual schema context + * @param schemaPath + * - schema path of data + * @return {@link DOMResult} + */ + protected DOMResult writeNormalizedNode(final NormalizedNode normalized, final SchemaContext context, + final SchemaPath schemaPath) throws IOException, XMLStreamException { + final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory(); + final Document doc = XmlDocumentUtils.getDocument(); + final DOMResult result = new DOMResult(doc); + NormalizedNodeWriter normalizedNodeWriter = null; + NormalizedNodeStreamWriter normalizedNodeStreamWriter = null; + XMLStreamWriter writer = null; + + try { + writer = XML_FACTORY.createXMLStreamWriter(result); + normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, schemaPath); + normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter); + + normalizedNodeWriter.write(normalized); + + normalizedNodeWriter.flush(); + } finally { + if (normalizedNodeWriter != null) { + normalizedNodeWriter.close(); + } + if (normalizedNodeStreamWriter != null) { + normalizedNodeStreamWriter.close(); + } + if (writer != null) { + writer.close(); + } + } + + return result; + } + + /** + * Generating base element of every notification + * + * @param doc + * - base {@link Document} + * @return element of {@link Document} + */ + protected Element basePartDoc(final Document doc) { + final Element notificationElement = + doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0", "notification"); + + doc.appendChild(notificationElement); + + final Element eventTimeElement = doc.createElement("eventTime"); + eventTimeElement.setTextContent(toRFC3339(new Date())); + notificationElement.appendChild(eventTimeElement); + + return notificationElement; + } + + /** + * Generating of {@link Document} transforming to string + * + * @param doc + * - {@link Document} with data + * @return - string from {@link Document} + */ + protected String transformDoc(final Document doc) { + try { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final Transformer transformer = ListenersConstants.FACTORY.newTransformer(); + transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); + transformer.setOutputProperty(OutputKeys.METHOD, "xml"); + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); + transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4"); + transformer.transform(new DOMSource(doc), + new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8))); + final byte[] charData = out.toByteArray(); + return new String(charData, "UTF-8"); + } catch (TransformerException | UnsupportedEncodingException e) { + final String msg = "Error during transformation of Document into String"; + LOG.error(msg, e); + return msg; + } + } +} diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java new file mode 100644 index 0000000000..f7a72bca4b --- /dev/null +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/AbstractQueryParams.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import java.io.StringReader; +import java.util.Date; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathFactory; +import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException; +import org.w3c.dom.Document; +import org.xml.sax.InputSource; + +/** + * Features of query parameters part of both notifications + * + */ +abstract class AbstractQueryParams extends AbstractNotificationsData { + + protected Date start = null; + protected Date stop = null; + protected String filter = null; + + private String xml; + + /** + * Set query parameters for listener + * + * @param start + * - start-time of getting notification + * @param stop + * - stop-time of getting notification + * @param filter + * - indicate which subset of all possible events are of interest + */ + public void setQueryParams(final Date start, final Date stop, final String filter) { + this.start = start; + this.stop = stop; + this.filter = filter; + } + + /** + * Checking query parameters on specific notification + * + * @param xml + * - data of notification + * @param listener + * - listener of notification + * @return true if notification meets the requirements of query parameters, + * false otherwise + */ + protected boolean checkQueryParams(final String xml, final T listener) { + this.xml = xml; + final Date now = new Date(); + if (this.stop != null) { + if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) { + return checkFilter(); + } + if (this.stop.compareTo(now) < 0) { + try { + listener.close(); + } catch (final Exception e) { + throw new RestconfDocumentedException("Problem with unregister listener." + e); + } + } + } else if (this.start != null) { + if (this.start.compareTo(now) < 0) { + this.start = null; + return checkFilter(); + } + } else { + return checkFilter(); + } + return false; + } + + /** + * Check if is filter used and then prepare and post data do client + * + * @param change + * - data of notification + */ + private boolean checkFilter() { + if (this.filter == null) { + return true; + } else { + try { + return parseFilterParam(); + } catch (final Exception e) { + throw new RestconfDocumentedException("Problem while parsing filter.", e); + } + } + } + + /** + * Parse and evaluate filter value by xml + * + * @return true or false - depends on filter expression and data of + * notifiaction + * @throws Exception + */ + private boolean parseFilterParam() throws Exception { + final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + final DocumentBuilder builder = factory.newDocumentBuilder(); + final Document docOfXml = builder.parse(new InputSource(new StringReader(this.xml))); + final XPath xPath = XPathFactory.newInstance().newXPath(); + return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN); + } +} diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java new file mode 100644 index 0000000000..50bb088a83 --- /dev/null +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/BaseListenerInterface.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import io.netty.channel.Channel; +import java.util.Set; + +/** + * Base interface for both listeners({@link ListenerAdapter}, + * {@link NotificationListenerAdapter}) + */ +interface BaseListenerInterface extends AutoCloseable { + + /** + * Return all subscribers of listener + * + * @return set of subscribers + */ + Set getSubscribers(); + + /** + * Checks if exists at least one {@link Channel} subscriber. + * + * @return True if exist at least one {@link Channel} subscriber, false + * otherwise. + */ + boolean hasSubscribers(); + + /** + * Get name of stream + * + * @return stream name + */ + String getStreamName(); + + /** + * Get output type + * + * @return outputType + */ + String getOutputType(); +} diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java new file mode 100644 index 0000000000..fc0b1186ae --- /dev/null +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Event.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import io.netty.channel.Channel; + +/** + * Represents event of specific {@link EventType} type, holds data and + * {@link Channel} subscriber. + */ +class Event { + private final EventType type; + private Channel subscriber; + private String data; + + /** + * Creates new event specified by {@link EventType} type. + * + * @param type + * EventType + */ + public Event(final EventType type) { + this.type = type; + } + + /** + * Gets the {@link Channel} subscriber. + * + * @return Channel + */ + public Channel getSubscriber() { + return this.subscriber; + } + + /** + * Sets subscriber for event. + * + * @param subscriber + * Channel + */ + public void setSubscriber(final Channel subscriber) { + this.subscriber = subscriber; + } + + /** + * Gets event String. + * + * @return String representation of event data. + */ + public String getData() { + return this.data; + } + + /** + * Sets event data. + * + * @param data + * String. + */ + public void setData(final String data) { + this.data = data; + } + + /** + * Gets event type. + * + * @return The type of the event. + */ + public EventType getType() { + return this.type; + } +} diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java new file mode 100644 index 0000000000..fd5dc36cfb --- /dev/null +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventBusChangeRecorder.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import com.google.common.eventbus.Subscribe; +import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class EventBusChangeRecorder { + + private static final Logger LOG = LoggerFactory.getLogger(EventBusChangeRecorder.class); + private final T listener; + + /** + * Event bus change recorder of specific listener of notifications + * + * @param listener + * - specific listener + */ + EventBusChangeRecorder(final T listener) { + this.listener = listener; + } + + @Subscribe + public void recordCustomerChange(final Event event) { + if (event.getType() == EventType.REGISTER) { + final Channel subscriber = event.getSubscriber(); + if (!this.listener.getSubscribers().contains(subscriber)) { + this.listener.getSubscribers().add(subscriber); + } + } else if (event.getType() == EventType.DEREGISTER) { + this.listener.getSubscribers().remove(event.getSubscriber()); + Notificator.removeListenerIfNoSubscriberExists(this.listener); + } else if (event.getType() == EventType.NOTIFY) { + for (final Channel subscriber : this.listener.getSubscribers()) { + if (subscriber.isActive()) { + LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); + subscriber.writeAndFlush(new TextWebSocketFrame(event.getData())); + } else { + LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); + this.listener.getSubscribers().remove(subscriber); + } + } + } + } +} diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java new file mode 100644 index 0000000000..2a4d8bccbb --- /dev/null +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/EventType.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +/** + * Type of the event. + */ +enum EventType { + REGISTER, DEREGISTER, NOTIFY; +} diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java index 331daf7344..0b2c1dd34d 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java @@ -8,55 +8,18 @@ package org.opendaylight.netconf.sal.streams.listeners; import com.google.common.base.Preconditions; -import com.google.common.eventbus.AsyncEventBus; -import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; -import io.netty.channel.Channel; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import io.netty.util.internal.ConcurrentSet; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.StringReader; -import java.io.UnsupportedEncodingException; -import java.nio.charset.StandardCharsets; -import java.text.SimpleDateFormat; -import java.util.Date; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.regex.Pattern; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.stream.XMLOutputFactory; import javax.xml.stream.XMLStreamException; -import javax.xml.stream.XMLStreamWriter; -import javax.xml.transform.OutputKeys; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerException; -import javax.xml.transform.TransformerFactory; import javax.xml.transform.dom.DOMResult; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; -import javax.xml.xpath.XPath; -import javax.xml.xpath.XPathConstants; -import javax.xml.xpath.XPathFactory; import org.json.JSONObject; import org.json.XML; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; -import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.netconf.sal.restconf.impl.ControllerContext; -import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException; -import org.opendaylight.restconf.Draft18.MonitoringModule; -import org.opendaylight.restconf.handlers.SchemaContextHandler; -import org.opendaylight.restconf.handlers.TransactionChainHandler; -import org.opendaylight.restconf.parser.IdentifierCodec; import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; -import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; @@ -65,10 +28,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgum import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; -import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter; -import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter; -import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils; import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree; import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -78,36 +37,24 @@ import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; -import org.xml.sax.InputSource; /** - * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source. + * {@link ListenerAdapter} is responsible to track events, which occurred by + * changing data in data source. */ -public class ListenerAdapter implements DOMDataChangeListener { +public class ListenerAdapter extends AbstractCommonSubscriber implements DOMDataChangeListener { private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class); - private static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance(); - private static final TransformerFactory FACTORY = TransformerFactory.newInstance(); - private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$"); - - private static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ"); private final YangInstanceIdentifier path; - private ListenerRegistration registration; private final String streamName; - private Set subscribers = new ConcurrentSet<>(); - private final EventBus eventBus; - private final EventBusChangeRecorder eventBusChangeRecorder; private final NotificationOutputType outputType; - private Date start = null; - private Date stop = null; - private String filter = null; - private TransactionChainHandler transactionChainHandler; - private SchemaContextHandler schemaHandler; + + private AsyncDataChangeEvent> change; /** * Creates new {@link ListenerAdapter} listener specified by path and stream - * name. + * name and register for subscribing * * @param path * Path to data in data store. @@ -118,76 +65,47 @@ public class ListenerAdapter implements DOMDataChangeListener { */ ListenerAdapter(final YangInstanceIdentifier path, final String streamName, final NotificationOutputType outputType) { - this.outputType = outputType; - Preconditions.checkNotNull(path); + super(); + register(this); + setLocalNameOfPath(path.getLastPathArgument().getNodeType().getLocalName()); + + this.outputType = Preconditions.checkNotNull(outputType); + this.path = Preconditions.checkNotNull(path); Preconditions.checkArgument((streamName != null) && !streamName.isEmpty()); - this.path = path; this.streamName = streamName; - this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); - this.eventBusChangeRecorder = new EventBusChangeRecorder(); - this.eventBus.register(this.eventBusChangeRecorder); } @Override public void onDataChanged(final AsyncDataChangeEvent> change) { - final Date now = new Date(); - if (this.stop != null) { - if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) { - checkFilter(change); - } - if (this.stop.compareTo(now) < 0) { - try { - this.close(); - } catch (final Exception e) { - throw new RestconfDocumentedException("Problem with unregister listener." + e); - } - } - } else if (this.start != null) { - if (this.start.compareTo(now) < 0) { - this.start = null; - checkFilter(change); - } - } else { - checkFilter(change); + this.change = change; + final String xml = prepareXml(); + if (checkQueryParams(xml, this)) { + prepareAndPostData(xml); } } /** - * Check if is filter used and then prepare and post data do client + * Gets the name of the stream. * - * @param change - * - data of notification + * @return The name of the stream. */ - private void checkFilter(final AsyncDataChangeEvent> change) { - final String xml = prepareXmlFrom(change); - if (this.filter == null) { - prepareAndPostData(xml); - } else { - try { - if (parseFilterParam(xml)) { - prepareAndPostData(xml); - } - } catch (final Exception e) { - throw new RestconfDocumentedException("Problem while parsing filter.", e); - } - } + @Override + public String getStreamName() { + return this.streamName; + } + + @Override + public String getOutputType() { + return this.outputType.getName(); } /** - * Parse and evaluate filter value by xml + * Get path pointed to data in data store. * - * @param xml - * - notification data in xml - * @return true or false - depends on filter expression and data of - * notifiaction - * @throws Exception + * @return Path pointed to data in data store. */ - private boolean parseFilterParam(final String xml) throws Exception { - final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); - final DocumentBuilder builder = factory.newDocumentBuilder(); - final Document docOfXml = builder.parse(new InputSource(new StringReader(xml))); - final XPath xPath = XPathFactory.newInstance().newXPath(); - return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN); + public YangInstanceIdentifier getPath() { + return this.path; } /** @@ -196,117 +114,19 @@ public class ListenerAdapter implements DOMDataChangeListener { * @param xml */ private void prepareAndPostData(final String xml) { - final Event event = new Event(EventType.NOTIFY); - if (this.outputType.equals(NotificationOutputType.JSON)) { - final JSONObject jsonObject = XML.toJSONObject(xml); - event.setData(jsonObject.toString()); - } else { - event.setData(xml); - } - this.eventBus.post(event); - } - - /** - * Tracks events of data change by customer. - */ - private final class EventBusChangeRecorder { - @Subscribe - public void recordCustomerChange(final Event event) { - if (event.getType() == EventType.REGISTER) { - final Channel subscriber = event.getSubscriber(); - if (!ListenerAdapter.this.subscribers.contains(subscriber)) { - ListenerAdapter.this.subscribers.add(subscriber); - } - } else if (event.getType() == EventType.DEREGISTER) { - ListenerAdapter.this.subscribers.remove(event.getSubscriber()); - Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this); - } else if (event.getType() == EventType.NOTIFY) { - for (final Channel subscriber : ListenerAdapter.this.subscribers) { - if (subscriber.isActive()) { - LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); - subscriber.writeAndFlush(new TextWebSocketFrame(event.getData())); - } else { - LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); - ListenerAdapter.this.subscribers.remove(subscriber); - } - } - } - } - } - - /** - * Represents event of specific {@link EventType} type, holds data and {@link Channel} subscriber. - */ - private final class Event { - private final EventType type; - private Channel subscriber; - private String data; - - /** - * Creates new event specified by {@link EventType} type. - * - * @param type - * EventType - */ - public Event(final EventType type) { - this.type = type; - } - - /** - * Gets the {@link Channel} subscriber. - * - * @return Channel - */ - public Channel getSubscriber() { - return this.subscriber; - } - - /** - * Sets subscriber for event. - * - * @param subscriber - * Channel - */ - public void setSubscriber(final Channel subscriber) { - this.subscriber = subscriber; - } - - /** - * Gets event String. - * - * @return String representation of event data. - */ - public String getData() { - return this.data; - } - - /** - * Sets event data. - * - * @param data String. - */ - public void setData(final String data) { - this.data = data; - } - - /** - * Gets event type. - * - * @return The type of the event. - */ - public EventType getType() { - return this.type; + final Event event = new Event(EventType.NOTIFY); + if (this.outputType.equals(NotificationOutputType.JSON)) { + final JSONObject jsonObject = XML.toJSONObject(xml); + event.setData(jsonObject.toString()); + } else { + event.setData(xml); } + post(event); } /** - * Type of the event. + * Tracks events of data change by customer. */ - private enum EventType { - REGISTER, - DEREGISTER, - NOTIFY; - } /** * Prepare data in printable form and transform it to String. @@ -315,68 +135,19 @@ public class ListenerAdapter implements DOMDataChangeListener { * DataChangeEvent * @return Data in printable form. */ - private String prepareXmlFrom(final AsyncDataChangeEvent> change) { + private String prepareXml() { final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema(); - final DataSchemaContextTree dataContextTree = DataSchemaContextTree.from(schemaContext); + final DataSchemaContextTree dataContextTree = DataSchemaContextTree.from(schemaContext); final Document doc = createDocument(); - final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0", - "notification"); - - doc.appendChild(notificationElement); - - final Element eventTimeElement = doc.createElement("eventTime"); - eventTimeElement.setTextContent(toRFC3339(new Date())); - notificationElement.appendChild(eventTimeElement); + final Element notificationElement = basePartDoc(doc); final Element dataChangedNotificationEventElement = doc.createElementNS( "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification"); - addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change, + addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, this.change, schemaContext, dataContextTree); notificationElement.appendChild(dataChangedNotificationEventElement); - - try { - final ByteArrayOutputStream out = new ByteArrayOutputStream(); - final Transformer transformer = FACTORY.newTransformer(); - transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); - transformer.setOutputProperty(OutputKeys.METHOD, "xml"); - transformer.setOutputProperty(OutputKeys.INDENT, "yes"); - transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); - transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4"); - transformer.transform(new DOMSource(doc), - new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8))); - final byte[] charData = out.toByteArray(); - return new String(charData, "UTF-8"); - } catch (TransformerException | UnsupportedEncodingException e) { - final String msg = "Error during transformation of Document into String"; - LOG.error(msg, e); - return msg; - } - } - - /** - * Formats data specified by RFC3339. - * - * @param d - * Date - * @return Data specified by RFC3339. - */ - public static String toRFC3339(final Date d) { - return RFC3339_PATTERN.matcher(RFC3339.format(d)).replaceAll("$1:$2"); - } - - /** - * Creates {@link Document} document. - * @return {@link Document} document. - */ - public static Document createDocument() { - final DocumentBuilder bob; - try { - bob = DBF.newDocumentBuilder(); - } catch (final ParserConfigurationException e) { - return null; - } - return bob.newDocument(); + return transformDoc(doc); } /** @@ -392,15 +163,13 @@ public class ListenerAdapter implements DOMDataChangeListener { private void addValuesToDataChangedNotificationEventElement(final Document doc, final Element dataChangedNotificationEventElement, final AsyncDataChangeEvent> change, - final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) { + final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) { addCreatedChangedValuesFromDataToElement(doc, change.getCreatedData().entrySet(), - dataChangedNotificationEventElement, - Operation.CREATED, schemaContext, dataSchemaContextTree); + dataChangedNotificationEventElement, Operation.CREATED, schemaContext, dataSchemaContextTree); addCreatedChangedValuesFromDataToElement(doc, change.getUpdatedData().entrySet(), - dataChangedNotificationEventElement, - Operation.UPDATED, schemaContext, dataSchemaContextTree); + dataChangedNotificationEventElement, Operation.UPDATED, schemaContext, dataSchemaContextTree); addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement, Operation.DELETED); @@ -431,9 +200,10 @@ public class ListenerAdapter implements DOMDataChangeListener { } } - private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set>> data, final Element element, final Operation operation, final SchemaContext - schemaContext, final DataSchemaContextTree dataSchemaContextTree) { + private void addCreatedChangedValuesFromDataToElement(final Document doc, + final Set>> data, final Element element, + final Operation operation, final SchemaContext schemaContext, + final DataSchemaContextTree dataSchemaContextTree) { if ((data == null) || data.isEmpty()) { return; } @@ -471,9 +241,9 @@ public class ListenerAdapter implements DOMDataChangeListener { return dataChangeEventElement; } - private Node createCreatedChangedDataChangeEventElement(final Document doc, final Entry> entry, final Operation operation, final SchemaContext - schemaContext, final DataSchemaContextTree dataSchemaContextTree) { + private Node createCreatedChangedDataChangeEventElement(final Document doc, + final Entry> entry, final Operation operation, + final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) { final Element dataChangeEventElement = doc.createElement("data-change-event"); final Element pathElement = doc.createElement("path"); final YangInstanceIdentifier path = entry.getKey(); @@ -485,8 +255,14 @@ public class ListenerAdapter implements DOMDataChangeListener { dataChangeEventElement.appendChild(operationElement); try { - final DOMResult domResult = writeNormalizedNode(entry.getValue(), path, - schemaContext, dataSchemaContextTree); + SchemaPath nodePath; + final NormalizedNode normalized = entry.getValue(); + if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) { + nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath(); + } else { + nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent(); + } + final DOMResult domResult = writeNormalizedNode(normalized, schemaContext, nodePath); final Node result = doc.importNode(domResult.getNode().getFirstChild(), true); final Element dataElement = doc.createElement("data"); dataElement.appendChild(result); @@ -500,47 +276,6 @@ public class ListenerAdapter implements DOMDataChangeListener { return dataChangeEventElement; } - private static DOMResult writeNormalizedNode(final NormalizedNode normalized, - final YangInstanceIdentifier path, final SchemaContext context, - final DataSchemaContextTree dataSchemaContextTree) - throws IOException, XMLStreamException { - final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory(); - final Document doc = XmlDocumentUtils.getDocument(); - final DOMResult result = new DOMResult(doc); - NormalizedNodeWriter normalizedNodeWriter = null; - NormalizedNodeStreamWriter normalizedNodeStreamWriter = null; - XMLStreamWriter writer = null; - final SchemaPath nodePath; - - if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) { - nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath(); - } else { - nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent(); - } - - try { - writer = XML_FACTORY.createXMLStreamWriter(result); - normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, nodePath); - normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter); - - normalizedNodeWriter.write(normalized); - - normalizedNodeWriter.flush(); - } finally { - if (normalizedNodeWriter != null) { - normalizedNodeWriter.close(); - } - if (normalizedNodeStreamWriter != null) { - normalizedNodeStreamWriter.close(); - } - if (writer != null) { - writer.close(); - } - } - - return result; - } - /** * Adds path as value to element. * @@ -549,6 +284,7 @@ public class ListenerAdapter implements DOMDataChangeListener { * @param element * {@link Element} */ + @SuppressWarnings("rawtypes") private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element) { final YangInstanceIdentifier normalizedPath = ControllerContext.getInstance().toXpathRepresentation(path); final StringBuilder textContent = new StringBuilder(); @@ -601,116 +337,11 @@ public class ListenerAdapter implements DOMDataChangeListener { } /** - * Gets path pointed to data in data store. - * - * @return Path pointed to data in data store. - */ - public YangInstanceIdentifier getPath() { - return this.path; - } - - /** - * Sets {@link ListenerRegistration} registration. - * - * @param registration DOMDataChangeListener registration - */ - public void setRegistration(final ListenerRegistration registration) { - this.registration = registration; - } - - /** - * Gets the name of the stream. - * - * @return The name of the stream. - */ - public String getStreamName() { - return this.streamName; - } - - /** - * Removes all subscribers and unregisters event bus change recorder form - * event bus and delete data in DS - */ - public void close() throws Exception { - final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction(); - wTx.delete(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY - + this.path.getLastPathArgument().getNodeType().getLocalName(), this.schemaHandler.get())); - wTx.submit().checkedGet(); - - this.subscribers = new ConcurrentSet<>(); - this.registration.close(); - this.registration = null; - this.eventBus.unregister(this.eventBusChangeRecorder); - } - - /** - * Checks if {@link ListenerRegistration} registration exist. - * - * @return True if exist, false otherwise. - */ - public boolean isListening() { - return this.registration == null ? false : true; - } - - /** - * Creates event of type {@link EventType#REGISTER}, set {@link Channel} subscriber to the event and post event into - * event bus. - * - * @param subscriber - * Channel - */ - public void addSubscriber(final Channel subscriber) { - if (!subscriber.isActive()) { - LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress()); - } - final Event event = new Event(EventType.REGISTER); - event.setSubscriber(subscriber); - this.eventBus.post(event); - } - - /** - * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} subscriber to the event and posts event - * into event bus. - * - * @param subscriber - */ - public void removeSubscriber(final Channel subscriber) { - LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress()); - final Event event = new Event(EventType.DEREGISTER); - event.setSubscriber(subscriber); - this.eventBus.post(event); - } - - /** - * Checks if exists at least one {@link Channel} subscriber. - * - * @return True if exist at least one {@link Channel} subscriber, false otherwise. - */ - public boolean hasSubscribers() { - return !this.subscribers.isEmpty(); - } - - /** - * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}. - */ - private static enum Store { - CONFIG("config"), - OPERATION("operation"); - - private final String value; - - private Store(final String value) { - this.value = value; - } - } - - /** - * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}. + * Consists of three types {@link Operation#CREATED}, + * {@link Operation#UPDATED} and {@link Operation#DELETED}. */ private static enum Operation { - CREATED("created"), - UPDATED("updated"), - DELETED("deleted"); + CREATED("created"), UPDATED("updated"), DELETED("deleted"); private final String value; @@ -718,46 +349,4 @@ public class ListenerAdapter implements DOMDataChangeListener { this.value = value; } } - - /** - * Set query parameters for listener - * - * @param start - * - start-time of getting notification - * @param stop - * - stop-time of getting notification - * @param filter - * - indicate which subset of all possible events are of interest - */ - public void setQueryParams(final Date start, final Date stop, final String filter) { - this.start = start; - this.stop = stop; - this.filter = filter; - } - - /** - * Get output type - * - * @return outputType - */ - public String getOutputType() { - return this.outputType.getName(); - } - - /** - * Transaction chain to delete data in DS on close() - * - * @param transactionChainHandler - * - creating new write transaction to delete data on close - * @param schemaHandler - * - for getting schema to deserialize - * {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to - * {@link YangInstanceIdentifier} - */ - public void setCloseVars(final TransactionChainHandler transactionChainHandler, - final SchemaContextHandler schemaHandler) { - this.transactionChainHandler = transactionChainHandler; - this.schemaHandler = schemaHandler; - } - } diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenersConstants.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenersConstants.java new file mode 100644 index 0000000000..a92c3a27e0 --- /dev/null +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenersConstants.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import java.text.SimpleDateFormat; +import java.util.regex.Pattern; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.transform.TransformerFactory; + +/** + * Constants of listeners + */ +class ListenersConstants { + + static final DocumentBuilderFactory DBF = DocumentBuilderFactory.newInstance(); + static final TransformerFactory FACTORY = TransformerFactory.newInstance(); + static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$"); + + static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ"); +} diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java index 2d3ad3a10b..843edafac6 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java @@ -7,54 +7,19 @@ */ package org.opendaylight.netconf.sal.streams.listeners; -import com.google.common.base.Charsets; import com.google.common.base.Preconditions; -import com.google.common.eventbus.AsyncEventBus; -import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; -import io.netty.channel.Channel; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import io.netty.util.internal.ConcurrentSet; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.StringReader; import java.io.StringWriter; -import java.io.UnsupportedEncodingException; import java.io.Writer; import java.util.Collection; import java.util.Date; -import java.util.Set; -import java.util.concurrent.Executors; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.stream.XMLOutputFactory; import javax.xml.stream.XMLStreamException; -import javax.xml.stream.XMLStreamWriter; -import javax.xml.transform.OutputKeys; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerException; -import javax.xml.transform.TransformerFactory; import javax.xml.transform.dom.DOMResult; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; -import javax.xml.xpath.XPath; -import javax.xml.xpath.XPathConstants; -import javax.xml.xpath.XPathFactory; import org.json.JSONObject; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMNotification; import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; import org.opendaylight.netconf.sal.restconf.impl.ControllerContext; import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException; -import org.opendaylight.restconf.Draft18.MonitoringModule; -import org.opendaylight.restconf.handlers.SchemaContextHandler; -import org.opendaylight.restconf.handlers.TransactionChainHandler; -import org.opendaylight.restconf.parser.IdentifierCodec; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; @@ -64,8 +29,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWrit import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactory; import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.data.codec.gson.JsonWriterFactory; -import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter; -import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; @@ -73,34 +36,22 @@ import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; -import org.xml.sax.InputSource; /** * {@link NotificationListenerAdapter} is responsible to track events on * notifications. * */ -public class NotificationListenerAdapter implements DOMNotificationListener { +public class NotificationListenerAdapter extends AbstractCommonSubscriber implements DOMNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class); - private static final TransformerFactory FACTORY = TransformerFactory.newInstance(); private final String streamName; - private final EventBus eventBus; - private final EventBusChangeRecorder eventBusChangeRecorder; - private final SchemaPath path; private final String outputType; - private Date start = null; - private Date stop = null; - private String filter; private SchemaContext schemaContext; private DOMNotification notification; - private ListenerRegistration registration; - private Set subscribers = new ConcurrentSet<>(); - private TransactionChainHandler transactionChainHandler; - private SchemaContextHandler schemaHandler; /** * Set path of listener and stream name, register event bus. @@ -113,76 +64,54 @@ public class NotificationListenerAdapter implements DOMNotificationListener { * - type of output on notification (JSON, XML) */ NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) { - this.outputType = outputType; + super(); + register(this); + setLocalNameOfPath(path.getLastComponent().getLocalName()); + + this.outputType = Preconditions.checkNotNull(outputType); + this.path = Preconditions.checkNotNull(path); Preconditions.checkArgument((streamName != null) && !streamName.isEmpty()); - Preconditions.checkArgument(path != null); - this.path = path; this.streamName = streamName; - this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); - this.eventBusChangeRecorder = new EventBusChangeRecorder(); - this.eventBus.register(this.eventBusChangeRecorder); + } + + /** + * Get outputType of listenere + * + * @return the outputType + */ + @Override + public String getOutputType() { + return this.outputType; } @Override public void onNotification(final DOMNotification notification) { this.schemaContext = ControllerContext.getInstance().getGlobalSchema(); this.notification = notification; - final Date now = new Date(); - if (this.stop != null) { - if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) { - checkFilter(); - } - if (this.stop.compareTo(now) < 0) { - try { - this.close(); - } catch (final Exception e) { - throw new RestconfDocumentedException("Problem with unregister listener." + e); - } - } - } else if (this.start != null) { - if (this.start.compareTo(now) < 0) { - this.start = null; - checkFilter(); - } - } else { - checkFilter(); + + final String xml = prepareXml(); + if (checkQueryParams(xml, this)) { + prepareAndPostData(xml); } } /** - * Check if is filter used and then prepare and post data do client + * Get stream name of this listener * + * @return {@link String} */ - private void checkFilter() { - final String xml = prepareXml(); - if (this.filter == null) { - prepareAndPostData(xml); - } else { - try { - if (parseFilterParam(xml)) { - prepareAndPostData(xml); - } - } catch (final Exception e) { - throw new RestconfDocumentedException("Problem while parsing filter.", e); - } - } + @Override + public String getStreamName() { + return this.streamName; } /** - * Parse and evaluate filter value by xml + * Get schema path of notification * - * @param xml - * - notification data in xml - * @return true or false - depends on filter expression and data of - * notifiaction - * @throws Exception + * @return {@link SchemaPath} */ - private boolean parseFilterParam(final String xml) throws Exception { - final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); - final DocumentBuilder builder = factory.newDocumentBuilder(); - final Document docOfXml = builder.parse(new InputSource(new StringReader(xml))); - final XPath xPath = XPathFactory.newInstance().newXPath(); - return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN); + public SchemaPath getSchemaPath() { + return this.path; } /** @@ -197,7 +126,7 @@ public class NotificationListenerAdapter implements DOMNotificationListener { } else { event.setData(xml); } - this.eventBus.post(event); + post(event); } /** @@ -227,137 +156,16 @@ public class NotificationListenerAdapter implements DOMNotificationListener { return writer.toString(); } - /** - * Checks if exists at least one {@link Channel} subscriber. - * - * @return True if exist at least one {@link Channel} subscriber, false - * otherwise. - */ - public boolean hasSubscribers() { - return !this.subscribers.isEmpty(); - } - - /** - * Reset lists, close registration and unregister bus event and delete data in DS. - */ - public void close() { - final DOMDataWriteTransaction wTx = this.transactionChainHandler.get().newWriteOnlyTransaction(); - wTx.delete(LogicalDatastoreType.OPERATIONAL, - IdentifierCodec.deserialize( - MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + this.path.getLastComponent().getLocalName(), - this.schemaHandler.get())); - try { - wTx.submit().checkedGet(); - } catch (final TransactionCommitFailedException e) { - throw new RestconfDocumentedException("Problem while deleting data from DS.", e); - } - - this.subscribers = new ConcurrentSet<>(); - this.registration.close(); - this.registration = null; - this.eventBus.unregister(this.eventBusChangeRecorder); - } - - /** - * Get stream name of this listener - * - * @return {@link String} - */ - public String getStreamName() { - return this.streamName; - } - - /** - * Check if is this listener registered. - * - * @return - true if is registered, otherwise null - */ - public boolean isListening() { - return this.registration == null ? false : true; - } - - /** - * Get schema path of notification - * - * @return {@link SchemaPath} - */ - public SchemaPath getSchemaPath() { - return this.path; - } - - /** - * Set registration for close after closing connection and check if this - * listener is registered - * - * @param registration - * - registered listener - */ - public void setRegistration(final ListenerRegistration registration) { - Preconditions.checkNotNull(registration); - this.registration = registration; - } - - /** - * Creates event of type {@link EventType#REGISTER}, set {@link Channel} - * subscriber to the event and post event into event bus. - * - * @param subscriber - * Channel - */ - public void addSubscriber(final Channel subscriber) { - if (!subscriber.isActive()) { - LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress()); - } - final Event event = new Event(EventType.REGISTER); - event.setSubscriber(subscriber); - this.eventBus.post(event); - } - - /** - * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} - * subscriber to the event and posts event into event bus. - * - * @param subscriber - */ - public void removeSubscriber(final Channel subscriber) { - LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress()); - final Event event = new Event(EventType.DEREGISTER); - event.setSubscriber(subscriber); - this.eventBus.post(event); - } - private String prepareXml() { - final Document doc = ListenerAdapter.createDocument(); - final Element notificationElement = - doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0", - "notification"); - doc.appendChild(notificationElement); - - final Element eventTimeElement = doc.createElement("eventTime"); - eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date())); - notificationElement.appendChild(eventTimeElement); + final Document doc = createDocument(); + final Element notificationElement = basePartDoc(doc); final Element notificationEventElement = doc.createElementNS( "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream"); addValuesToNotificationEventElement(doc, notificationEventElement, this.notification, this.schemaContext); notificationElement.appendChild(notificationEventElement); - try { - final ByteArrayOutputStream out = new ByteArrayOutputStream(); - final Transformer transformer = FACTORY.newTransformer(); - transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); - transformer.setOutputProperty(OutputKeys.METHOD, "xml"); - transformer.setOutputProperty(OutputKeys.INDENT, "yes"); - transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); - transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4"); - transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8))); - final byte[] charData = out.toByteArray(); - return new String(charData, "UTF-8"); - } catch (TransformerException | UnsupportedEncodingException e) { - final String msg = "Error during transformation of Document into String"; - LOG.error(msg, e); - return msg; - } + return transformDoc(doc); } private void addValuesToNotificationEventElement(final Document doc, final Element element, @@ -366,11 +174,11 @@ public class NotificationListenerAdapter implements DOMNotificationListener { return; } - final NormalizedNode>> body = notification - .getBody(); + final NormalizedNode>> body = + notification.getBody(); try { - final DOMResult domResult = writeNormalizedNode(body, - YangInstanceIdentifier.create(body.getIdentifier()), schemaContext); + + final DOMResult domResult = writeNormalizedNode(body, schemaContext, this.path); final Node result = doc.importNode(domResult.getNode().getFirstChild(), true); final Element dataElement = doc.createElement("notification"); dataElement.appendChild(result); @@ -381,181 +189,4 @@ public class NotificationListenerAdapter implements DOMNotificationListener { LOG.error("Error processing stream", e); } } - - private DOMResult writeNormalizedNode(final NormalizedNode normalized, final YangInstanceIdentifier path, - final SchemaContext context) throws IOException, XMLStreamException { - final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory(); - final Document doc = XmlDocumentUtils.getDocument(); - final DOMResult result = new DOMResult(doc); - NormalizedNodeWriter normalizedNodeWriter = null; - NormalizedNodeStreamWriter normalizedNodeStreamWriter = null; - XMLStreamWriter writer = null; - - try { - writer = XML_FACTORY.createXMLStreamWriter(result); - normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, - this.getSchemaPath()); - normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter); - - normalizedNodeWriter.write(normalized); - - normalizedNodeWriter.flush(); - } finally { - if (normalizedNodeWriter != null) { - normalizedNodeWriter.close(); - } - if (normalizedNodeStreamWriter != null) { - normalizedNodeStreamWriter.close(); - } - if (writer != null) { - writer.close(); - } - } - - return result; - } - - /** - * Tracks events of data change by customer. - */ - private final class EventBusChangeRecorder { - @Subscribe - public void recordCustomerChange(final Event event) { - if (event.getType() == EventType.REGISTER) { - final Channel subscriber = event.getSubscriber(); - if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) { - NotificationListenerAdapter.this.subscribers.add(subscriber); - } - } else if (event.getType() == EventType.DEREGISTER) { - NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber()); - Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this); - } else if (event.getType() == EventType.NOTIFY) { - for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) { - if (subscriber.isActive()) { - LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); - subscriber.writeAndFlush(new TextWebSocketFrame(event.getData())); - } else { - LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); - NotificationListenerAdapter.this.subscribers.remove(subscriber); - } - } - } - } - } - - /** - * Represents event of specific {@link EventType} type, holds data and - * {@link Channel} subscriber. - */ - private final class Event { - private final EventType type; - private Channel subscriber; - private String data; - - /** - * Creates new event specified by {@link EventType} type. - * - * @param type - * EventType - */ - public Event(final EventType type) { - this.type = type; - } - - /** - * Gets the {@link Channel} subscriber. - * - * @return Channel - */ - public Channel getSubscriber() { - return this.subscriber; - } - - /** - * Sets subscriber for event. - * - * @param subscriber - * Channel - */ - public void setSubscriber(final Channel subscriber) { - this.subscriber = subscriber; - } - - /** - * Gets event String. - * - * @return String representation of event data. - */ - public String getData() { - return this.data; - } - - /** - * Sets event data. - * - * @param data - * String. - */ - public void setData(final String data) { - this.data = data; - } - - /** - * Gets event type. - * - * @return The type of the event. - */ - public EventType getType() { - return this.type; - } - } - - /** - * Type of the event. - */ - private enum EventType { - REGISTER, DEREGISTER, NOTIFY - } - - /** - * Set query parameters for listener - * - * @param start - * - start-time of getting notification - * @param stop - * - stop-time of getting notification - * @param filter - * - indicate which subset of all possible events are of interest - */ - public void setQueryParams(final Date start, final Date stop, final String filter) { - this.start = start; - this.stop = stop; - this.filter = filter; - } - - /** - * Get outputType of listenere - * - * @return the outputType - */ - public String getOutputType() { - return this.outputType; - } - - /** - * Transaction chain to delete data in DS on close() - * - * @param transactionChainHandler - * - creating new write transaction to delete data on close - * @param schemaHandler - * - for getting schema to deserialize - * {@link MonitoringModule#PATH_TO_STREAM_WITHOUT_KEY} to - * {@link YangInstanceIdentifier} - */ - public void setCloseVars(final TransactionChainHandler transactionChainHandler, - final SchemaContextHandler schemaHandler) { - this.transactionChainHandler = transactionChainHandler; - this.schemaHandler = schemaHandler; - - } } diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java index 7c809ad77f..4db067cd15 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java @@ -131,25 +131,15 @@ public class Notificator { } } - /** - * Checks if listener has at least one subscriber. In case it doesn't have any, delete listener. - * - * @param listener - * ListenerAdapter - */ - public static void removeListenerIfNoSubscriberExists(final ListenerAdapter listener) { - if (!listener.hasSubscribers()) { - deleteListener(listener); - } - } - /** * Delete {@link ListenerAdapter} listener specified in parameter. * + * @param + * * @param listener * ListenerAdapter */ - private static void deleteListener(final ListenerAdapter listener) { + private static void deleteListener(final T listener) { if (listener != null) { try { listener.close(); @@ -203,13 +193,17 @@ public class Notificator { return listListeners; } - public static void removeNotificationListenerIfNoSubscriberExists(final NotificationListenerAdapter listener) { + public static void removeListenerIfNoSubscriberExists(final T listener) { if (!listener.hasSubscribers()) { - deleteNotificationListener(listener); + if (listener instanceof NotificationListenerAdapter) { + deleteNotificationListener(listener); + } else { + deleteListener(listener); + } } } - private static void deleteNotificationListener(final NotificationListenerAdapter listener) { + private static void deleteNotificationListener(final T listener) { if (listener != null) { try { listener.close(); diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/base/services/impl/RestconfOperationsServiceImpl.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/base/services/impl/RestconfOperationsServiceImpl.java index 612b03bdec..74df8b5422 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/base/services/impl/RestconfOperationsServiceImpl.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/base/services/impl/RestconfOperationsServiceImpl.java @@ -80,7 +80,6 @@ public class RestconfOperationsServiceImpl implements RestconfOperationsService ref.get(), Optional.of(this.domMountPointServiceHandler.get())); mountPoint = mountPointIdentifier.getMountPoint(); modules = ref.getModules(mountPoint); - } else { final String errMsg = "URI has bad format. If operations behind mount point should be showed, URI has to end with "; diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/services/impl/RestconfStreamsSubscriptionServiceImpl.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/services/impl/RestconfStreamsSubscriptionServiceImpl.java index 3c81d2322c..fb2c76ee49 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/services/impl/RestconfStreamsSubscriptionServiceImpl.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/services/impl/RestconfStreamsSubscriptionServiceImpl.java @@ -14,6 +14,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import javax.ws.rs.core.UriInfo; +import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext; import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext; import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException; @@ -29,6 +32,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdent import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeAttrBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,77 +44,45 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu private static final Logger LOG = LoggerFactory.getLogger(RestconfStreamsSubscriptionServiceImpl.class); - private final DOMDataBrokerHandler domDataBrokerHandler; - - private final NotificationServiceHandler notificationServiceHandler; - - private final SchemaContextHandler schemaHandler; - - private final TransactionChainHandler transactionChainHandler; + private final HandlersHolder handlersHolder; + /** + * Initialize holder of handlers with holders as parameters. + * + * @param domDataBrokerHandler + * - handler of {@link DOMDataBroker} + * @param notificationServiceHandler + * - handler of {@link DOMNotificationService} + * @param schemaHandler + * - handler of {@link SchemaContext} + * @param transactionChainHandler + * - handler of {@link DOMTransactionChain} + */ public RestconfStreamsSubscriptionServiceImpl(final DOMDataBrokerHandler domDataBrokerHandler, final NotificationServiceHandler notificationServiceHandler, final SchemaContextHandler schemaHandler, final TransactionChainHandler transactionChainHandler) { - this.domDataBrokerHandler = domDataBrokerHandler; - this.notificationServiceHandler = notificationServiceHandler; - this.schemaHandler = schemaHandler; - this.transactionChainHandler = transactionChainHandler; + this.handlersHolder = new HandlersHolder(domDataBrokerHandler, notificationServiceHandler, + transactionChainHandler, schemaHandler); } @Override public NormalizedNodeContext subscribeToStream(final String identifier, final UriInfo uriInfo) { - boolean startTime_used = false; - boolean stopTime_used = false; - boolean filter_used = false; - Date start = null; - Date stop = null; - String filter = null; - - for (final Entry> entry : uriInfo.getQueryParameters().entrySet()) { - switch (entry.getKey()) { - case "start-time": - if (!startTime_used) { - startTime_used = true; - start = SubscribeToStreamUtil.parseDateFromQueryParam(entry); - } else { - throw new RestconfDocumentedException("Start-time parameter can be used only once."); - } - break; - case "stop-time": - if (!stopTime_used) { - stopTime_used = true; - stop = SubscribeToStreamUtil.parseDateFromQueryParam(entry); - } else { - throw new RestconfDocumentedException("Stop-time parameter can be used only once."); - } - break; - case "filter": - if (!filter_used) { - filter_used = true; - filter = entry.getValue().iterator().next(); - } - break; - default: - throw new RestconfDocumentedException("Bad parameter used with notifications: " + entry.getKey()); - } - } - if (!startTime_used && stopTime_used) { - throw new RestconfDocumentedException("Stop-time parameter has to be used with start-time parameter."); - } + final NotificationQueryParams notificationQueryParams = new NotificationQueryParams(); + notificationQueryParams.prepareParams(uriInfo); + URI response = null; if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCR)) { - response = - SubscribeToStreamUtil.notifiDataStream(identifier, uriInfo, start, stop, this.domDataBrokerHandler, filter, - this.transactionChainHandler, this.schemaHandler); + response = SubscribeToStreamUtil.notifiDataStream(identifier, uriInfo, notificationQueryParams, + this.handlersHolder); } else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) { - response = SubscribeToStreamUtil.notifYangStream(identifier, uriInfo, start, stop, - this.notificationServiceHandler, filter, this.transactionChainHandler, this.schemaHandler); + response = SubscribeToStreamUtil.notifYangStream(identifier, uriInfo, notificationQueryParams, + this.handlersHolder); } if (response != null) { // prepare node with value of location final InstanceIdentifierContext iid = - SubscribeToStreamUtil.prepareIIDSubsStreamOutput(this.schemaHandler); + SubscribeToStreamUtil.prepareIIDSubsStreamOutput(this.handlersHolder.getSchemaHandler()); final NormalizedNodeAttrBuilder> builder = ImmutableLeafNodeBuilder.create().withValue(response.toString()); builder.withNodeIdentifier( @@ -127,4 +99,146 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu LOG.warn(msg); throw new RestconfDocumentedException(msg); } + + /** + * Holder of all handlers for notifications + */ + public final class HandlersHolder { + + private final DOMDataBrokerHandler domDataBrokerHandler; + private final NotificationServiceHandler notificationServiceHandler; + private final TransactionChainHandler transactionChainHandler; + private final SchemaContextHandler schemaHandler; + + private HandlersHolder(final DOMDataBrokerHandler domDataBrokerHandler, + final NotificationServiceHandler notificationServiceHandler, + final TransactionChainHandler transactionChainHandler, final SchemaContextHandler schemaHandler) { + this.domDataBrokerHandler = domDataBrokerHandler; + this.notificationServiceHandler = notificationServiceHandler; + this.transactionChainHandler = transactionChainHandler; + this.schemaHandler = schemaHandler; + } + + /** + * Get {@link DOMDataBrokerHandler} + * + * @return the domDataBrokerHandler + */ + public DOMDataBrokerHandler getDomDataBrokerHandler() { + return this.domDataBrokerHandler; + } + + /** + * Get {@link NotificationServiceHandler} + * + * @return the notificationServiceHandler + */ + public NotificationServiceHandler getNotificationServiceHandler() { + return this.notificationServiceHandler; + } + + /** + * Get {@link TransactionChainHandler} + * + * @return the transactionChainHandler + */ + public TransactionChainHandler getTransactionChainHandler() { + return this.transactionChainHandler; + } + + /** + * Get {@link SchemaContextHandler} + * + * @return the schemaHandler + */ + public SchemaContextHandler getSchemaHandler() { + return this.schemaHandler; + } + } + + /** + * Parser and holder of query paramteres from uriInfo for notifications + * + */ + public final class NotificationQueryParams { + + private Date start = null; + private Date stop = null; + private String filter = null; + + private NotificationQueryParams() { + + } + + private void prepareParams(final UriInfo uriInfo) { + boolean startTime_used = false; + boolean stopTime_used = false; + boolean filter_used = false; + + for (final Entry> entry : uriInfo.getQueryParameters().entrySet()) { + switch (entry.getKey()) { + case "start-time": + if (!startTime_used) { + startTime_used = true; + this.start = SubscribeToStreamUtil.parseDateFromQueryParam(entry); + } else { + throw new RestconfDocumentedException("Start-time parameter can be used only once."); + } + break; + case "stop-time": + if (!stopTime_used) { + stopTime_used = true; + this.stop = SubscribeToStreamUtil.parseDateFromQueryParam(entry); + } else { + throw new RestconfDocumentedException("Stop-time parameter can be used only once."); + } + break; + case "filter": + if (!filter_used) { + filter_used = true; + this.filter = entry.getValue().iterator().next(); + } + break; + default: + throw new RestconfDocumentedException( + "Bad parameter used with notifications: " + entry.getKey()); + } + } + if (!startTime_used && stopTime_used) { + throw new RestconfDocumentedException("Stop-time parameter has to be used with start-time parameter."); + } + + if (this.start == null) { + this.start = new Date(); + } + } + + /** + * Get start-time query parameter + * + * @return start-time + */ + public Date getStart() { + return this.start; + } + + /** + * Get stop-time query parameter + * + * @return stop-time + */ + public Date getStop() { + return this.stop; + } + + /** + * Get filter query parameter + * + * @return filter + */ + public String getFilter() { + return this.filter; + } + } + } diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/CreateStreamUtil.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/CreateStreamUtil.java index d199604aff..a6e76b56b4 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/CreateStreamUtil.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/CreateStreamUtil.java @@ -102,20 +102,23 @@ public final class CreateStreamUtil { final ContainerNode data = (ContainerNode) payload.getData(); final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName(); final YangInstanceIdentifier path = preparePath(data, qname); - final String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data); + String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data); final QName outputQname = QName.create(qname, "output"); final QName streamNameQname = QName.create(qname, "stream-name"); - final ContainerNode output = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new NodeIdentifier(outputQname)) - .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build(); final NotificationOutputType outputType = prepareOutputType(data); + if(outputType.equals(NotificationOutputType.JSON)){ + streamName = streamName + "/JSON"; + } if (!Notificator.existListenerFor(streamName)) { Notificator.createListener(path, streamName, outputType); } + final ContainerNode output = + ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname)) + .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build(); return new DefaultDOMRpcResult(output); } @@ -229,7 +232,9 @@ public final class CreateStreamUtil { streamName = streamName + ","; } } - + if (outputType.equals("JSON")) { + streamName = streamName + "/JSON"; + } final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName(); final QName outputQname = QName.create(rpcQName, "output"); final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier"); diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/SubscribeToStreamUtil.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/SubscribeToStreamUtil.java index f387941b67..d8df4d20c8 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/SubscribeToStreamUtil.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/SubscribeToStreamUtil.java @@ -38,11 +38,11 @@ import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapte import org.opendaylight.netconf.sal.streams.listeners.Notificator; import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer; import org.opendaylight.restconf.Draft18.MonitoringModule; -import org.opendaylight.restconf.handlers.DOMDataBrokerHandler; import org.opendaylight.restconf.handlers.NotificationServiceHandler; import org.opendaylight.restconf.handlers.SchemaContextHandler; -import org.opendaylight.restconf.handlers.TransactionChainHandler; import org.opendaylight.restconf.parser.IdentifierCodec; +import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder; +import org.opendaylight.restconf.restful.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams; import org.opendaylight.restconf.utils.RestconfConstants; import org.opendaylight.restconf.utils.mapping.RestconfMappingNodeUtil; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; @@ -55,6 +55,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode; import org.opendaylight.yangtools.yang.model.api.DataSchemaNode; import org.opendaylight.yangtools.yang.model.api.Module; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaNode; import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; @@ -72,83 +73,6 @@ public final class SubscribeToStreamUtil { throw new UnsupportedOperationException("Util class"); } - /** - * Parse enum from URI - * - * @param clazz - * - enum type - * @param value - * - string of enum value - * @return enum - */ - private static T parseURIEnum(final Class clazz, final String value) { - if ((value == null) || value.equals("")) { - return null; - } - return ResolveEnumUtil.resolveEnum(clazz, value); - } - - /** - * Prepare map of values from URI - * - * @param identifier - * - URI - * @return {@link Map} - */ - public static Map mapValuesFromUri(final String identifier) { - final HashMap result = new HashMap<>(); - final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH)); - for (final String token : tokens) { - final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL)); - if (paramToken.length == 2) { - result.put(paramToken[0], paramToken[1]); - } - } - return result; - } - - /** - * Register data change listener in dom data broker and set it to listener - * on stream - * - * @param ds - * - {@link LogicalDatastoreType} - * @param scope - * - {@link DataChangeScope} - * @param listener - * - listener on specific stream - * @param domDataBroker - * - data broker for register data change listener - */ - private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope, - final ListenerAdapter listener, final DOMDataBroker domDataBroker) { - if (listener.isListening()) { - return; - } - - final YangInstanceIdentifier path = listener.getPath(); - final ListenerRegistration registration = domDataBroker.registerDataChangeListener(ds, - path, listener, scope); - - listener.setRegistration(registration); - } - - /** - * Get port from web socket server. If doesn't exit, create it. - * - * @return port - */ - private static int prepareNotificationPort() { - int port = RestconfStreamsConstants.NOTIFICATION_PORT; - try { - final WebSocketServer webSocketServer = WebSocketServer.getInstance(); - port = webSocketServer.getPort(); - } catch (final NullPointerException e) { - WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT); - } - return port; - } - /** * Register listeners by streamName in identifier to listen to yang * notifications, put or delete info about listener to DS according to @@ -158,24 +82,15 @@ public final class SubscribeToStreamUtil { * - identifier as stream name * @param uriInfo * - for getting base URI information - * @param start - * - start-time query parameter - * @param stop - * - stop-time query parameter - * @param notifiServiceHandler - * - DOMNotificationService handler for register listeners - * @param filter - * - indicate which subset of all possible events are of interest - * @param transactionChainHandler - * - to put new data about stream to DS and delete after close - * listener - * @param schemaHandler - * - for getting schema context + * @param notificationQueryParams + * - query parameters of notification + * @param handlersHolder + * - holder of handlers for notifications * @return location for listening */ - public static URI notifYangStream(final String identifier, final UriInfo uriInfo, Date start, final Date stop, - final NotificationServiceHandler notifiServiceHandler, final String filter, - final TransactionChainHandler transactionChainHandler, final SchemaContextHandler schemaHandler) { + @SuppressWarnings("rawtypes") + public static URI notifYangStream(final String identifier, final UriInfo uriInfo, + final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) { final String streamName = Notificator.createStreamNameFromUri(identifier); if (Strings.isNullOrEmpty(streamName)) { throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); @@ -186,63 +101,27 @@ public final class SubscribeToStreamUtil { ErrorTag.UNKNOWN_ELEMENT); } - final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder(); - int notificationPort = RestconfStreamsConstants.NOTIFICATION_PORT; - try { - final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance(); - notificationPort = webSocketServerInstance.getPort(); - } catch (final NullPointerException e) { - WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT); - } - final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws"); - final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build(); - - final DOMDataReadWriteTransaction wTx = transactionChainHandler.get().newReadWriteTransaction(); - final boolean exist = checkExist(schemaHandler, wTx); - final Module monitoringModule = schemaHandler.get() - .findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE); - if (start == null) { - start = new Date(); - } + final DOMDataReadWriteTransaction wTx = + handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction(); + final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get(); + final boolean exist = checkExist(schemaContext, wTx); + + final URI uri = prepareUriByStreamName(uriInfo, streamName); for (final NotificationListenerAdapter listener : listeners) { - registerToListenNotification(listener, notifiServiceHandler); - listener.setQueryParams(start, stop, filter); - listener.setCloseVars(transactionChainHandler, schemaHandler); + registerToListenNotification(listener, handlersHolder.getNotificationServiceHandler()); + listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(), + notificationQueryParams.getFilter()); + listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(listener.getSchemaPath().getLastComponent(), - schemaHandler.get().getNotifications(), start, listener.getOutputType(), - uriToWebsocketServer, monitoringModule, exist); - writeDataToDS(schemaHandler, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist, mapToStreams); + schemaContext.getNotifications(), notificationQueryParams.getStart(), + listener.getOutputType(), uri, getMonitoringModule(schemaContext), exist); + writeDataToDS(schemaContext, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist, + mapToStreams); } submitData(wTx); - return uriToWebsocketServer; - } - - private static boolean checkExist(final SchemaContextHandler schemaHandler, final DOMDataReadWriteTransaction wTx) { - boolean exist; - try { - exist = wTx - .exists(LogicalDatastoreType.OPERATIONAL, IdentifierCodec - .deserialize(MonitoringModule.PATH_TO_STREAMS, schemaHandler.get())) - .checkedGet(); - } catch (final ReadFailedException e1) { - throw new RestconfDocumentedException("Problem while checking data if exists", e1); - } - return exist; - } - - private static void registerToListenNotification(final NotificationListenerAdapter listener, - final NotificationServiceHandler notificationServiceHandler) { - if (listener.isListening()) { - return; - } - - final SchemaPath path = listener.getSchemaPath(); - final ListenerRegistration registration = - notificationServiceHandler.get().registerNotificationListener(listener, path); - - listener.setRegistration(registration); + return uri; } /** @@ -274,24 +153,15 @@ public final class SubscribeToStreamUtil { * - identifier as stream name * @param uriInfo * - for getting base URI information - * @param start - * - start-time query parameter - * @param stop - * - stop-time query parameter - * @param domDataBrokerHandler - * - DOMDataBroker handler for register listener - * @param filter - * - indicate which subset of all possible events are of interest - * @param schemaHandler - * - for getting schema context - * @param transactionChainHandler - * - to put new data about stream to DS and delete after close - * listener + * @param notificationQueryParams + * - query parameters of notification + * @param handlersHolder + * - holder of handlers for notifications * @return location for listening */ - public static URI notifiDataStream(final String identifier, final UriInfo uriInfo, Date start, final Date stop, - final DOMDataBrokerHandler domDataBrokerHandler, final String filter, - final TransactionChainHandler transactionChainHandler, final SchemaContextHandler schemaHandler) { + @SuppressWarnings("rawtypes") + public static URI notifiDataStream(final String identifier, final UriInfo uriInfo, + final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) { final Map mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier); final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class, @@ -315,53 +185,33 @@ public final class SubscribeToStreamUtil { final ListenerAdapter listener = Notificator.getListenerFor(streamName); Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName); - if (start == null) { - start = new Date(); - } + listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(), + notificationQueryParams.getFilter()); + listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); - listener.setQueryParams(start, stop, filter); - listener.setCloseVars(transactionChainHandler, schemaHandler); + registration(ds, scope, listener, handlersHolder.getDomDataBrokerHandler().get()); - SubscribeToStreamUtil.registration(ds, scope, listener, domDataBrokerHandler.get()); + final URI uri = prepareUriByStreamName(uriInfo, streamName); - final int port = SubscribeToStreamUtil.prepareNotificationPort(); + final DOMDataReadWriteTransaction wTx = + handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction(); + final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get(); + final boolean exist = checkExist(schemaContext, wTx); - final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder(); - final UriBuilder uriToWebSocketServer = - uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI); - final URI uri = uriToWebSocketServer.replacePath(streamName).build(); - - final Module monitoringModule = schemaHandler.get() - .findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE); - final DOMDataReadWriteTransaction wTx = transactionChainHandler.get().newReadWriteTransaction(); - final boolean exist = checkExist(schemaHandler, wTx); - - final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(), start, - listener.getOutputType(), uri, monitoringModule, exist, schemaHandler.get()); - writeDataToDS(schemaHandler, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist, + final NormalizedNode mapToStreams = RestconfMappingNodeUtil + .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(), + notificationQueryParams.getStart(), listener.getOutputType(), uri, + getMonitoringModule(schemaContext), exist, schemaContext); + writeDataToDS(schemaContext, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist, mapToStreams); submitData(wTx); return uri; } - private static void writeDataToDS(final SchemaContextHandler schemaHandler, final String name, - final DOMDataReadWriteTransaction wTx, final boolean exist, final NormalizedNode mapToStreams) { - String pathId = ""; - if (exist) { - pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name; - } else { - pathId = MonitoringModule.PATH_TO_STREAMS; - } - wTx.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaHandler.get()), - mapToStreams); - } - - private static void submitData(final DOMDataReadWriteTransaction wTx) { - try { - wTx.submit().checkedGet(); - } catch (final TransactionCommitFailedException e) { - throw new RestconfDocumentedException("Problem while putting data to DS.", e); - } + public static Module getMonitoringModule(final SchemaContext schemaContext) { + final Module monitoringModule = + schemaContext.findModuleByNamespaceAndRevision(MonitoringModule.URI_MODULE, MonitoringModule.DATE); + return monitoringModule; } /** @@ -398,4 +248,138 @@ public final class SubscribeToStreamUtil { throw new RestconfDocumentedException("Cannot parse of value in date: " + value + e); } } + + @SuppressWarnings("rawtypes") + private static void writeDataToDS(final SchemaContext schemaContext, final String name, + final DOMDataReadWriteTransaction wTx, final boolean exist, final NormalizedNode mapToStreams) { + String pathId = ""; + if (exist) { + pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name; + } else { + pathId = MonitoringModule.PATH_TO_STREAMS; + } + wTx.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaContext), + mapToStreams); + } + + private static void submitData(final DOMDataReadWriteTransaction wTx) { + try { + wTx.submit().checkedGet(); + } catch (final TransactionCommitFailedException e) { + throw new RestconfDocumentedException("Problem while putting data to DS.", e); + } + } + + /** + * Prepare map of values from URI + * + * @param identifier + * - URI + * @return {@link Map} + */ + public static Map mapValuesFromUri(final String identifier) { + final HashMap result = new HashMap<>(); + final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH)); + for (final String token : tokens) { + final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL)); + if (paramToken.length == 2) { + result.put(paramToken[0], paramToken[1]); + } + } + return result; + } + + private static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) { + final int port = SubscribeToStreamUtil.prepareNotificationPort(); + + final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder(); + final UriBuilder uriToWebSocketServer = + uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI); + final URI uri = uriToWebSocketServer.replacePath(streamName).build(); + return uri; + } + + /** + * Register data change listener in dom data broker and set it to listener + * on stream + * + * @param ds + * - {@link LogicalDatastoreType} + * @param scope + * - {@link DataChangeScope} + * @param listener + * - listener on specific stream + * @param domDataBroker + * - data broker for register data change listener + */ + @SuppressWarnings("deprecation") + private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope, + final ListenerAdapter listener, final DOMDataBroker domDataBroker) { + if (listener.isListening()) { + return; + } + + final YangInstanceIdentifier path = listener.getPath(); + final ListenerRegistration registration = + domDataBroker.registerDataChangeListener(ds, path, listener, scope); + + listener.setRegistration(registration); + } + + /** + * Get port from web socket server. If doesn't exit, create it. + * + * @return port + */ + private static int prepareNotificationPort() { + int port = RestconfStreamsConstants.NOTIFICATION_PORT; + try { + final WebSocketServer webSocketServer = WebSocketServer.getInstance(); + port = webSocketServer.getPort(); + } catch (final NullPointerException e) { + WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT); + } + return port; + } + + private static boolean checkExist(final SchemaContext schemaContext, final DOMDataReadWriteTransaction wTx) { + boolean exist; + try { + exist = wTx.exists(LogicalDatastoreType.OPERATIONAL, + IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).checkedGet(); + } catch (final ReadFailedException e1) { + throw new RestconfDocumentedException("Problem while checking data if exists", e1); + } + return exist; + } + + private static void registerToListenNotification(final NotificationListenerAdapter listener, + final NotificationServiceHandler notificationServiceHandler) { + if (listener.isListening()) { + return; + } + + final SchemaPath path = listener.getSchemaPath(); + final ListenerRegistration registration = + notificationServiceHandler.get().registerNotificationListener(listener, path); + + listener.setRegistration(registration); + } + + /** + * Parse enum from URI + * + * @param clazz + * - enum type + * @param value + * - string of enum value + * @return enum + */ + private static T parseURIEnum(final Class clazz, final String value) { + if ((value == null) || value.equals("")) { + return null; + } + return ResolveEnumUtil.resolveEnum(clazz, value); + } + } diff --git a/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java b/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java index 35c6e1de33..8a11fa9c12 100644 --- a/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java +++ b/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java @@ -366,7 +366,7 @@ public class BrokerFacadeTest { listener.setCloseVars(transactionChainHandler, schemaHandler); // close and remove test notification listener listener.close(); - Notificator.removeNotificationListenerIfNoSubscriberExists(listener); + Notificator.removeListenerIfNoSubscriberExists(listener); } /** diff --git a/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/ExpressionParserTest.java b/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/ExpressionParserTest.java index 61cd81d9e2..ab14b63808 100644 --- a/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/ExpressionParserTest.java +++ b/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/ExpressionParserTest.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.sal.restconf.impl.test; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.Collection; import org.junit.Assert; @@ -20,7 +21,10 @@ import org.opendaylight.controller.md.sal.rest.common.TestRestconfUtils; import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter; import org.opendaylight.netconf.sal.streams.listeners.Notificator; import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; public class ExpressionParserTest { @@ -126,12 +130,25 @@ public class ExpressionParserTest { } } final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class); + final PathArgument pathValue = NodeIdentifier.create(QName.create("module", "2016-14-12", "localName")); + Mockito.when(path.getLastPathArgument()).thenReturn(pathValue); final ListenerAdapter listener = Notificator.createListener(path, "streamName", NotificationOutputType.JSON); listener.setQueryParams(null, null, filter); - final Method m = listener.getClass().getDeclaredMethod("parseFilterParam", String.class); + final Class superclass = listener.getClass().getSuperclass().getSuperclass(); + Method m = null; + for (final Method method : superclass.getDeclaredMethods()) { + if (method.getName().equals("parseFilterParam")) { + m = method; + } + } + if (m == null) { + throw new Exception("Methode parseFilterParam doesn't exist in " + superclass.getName()); + } m.setAccessible(true); - - return (boolean) m.invoke(listener, readFile(xml)); + final Field xmlField = superclass.getDeclaredField("xml"); + xmlField.setAccessible(true); + xmlField.set(listener, readFile(xml)); + return (boolean) m.invoke(listener, null); } private String readFile(final File xml) throws Exception { diff --git a/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplNotificationSubscribingTest.java b/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplNotificationSubscribingTest.java index 2fdb0ba7e4..c2e69f7988 100644 --- a/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplNotificationSubscribingTest.java +++ b/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplNotificationSubscribingTest.java @@ -36,7 +36,10 @@ import org.opendaylight.netconf.sal.restconf.impl.RestconfImpl; import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter; import org.opendaylight.netconf.sal.streams.listeners.Notificator; import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public class RestconfImplNotificationSubscribingTest { @@ -61,6 +64,8 @@ public class RestconfImplNotificationSubscribingTest { ControllerContext.getInstance().setGlobalSchema(TestRestconfUtils.loadSchemaContext("/notifications")); final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class); + final PathArgument pathValue = NodeIdentifier.create(QName.create("module", "2016-14-12", "localName")); + Mockito.when(path.getLastPathArgument()).thenReturn(pathValue); Notificator.createListener(path, this.identifier, NotificationOutputType.XML); } @@ -215,6 +220,8 @@ public class RestconfImplNotificationSubscribingTest { @Test public void onNotifiTest() throws Exception { final YangInstanceIdentifier path = Mockito.mock(YangInstanceIdentifier.class); + final PathArgument pathValue = NodeIdentifier.create(QName.create("module", "2016-14-12", "localName")); + Mockito.when(path.getLastPathArgument()).thenReturn(pathValue); final ListenerAdapter listener = Notificator.createListener(path, this.identifier, NotificationOutputType.XML); final List>> list = new ArrayList<>(); @@ -229,13 +236,14 @@ public class RestconfImplNotificationSubscribingTest { final AsyncDataChangeEvent> change = Mockito.mock(AsyncDataChangeEvent.class); - Field start = listener.getClass().getDeclaredField("start"); + final Class superclass = listener.getClass().getSuperclass().getSuperclass(); + Field start = superclass.getDeclaredField("start"); start.setAccessible(true); Date startOrig = (Date) start.get(listener); Assert.assertNotNull(startOrig); listener.onDataChanged(change); - start = listener.getClass().getDeclaredField("start"); + start = superclass.getDeclaredField("start"); start.setAccessible(true); startOrig = (Date) start.get(listener); Assert.assertNull(startOrig); diff --git a/restconf/sal-rest-connector/src/test/java/org/opendaylight/restconf/restful/services/impl/RestconfStreamsSubscriptionServiceImplTest.java b/restconf/sal-rest-connector/src/test/java/org/opendaylight/restconf/restful/services/impl/RestconfStreamsSubscriptionServiceImplTest.java index 619cab5013..7a8fcd327f 100644 --- a/restconf/sal-rest-connector/src/test/java/org/opendaylight/restconf/restful/services/impl/RestconfStreamsSubscriptionServiceImplTest.java +++ b/restconf/sal-rest-connector/src/test/java/org/opendaylight/restconf/restful/services/impl/RestconfStreamsSubscriptionServiceImplTest.java @@ -104,7 +104,6 @@ public class RestconfStreamsSubscriptionServiceImplTest { public static void setUpBeforeTest() throws Exception { final Map listenersByStreamNameSetter = new HashMap<>(); final ListenerAdapter adapter = mock(ListenerAdapter.class); - doReturn(false).when(adapter).isListening(); listenersByStreamNameSetter.put( "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", adapter); -- 2.36.6