import javax.xml.transform.dom.DOMResult;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
+import org.json.JSONObject;
+import org.json.XML;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private Set<Channel> subscribers = new ConcurrentSet<>();
private final EventBus eventBus;
private final EventBusChangeRecorder eventBusChangeRecorder;
+ private final NotificationOutputType outputType;
+ private Date start = null;
+ private Date stop = null;
/**
- * Creates new {@link ListenerAdapter} listener specified by path and stream name.
+ * Creates new {@link ListenerAdapter} listener specified by path and stream
+ * name.
*
* @param path
* Path to data in data store.
* @param streamName
* The name of the stream.
+ * @param outputType
+ * - type of output on notification (JSON, XML)
*/
- ListenerAdapter(final YangInstanceIdentifier path, final String streamName) {
+ ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
+ final NotificationOutputType outputType) {
+ this.outputType = outputType;
Preconditions.checkNotNull(path);
Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
this.path = path;
@Override
public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ final Date now = new Date();
+ if (this.stop != null) {
+ if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
+ prepareAndPostData(change);
+ }
+ if (this.stop.compareTo(now) < 0) {
+ try {
+ this.close();
+ } catch (final Exception e) {
+ throw new RestconfDocumentedException("Problem with unregister listener." + e);
+ }
+ }
+ } else if (this.start != null) {
+ if (this.start.compareTo(now) < 0) {
+ this.start = null;
+ prepareAndPostData(change);
+ }
+ } else {
+ prepareAndPostData(change);
+ }
+ }
+
+ private void prepareAndPostData(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
if (!change.getCreatedData().isEmpty() || !change.getUpdatedData().isEmpty()
|| !change.getRemovedPaths().isEmpty()) {
final String xml = prepareXmlFrom(change);
final Event event = new Event(EventType.NOTIFY);
- event.setData(xml);
+ if (this.outputType.equals(NotificationOutputType.JSON)) {
+ final JSONObject jsonObject = XML.toJSONObject(xml);
+ event.setData(jsonObject.toString());
+ } else {
+ event.setData(xml);
+ }
this.eventBus.post(event);
}
}
final Document doc = createDocument();
final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
"notification");
+
doc.appendChild(notificationElement);
final Element eventTimeElement = doc.createElement("eventTime");
final Element dataChangedNotificationEventElement = doc.createElementNS(
"urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
+
addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change,
schemaContext, dataContextTree);
notificationElement.appendChild(dataChangedNotificationEventElement);
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
- transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
+ transformer.transform(new DOMSource(doc),
+ new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
final byte[] charData = out.toByteArray();
return new String(charData, "UTF-8");
} catch (TransformerException | UnsupportedEncodingException e) {
* @param operation
* {@link Operation}
*/
- private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data, final Element element,
- final Operation operation) {
+ private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data,
+ final Element element, final Operation operation) {
if ((data == null) || data.isEmpty()) {
return;
}
* {@link Operation}
* @return {@link Node} node represented by changed event element.
*/
- private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path, final Operation operation) {
+ private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path,
+ final Operation operation) {
final Element dataChangeEventElement = doc.createElement("data-change-event");
final Element pathElement = doc.createElement("path");
addPathAsValueToElement(path, pathElement);
}
}
+ /**
+ * Set query parameters for listener
+ *
+ * @param start
+ * - start-time of getting notification
+ * @param stop
+ * - stop-time of getting notification
+ */
+ public void setTimer(final Date start, final Date stop) {
+ this.start = start;
+ this.stop = stop;
+ }
+
}