import javax.xml.transform.dom.DOMResult;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
+import org.json.JSONObject;
+import org.json.XML;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
- private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
+ private static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
private final YangInstanceIdentifier path;
private ListenerRegistration<DOMDataChangeListener> registration;
private Set<Channel> subscribers = new ConcurrentSet<>();
private final EventBus eventBus;
private final EventBusChangeRecorder eventBusChangeRecorder;
+ private final NotificationOutputType outputType;
+ private Date start = null;
+ private Date stop = null;
/**
- * Creates new {@link ListenerAdapter} listener specified by path and stream name.
+ * Creates new {@link ListenerAdapter} listener specified by path and stream
+ * name.
*
* @param path
* Path to data in data store.
* @param streamName
* The name of the stream.
+ * @param outputType
+ * - type of output on notification (JSON, XML)
*/
- ListenerAdapter(final YangInstanceIdentifier path, final String streamName) {
+ ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
+ final NotificationOutputType outputType) {
+ this.outputType = outputType;
Preconditions.checkNotNull(path);
- Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
+ Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
this.path = path;
this.streamName = streamName;
- eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
- eventBusChangeRecorder = new EventBusChangeRecorder();
- eventBus.register(eventBusChangeRecorder);
+ this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+ this.eventBusChangeRecorder = new EventBusChangeRecorder();
+ this.eventBus.register(this.eventBusChangeRecorder);
}
@Override
public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ final Date now = new Date();
+ if (this.stop != null) {
+ if ((this.start.compareTo(now) < 0) && (this.stop.compareTo(now) > 0)) {
+ prepareAndPostData(change);
+ }
+ if (this.stop.compareTo(now) < 0) {
+ try {
+ this.close();
+ } catch (final Exception e) {
+ throw new RestconfDocumentedException("Problem with unregister listener." + e);
+ }
+ }
+ } else if (this.start != null) {
+ if (this.start.compareTo(now) < 0) {
+ this.start = null;
+ prepareAndPostData(change);
+ }
+ } else {
+ prepareAndPostData(change);
+ }
+ }
+
+ private void prepareAndPostData(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
if (!change.getCreatedData().isEmpty() || !change.getUpdatedData().isEmpty()
|| !change.getRemovedPaths().isEmpty()) {
final String xml = prepareXmlFrom(change);
final Event event = new Event(EventType.NOTIFY);
- event.setData(xml);
- eventBus.post(event);
+ if (this.outputType.equals(NotificationOutputType.JSON)) {
+ final JSONObject jsonObject = XML.toJSONObject(xml);
+ event.setData(jsonObject.toString());
+ } else {
+ event.setData(xml);
+ }
+ this.eventBus.post(event);
}
}
public void recordCustomerChange(final Event event) {
if (event.getType() == EventType.REGISTER) {
final Channel subscriber = event.getSubscriber();
- if (!subscribers.contains(subscriber)) {
- subscribers.add(subscriber);
+ if (!ListenerAdapter.this.subscribers.contains(subscriber)) {
+ ListenerAdapter.this.subscribers.add(subscriber);
}
} else if (event.getType() == EventType.DEREGISTER) {
- subscribers.remove(event.getSubscriber());
+ ListenerAdapter.this.subscribers.remove(event.getSubscriber());
Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
} else if (event.getType() == EventType.NOTIFY) {
- for (final Channel subscriber : subscribers) {
+ for (final Channel subscriber : ListenerAdapter.this.subscribers) {
if (subscriber.isActive()) {
LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
} else {
LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
- subscribers.remove(subscriber);
+ ListenerAdapter.this.subscribers.remove(subscriber);
}
}
}
* @return Channel
*/
public Channel getSubscriber() {
- return subscriber;
+ return this.subscriber;
}
/**
* @return String representation of event data.
*/
public String getData() {
- return data;
+ return this.data;
}
/**
* @return The type of the event.
*/
public EventType getType() {
- return type;
+ return this.type;
}
}
final Document doc = createDocument();
final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
"notification");
+
doc.appendChild(notificationElement);
final Element eventTimeElement = doc.createElement("eventTime");
final Element dataChangedNotificationEventElement = doc.createElementNS(
"urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
+
addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change,
schemaContext, dataContextTree);
notificationElement.appendChild(dataChangedNotificationEventElement);
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
- transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
+ transformer.transform(new DOMSource(doc),
+ new StreamResult(new OutputStreamWriter(out, StandardCharsets.UTF_8)));
final byte[] charData = out.toByteArray();
return new String(charData, "UTF-8");
} catch (TransformerException | UnsupportedEncodingException e) {
* Date
* @return Data specified by RFC3339.
*/
- private String toRFC3339(final Date d) {
- return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2");
+ public static String toRFC3339(final Date d) {
+ return RFC3339_PATTERN.matcher(RFC3339.format(d)).replaceAll("$1:$2");
}
/**
* Creates {@link Document} document.
* @return {@link Document} document.
*/
- private Document createDocument() {
+ public static Document createDocument() {
final DocumentBuilder bob;
try {
bob = DBF.newDocumentBuilder();
* @param operation
* {@link Operation}
*/
- private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data, final Element element,
- final Operation operation) {
- if (data == null || data.isEmpty()) {
+ private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data,
+ final Element element, final Operation operation) {
+ if ((data == null) || data.isEmpty()) {
return;
}
for (final YangInstanceIdentifier path : data) {
private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set<Entry<YangInstanceIdentifier,
NormalizedNode<?,?>>> data, final Element element, final Operation operation, final SchemaContext
schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
- if (data == null || data.isEmpty()) {
+ if ((data == null) || data.isEmpty()) {
return;
}
- for (Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
+ for (final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
if (!ControllerContext.getInstance().isNodeMixin(entry.getKey())) {
final Node node = createCreatedChangedDataChangeEventElement(doc, entry, operation, schemaContext,
dataSchemaContextTree);
* {@link Operation}
* @return {@link Node} node represented by changed event element.
*/
- private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path, final Operation operation) {
+ private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path,
+ final Operation operation) {
final Element dataChangeEventElement = doc.createElement("data-change-event");
final Element pathElement = doc.createElement("path");
addPathAsValueToElement(path, pathElement);
final Element dataElement = doc.createElement("data");
dataElement.appendChild(result);
dataChangeEventElement.appendChild(dataElement);
- } catch (IOException e) {
+ } catch (final IOException e) {
LOG.error("Error in writer ", e);
- } catch (XMLStreamException e) {
+ } catch (final XMLStreamException e) {
LOG.error("Error processing stream", e);
}
return dataChangeEventElement;
}
- private static DOMResult writeNormalizedNode(final NormalizedNode<?,?> normalized, final
- YangInstanceIdentifier path, final SchemaContext context, final DataSchemaContextTree dataSchemaContextTree) throws
- IOException, XMLStreamException {
+ private static DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized,
+ final YangInstanceIdentifier path, final SchemaContext context,
+ final DataSchemaContextTree dataSchemaContextTree)
+ throws IOException, XMLStreamException {
final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
final Document doc = XmlDocumentUtils.getDocument();
final DOMResult result = new DOMResult(doc);
XMLStreamWriter writer = null;
final SchemaPath nodePath;
- if (normalized instanceof MapEntryNode || normalized instanceof UnkeyedListEntryNode) {
+ if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) {
nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
} else {
nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
* @return Path pointed to data in data store.
*/
public YangInstanceIdentifier getPath() {
- return path;
+ return this.path;
}
/**
* @return The name of the stream.
*/
public String getStreamName() {
- return streamName;
+ return this.streamName;
}
/**
* Removes all subscribers and unregisters event bus change recorder form event bus.
*/
public void close() throws Exception {
- subscribers = new ConcurrentSet<>();
- registration.close();
- registration = null;
- eventBus.unregister(eventBusChangeRecorder);
+ this.subscribers = new ConcurrentSet<>();
+ this.registration.close();
+ this.registration = null;
+ this.eventBus.unregister(this.eventBusChangeRecorder);
}
/**
* @return True if exist, false otherwise.
*/
public boolean isListening() {
- return registration == null ? false : true;
+ return this.registration == null ? false : true;
}
/**
}
final Event event = new Event(EventType.REGISTER);
event.setSubscriber(subscriber);
- eventBus.post(event);
+ this.eventBus.post(event);
}
/**
LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
final Event event = new Event(EventType.DEREGISTER);
event.setSubscriber(subscriber);
- eventBus.post(event);
+ this.eventBus.post(event);
}
/**
* @return True if exist at least one {@link Channel} subscriber, false otherwise.
*/
public boolean hasSubscribers() {
- return !subscribers.isEmpty();
+ return !this.subscribers.isEmpty();
}
/**
}
}
+ /**
+ * Set query parameters for listener
+ *
+ * @param start
+ * - start-time of getting notification
+ * @param stop
+ * - stop-time of getting notification
+ */
+ public void setTimer(final Date start, final Date stop) {
+ this.start = start;
+ this.stop = stop;
+ }
+
}