*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
-import com.google.common.base.Preconditions;
+import com.google.common.base.MoreObjects;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
import org.eclipse.jdt.annotation.NonNull;
-import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.common.formatters.EventFormatter;
+import org.opendaylight.restconf.common.formatters.EventFormatterFactory;
import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamSessionHandler;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Features of subscribing part of both notifications.
*/
-abstract class AbstractCommonSubscriber extends AbstractNotificationsData implements BaseListenerInterface {
+abstract class AbstractCommonSubscriber<P, T> extends AbstractNotificationsData implements BaseListenerInterface {
private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
private static final DateTimeFormatter FORMATTER = new DateTimeFormatterBuilder()
.appendValue(ChronoField.YEAR, 4).appendLiteral('-')
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
.appendOffset("+HH:MM", "Z").toFormatter();
+ private final EventFormatterFactory<T> formatterFactory;
+ private final NotificationOutputType outputType;
+ private final String streamName;
+ private final P path;
+
@GuardedBy("this")
private final Set<StreamSessionHandler> subscribers = new HashSet<>();
@GuardedBy("this")
private Instant stop = null;
private boolean leafNodesOnly = false;
private boolean skipNotificationData = false;
+ private EventFormatter<T> formatter;
+
+ AbstractCommonSubscriber(final QName lastQName, final String streamName, final P path,
+ final NotificationOutputType outputType, final EventFormatterFactory<T> formatterFactory) {
+ super(lastQName);
+ this.streamName = requireNonNull(streamName);
+ checkArgument(!streamName.isEmpty());
+ this.path = requireNonNull(path);
+
+ this.outputType = requireNonNull(outputType);
+ this.formatterFactory = requireNonNull(formatterFactory);
+ formatter = formatterFactory.getFormatter();
+ }
+
+ @Override
+ public final String getStreamName() {
+ return streamName;
+ }
+
+ @Override
+ public final String getOutputType() {
+ return outputType.getName();
+ }
@Override
public final synchronized boolean hasSubscribers() {
@Override
public synchronized void addSubscriber(final StreamSessionHandler subscriber) {
final boolean isConnected = subscriber.isConnected();
- Preconditions.checkState(isConnected);
+ checkState(isConnected);
LOG.debug("Subscriber {} is added.", subscriber);
subscribers.add(subscriber);
}
@Override
public synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
final boolean isConnected = subscriber.isConnected();
- Preconditions.checkState(isConnected);
+ checkState(isConnected);
LOG.debug("Subscriber {} is removed", subscriber);
subscribers.remove(subscriber);
if (!hasSubscribers()) {
/**
* Set query parameters for listener.
*
- * @param params NotificationQueryParams to use.
+ * @param params NotificationQueryParams to use.
*/
public final void setQueryParams(final NotificationQueryParams params) {
final var startTime = params.startTime();
skipNotificationData = skipData == null ? false : skipData.value();
final var filter = params.filter();
- if (filter != null) {
+ final String filterValue = filter == null ? null : filter.paramValue();
+ if (filterValue != null && !filterValue.isEmpty()) {
try {
- setFilter(filter.paramValue());
+ formatter = formatterFactory.getFormatter(filterValue);
} catch (XPathExpressionException e) {
throw new IllegalArgumentException("Failed to get filter", e);
}
+ } else {
+ formatter = formatterFactory.getFormatter();
}
}
- abstract void setFilter(@Nullable String xpathString) throws XPathExpressionException;
+ final P path() {
+ return path;
+ }
/**
* Check whether this query should only notify about leaf node changes.
return skipNotificationData;
}
+ final EventFormatter<T> formatter() {
+ return formatter;
+ }
+
/**
* Sets {@link Registration} registration.
*
return false;
}
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("path", path)
+ .add("stream-name", streamName)
+ .add("output-type", getOutputType())
+ .toString();
+ }
+
/**
* Parse input of query parameters - start-time or stop-time - from {@link DateAndTime} format
* to {@link Instant} format.
import org.opendaylight.restconf.nb.rfc8040.Rfc8040;
import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler;
import org.opendaylight.yangtools.util.xml.UntrustedXML;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
/**
* Abstract class for processing and preparing data.
- *
*/
abstract class AbstractNotificationsData {
private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationsData.class);
private static final TransformerFactory TF = TransformerFactory.newInstance();
private static final XMLOutputFactory OF = XMLOutputFactory.newInstance();
- private DOMDataBroker dataBroker;
+ private final String localName;
+
protected SchemaContextHandler schemaHandler;
- private String localName;
+ private DOMDataBroker dataBroker;
+
+ AbstractNotificationsData(final QName lastQName) {
+ localName = lastQName.getLocalName();
+ }
/**
* Data broker for delete data in DS on close().
return wTx.commit();
}
- /**
- * Set localName of last path element of specific listener.
- *
- * @param localName
- * local name
- */
- @SuppressWarnings("checkstyle:hiddenField")
- protected void setLocalNameOfPath(final String localName) {
- this.localName = localName;
- }
-
/**
* Formats data specified by RFC3339.
*
*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
import java.time.Instant;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Collectors;
-import javax.xml.xpath.XPathExpressionException;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.restconf.common.formatters.DataTreeCandidateFormatter;
import org.opendaylight.restconf.common.formatters.DataTreeCandidateFormatterFactory;
import org.opendaylight.restconf.common.formatters.JSONDataTreeCandidateFormatter;
import org.opendaylight.restconf.common.formatters.XMLDataTreeCandidateFormatter;
/**
* {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
*/
-public class ListenerAdapter extends AbstractCommonSubscriber implements ClusteredDOMDataTreeChangeListener {
+public class ListenerAdapter extends AbstractCommonSubscriber<YangInstanceIdentifier, Collection<DataTreeCandidate>>
+ implements ClusteredDOMDataTreeChangeListener {
private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
- private static final String PATH = "path";
private static final DataTreeCandidateFormatterFactory JSON_FORMATTER_FACTORY =
JSONDataTreeCandidateFormatter.createFactory(JSONCodecFactorySupplier.RFC7951);
- private final YangInstanceIdentifier path;
- private final String streamName;
- private final NotificationOutputType outputType;
-
- @VisibleForTesting DataTreeCandidateFormatter formatter;
-
/**
* Creates new {@link ListenerAdapter} listener specified by path and stream name and register for subscribing.
*
* @param streamName The name of the stream.
* @param outputType Type of output on notification (JSON, XML).
*/
- ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
+ @VisibleForTesting
+ public ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
final NotificationOutputType outputType) {
- setLocalNameOfPath(path.getLastPathArgument().getNodeType().getLocalName());
-
- this.outputType = requireNonNull(outputType);
- this.path = requireNonNull(path);
- this.streamName = requireNonNull(streamName);
- checkArgument(!streamName.isEmpty());
-
- formatter = getFormatterFactory().getFormatter();
+ super(path.getLastPathArgument().getNodeType(), streamName, path, outputType, getFormatterFactory(outputType));
}
- private DataTreeCandidateFormatterFactory getFormatterFactory() {
+ private static DataTreeCandidateFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
switch (outputType) {
case JSON:
return JSON_FORMATTER_FACTORY;
}
}
- @Override
- final void setFilter(final String filter) throws XPathExpressionException {
- final DataTreeCandidateFormatterFactory factory = getFormatterFactory();
- formatter = filter == null || filter.isEmpty() ? factory.getFormatter() : factory.getFormatter(filter);
- }
-
@Override
public void onInitialData() {
// No-op
final Optional<String> maybeData;
try {
- maybeData = formatter.eventData(schemaHandler.get(), dataTreeCandidates, now, getLeafNodesOnly(),
+ maybeData = formatter().eventData(schemaHandler.get(), dataTreeCandidates, now, getLeafNodesOnly(),
isSkipNotificationData());
} catch (final Exception e) {
LOG.error("Failed to process notification {}",
}
}
- /**
- * Gets the name of the stream.
- *
- * @return The name of the stream.
- */
- @Override
- public String getStreamName() {
- return streamName;
- }
-
- @Override
- public String getOutputType() {
- return outputType.getName();
- }
-
/**
* Get path pointed to data in data store.
*
* @return Path pointed to data in data store.
*/
public YangInstanceIdentifier getPath() {
- return path;
+ return path();
}
/**
new DOMDataTreeIdentifier(datastore, getPath()), this));
}
}
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add(PATH, path)
- .add("stream-name", streamName)
- .add("output-type", outputType)
- .toString();
- }
}
final long stamp = notificationListenersLock.writeLock();
try {
return notificationListeners.computeIfAbsent(streamName,
- stream -> new NotificationListenerAdapter(schemaPath, stream, outputType.getName()));
+ stream -> new NotificationListenerAdapter(schemaPath, stream, outputType));
} finally {
notificationListenersLock.unlockWrite(stamp);
}
*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
import java.time.Instant;
import java.util.Optional;
-import javax.xml.xpath.XPathExpressionException;
-import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.mdsal.dom.api.DOMNotification;
import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
import org.opendaylight.mdsal.dom.api.DOMNotificationService;
import org.opendaylight.restconf.common.formatters.JSONNotificationFormatter;
-import org.opendaylight.restconf.common.formatters.NotificationFormatter;
import org.opendaylight.restconf.common.formatters.NotificationFormatterFactory;
import org.opendaylight.restconf.common.formatters.XMLNotificationFormatter;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
/**
* {@link NotificationListenerAdapter} is responsible to track events on notifications.
*/
-public class NotificationListenerAdapter extends AbstractCommonSubscriber implements DOMNotificationListener {
+public class NotificationListenerAdapter extends AbstractCommonSubscriber<Absolute, DOMNotification>
+ implements DOMNotificationListener {
private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
private static final NotificationFormatterFactory JSON_FORMATTER_FACTORY = JSONNotificationFormatter.createFactory(
JSONCodecFactorySupplier.RFC7951);
- private final String streamName;
- private final Absolute path;
- private final NotificationOutputType outputType;
-
- @VisibleForTesting NotificationFormatter formatter;
-
-
/**
* Set path of listener and stream name.
*
* @param streamName Name of the stream.
* @param outputType Type of output on notification (JSON or XML).
*/
- NotificationListenerAdapter(final Absolute path, final String streamName, final String outputType) {
- setLocalNameOfPath(path.lastNodeIdentifier().getLocalName());
-
- this.outputType = NotificationOutputType.forName(requireNonNull(outputType)).get();
- this.path = requireNonNull(path);
- this.streamName = requireNonNull(streamName);
- checkArgument(!streamName.isEmpty());
- formatter = getFormatterFactory().getFormatter();
-
- LOG.debug("output type: {}, {}", outputType, this.outputType);
+ NotificationListenerAdapter(final Absolute path, final String streamName, final NotificationOutputType outputType) {
+ super(path.lastNodeIdentifier(), streamName, path, outputType, getFormatterFactory(outputType));
}
- private NotificationFormatterFactory getFormatterFactory() {
+ private static NotificationFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
switch (outputType) {
case JSON:
return JSON_FORMATTER_FACTORY;
}
}
- @Override
- final void setFilter(final @Nullable String filter) throws XPathExpressionException {
- final NotificationFormatterFactory factory = getFormatterFactory();
- formatter = filter == null || filter.isEmpty() ? factory.getFormatter() : factory.getFormatter(filter);
- }
-
- /**
- * Get output type of this listener.
- *
- * @return The configured output type (JSON or XML).
- */
- @Override
- public String getOutputType() {
- return outputType.getName();
- }
-
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
public void onNotification(final DOMNotification notification) {
final Optional<String> maybeOutput;
try {
- maybeOutput = formatter.eventData(schemaHandler.get(), notification, now, getLeafNodesOnly(),
+ maybeOutput = formatter().eventData(schemaHandler.get(), notification, now, getLeafNodesOnly(),
isSkipNotificationData());
} catch (Exception e) {
LOG.error("Failed to process notification {}", notification, e);
}
}
- /**
- * Get stream name of this listener.
- *
- * @return The configured stream name.
- */
- @Override
- public String getStreamName() {
- return streamName;
- }
-
/**
* Get schema path of notification.
*
* @return The configured schema path that points to observing YANG notification schema node.
*/
public Absolute getSchemaPath() {
- return path;
+ return path();
}
public final synchronized void listen(final DOMNotificationService notificationService) {
setRegistration(notificationService.registerNotificationListener(this, getSchemaPath()));
}
}
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("path", path)
- .add("stream-name", streamName)
- .add("output-type", outputType)
- .toString();
- }
}
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFailedFluentFuture;
import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFluentFuture;
YangParserTestUtils.parseYangFiles(TestRestconfUtils.loadFiles(PATH_FOR_NEW_SCHEMA_CONTEXT));
final DOMDataBroker dataBroker = mock(DOMDataBroker.class);
final DOMDataTreeWriteTransaction wTx = mock(DOMDataTreeWriteTransaction.class);
- when(dataBroker.newWriteOnlyTransaction()).thenReturn(wTx);
+ doReturn(wTx).when(dataBroker).newWriteOnlyTransaction();
doReturn(CommitInfo.emptyFluentFuture()).when(wTx).commit();
final SchemaContextHandler schemaContextHandler = new SchemaContextHandler(dataBroker,
mock(DOMSchemaService.class));
final InstanceIdentifierContext<?> context = mock(InstanceIdentifierContext.class);
final RpcDefinition schemaNode = mock(RpcDefinition.class);
final QName qname = QName.create("invoke:rpc:module", "2013-12-03", "rpcTest");
- when(schemaNode.getQName()).thenReturn(qname);
+ doReturn(qname).when(schemaNode).getQName();
doReturn(schemaNode).when(context).getSchemaNode();
final NormalizedNode data = mock(NormalizedNode.class);
final DOMRpcResult domRpcResult = mock(DOMRpcResult.class);
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.UriBuilder;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
@BeforeClass
public static void setUpBeforeTest() {
- final Map<String, ListenerAdapter> listenersByStreamNameSetter = new HashMap<>();
- final ListenerAdapter adapter = mock(ListenerAdapter.class);
- final YangInstanceIdentifier yiid = mock(YangInstanceIdentifier.class);
- doReturn(yiid).when(adapter).getPath();
- doReturn("JSON").when(adapter).getOutputType();
- listenersByStreamNameSetter.put(
- "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
- adapter);
- ListenersBroker.getInstance().setDataChangeListeners(listenersByStreamNameSetter);
+ final String name =
+ "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
+ final ListenerAdapter adapter = new ListenerAdapter(YangInstanceIdentifier.create(new NodeIdentifier(
+ QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))),
+ name, NotificationOutputType.JSON);
+ ListenersBroker.getInstance().setDataChangeListeners(Map.of(name, adapter));
}
@AfterClass
throws Exception {
final NotificationListenerAdapter notifiAdapter = ListenersBroker.getInstance().registerNotificationListener(
schemaPathNotifi, "json-stream", NotificationOutputType.JSON);
- return notifiAdapter.formatter.eventData(SCHEMA_CONTEXT, notificationData, Instant.now(), false, false).get();
+ return notifiAdapter.formatter().eventData(SCHEMA_CONTEXT, notificationData, Instant.now(), false, false).get();
}
}
throws Exception {
final NotificationListenerAdapter notifiAdapter = ListenersBroker.getInstance().registerNotificationListener(
schemaPathNotifi, "xml-stream", NotificationOutputTypeGrouping.NotificationOutputType.XML);
- return notifiAdapter.formatter.eventData(SCHEMA_CONTEXT, notificationData, Instant.now(), false, false).get();
+ return notifiAdapter.formatter().eventData(SCHEMA_CONTEXT, notificationData, Instant.now(), false, false).get();
}
}